Sign up
Login
New
Trending
Archive
English
English
Sign up
Login
New Paste
Add Image
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ RPi 3 B+ DUT mode controller for NI PXI power measurements (SAFE EDITION). """ import os, re, sys, time, threading, subprocess, hashlib, secrets, fcntl, signal import RPi.GPIO as GPIO # ========== CONFIG ========== PIN_IDLE = 17 PIN_CPU = 23 PIN_BLE = 27 PIN_WIFI = 22 INPUT_PINS = [PIN_IDLE, PIN_CPU, PIN_BLE, PIN_WIFI] DEBOUNCE_SEC = 0.05 POLL_INTERVAL_SEC = 0.20 VIDEO_FILE = os.environ.get("VIDEO_FILE", "/home/niindia/test_video.mp4") # ========== HELPERS ========== def run_cmd(cmd: str): try: c = subprocess.run( cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return c.returncode, c.stdout, c.stderr except Exception as e: return 1, "", str(e) def detect_gateway(): rc, out, _ = run_cmd("ip route | grep -m1 '^default ' || true") if rc == 0 and out: m = re.search(r"default\\s+via\\s+([0-9.]+)", out) if m: return m.group(1) return None WIFI_PING_TARGET = detect_gateway() or "127.0.0.1" WIFI_IPERF_SERVER = None WIFI_WGET_URL = os.environ.get("WIFI_WGET_URL", "") STRESS_THREADS = os.cpu_count() or 4 STRESS_ITERATIONS = 200_000 STRESS_KEYLEN = 32 current_state = None _lock_fh = None _video_proc = None _stress_threads = [] _stress_stop_evt = threading.Event() _bt_stop_evt = threading.Event() _bt_thread = None _wifi_stop_evt = threading.Event() _wifi_thread = None # ========== SINGLE INSTANCE ========== LOCK_FILE = "/tmp/rpi_modes.lock" def acquire_single_instance_lock(): global _lock_fh _lock_fh = open(LOCK_FILE, "w") try: fcntl.lockf(_lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB) _lock_fh.write(str(os.getpid())) _lock_fh.flush() except OSError: print("Another instance is already running.") sys.exit(1) # ========== GOVERNOR / RADIOS ========== def set_governor(mode: str): cmd = ( "for g in /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor; " f"do echo {mode} | sudo tee $g >/dev/null; done" ) run_cmd(cmd) def bt_on(): run_cmd("sudo rfkill unblock bluetooth") run_cmd("sudo systemctl start bluetooth || true") run_cmd("sudo hciconfig hci0 up || true") def bt_off(): run_cmd("sudo hciconfig hci0 down || true") run_cmd("sudo systemctl stop bluetooth || true") run_cmd("sudo rfkill block bluetooth") def wifi_on(): run_cmd("sudo rfkill unblock wifi") run_cmd("sudo ip link set wlan0 up || true") def wifi_off(): run_cmd("sudo ip link set wlan0 down || true") run_cmd("sudo rfkill block wifi") # ========== CPU STRESS ========== def _cpu_stress_worker(): while not _stress_stop_evt.is_set(): pwd = secrets.token_bytes(32) salt = secrets.token_bytes(16) hashlib.pbkdf2_hmac( 'sha256', pwd, salt, STRESS_ITERATIONS, dklen=STRESS_KEYLEN ) def start_stress(): global _stress_threads if _stress_threads: return _stress_stop_evt.clear() _stress_threads = [] for _ in range(STRESS_THREADS): t = threading.Thread(target=_cpu_stress_worker, daemon=True) t.start() _stress_threads.append(t) def stop_stress(): global _stress_threads _stress_stop_evt.set() for t in _stress_threads: t.join(timeout=1.5) _stress_threads = [] # ========== BLE ========== def _ble_scan_worker(): while not _bt_stop_evt.is_set(): run_cmd( "timeout 5s bash -lc " "'bluetoothctl --timeout 5 scan on >/dev/null 2>&1 || true'" ) time.sleep(1) def start_ble_scan(): global _bt_thread if _bt_thread and _bt_thread.is_alive(): return _bt_stop_evt.clear() _bt_thread = threading.Thread(target=_ble_scan_worker, daemon=True) _bt_thread.start() def stop_ble_scan(): global _bt_thread _bt_stop_evt.set() if _bt_thread: _bt_thread.join(timeout=1.5) _bt_thread = None # ========== WIFI ========== def _wifi_worker(): while not _wifi_stop_evt.is_set(): run_cmd( f"ping -c 5 -i 0.2 -W 1 {WIFI_PING_TARGET} " ">/dev/null 2>&1 || true" ) if WIFI_WGET_URL: run_cmd( f"timeout 10s bash -lc " f"'wget -q -O - {WIFI_WGET_URL} >/dev/null' || true" ) time.sleep(1) def start_wifi_traffic(): global _wifi_thread if _wifi_thread and _wifi_thread.is_alive(): return _wifi_stop_evt.clear() _wifi_thread = threading.Thread(target=_wifi_worker, daemon=True) _wifi_thread.start() def stop_wifi_traffic(): global _wifi_thread _wifi_stop_evt.set() if _wifi_thread: _wifi_thread.join(timeout=1.5) _wifi_thread = None # ========== GPIO ========== def setup_gpio(): GPIO.setwarnings(False) GPIO.cleanup() GPIO.setmode(GPIO.BCM) for pin in INPUT_PINS: GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) def read_pins(): return {pin: GPIO.input(pin) for pin in INPUT_PINS} # ========== STATES ========== def enter_idle(): global current_state if current_state == "IDLE": return stop_wifi_traffic() stop_ble_scan() stop_stress() bt_off() wifi_off() set_governor("powersave") current_state = "IDLE" def enter_cpu(): global current_state if current_state == "CPU": return bt_on() wifi_on() start_ble_scan() start_wifi_traffic() set_governor("performance") start_stress() current_state = "CPU" def enter_ble(): global current_state if current_state == "BLE": return stop_wifi_traffic() stop_stress() wifi_off() bt_on() start_ble_scan() set_governor("ondemand") current_state = "BLE" def enter_wifi(): global current_state if current_state == "WIFI": return stop_ble_scan() stop_stress() bt_off() wifi_on() start_wifi_traffic() set_governor("ondemand") current_state = "WIFI" def apply_state_from_pins(s): if s[PIN_CPU]: enter_cpu() elif s[PIN_WIFI]: enter_wifi() elif s[PIN_BLE]: enter_ble() elif s[PIN_IDLE]: enter_idle() # ========== MAIN ========== def graceful_exit(*_): stop_wifi_traffic() stop_ble_scan() stop_stress() bt_off() wifi_off() GPIO.cleanup() sys.exit(0) def main(): acquire_single_instance_lock() signal.signal(signal.SIGINT, graceful_exit) signal.signal(signal.SIGTERM, graceful_exit) setup_gpio() enter_idle() while True: s1 = read_pins() time.sleep(DEBOUNCE_SEC) s2 = read_pins() if s1 == s2: apply_state_from_pins(s2) time.sleep(POLL_INTERVAL_SEC) if __name__ == "__main__": main()
Settings
Title :
[Optional]
Paste Folder :
[Optional]
Select
Syntax :
[Optional]
Select
Markup
CSS
JavaScript
Bash
C
C#
C++
Java
JSON
Lua
Plaintext
C-like
ABAP
ActionScript
Ada
Apache Configuration
APL
AppleScript
Arduino
ARFF
AsciiDoc
6502 Assembly
ASP.NET (C#)
AutoHotKey
AutoIt
Basic
Batch
Bison
Brainfuck
Bro
CoffeeScript
Clojure
Crystal
Content-Security-Policy
CSS Extras
D
Dart
Diff
Django/Jinja2
Docker
Eiffel
Elixir
Elm
ERB
Erlang
F#
Flow
Fortran
GEDCOM
Gherkin
Git
GLSL
GameMaker Language
Go
GraphQL
Groovy
Haml
Handlebars
Haskell
Haxe
HTTP
HTTP Public-Key-Pins
HTTP Strict-Transport-Security
IchigoJam
Icon
Inform 7
INI
IO
J
Jolie
Julia
Keyman
Kotlin
LaTeX
Less
Liquid
Lisp
LiveScript
LOLCODE
Makefile
Markdown
Markup templating
MATLAB
MEL
Mizar
Monkey
N4JS
NASM
nginx
Nim
Nix
NSIS
Objective-C
OCaml
OpenCL
Oz
PARI/GP
Parser
Pascal
Perl
PHP
PHP Extras
PL/SQL
PowerShell
Processing
Prolog
.properties
Protocol Buffers
Pug
Puppet
Pure
Python
Q (kdb+ database)
Qore
R
React JSX
React TSX
Ren'py
Reason
reST (reStructuredText)
Rip
Roboconf
Ruby
Rust
SAS
Sass (Sass)
Sass (Scss)
Scala
Scheme
Smalltalk
Smarty
SQL
Soy (Closure Template)
Stylus
Swift
TAP
Tcl
Textile
Template Toolkit 2
Twig
TypeScript
VB.Net
Velocity
Verilog
VHDL
vim
Visual Basic
WebAssembly
Wiki markup
Xeora
Xojo (REALbasic)
XQuery
YAML
HTML
Expiration :
[Optional]
Never
Self Destroy
10 Minutes
1 Hour
1 Day
1 Week
2 Weeks
1 Month
6 Months
1 Year
Status :
[Optional]
Public
Unlisted
Private (members only)
Password :
[Optional]
Description:
[Optional]
Tags:
[Optional]
Encrypt Paste
(
?
)
Create Paste
You are currently not logged in, this means you can not edit or delete anything you paste.
Sign Up
or
Login
Site Languages
×
English