#!/usr/bin/env python3 """ Breathing belt capture script for ADS1115. Wiring (as you described): - Belt: red = power, green = GND, yellow = signal1, orange = signal2, blue = extra signal (optional) - ADS1115: A0 = yellow, A1 = orange, A2 = blue - Each of A0/A1/A2 has a 100k bleed to GND (good: stops floating/charge-hold behaviour) - ADS1115 VDD -> Pi 3V3, ADS1115 GND -> Pi GND - Belt green -> Pi/ADS GND - Belt red -> Pi 3V3 (if belt needs power; sounds like it does) IMPORTANT: do NOT use 5V for belt unless you are sure it expects 5V. Goal: - Continuously sample A0/A1/A2 and also compute a few helpful derived signals. - Log to CSV with a label column indicating inhale/exhale state. - Mark inhale/exhale in realtime: * SPACE toggles inhale_state (press at inhale start, press again at exhale start) * 'i' sets inhale_state=1 * 'e' sets inhale_state=0 * 'm' drops a manual marker event * 'q' quits Why derived signals: - Raw signals can drift. We also compute: - diff01 = A0 - A1 (often more stable if those are two ends of the sensor output) - hp (high-pass) to remove slow baseline drift - dv/dt approximate derivative (helps identify transition points) These help us learn how inhale/exhale shapes look, so later we can plot and detect them. """ import csv import os import sys import time from datetime import datetime from smbus2 import SMBus import termios import tty import select # ---------- ADS1115 low-level I2C ---------- ADDR = 0x48 # change if i2cdetect shows different REG_CONV = 0x00 REG_CFG = 0x01 # PGA gain bits: +/-4.096V => 0.125mV/LSB (good default, matches 3.3V systems) PGA_4_096 = 0x0200 # Data rate: 250 samples/sec internal conversion rate (fast enough for smooth data) DR_250SPS = 0x00A0 # Single-shot mode MODE_SINGLE = 0x0100 # Disable comparator COMP_DISABLE = 0x0003 # LSB size for PGA +/-4.096V LSB_V = 0.000125 # MUX single-ended inputs (AINx vs GND) MUX_SE = { 0: 0x4000, # AIN0 vs GND 1: 0x5000, # AIN1 vs GND 2: 0x6000, # AIN2 vs GND 3: 0x7000, # AIN3 vs GND } def write_u16(bus, reg, value): bus.write_i2c_block_data(ADDR, reg, [(value >> 8) & 0xFF, value & 0xFF]) def read_u16(bus, reg): b = bus.read_i2c_block_data(ADDR, reg, 2) return (b[0] << 8) | b[1] def read_signed_conv(bus): raw = read_u16(bus, REG_CONV) if raw & 0x8000: raw -= 1 << 16 return raw def read_se_voltage(bus, ch): """Read single-ended voltage on ADS channel ch (0..3)""" cfg = 0x8000 | MUX_SE[ch] | PGA_4_096 | MODE_SINGLE | DR_250SPS | COMP_DISABLE write_u16(bus, REG_CFG, cfg) # Wait for conversion. At 250SPS, max ~4ms; give it 6ms margin. time.sleep(0.006) raw = read_signed_conv(bus) return raw * LSB_V # ---------- Keyboard (non-blocking) ---------- class RawKeyReader: """Put terminal into raw mode and read single characters without blocking.""" def __init__(self): self.fd = sys.stdin.fileno() self.old = termios.tcgetattr(self.fd) def __enter__(self): tty.setraw(self.fd) return self def __exit__(self, exc_type, exc, tb): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) def read_key(self): """Return a single character if available, else None.""" r, _, _ = select.select([sys.stdin], [], [], 0) if r: return sys.stdin.read(1) return None # ---------- Capture / filtering ---------- def main(): sample_hz = 50.0 dt = 1.0 / sample_hz # High-pass filter to remove slow drift: # avg tracks baseline; hp = x - avg hp_alpha = 0.02 # smaller = slower baseline tracking avg = None prev_hp = 0.0 # Derivative prev_x = None inhale_state = 0 # 0=exhale, 1=inhale marker = "" # free-form event marker for the current row (usually empty) ts = datetime.now().strftime("%Y%m%d_%H%M%S") out_path = f"breath_capture_{ts}.csv" print("Starting capture.") print("Keys: SPACE=toggle inhale/exhale, i=inhale, e=exhale, m=marker, q=quit") print(f"Writing: {out_path}") with SMBus(1) as bus, open(out_path, "w", newline="") as f, RawKeyReader() as kr: w = csv.writer(f) w.writerow([ "t_unix", "t_s", "inhale_state", "event", "a0_yellow_v", "a1_orange_v", "a2_blue_v", "diff01_v", "hp_diff01_v", "d_diff01_v_per_s", ]) t0 = time.time() next_t = t0 rows = 0 while True: # Handle keypresses (note: terminal cannot detect key *release*) k = kr.read_key() if k: if k == "q": print("\nQuit requested.") break elif k == " ": inhale_state = 0 if inhale_state else 1 marker = "toggle" elif k.lower() == "i": inhale_state = 1 marker = "inhale_start" elif k.lower() == "e": inhale_state = 0 marker = "exhale_start" elif k.lower() == "m": marker = "mark" now = time.time() if now < next_t: time.sleep(max(0.0, next_t - now)) continue next_t += dt # Read channels a0 = read_se_voltage(bus, 0) # yellow a1 = read_se_voltage(bus, 1) # orange a2 = read_se_voltage(bus, 2) # blue (optional signal) diff01 = a0 - a1 # High-pass on diff01 to remove baseline drift if avg is None: avg = diff01 avg = (1.0 - hp_alpha) * avg + hp_alpha * diff01 hp = diff01 - avg # Derivative of diff01 (helps spot transitions) if prev_x is None: d = 0.0 else: d = (diff01 - prev_x) / dt prev_x = diff01 t_s = now - t0 w.writerow([now, t_s, inhale_state, marker, a0, a1, a2, diff01, hp, d]) rows += 1 # Live status line (simple, readable) if rows % 5 == 0: state = "INHALE" if inhale_state else "EXHALE" sys.stdout.write( f"\r{state:6s} a0={a0*1000:7.1f}mV a1={a1*1000:7.1f}mV a2={a2*1000:7.1f}mV" f" diff01={diff01*1000:+7.1f}mV hp={hp*1000:+7.1f}mV d={d:+.3f}V/s " ) sys.stdout.flush() # Clear marker after it’s been recorded once marker = "" print(f"Saved {out_path}") if __name__ == "__main__": main()