From 7daeb5692f45446738967c33ea4b2089944e8d22 Mon Sep 17 00:00:00 2001 From: zhaozilong12 <405241463@qq.com> Date: Wed, 10 Sep 2025 18:11:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=B1=E5=BA=A6=E7=9B=B8=E6=9C=BA=E8=B7=9D?= =?UTF-8?q?=E7=A6=BB=EF=BC=8Copencv=E6=B8=B2=E6=9F=93=E3=80=81=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E8=93=9D=E7=89=99IMU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/requirements.txt | 3 +- backend/tests/test44femtobolt.py | 4 +- backend/tests/testblueimu.py | 281 ++++++++++++++++++++++++++++++ backend/tests/testfemtobolt.py | 120 ++++++++----- backend/tests/testpltfemtobolt.py | 140 +++++++++++++++ 5 files changed, 505 insertions(+), 43 deletions(-) create mode 100644 backend/tests/testblueimu.py create mode 100644 backend/tests/testpltfemtobolt.py diff --git a/backend/requirements.txt b/backend/requirements.txt index e8cf6207..8716eecb 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -48,4 +48,5 @@ debugpy pyyaml click colorama -tqdm \ No newline at end of file +tqdm +bleak \ No newline at end of file diff --git a/backend/tests/test44femtobolt.py b/backend/tests/test44femtobolt.py index af8b7b91..79646c95 100644 --- a/backend/tests/test44femtobolt.py +++ b/backend/tests/test44femtobolt.py @@ -24,7 +24,7 @@ class FemtoBoltContourViewer: def _load_sdk(self): """加载并初始化 FemtoBolt SDK""" base_dir = os.path.dirname(os.path.abspath(__file__)) - dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "bin", "k4a.dll") + dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "k4a.dll") self.pykinect = pykinect self.pykinect.initialize_libraries(track_body=False, module_k4a_path=dll_path) @@ -90,5 +90,5 @@ class FemtoBoltContourViewer: if __name__ == "__main__": - viewer = FemtoBoltContourViewer(depth_min=900, depth_max=1100) + viewer = FemtoBoltContourViewer(depth_min=500, depth_max=700) viewer.run() diff --git a/backend/tests/testblueimu.py b/backend/tests/testblueimu.py new file mode 100644 index 00000000..0b8b8630 --- /dev/null +++ b/backend/tests/testblueimu.py @@ -0,0 +1,281 @@ +import asyncio +from bleak import BleakClient, BleakScanner +from bleak.backends.characteristic import BleakGATTCharacteristic +from array import array +import numpy as np + +#设备的Characteristic UUID +# par_notification_characteristic="0000ae02-0000-1000-8000-00805f9b34fb" +par_notification_characteristic=0x0007 +#设备的Characteristic UUID(具备写属性Write) +# par_write_characteristic="0000ae01-0000-1000-8000-00805f9b34fb" +par_write_characteristic=0x0005 + +par_device_addr="ef:3c:1a:0a:fe:02" #设备的MAC地址 此处需要填入设备的mac地址 + +#准备发送的消息,为“hi world\n”的HEX形式(包括回车符0x0A 0x0D) +# send_str=bytearray([0x68,0x69,0x20,0x77,0x6F,0x72,0x6C,0x64,0x0A,0x0D]) + +#监听回调函数,此处为打印消息 +def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray): + #print("rev data:",data) + parse_imu(data) + +def parse_imu(buf): + scaleAccel = 0.00478515625 # 加速度 [-16g~+16g] 9.8*16/32768 + scaleQuat = 0.000030517578125 # 四元数 [-1~+1] 1/32768 + scaleAngle = 0.0054931640625 # 角度 [-180~+180] 180/32768 + scaleAngleSpeed = 0.06103515625 # 角速度 [-2000~+2000] 2000/32768 + scaleMag = 0.15106201171875 # 磁场 [-4950~+4950] 4950/32768 + scaleTemperature = 0.01 # 温度 + scaleAirPressure = 0.0002384185791 # 气压 [-2000~+2000] 2000/8388608 + scaleHeight = 0.0010728836 # 高度 [-9000~+9000] 9000/8388608 + + imu_dat = array('f',[0.0 for i in range(0,34)]) + + if buf[0] == 0x11: + ctl = (buf[2] << 8) | buf[1] + # print("\n subscribe tag: 0x%04x"%ctl) + # print(" ms: ", ((buf[6]<<24) | (buf[5]<<16) | (buf[4]<<8) | (buf[3]<<0))) + + L =7 # 从第7字节开始根据 订阅标识tag来解析剩下的数据 + if ((ctl & 0x0001) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\taX: %.3f"%tmpX); # x加速度aX + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\taY: %.3f"%tmpY); # y加速度aY + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\taZ: %.3f"%tmpZ); # z加速度aZ + + imu_dat[0] = float(tmpX) + imu_dat[1] = float(tmpY) + imu_dat[2] = float(tmpZ) + + if ((ctl & 0x0002) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\tAX: %.3f"%tmpX) # x加速度AX + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\tAY: %.3f"%tmpY) # y加速度AY + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\tAZ: %.3f"%tmpZ) # z加速度AZ + + imu_dat[3] = float(tmpX) + imu_dat[4] = float(tmpY) + imu_dat[5] = float(tmpZ) + + if ((ctl & 0x0004) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAngleSpeed; L += 2 + # print("\tGX: %.3f"%tmpX) # x角速度GX + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAngleSpeed; L += 2 + # print("\tGY: %.3f"%tmpY) # y角速度GY + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAngleSpeed; L += 2 + # print("\tGZ: %.3f"%tmpZ) # z角速度GZ + + imu_dat[6] = float(tmpX) + imu_dat[7] = float(tmpY) + imu_dat[8] = float(tmpZ) + + if ((ctl & 0x0008) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleMag; L += 2 + # print("\tCX: %.3f"%tmpX); # x磁场CX + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleMag; L += 2 + # print("\tCY: %.3f"%tmpY); # y磁场CY + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleMag; L += 2 + # print("\tCZ: %.3f"%tmpZ); # z磁场CZ + + imu_dat[9] = float(tmpX) + imu_dat[10] = float(tmpY) + imu_dat[11] = float(tmpZ) + + if ((ctl & 0x0010) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleTemperature; L += 2 + # print("\ttemperature: %.2f"%tmpX) # 温度 + + tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L]))) + if ((tmpU32 & 0x800000) == 0x800000): # 若24位数的最高位为1则该数值为负数,需转为32位负数,直接补上ff即可 + tmpU32 = (tmpU32 | 0xff000000) + tmpY = np.int32(tmpU32) * scaleAirPressure; L += 3 + # print("\tairPressure: %.3f"%tmpY); # 气压 + + tmpU32 = np.uint32((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L])) + if ((tmpU32 & 0x800000) == 0x800000): # 若24位数的最高位为1则该数值为负数,需转为32位负数,直接补上ff即可 + tmpU32 = (tmpU32 | 0xff000000) + tmpZ = np.int32(tmpU32) * scaleHeight; L += 3 + # print("\theight: %.3f"%tmpZ); # 高度 + + imu_dat[12] = float(tmpX) + imu_dat[13] = float(tmpY) + imu_dat[14] = float(tmpZ) + + if ((ctl & 0x0020) != 0): + tmpAbs = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleQuat; L += 2 + # print("\tw: %.3f"%tmpAbs); # w + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleQuat; L += 2 + # print("\tx: %.3f"%tmpX); # x + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleQuat; L += 2 + # print("\ty: %.3f"%tmpY); # y + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleQuat; L += 2 + # print("\tz: %.3f"%tmpZ); # z + + imu_dat[15] = float(tmpAbs) + imu_dat[16] = float(tmpX) + imu_dat[17] = float(tmpY) + imu_dat[18] = float(tmpZ) + + if ((ctl & 0x0040) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAngle; L += 2 + # print("\tangleX: %.3f"%tmpX); # x角度 + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAngle; L += 2 + # print("\tangleY: %.3f"%tmpY); # y角度 + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAngle; L += 2 + # print("\tangleZ: %.3f"%tmpZ); # z角度 + print(f"\tangleX: {tmpX:.3f}, angleY: {tmpY:.3f}, angleZ: {tmpZ:.3f}") + imu_dat[19] = float(tmpX) + imu_dat[20] = float(tmpY) + imu_dat[21] = float(tmpZ) + + if ((ctl & 0x0080) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) / 1000.0; L += 2 + # print("\toffsetX: %.3f"%tmpX); # x坐标 + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) / 1000.0; L += 2 + # print("\toffsetY: %.3f"%tmpY); # y坐标 + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) / 1000.0; L += 2 + # print("\toffsetZ: %.3f"%tmpZ); # z坐标 + + imu_dat[22] = float(tmpX) + imu_dat[23] = float(tmpY) + imu_dat[24] = float(tmpZ) + + # if ((ctl & 0x0100) != 0): + # tmpU32 = ((buf[L+3]<<24) | (buf[L+2]<<16) | (buf[L+1]<<8) | (buf[L]<<0)); L += 4 + # print("\tsteps: %u"%tmpU32); # 计步数 + # tmpU8 = buf[L]; L += 1 + # if (tmpU8 & 0x01):# 是否在走路 + # print("\t walking yes") + # imu_dat[25] = 100 + # else: + # print("\t walking no") + # imu_dat[25] = 0 + # if (tmpU8 & 0x02):# 是否在跑步 + # print("\t running yes") + # imu_dat[26] = 100 + # else: + # print("\t running no") + # imu_dat[26] = 0 + # if (tmpU8 & 0x04):# 是否在骑车 + # print("\t biking yes") + # imu_dat[27] = 100 + # else: + # print("\t biking no") + # imu_dat[27] = 0 + # if (tmpU8 & 0x08):# 是否在开车 + # print("\t driving yes") + # imu_dat[28] = 100 + # else: + # print("\t driving no") + # imu_dat[28] = 0 + + if ((ctl & 0x0200) != 0): + tmpX = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\tasX: %.3f"%tmpX); # x加速度asX + tmpY = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\tasY: %.3f"%tmpY); # y加速度asY + tmpZ = np.short((np.short(buf[L+1])<<8) | buf[L]) * scaleAccel; L += 2 + # print("\tasZ: %.3f"%tmpZ); # z加速度asZ + + imu_dat[29] = float(tmpX) + imu_dat[30] = float(tmpY) + imu_dat[31] = float(tmpZ) + + if ((ctl & 0x0400) != 0): + tmpU16 = ((buf[L+1]<<8) | (buf[L]<<0)); L += 2 + # print("\tadc: %u"%tmpU16); # adc测量到的电压值,单位为mv + imu_dat[32] = float(tmpU16) + + if ((ctl & 0x0800) != 0): + tmpU8 = buf[L]; L += 1 + # print("\t GPIO1 M:%X, N:%X"%((tmpU8>>4)&0x0f, (tmpU8)&0x0f)) + imu_dat[33] = float(tmpU8) + + else: + print("[error] data head not define") + +async def main(): + print("starting scan...") + + #基于MAC地址查找设备 + device = await BleakScanner.find_device_by_address( + par_device_addr, cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统 + ) + if device is None: + print("could not find device with address '%s'", par_device_addr) + return + + #事件定义 + disconnected_event = asyncio.Event() + + #断开连接事件回调 + def disconnected_callback(client): + print("Disconnected callback called!") + disconnected_event.set() + + print("connecting to device...") + async with BleakClient(device,disconnected_callback=disconnected_callback) as client: + print("Connected") + await client.start_notify(par_notification_characteristic, notification_handler) + + # 保持连接 0x29 + wakestr=bytes([0x29]) + await client.write_gatt_char(par_write_characteristic, wakestr) + await asyncio.sleep(0.2) + print("------------------------------------------------") + # 尝试采用蓝牙高速通信特性 0x46 + fast=bytes([0x46]) + await client.write_gatt_char(par_write_characteristic, fast) + await asyncio.sleep(0.2) + + # GPIO 上拉 + #upstr=bytes([0x27,0x10]) + #await client.write_gatt_char(par_write_characteristic, upstr) + #await asyncio.sleep(0.2) + + # 参数设置 + isCompassOn = 0 #1=使用磁场融合姿态,0=不使用 + barometerFilter = 2 + Cmd_ReportTag = 0x0FFF # 功能订阅标识 + params = bytearray([0x00 for i in range(0,11)]) + params[0] = 0x12 + params[1] = 5 #静止状态加速度阀值 + params[2] = 255 #静止归零速度(单位cm/s) 0:不归零 255:立即归零 + params[3] = 0 #动态归零速度(单位cm/s) 0:不归零 + params[4] = ((barometerFilter&3)<<1) | (isCompassOn&1); + params[5] = 60 #数据主动上报的传输帧率[取值0-250HZ], 0表示0.5HZ + params[6] = 1 #陀螺仪滤波系数[取值0-2],数值越大越平稳但实时性越差 + params[7] = 3 #加速计滤波系数[取值0-4],数值越大越平稳但实时性越差 + params[8] = 5 #磁力计滤波系数[取值0-9],数值越大越平稳但实时性越差 + params[9] = Cmd_ReportTag&0xff + params[10] = (Cmd_ReportTag>>8)&0xff + await client.write_gatt_char(par_write_characteristic, params) + await asyncio.sleep(0.2) + + notes=bytes([0x19]) + await client.write_gatt_char(par_write_characteristic, notes) + + #await asyncio.sleep(2.0) #延迟一下等角度稳定后,再进行下一步的清零操作 + #await client.write_gatt_char(par_write_characteristic, bytes([0x05])) # z轴角归零 0x05 有需要的用户可开启 + #await asyncio.sleep(0.3) + #await client.write_gatt_char(par_write_characteristic, bytes([0x06])) # xyz坐标系清零 0x06 有需要的用户可开启 + + + #await asyncio.sleep(0.2) + #await client.write_gatt_char(par_write_characteristic, bytes([0x51,0xAA,0xBB])) # 用总圈数代替欧拉角传输 并清零圈数 0x51 + #await client.write_gatt_char(par_write_characteristic, bytes([0x51,0x00,0x00])) # 输出欧拉角 0x51 + + # 添加一个循环,使程序在接收数据时不会退出 + while not disconnected_event.is_set(): + await asyncio.sleep(1.0) + + #await disconnected_event.wait() #休眠直到设备断开连接,有延迟。此处为监听设备直到断开为止 + #await client.stop_notify(par_notification_characteristic) + +asyncio.run(main()) \ No newline at end of file diff --git a/backend/tests/testfemtobolt.py b/backend/tests/testfemtobolt.py index 823265da..8f361630 100644 --- a/backend/tests/testfemtobolt.py +++ b/backend/tests/testfemtobolt.py @@ -11,27 +11,31 @@ class FemtoBoltViewer: # 自定义彩虹色 colormap colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', + 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', + 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue'] - self.cmap = LinearSegmentedColormap.from_list("custom_cmap", colors) + self.custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors) # SDK 设备句柄和配置 self.device_handle = None self.pykinect = None self.config = None - # 缓存背景+网格图像(仅生成一次) + # 缓存数组 self.background = None + self.output_buffer = None + self._depth_filtered = None # 用于复用深度图过滤结果 + self._blur_buffer = None # 用于复用高斯模糊结果 # OpenCV 窗口 - cv2.namedWindow("Depth CV") + cv2.namedWindow("Depth CV", cv2.WINDOW_NORMAL) def _load_sdk(self): - """加载并初始化 FemtoBolt SDK""" try: import pykinect_azure as pykinect self.pykinect = pykinect base_dir = os.path.dirname(os.path.abspath(__file__)) - dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "bin", "k4a.dll") + dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "k4a.dll") self.pykinect.initialize_libraries(track_body=False, module_k4a_path=dll_path) return True except Exception as e: @@ -39,42 +43,72 @@ class FemtoBoltViewer: return False def _configure_device(self): - """配置 FemtoBolt 深度相机""" self.config = self.pykinect.default_configuration self.config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED self.config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15 self.config.synchronized_images_only = False - self.config.color_resolution = 0 self.device_handle = self.pykinect.start_device(config=self.config) - def _get_color_image(self, depth_image): - """将原始深度图转换为叠加背景网格后的 RGB 彩色图像""" - h, w = depth_image.shape - # 第一次调用时生成灰色背景和白色网格 - if self.background is None: - self.background = np.full((h, w, 3), 128, dtype=np.uint8) # 灰色 (0.5 -> 128) - # 绘制网格线 - for x in range(w): - cv2.line(self.background, (x, 0), (x, h-1), (255, 255, 255), 1) - for y in range(h): - cv2.line(self.background, (0, y), (w-1, y), (255, 255, 255), 1) + def _generate_contour_image(self, depth): + """改进版 OpenCV 等高线渲染,梯度平滑、局部对比增强""" + try: + # 初始化 depth_filtered 缓冲区 + if self._depth_filtered is None or self._depth_filtered.shape != depth.shape: + self._depth_filtered = np.zeros_like(depth, dtype=np.uint16) - # 生成深度掩码,仅保留指定范围内的像素 - mask_valid = (depth_image >= self.depth_min) & (depth_image <= self.depth_max) - depth_clipped = np.clip(depth_image, self.depth_min, self.depth_max) - normed = (depth_clipped.astype(np.float32) - self.depth_min) / (self.depth_max - self.depth_min) + np.copyto(self._depth_filtered, depth) # 直接覆盖,不生成新数组 + depth_filtered = self._depth_filtered + depth_filtered[depth_filtered > self.depth_max] = 0 + depth_filtered[depth_filtered < self.depth_min] = 0 + height, width = depth_filtered.shape - # 反转映射,保证颜色方向与之前一致 - normed = 1.0 - normed + # 背景缓存 + if self.background is None or self.background.shape[:2] != (height, width): + background_gray = int(0.5 * 255 * 0.3 + 255 * (1 - 0.3)) + self.background = np.ones((height, width, 3), dtype=np.uint8) * background_gray + grid_spacing = max(height // 20, width // 20, 10) + for x in range(0, width, grid_spacing): + cv2.line(self.background, (x, 0), (x, height-1), (255, 255, 255), 1) + for y in range(0, height, grid_spacing): + cv2.line(self.background, (0, y), (width-1, y), (255, 255, 255), 1) - # 应用自定义 colormap,将深度值映射到 RGB - rgba = self.cmap(normed) - rgb = (rgba[..., :3] * 255).astype(np.uint8) + # 初始化输出缓存和模糊缓存 + self.output_buffer = np.empty_like(self.background) + self._blur_buffer = np.empty_like(self.background) - # 叠加:在背景上覆盖彩色深度图(掩码处不覆盖,保留灰色背景+网格) - final_img = self.background.copy() - final_img[mask_valid] = rgb[mask_valid] - return final_img + # 复用输出缓存,避免 copy() + np.copyto(self.output_buffer, self.background) + output = self.output_buffer + valid_mask = depth_filtered > 0 + + if np.any(valid_mask): + # 连续归一化深度值 + norm_depth = np.zeros_like(depth_filtered, dtype=np.float32) + norm_depth[valid_mask] = (depth_filtered[valid_mask] - self.depth_min) / (self.depth_max - self.depth_min) + norm_depth = np.clip(norm_depth, 0, 1) ** 0.8 # Gamma增强 + + # 使用 colormap 映射 + cmap_colors = (self.custom_cmap(norm_depth)[..., :3] * 255).astype(np.uint8) + output[valid_mask] = cmap_colors[valid_mask] + + # Sobel 边界检测 + cv2.magnitude 替换 np.hypot + depth_uint8 = (norm_depth * 255).astype(np.uint8) + gx = cv2.Sobel(depth_uint8, cv2.CV_32F, 1, 0, ksize=3) + gy = cv2.Sobel(depth_uint8, cv2.CV_32F, 0, 1, ksize=3) + grad_mag = cv2.magnitude(gx, gy) + grad_mag = grad_mag.astype(np.uint8) + + # 自适应局部对比度增强(向量化) + edge_mask = grad_mag > 30 + output[edge_mask] = np.clip(output[edge_mask].astype(np.float32) * 1.5, 0, 255).astype(np.uint8) + + # 高斯平滑,复用 dst 缓冲区 + cv2.GaussianBlur(output, (3, 3), 0.3, dst=self._blur_buffer) + return self._blur_buffer + + except Exception as e: + print(f"等高线渲染失败: {e}") + return None def run(self): if not self._load_sdk(): @@ -82,7 +116,7 @@ class FemtoBoltViewer: return self._configure_device() - print("FemtoBolt 深度相机启动成功,按 Ctrl+C 或 ESC 退出") + print("FemtoBolt 深度相机启动成功,按 Ctrl+C 或 ESC 退出", self.config) try: while True: @@ -93,14 +127,20 @@ class FemtoBoltViewer: if not ret or depth_image is None: continue - # 转换并渲染当前帧 - final_img = self._get_color_image(depth_image) + final_img = self._generate_contour_image(depth_image) + if final_img is not None: + # 推迟裁剪到显示阶段 + h, w = final_img.shape[:2] + target_width = h // 2 + if w > target_width: + left = (w - target_width) // 2 + right = left + target_width + cv2.imshow("Depth CV", final_img[:, left:right]) + else: + cv2.imshow("Depth CV", final_img) - # OpenCV 显示 - cv2.imshow("Depth CV", final_img) - # 按 ESC 键退出 - if cv2.waitKey(1) & 0xFF == 27: - break + if cv2.waitKey(1) & 0xFF == 27: + break except KeyboardInterrupt: print("检测到退出信号,结束程序") @@ -112,5 +152,5 @@ class FemtoBoltViewer: if __name__ == "__main__": - viewer = FemtoBoltViewer(depth_min=900, depth_max=1100) + viewer = FemtoBoltViewer(depth_min=500, depth_max=700) viewer.run() diff --git a/backend/tests/testpltfemtobolt.py b/backend/tests/testpltfemtobolt.py new file mode 100644 index 00000000..cd440ae7 --- /dev/null +++ b/backend/tests/testpltfemtobolt.py @@ -0,0 +1,140 @@ +import os +import io +import numpy as np +import cv2 +import matplotlib.pyplot as plt +from matplotlib.colors import LinearSegmentedColormap + +class FemtoBoltViewer: + def __init__(self, depth_min=900, depth_max=1300): + self.depth_range_min = depth_min + self.depth_range_max = depth_max + + # 自定义彩虹色 colormap + colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', + 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue'] + self.custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors) + + # Matplotlib 图形初始化 + self.fig, self.ax = plt.subplots(figsize=(6, 6), dpi=75) + self.ax.axis('off') # 隐藏坐标轴 + + # SDK 设备句柄和配置 + self.device_handle = None + self.pykinect = None + self.config = None + + # OpenCV 窗口 + cv2.namedWindow("Depth CV", cv2.WINDOW_NORMAL) + + def _load_sdk(self): + try: + import pykinect_azure as pykinect + self.pykinect = pykinect + base_dir = os.path.dirname(os.path.abspath(__file__)) + dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "k4a.dll") + self.pykinect.initialize_libraries(track_body=False, module_k4a_path=dll_path) + return True + except Exception as e: + print(f"加载 SDK 失败: {e}") + return False + + def _configure_device(self): + self.config = self.pykinect.default_configuration + self.config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED + self.config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15 + self.config.synchronized_images_only = False + self.device_handle = self.pykinect.start_device(config=self.config) + + def _generate_contour_image_plt(self, depth): + """使用 matplotlib 生成等高线图像(完全采用 display_x.py 的逻辑)""" + try: + # 清除之前的绘图 + self.ax.clear() + self.ax.axis('off') + + # 深度数据过滤 + depth_filtered = depth.copy() + depth_filtered[depth_filtered > self.depth_range_max] = 0 + depth_filtered[depth_filtered < self.depth_range_min] = 0 + + # 背景图 + background = np.ones_like(depth_filtered) * 0.5 # 灰色背景 + self.ax.imshow(background, origin='lower', cmap='gray', alpha=0.3) + + # 屏蔽深度为0 + depth_masked = np.ma.masked_equal(depth_filtered, 0) + + # 绘制白色栅格线(底层) + self.ax.grid(True, which='both', axis='both', color='white', linestyle='-', linewidth=0.5, zorder=0) + self.ax.minorticks_on() + self.ax.grid(True, which='minor', axis='both', color='white', linestyle='-', linewidth=0.3, zorder=0) + + # 绘制等高线图 + self.ax.contourf(depth_masked, levels=100, cmap=self.custom_cmap, + vmin=self.depth_range_min, vmax=self.depth_range_max, origin='upper', zorder=2) + + # 保存到 BytesIO 缓冲区 + buf = io.BytesIO() + self.fig.savefig(buf, format='png', bbox_inches='tight', pad_inches=0, dpi=75) + buf.seek(0) + + # 转为 numpy 数组 + img_array = np.frombuffer(buf.getvalue(), dtype=np.uint8) + buf.close() + + # 使用 OpenCV 解码 PNG + img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + + # 裁剪宽度 + if img is not None: + height, width = img.shape[:2] + target_width = round(height // 2) + if width > target_width: + left = (width - target_width) // 2 + right = left + target_width + img = img[:, left:right] + return img + else: + print("无法解码matplotlib生成的PNG图像") + return None + + except Exception as e: + print(f"生成等高线图像失败: {e}") + return None + + def run(self): + if not self._load_sdk(): + print("SDK 加载失败,程序退出") + return + + self._configure_device() + print("FemtoBolt 深度相机启动成功,按 Ctrl+C 或 ESC 退出") + + try: + while True: + capture = self.device_handle.update() + if capture is None: + continue + ret, depth_image = capture.get_depth_image() + if not ret or depth_image is None: + continue + + final_img = self._generate_contour_image_plt(depth_image) + if final_img is not None: + cv2.imshow("Depth CV", final_img) + if cv2.waitKey(1) & 0xFF == 27: + break + + except KeyboardInterrupt: + print("检测到退出信号,结束程序") + finally: + if self.device_handle: + self.device_handle.stop() + self.device_handle.close() + cv2.destroyAllWindows() + + +if __name__ == "__main__": + viewer = FemtoBoltViewer(depth_min=500, depth_max=700) + viewer.run()