BodyBalanceEvaluation/backend/devices/imu_test.py

178 lines
5.8 KiB
Python
Raw Normal View History

import argparse
2026-01-12 15:21:44 +08:00
import asyncio
import time
from statistics import mean
2026-01-12 15:21:44 +08:00
import bleak
import device_model
2026-01-12 15:21:44 +08:00
async def find_device_by_address(address: str, timeout_s: float):
2026-01-12 15:21:44 +08:00
try:
return await bleak.BleakScanner.find_device_by_address(address, timeout=timeout_s)
except TypeError:
return await bleak.BleakScanner.find_device_by_address(address, cb=dict(use_bdaddr=False))
async def find_device_by_name(name: str, timeout_s: float):
scanner_fn = getattr(bleak.BleakScanner, "find_device_by_name", None)
if callable(scanner_fn):
return await scanner_fn(name, timeout=timeout_s)
devices = await bleak.BleakScanner.discover(timeout=timeout_s)
for d in devices:
if (getattr(d, "name", None) or "") == name:
return d
return None
async def run_trials(label: str, finder, runs: int, cooldown_s: float):
ok_times = []
fail = 0
for i in range(1, runs + 1):
start = time.perf_counter()
device = await finder()
ms = (time.perf_counter() - start) * 1000
if device is None:
fail += 1
print(f"[{label}] [{i:03d}] FAIL {ms:.1f}ms")
2026-01-12 15:21:44 +08:00
else:
addr = getattr(device, "address", None)
name = getattr(device, "name", None)
ok_times.append(ms)
print(f"[{label}] [{i:03d}] OK {ms:.1f}ms address={addr} name={name}")
if cooldown_s > 0:
await asyncio.sleep(cooldown_s)
if ok_times:
print(f"[{label}] runs={runs} success={len(ok_times)} fail={fail} avg={mean(ok_times):.1f}ms min={min(ok_times):.1f}ms max={max(ok_times):.1f}ms")
2026-01-12 15:21:44 +08:00
else:
print(f"[{label}] runs={runs} success=0 fail={fail}")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--address", default="FA:E8:88:06:FE:F3")
parser.add_argument("--name", default="WT901BLE67")
parser.add_argument("--runs", type=int, default=10)
parser.add_argument("--timeout", type=float, default=30.0)
parser.add_argument("--cooldown", type=float, default=0.3)
parser.add_argument("--mode", choices=["mac", "name", "both", "isopen", "battery"], default="both")
return parser.parse_args()
async def main():
args = parse_args()
if args.mode == "isopen":
device = await find_device_by_name(args.name, args.timeout)
if device is None:
device = await find_device_by_address(args.address, args.timeout)
if device is None:
print("FAIL: 未发现设备")
return
addr = getattr(device, "address", None)
name = getattr(device, "name", None)
if args.address and addr and addr.lower() != args.address.lower():
print(f"FAIL: 发现设备地址不匹配 found={addr} expected={args.address}")
return
print(f"FOUND address={addr} name={name}")
first_frame = asyncio.Event()
def on_update(dm):
if not first_frame.is_set():
first_frame.set()
dm = device_model.DeviceModel("imu_test", device, on_update)
task = asyncio.create_task(dm.openDevice())
try:
await asyncio.wait_for(first_frame.wait(), timeout=30.0)
print(f"OPENED isOpen={dm.isOpen} client_connected={bool(getattr(getattr(dm, 'client', None), 'is_connected', False))}")
except Exception:
print(f"OPEN_TIMEOUT isOpen={dm.isOpen} client_connected={bool(getattr(getattr(dm, 'client', None), 'is_connected', False))}")
while True:
client_connected = bool(getattr(getattr(dm, "client", None), "is_connected", False))
print(f"STATUS isOpen={dm.isOpen} client_connected={client_connected}")
if not dm.isOpen:
break
await asyncio.sleep(1.0)
try:
await asyncio.wait_for(task, timeout=5.0)
except Exception:
try:
task.cancel()
except Exception:
pass
print("DONE")
return
if args.mode == "battery":
device = await find_device_by_name(args.name, args.timeout)
if device is None:
device = await find_device_by_address(args.address, args.timeout)
if device is None:
print("FAIL: 未发现设备")
return
addr = getattr(device, "address", None)
name = getattr(device, "name", None)
if args.address and addr and addr.lower() != args.address.lower():
print(f"FAIL: 发现设备地址不匹配 found={addr} expected={args.address}")
return
print(f"FOUND address={addr} name={name}")
first_frame = asyncio.Event()
def on_update(dm):
if not first_frame.is_set():
first_frame.set()
dm = device_model.DeviceModel("imu_test", device, on_update)
task = asyncio.create_task(dm.openDevice())
try:
await asyncio.wait_for(first_frame.wait(), timeout=30.0)
print("OPENED")
except Exception:
print("OPEN_TIMEOUT")
info = await dm.readBattery(timeout=3.0)
print(f"BATTERY {info}")
try:
dm.closeDevice()
except Exception:
pass
try:
await asyncio.wait_for(task, timeout=5.0)
except Exception:
try:
task.cancel()
except Exception:
pass
print("DONE")
return
if args.mode in ("mac", "both"):
await run_trials(
"mac",
lambda: find_device_by_address(args.address, args.timeout),
args.runs,
args.cooldown,
)
if args.mode in ("name", "both"):
await run_trials(
"name",
lambda: find_device_by_name(args.name, args.timeout),
args.runs,
args.cooldown,
)
if __name__ == "__main__":
asyncio.run(main())