2026-01-13 18:15:51 +08:00
|
|
|
import argparse
|
2026-01-12 15:21:44 +08:00
|
|
|
import asyncio
|
2026-01-13 18:15:51 +08:00
|
|
|
import time
|
|
|
|
|
from statistics import mean
|
2026-01-12 15:21:44 +08:00
|
|
|
|
2026-01-13 18:15:51 +08:00
|
|
|
import bleak
|
2026-01-12 15:21:44 +08:00
|
|
|
|
|
|
|
|
|
2026-01-13 18:15:51 +08:00
|
|
|
async def find_device_by_address(address: str, timeout_s: float):
|
2026-01-12 15:21:44 +08:00
|
|
|
try:
|
2026-01-13 18:15:51 +08:00
|
|
|
return await bleak.BleakScanner.find_device_by_address(address, timeout=timeout_s)
|
|
|
|
|
except TypeError:
|
|
|
|
|
return await bleak.BleakScanner.find_device_by_address(address, cb=dict(use_bdaddr=False))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def find_device_by_name(name: str, timeout_s: float):
|
|
|
|
|
scanner_fn = getattr(bleak.BleakScanner, "find_device_by_name", None)
|
|
|
|
|
if callable(scanner_fn):
|
|
|
|
|
return await scanner_fn(name, timeout=timeout_s)
|
|
|
|
|
devices = await bleak.BleakScanner.discover(timeout=timeout_s)
|
|
|
|
|
for d in devices:
|
|
|
|
|
if (getattr(d, "name", None) or "") == name:
|
|
|
|
|
return d
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_trials(label: str, finder, runs: int, cooldown_s: float):
|
|
|
|
|
ok_times = []
|
|
|
|
|
fail = 0
|
|
|
|
|
|
|
|
|
|
for i in range(1, runs + 1):
|
|
|
|
|
start = time.perf_counter()
|
|
|
|
|
device = await finder()
|
|
|
|
|
ms = (time.perf_counter() - start) * 1000
|
|
|
|
|
if device is None:
|
|
|
|
|
fail += 1
|
|
|
|
|
print(f"[{label}] [{i:03d}] FAIL {ms:.1f}ms")
|
2026-01-12 15:21:44 +08:00
|
|
|
else:
|
2026-01-13 18:15:51 +08:00
|
|
|
addr = getattr(device, "address", None)
|
|
|
|
|
name = getattr(device, "name", None)
|
|
|
|
|
ok_times.append(ms)
|
|
|
|
|
print(f"[{label}] [{i:03d}] OK {ms:.1f}ms address={addr} name={name}")
|
|
|
|
|
if cooldown_s > 0:
|
|
|
|
|
await asyncio.sleep(cooldown_s)
|
|
|
|
|
|
|
|
|
|
if ok_times:
|
|
|
|
|
print(f"[{label}] runs={runs} success={len(ok_times)} fail={fail} avg={mean(ok_times):.1f}ms min={min(ok_times):.1f}ms max={max(ok_times):.1f}ms")
|
2026-01-12 15:21:44 +08:00
|
|
|
else:
|
2026-01-13 18:15:51 +08:00
|
|
|
print(f"[{label}] runs={runs} success=0 fail={fail}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args():
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
|
parser.add_argument("--address", default="FA:E8:88:06:FE:F3")
|
|
|
|
|
parser.add_argument("--name", default="WT901BLE67")
|
|
|
|
|
parser.add_argument("--runs", type=int, default=10)
|
|
|
|
|
parser.add_argument("--timeout", type=float, default=30.0)
|
|
|
|
|
parser.add_argument("--cooldown", type=float, default=0.3)
|
|
|
|
|
parser.add_argument("--mode", choices=["mac", "name", "both"], default="both")
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
args = parse_args()
|
|
|
|
|
|
|
|
|
|
if args.mode in ("mac", "both"):
|
|
|
|
|
await run_trials(
|
|
|
|
|
"mac",
|
|
|
|
|
lambda: find_device_by_address(args.address, args.timeout),
|
|
|
|
|
args.runs,
|
|
|
|
|
args.cooldown,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if args.mode in ("name", "both"):
|
|
|
|
|
await run_trials(
|
|
|
|
|
"name",
|
|
|
|
|
lambda: find_device_by_name(args.name, args.timeout),
|
|
|
|
|
args.runs,
|
|
|
|
|
args.cooldown,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
asyncio.run(main())
|