#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ RPi 3 B+ DUT mode controller for NI PXI power measurements (SAFE EDITION). """ import os, re, sys, time, threading, subprocess, hashlib, secrets, fcntl, signal import RPi.GPIO as GPIO # ========== CONFIG ========== PIN_IDLE = 17 PIN_CPU = 23 PIN_BLE = 27 PIN_WIFI = 22 INPUT_PINS = [PIN_IDLE, PIN_CPU, PIN_BLE, PIN_WIFI] DEBOUNCE_SEC = 0.05 POLL_INTERVAL_SEC = 0.20 VIDEO_FILE = os.environ.get("VIDEO_FILE", "/home/niindia/test_video.mp4") # ========== HELPERS ========== def run_cmd(cmd: str): try: c = subprocess.run( cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return c.returncode, c.stdout, c.stderr except Exception as e: return 1, "", str(e) def detect_gateway(): rc, out, _ = run_cmd("ip route | grep -m1 '^default ' || true") if rc == 0 and out: m = re.search(r"default\\s+via\\s+([0-9.]+)", out) if m: return m.group(1) return None WIFI_PING_TARGET = detect_gateway() or "127.0.0.1" WIFI_IPERF_SERVER = None WIFI_WGET_URL = os.environ.get("WIFI_WGET_URL", "") STRESS_THREADS = os.cpu_count() or 4 STRESS_ITERATIONS = 200_000 STRESS_KEYLEN = 32 current_state = None _lock_fh = None _video_proc = None _stress_threads = [] _stress_stop_evt = threading.Event() _bt_stop_evt = threading.Event() _bt_thread = None _wifi_stop_evt = threading.Event() _wifi_thread = None # ========== SINGLE INSTANCE ========== LOCK_FILE = "/tmp/rpi_modes.lock" def acquire_single_instance_lock(): global _lock_fh _lock_fh = open(LOCK_FILE, "w") try: fcntl.lockf(_lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB) _lock_fh.write(str(os.getpid())) _lock_fh.flush() except OSError: print("Another instance is already running.") sys.exit(1) # ========== GOVERNOR / RADIOS ========== def set_governor(mode: str): cmd = ( "for g in /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor; " f"do echo {mode} | sudo tee $g >/dev/null; done" ) run_cmd(cmd) def bt_on(): run_cmd("sudo rfkill unblock bluetooth") run_cmd("sudo systemctl start bluetooth || true") run_cmd("sudo hciconfig hci0 up || true") def bt_off(): run_cmd("sudo hciconfig hci0 down || true") run_cmd("sudo systemctl stop bluetooth || true") run_cmd("sudo rfkill block bluetooth") def wifi_on(): run_cmd("sudo rfkill unblock wifi") run_cmd("sudo ip link set wlan0 up || true") def wifi_off(): run_cmd("sudo ip link set wlan0 down || true") run_cmd("sudo rfkill block wifi") # ========== CPU STRESS ========== def _cpu_stress_worker(): while not _stress_stop_evt.is_set(): pwd = secrets.token_bytes(32) salt = secrets.token_bytes(16) hashlib.pbkdf2_hmac( 'sha256', pwd, salt, STRESS_ITERATIONS, dklen=STRESS_KEYLEN ) def start_stress(): global _stress_threads if _stress_threads: return _stress_stop_evt.clear() _stress_threads = [] for _ in range(STRESS_THREADS): t = threading.Thread(target=_cpu_stress_worker, daemon=True) t.start() _stress_threads.append(t) def stop_stress(): global _stress_threads _stress_stop_evt.set() for t in _stress_threads: t.join(timeout=1.5) _stress_threads = [] # ========== BLE ========== def _ble_scan_worker(): while not _bt_stop_evt.is_set(): run_cmd( "timeout 5s bash -lc " "'bluetoothctl --timeout 5 scan on >/dev/null 2>&1 || true'" ) time.sleep(1) def start_ble_scan(): global _bt_thread if _bt_thread and _bt_thread.is_alive(): return _bt_stop_evt.clear() _bt_thread = threading.Thread(target=_ble_scan_worker, daemon=True) _bt_thread.start() def stop_ble_scan(): global _bt_thread _bt_stop_evt.set() if _bt_thread: _bt_thread.join(timeout=1.5) _bt_thread = None # ========== WIFI ========== def _wifi_worker(): while not _wifi_stop_evt.is_set(): run_cmd( f"ping -c 5 -i 0.2 -W 1 {WIFI_PING_TARGET} " ">/dev/null 2>&1 || true" ) if WIFI_WGET_URL: run_cmd( f"timeout 10s bash -lc " f"'wget -q -O - {WIFI_WGET_URL} >/dev/null' || true" ) time.sleep(1) def start_wifi_traffic(): global _wifi_thread if _wifi_thread and _wifi_thread.is_alive(): return _wifi_stop_evt.clear() _wifi_thread = threading.Thread(target=_wifi_worker, daemon=True) _wifi_thread.start() def stop_wifi_traffic(): global _wifi_thread _wifi_stop_evt.set() if _wifi_thread: _wifi_thread.join(timeout=1.5) _wifi_thread = None # ========== GPIO ========== def setup_gpio(): GPIO.setwarnings(False) GPIO.cleanup() GPIO.setmode(GPIO.BCM) for pin in INPUT_PINS: GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) def read_pins(): return {pin: GPIO.input(pin) for pin in INPUT_PINS} # ========== STATES ========== def enter_idle(): global current_state if current_state == "IDLE": return stop_wifi_traffic() stop_ble_scan() stop_stress() bt_off() wifi_off() set_governor("powersave") current_state = "IDLE" def enter_cpu(): global current_state if current_state == "CPU": return bt_on() wifi_on() start_ble_scan() start_wifi_traffic() set_governor("performance") start_stress() current_state = "CPU" def enter_ble(): global current_state if current_state == "BLE": return stop_wifi_traffic() stop_stress() wifi_off() bt_on() start_ble_scan() set_governor("ondemand") current_state = "BLE" def enter_wifi(): global current_state if current_state == "WIFI": return stop_ble_scan() stop_stress() bt_off() wifi_on() start_wifi_traffic() set_governor("ondemand") current_state = "WIFI" def apply_state_from_pins(s): if s[PIN_CPU]: enter_cpu() elif s[PIN_WIFI]: enter_wifi() elif s[PIN_BLE]: enter_ble() elif s[PIN_IDLE]: enter_idle() # ========== MAIN ========== def graceful_exit(*_): stop_wifi_traffic() stop_ble_scan() stop_stress() bt_off() wifi_off() GPIO.cleanup() sys.exit(0) def main(): acquire_single_instance_lock() signal.signal(signal.SIGINT, graceful_exit) signal.signal(signal.SIGTERM, graceful_exit) setup_gpio() enter_idle() while True: s1 = read_pins() time.sleep(DEBOUNCE_SEC) s2 = read_pins() if s1 == s2: apply_state_from_pins(s2) time.sleep(POLL_INTERVAL_SEC) if __name__ == "__main__": main()