BodyBalanceEvaluation/backend/database.py

1542 lines
60 KiB
Python
Raw Normal View History

2025-07-28 11:59:56 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库管理模块
负责SQLite数据库的创建连接和数据操作
"""
import sqlite3
import json
import uuid
2025-08-02 16:52:17 +08:00
from datetime import datetime, timezone, timedelta
2025-07-28 11:59:56 +08:00
from typing import List, Dict, Optional, Any
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str):
self.db_path = db_path
self.connection = None
2025-08-02 16:52:17 +08:00
# 设置中国上海时区 (UTC+8)
self.china_tz = timezone(timedelta(hours=8))
def get_china_time(self) -> str:
"""获取中国时区的当前时间字符串"""
return datetime.now(self.china_tz).strftime('%Y-%m-%d %H:%M:%S')
2025-07-28 11:59:56 +08:00
def generate_patient_id(self) -> str:
"""生成患者唯一标识(YYYYMMDD0000)年月日+四位序号"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取当前日期
china_tz = timezone(timedelta(hours=8))
today = datetime.now(china_tz).strftime('%Y%m%d')
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
# 查询今天已有的最大序号
cursor.execute('''
SELECT id FROM patients
WHERE id LIKE ?
ORDER BY id DESC
LIMIT 1
''', (f'{today}%',))
result = cursor.fetchone()
if result:
# 提取序号部分并加1
last_id = result[0]
if len(last_id) >= 12 and last_id[:8] == today:
last_sequence = int(last_id[8:12])
sequence = str(last_sequence + 1).zfill(4)
else:
sequence = '0001'
else:
# 今天第一个患者
sequence = '0001'
patient_id = f'{today}{sequence}'
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM patients WHERE id = ?', (patient_id,))
if cursor.fetchone()[0] == 0:
return patient_id
# 如果10次尝试都失败使用UUID作为备用方案
logger.warning('患者ID生成重试次数过多使用UUID备用方案')
return str(uuid.uuid4())
except Exception as e:
logger.error(f'生成患者ID失败: {e}')
# 如果生成失败使用UUID作为备用方案
return str(uuid.uuid4())
def generate_user_id(self) -> str:
"""生成用户唯一标识,格式为六位数字序号(000001)"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
# 获取当前最大的用户ID只考虑六位数字格式的ID
cursor.execute('SELECT MAX(CAST(id AS INTEGER)) FROM users WHERE id GLOB "[0-9][0-9][0-9][0-9][0-9][0-9]"')
result = cursor.fetchone()[0]
if result is None:
# 如果没有用户记录从000001开始
next_id = 1
else:
next_id = result + 1
# 格式化为六位数字,前面补零
user_id = f"{next_id:06d}"
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM users WHERE id = ?', (user_id,))
if cursor.fetchone()[0] == 0:
return user_id
# 如果10次尝试都失败使用时间戳+随机数作为备用方案
logger.warning('用户ID生成重试次数过多使用备用方案')
import time
import random
timestamp_suffix = str(int(time.time()))[-4:]
random_suffix = str(random.randint(10, 99))
return f"{timestamp_suffix}{random_suffix}"
except Exception as e:
logger.error(f'生成用户ID失败: {e}')
# 如果出错,使用时间戳作为备用方案
import time
return str(int(time.time()))[-6:]
def generate_session_id(self) -> str:
"""生成会话唯一标识(YYYYMMDDHHMMSS)年月日时分秒"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取当前时间
china_tz = timezone(timedelta(hours=8))
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
current_time = datetime.now(china_tz)
session_id = current_time.strftime('%Y%m%d%H%M%S')
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,))
if cursor.fetchone()[0] == 0:
return session_id
# 如果存在冲突等待1秒后重试
import time
time.sleep(0.001) # 等待1毫秒
# 如果10次尝试都失败在时间后添加随机后缀
import random
current_time = datetime.now(china_tz)
base_id = current_time.strftime('%Y%m%d%H%M%S')
suffix = str(random.randint(10, 99))
session_id = f'{base_id}{suffix}'
# 最后检查一次
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,))
if cursor.fetchone()[0] == 0:
return session_id
# 如果还是冲突使用UUID作为备用方案
logger.warning('会话ID生成重试次数过多使用UUID备用方案')
return str(uuid.uuid4())
except Exception as e:
logger.error(f'生成会话ID失败: {e}')
# 如果生成失败使用UUID作为备用方案
return str(uuid.uuid4())
def generate_detection_data_id(self) -> str:
"""生成检测数据记录唯一标识(YYYYMMDDHHMMSS)年月日时分秒"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取当前时间
china_tz = timezone(timedelta(hours=8))
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
current_time = datetime.now(china_tz)
data_id = current_time.strftime('%Y%m%d%H%M%S')
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,))
if cursor.fetchone()[0] == 0:
return data_id
# 如果存在冲突等待1毫秒后重试
import time
time.sleep(0.001)
# 如果10次尝试都失败在时间后添加随机后缀
import random
current_time = datetime.now(china_tz)
base_id = current_time.strftime('%Y%m%d%H%M%S')
suffix = str(random.randint(100, 999))
data_id = f'{base_id}{suffix}'
# 最后检查一次
cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,))
if cursor.fetchone()[0] == 0:
return data_id
# 如果还是冲突使用UUID作为备用方案
logger.warning('检测数据ID生成重试次数过多使用UUID备用方案')
return str(uuid.uuid4())
except Exception as e:
logger.error(f'生成检测数据ID失败: {e}')
# 如果生成失败使用UUID作为备用方案
return str(uuid.uuid4())
2025-07-28 11:59:56 +08:00
def get_connection(self) -> sqlite3.Connection:
"""获取数据库连接"""
if not self.connection:
self.connection = sqlite3.connect(
self.db_path,
check_same_thread=False,
timeout=30.0
)
self.connection.row_factory = sqlite3.Row
return self.connection
def init_database(self):
"""初始化数据库表结构"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 创建用户表(医生)
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY, -- 用户唯一标识(0000000)六位序号
name TEXT NOT NULL, -- 用户真实姓名
username TEXT UNIQUE NOT NULL, -- 用户名登录名
password TEXT NOT NULL, -- 密码
register_date TIMESTAMP, -- 注册日期
is_active BOOLEAN DEFAULT 1, -- 账户是否激活0=未激活1=已激活
user_type TEXT DEFAULT 'user', -- 用户类型user/admin/doctor
phone TEXT DEFAULT '', -- 联系电话
created_at TIMESTAMP, -- 记录创建时间
updated_at TIMESTAMP -- 记录更新时间
)
''')
2025-07-28 11:59:56 +08:00
# 创建患者表
cursor.execute('''
CREATE TABLE IF NOT EXISTS patients (
id TEXT PRIMARY KEY, -- 患者唯一标识(YYYYMMDD0000)年月日+四位序号
name TEXT NOT NULL, -- 患者姓名
gender TEXT, -- 性别
birth_date TIMESTAMP, -- 出生日期
nationality TEXT, -- 民族
residence TEXT, -- 居住地
height REAL, -- 身高cm
weight REAL, -- 体重kg
shoe_size TEXT, -- 鞋码
phone TEXT, -- 电话号码
email TEXT, -- 电子邮箱
occupation TEXT, -- 职业
workplace TEXT, -- 工作单位
medical_history TEXT, -- 病史
notes TEXT, -- 备注信息
created_at TIMESTAMP, -- 记录创建时间
updated_at TIMESTAMP -- 记录更新时间
2025-07-28 11:59:56 +08:00
)
''')
# 创建检测会话表
cursor.execute('''
CREATE TABLE IF NOT EXISTS detection_sessions (
id TEXT PRIMARY KEY, -- 会话唯一标识(YYYYMMDDHHMMSS)年月日时分秒
patient_id TEXT NOT NULL, -- 患者ID外键
creator_id TEXT, -- 创建人ID医生ID外键
start_time TIMESTAMP, -- 检测开始时间
end_time TIMESTAMP, -- 检测结束时间
duration INTEGER, -- 检测持续时间
settings TEXT, -- 检测设置JSON格式
normal_video_path TEXT, -- 足部检测视频文件路径
femtobolt_video_path TEXT, -- 深度相机视频文件路径
screen_video_path TEXT, -- 屏幕录制视频路径
diagnosis_info TEXT, -- 诊断信息
treatment_info TEXT, -- 处理信息
suggestion_info TEXT, -- 建议信息
2025-08-06 17:13:05 +08:00
status TEXT DEFAULT 'created', -- 会话状态created/running/diagnosed/completed)
created_at TIMESTAMP, -- 记录创建时间
FOREIGN KEY (patient_id) REFERENCES patients (id), -- 患者表外键约束
FOREIGN KEY (creator_id) REFERENCES users (id) -- 用户表外键约束
2025-07-28 11:59:56 +08:00
)
''')
# 创建检测数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS detection_data (
id TEXT PRIMARY KEY, -- 记录唯一标识(YYYYMMDDHHMMSS)年月日时分秒
session_id TEXT NOT NULL, -- 检测会话ID外键
head_pose TEXT , -- 头部姿态数据JSON格式
body_pose TEXT , -- 身体姿态数据JSON格式
body_image TEXT, -- 身体视频截图存储路径
foot_data TEXT , -- 足部姿态数据JSON格式
foot_image TEXT, -- 足部监测视频截图存储路径
foot_data_image TEXT, -- 足底压力数据图存储路径
screen_image TEXT, -- 屏幕录制视频截图存储路径
timestamp TIMESTAMP, -- 数据记录时间戳
FOREIGN KEY (session_id) REFERENCES detection_sessions (id) -- 检测会话表外键约束
2025-07-28 11:59:56 +08:00
)
''')
2025-07-28 11:59:56 +08:00
# 创建系统设置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_settings (
key TEXT PRIMARY KEY, -- 设置项键名唯一标识
value TEXT NOT NULL, -- 设置项值
description TEXT, -- 设置项描述说明
updated_at TIMESTAMP -- 最后更新时间
2025-07-28 11:59:56 +08:00
)
''')
2025-07-31 17:19:17 +08:00
# 创建索引以提高查询性能
# 患者表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_name ON patients (name)') # 患者姓名索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_phone ON patients (phone)') # 患者电话索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_email ON patients (email)') # 患者邮箱索引
# 检测会话表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_patient ON detection_sessions (patient_id)') # 患者ID索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_creator ON detection_sessions (creator_id)') # 创建人ID索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_time ON detection_sessions (start_time)') # 开始时间索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_status ON detection_sessions (status)') # 会话状态索引
# 检测数据表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_session ON detection_data (session_id)') # 会话ID索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_timestamp ON detection_data (timestamp)') # 时间戳索引
# 用户表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_username ON users (username)') # 用户名索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_type ON users (user_type)') # 用户类型索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_phone ON users (phone)') # 用户电话索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_active ON users (is_active)') # 激活状态索引
2025-07-31 17:19:17 +08:00
# 插入默认管理员账户(如果不存在)
cursor.execute('SELECT COUNT(*) FROM users WHERE username = ?', ('admin',))
admin_exists = cursor.fetchone()[0]
if admin_exists == 0:
admin_id = self.generate_user_id()
2025-08-02 16:52:17 +08:00
# 默认密码为 admin123明文存储
admin_password = 'admin123'
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-31 17:19:17 +08:00
cursor.execute('''
INSERT INTO users (id, name, username, password, is_active, user_type, register_date, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (admin_id, '系统管理员', 'admin', admin_password, 1, 'admin', china_time, china_time, china_time))
2025-07-31 17:19:17 +08:00
logger.info('创建默认管理员账户: admin/admin123')
2025-07-28 11:59:56 +08:00
# 插入默认系统设置
china_time = self.get_china_time()
# 检查并插入默认摄像头设备索引配置
cursor.execute('SELECT COUNT(*) FROM system_settings WHERE key = ?', ('monitor_device_index',))
monitor_config_exists = cursor.fetchone()[0]
if monitor_config_exists == 0:
cursor.execute('''
INSERT INTO system_settings (key, value, description, updated_at)
VALUES (?, ?, ?, ?)
''', ('monitor_device_index', '1', '足部监视摄像头设备索引号', china_time))
logger.info('创建默认摄像头设备索引配置: 1')
2025-07-28 11:59:56 +08:00
conn.commit()
logger.info('数据库初始化完成')
except Exception as e:
conn.rollback()
logger.error(f'数据库初始化失败: {e}')
raise
# ==================== 患者管理 ====================
def create_patient(self, patient_data: Dict[str, Any]) -> str:
"""创建患者记录"""
# 验证必填字段
if not patient_data.get('name'):
raise ValueError('患者姓名不能为空')
2025-07-28 11:59:56 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
patient_id = self.generate_patient_id()
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-28 11:59:56 +08:00
cursor.execute('''
INSERT INTO patients (
id, name, gender, birth_date, nationality, residence,
height, weight, shoe_size, phone, email, occupation, workplace,
medical_history, notes, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2025-07-28 11:59:56 +08:00
''', (
patient_id,
patient_data.get('name'),
patient_data.get('gender'),
2025-07-31 17:23:05 +08:00
patient_data.get('birth_date'),
patient_data.get('nationality'),
patient_data.get('residence'),
2025-07-28 11:59:56 +08:00
patient_data.get('height'),
patient_data.get('weight'),
2025-07-31 17:23:05 +08:00
patient_data.get('shoe_size'),
patient_data.get('phone'),
patient_data.get('email'),
patient_data.get('occupation'),
patient_data.get('workplace'),
2025-07-28 11:59:56 +08:00
patient_data.get('medical_history'),
2025-08-02 16:52:17 +08:00
patient_data.get('notes'),
china_time,
china_time
2025-07-28 11:59:56 +08:00
))
conn.commit()
logger.info(f'创建患者记录: {patient_id}')
return patient_id
except Exception as e:
conn.rollback()
logger.error(f'创建患者记录失败: {e}')
raise
def get_patients(self, page: int = 1, size: int = 10, keyword: str = '') -> List[Dict]:
"""获取患者列表"""
# 验证分页参数
if page < 1:
page = 1
if size < 1 or size > 100:
size = 10
2025-07-28 11:59:56 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
offset = (page - 1) * size
if keyword:
cursor.execute('''
SELECT * FROM patients
WHERE name LIKE ? OR phone LIKE ? OR email LIKE ?
2025-07-28 11:59:56 +08:00
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', size, offset))
2025-07-28 11:59:56 +08:00
else:
cursor.execute('''
SELECT * FROM patients
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
rows = cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
logger.error(f'获取患者列表失败: {e}')
raise
def get_patients_count(self, keyword: str = '') -> int:
"""获取患者总数"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if keyword:
cursor.execute('''
SELECT COUNT(*) FROM patients
WHERE name LIKE ? OR phone LIKE ?
''', (f'%{keyword}%', f'%{keyword}%'))
else:
cursor.execute('SELECT COUNT(*) FROM patients')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f'获取患者总数失败: {e}')
return 0
def get_patient(self, patient_id: str) -> Optional[Dict]:
"""获取单个患者信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM patients WHERE id = ?', (patient_id,))
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f'获取患者信息失败: {e}')
return None
def update_patient(self, patient_id: str, patient_data: Dict[str, Any]):
"""更新患者信息"""
# 验证必填字段
if not patient_data.get('name'):
raise ValueError('患者姓名不能为空')
# 验证患者是否存在
if not self.get_patient(patient_id):
raise ValueError(f'患者不存在: {patient_id}')
2025-07-28 11:59:56 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-28 11:59:56 +08:00
cursor.execute('''
UPDATE patients SET
name = ?, gender = ?, birth_date = ?, nationality = ?,
residence = ?, height = ?, weight = ?, shoe_size = ?, phone = ?, email = ?,
occupation = ?, workplace = ?, medical_history = ?, notes = ?, updated_at = ?
2025-07-28 11:59:56 +08:00
WHERE id = ?
''', (
patient_data.get('name'),
patient_data.get('gender'),
2025-07-31 17:23:05 +08:00
patient_data.get('birth_date'),
patient_data.get('nationality'),
patient_data.get('residence'),
2025-07-28 11:59:56 +08:00
patient_data.get('height'),
patient_data.get('weight'),
2025-07-31 17:23:05 +08:00
patient_data.get('shoe_size'),
patient_data.get('phone'),
patient_data.get('email'),
patient_data.get('occupation'),
patient_data.get('workplace'),
2025-07-28 11:59:56 +08:00
patient_data.get('medical_history'),
patient_data.get('notes'),
2025-08-02 16:52:17 +08:00
china_time,
2025-07-28 11:59:56 +08:00
patient_id
))
conn.commit()
logger.info(f'更新患者信息: {patient_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新患者信息失败: {e}')
raise
def delete_patient(self, patient_id: str):
"""删除患者记录"""
# 验证患者是否存在
if not self.get_patient(patient_id):
raise ValueError(f'患者不存在: {patient_id}')
2025-07-28 11:59:56 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
# 删除相关的检测数据
cursor.execute('''
DELETE FROM detection_data
WHERE session_id IN (
SELECT id FROM detection_sessions WHERE patient_id = ?
)
''', (patient_id,))
# 删除检测会话
cursor.execute('DELETE FROM detection_sessions WHERE patient_id = ?', (patient_id,))
# 删除患者记录
cursor.execute('DELETE FROM patients WHERE id = ?', (patient_id,))
conn.commit()
logger.info(f'删除患者记录: {patient_id}')
except Exception as e:
conn.rollback()
logger.error(f'删除患者记录失败: {e}')
raise
def batch_delete_patients(self, patient_ids: List[str]):
"""批量删除患者记录"""
if not patient_ids:
return
conn = self.get_connection()
cursor = conn.cursor()
try:
# 验证所有患者是否存在
placeholders = ','.join(['?' for _ in patient_ids])
cursor.execute(f'SELECT COUNT(*) FROM patients WHERE id IN ({placeholders})', patient_ids)
existing_count = cursor.fetchone()[0]
if existing_count != len(patient_ids):
raise ValueError('部分患者记录不存在')
# 批量删除相关的检测数据
cursor.execute(f'''
DELETE FROM detection_data
WHERE session_id IN (
SELECT id FROM detection_sessions WHERE patient_id IN ({placeholders})
)
''', patient_ids)
# 批量删除检测会话
cursor.execute(f'DELETE FROM detection_sessions WHERE patient_id IN ({placeholders})', patient_ids)
# 批量删除患者记录
cursor.execute(f'DELETE FROM patients WHERE id IN ({placeholders})', patient_ids)
conn.commit()
logger.info(f'批量删除患者记录: {len(patient_ids)}')
except Exception as e:
conn.rollback()
logger.error(f'批量删除患者记录失败: {e}')
raise
2025-07-28 11:59:56 +08:00
# ==================== 检测会话管理 ====================
def create_detection_session(self, patient_id: str, settings: Dict[str, Any], creator_id: str = None) -> str:
2025-07-28 11:59:56 +08:00
"""创建检测会话"""
conn = self.get_connection()
cursor = conn.cursor()
try:
session_id = self.generate_session_id()
2025-07-28 11:59:56 +08:00
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-28 11:59:56 +08:00
cursor.execute('''
INSERT INTO detection_sessions (
2025-08-06 08:48:38 +08:00
id, patient_id, creator_id, duration, settings, status,
diagnosis_info, treatment_info, suggestion_info, start_time, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2025-07-28 11:59:56 +08:00
''', (
session_id,
patient_id,
creator_id,
2025-07-28 11:59:56 +08:00
settings.get('duration', 60),
json.dumps(settings),
2025-08-06 08:48:38 +08:00
'running',
settings.get('diagnosis_info', ''),
settings.get('treatment_info', ''),
settings.get('suggestion_info', ''),
2025-08-02 16:52:17 +08:00
china_time,
china_time
2025-07-28 11:59:56 +08:00
))
conn.commit()
logger.info(f'创建检测会话: {session_id}')
return session_id
except Exception as e:
conn.rollback()
logger.error(f'创建检测会话失败: {e}')
raise
2025-08-07 14:38:08 +08:00
def update_session_status(self, session_id: str, status: str) -> bool:
"""更新会话状态
Returns:
bool: 更新成功返回True失败返回False
"""
2025-07-28 11:59:56 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
if status in ['completed', 'stopped', 'error']:
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-28 11:59:56 +08:00
cursor.execute('''
UPDATE detection_sessions SET
2025-08-06 08:48:38 +08:00
status = ?, end_time = ?
2025-07-28 11:59:56 +08:00
WHERE id = ?
2025-08-06 08:48:38 +08:00
''', (status, china_time, session_id))
2025-07-28 11:59:56 +08:00
else:
cursor.execute('''
UPDATE detection_sessions SET
2025-08-06 08:48:38 +08:00
status = ?
2025-07-28 11:59:56 +08:00
WHERE id = ?
2025-08-06 08:48:38 +08:00
''', (status, session_id))
2025-07-28 11:59:56 +08:00
conn.commit()
logger.info(f'更新会话状态: {session_id} -> {status}')
2025-08-07 14:38:08 +08:00
return True
2025-07-28 11:59:56 +08:00
except Exception as e:
conn.rollback()
logger.error(f'更新会话状态失败: {e}')
2025-08-07 14:38:08 +08:00
return False
2025-07-28 11:59:56 +08:00
2025-07-31 17:23:05 +08:00
def update_session_duration(self, session_id: str, duration: int):
"""更新会话持续时间"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET duration = ?
WHERE id = ?
''', (duration, session_id))
conn.commit()
logger.info(f'更新会话持续时间: {session_id} -> {duration}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话持续时间失败: {e}')
raise
def update_session_normal_video_path(self, session_id: str, video_path: str):
"""更新会话足部检测视频路径"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET normal_video_path = ?
WHERE id = ?
''', (video_path, session_id))
conn.commit()
logger.info(f'更新会话足部检测视频路径: {session_id} -> {video_path}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话足部检测视频路径失败: {e}')
raise
def update_session_femtobolt_video_path(self, session_id: str, video_path: str):
"""更新会话深度相机视频路径"""
2025-07-31 17:23:05 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET femtobolt_video_path = ?
2025-07-31 17:23:05 +08:00
WHERE id = ?
''', (video_path, session_id))
conn.commit()
logger.info(f'更新会话深度相机视频路径: {session_id} -> {video_path}')
2025-07-31 17:23:05 +08:00
except Exception as e:
conn.rollback()
logger.error(f'更新会话深度相机视频路径失败: {e}')
2025-07-31 17:23:05 +08:00
raise
def update_session_screen_video_path(self, session_id: str, video_path: str):
"""更新会话屏幕录制视频路径"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET screen_video_path = ?
WHERE id = ?
''', (video_path, session_id))
conn.commit()
logger.info(f'更新会话屏幕录制视频路径: {session_id} -> {video_path}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话屏幕录制视频路径失败: {e}')
raise
def update_session_diagnosis_info(self, session_id: str, diagnosis_info: str):
"""更新会话诊断信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET diagnosis_info = ?
WHERE id = ?
''', (diagnosis_info, session_id))
conn.commit()
logger.info(f'更新会话诊断信息: {session_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话诊断信息失败: {e}')
raise
def update_session_treatment_info(self, session_id: str, treatment_info: str):
"""更新会话处理信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET treatment_info = ?
WHERE id = ?
''', (treatment_info, session_id))
conn.commit()
logger.info(f'更新会话处理信息: {session_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话处理信息失败: {e}')
raise
def update_session_suggestion_info(self, session_id: str, suggestion_info: str):
"""更新会话建议信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET suggestion_info = ?
WHERE id = ?
''', (suggestion_info, session_id))
conn.commit()
logger.info(f'更新会话建议信息: {session_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话建议信息失败: {e}')
raise
2025-08-06 17:13:05 +08:00
def update_session_all_info(self, session_id: str, diagnosis_info: str = None, treatment_info: str = None, suggestion_info: str = None, status: str = None):
"""同时更新会话的诊断信息、处理信息、建议信息和状态"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 构建动态SQL语句只更新非None的字段
update_fields = []
update_values = []
if diagnosis_info is not None:
update_fields.append('diagnosis_info = ?')
update_values.append(diagnosis_info)
if treatment_info is not None:
update_fields.append('treatment_info = ?')
update_values.append(treatment_info)
if suggestion_info is not None:
update_fields.append('suggestion_info = ?')
update_values.append(suggestion_info)
if status is not None:
update_fields.append('status = ?')
update_values.append(status)
# 如果状态是完成、停止或错误,同时更新结束时间
if status in ['completed', 'stopped', 'error']:
update_fields.append('end_time = ?')
update_values.append(self.get_china_time())
if not update_fields:
logger.warning(f'没有提供要更新的信息: {session_id}')
return
# 添加session_id到参数列表
update_values.append(session_id)
sql = f'''
UPDATE detection_sessions SET {', '.join(update_fields)}
WHERE id = ?
'''
cursor.execute(sql, update_values)
conn.commit()
updated_info = []
if diagnosis_info is not None:
updated_info.append('诊断信息')
if treatment_info is not None:
updated_info.append('处理信息')
if suggestion_info is not None:
updated_info.append('建议信息')
if status is not None:
updated_info.append(f'状态({status})')
logger.info(f'批量更新会话信息成功: {session_id}, 更新字段: {", ".join(updated_info)}')
except Exception as e:
conn.rollback()
logger.error(f'批量更新会话信息失败: {e}')
raise
2025-07-28 11:59:56 +08:00
def get_detection_sessions(self, page: int = 1, size: int = 10, patient_id: str = None) -> List[Dict]:
"""获取检测会话列表"""
conn = self.get_connection()
cursor = conn.cursor()
try:
offset = (page - 1) * size
if patient_id:
cursor.execute('''
SELECT s.*, p.name as patient_name, u.name as creator_name
2025-07-28 11:59:56 +08:00
FROM detection_sessions s
LEFT JOIN patients p ON s.patient_id = p.id
LEFT JOIN users u ON s.creator_id = u.id
2025-07-28 11:59:56 +08:00
WHERE s.patient_id = ?
ORDER BY s.start_time DESC
LIMIT ? OFFSET ?
''', (patient_id, size, offset))
else:
cursor.execute('''
SELECT s.*, p.name as patient_name, u.name as creator_name
2025-07-28 11:59:56 +08:00
FROM detection_sessions s
LEFT JOIN patients p ON s.patient_id = p.id
LEFT JOIN users u ON s.creator_id = u.id
2025-07-28 11:59:56 +08:00
ORDER BY s.start_time DESC
LIMIT ? OFFSET ?
''', (size, offset))
rows = cursor.fetchall()
sessions = []
for row in rows:
session = dict(row)
# 解析设置JSON
if session['settings']:
try:
session['settings'] = json.loads(session['settings'])
except:
session['settings'] = {}
sessions.append(session)
return sessions
except Exception as e:
logger.error(f'获取检测会话列表失败: {e}')
raise
def get_sessions_count(self, patient_id: str = None) -> int:
"""获取会话总数"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if patient_id:
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE patient_id = ?', (patient_id,))
else:
cursor.execute('SELECT COUNT(*) FROM detection_sessions')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f'获取会话总数失败: {e}')
return 0
def delete_detection_session(self, session_id: str):
"""删除检测会话及其相关的检测数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 验证会话是否存在
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,))
if cursor.fetchone()[0] == 0:
raise ValueError(f'检测会话不存在: {session_id}')
# 先删除相关的检测数据
cursor.execute('DELETE FROM detection_data WHERE session_id = ?', (session_id,))
deleted_data_count = cursor.rowcount
# 再删除检测会话
cursor.execute('DELETE FROM detection_sessions WHERE id = ?', (session_id,))
conn.commit()
logger.info(f'删除检测会话: {session_id}, 同时删除了 {deleted_data_count} 条检测数据')
except Exception as e:
conn.rollback()
logger.error(f'删除检测会话失败: {e}')
raise
2025-07-28 11:59:56 +08:00
def get_session_data(self, session_id: str) -> Optional[Dict]:
"""获取会话详细数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取会话基本信息
cursor.execute('''
SELECT s.*, p.name as patient_name, u.name as creator_name
2025-07-28 11:59:56 +08:00
FROM detection_sessions s
LEFT JOIN patients p ON s.patient_id = p.id
LEFT JOIN users u ON s.creator_id = u.id
2025-07-28 11:59:56 +08:00
WHERE s.id = ?
''', (session_id,))
session_row = cursor.fetchone()
if not session_row:
return None
session = dict(session_row)
# 解析设置JSON
if session['settings']:
try:
session['settings'] = json.loads(session['settings'])
except:
session['settings'] = {}
# 获取检测数据
cursor.execute('''
SELECT * FROM detection_data
WHERE session_id = ?
ORDER BY timestamp
''', (session_id,))
data_rows = cursor.fetchall()
session['data'] = []
for row in data_rows:
data_point = dict(row)
# 解析数据JSON
try:
data_point['data_value'] = json.loads(data_point['data_value'])
except:
pass
session['data'].append(data_point)
return session
except Exception as e:
logger.error(f'获取会话数据失败: {e}')
return None
# ==================== 检测记录数据管理 ====================
2025-07-28 11:59:56 +08:00
def save_detection_data(self, session_id: str, data: Dict[str, Any]):
"""保存检测数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
# 生成检测数据记录ID
data_id = self.generate_detection_data_id()
# 根据新的表结构保存数据
cursor.execute('''
INSERT INTO detection_data (
id, session_id, head_pose, body_pose, body_image,
foot_data, foot_image, foot_data_image, screen_image, timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data_id,
session_id,
json.dumps(data.get('head_pose')) if data.get('head_pose') else None,
json.dumps(data.get('body_pose')) if data.get('body_pose') else None,
data.get('body_image'),
json.dumps(data.get('foot_data')) if data.get('foot_data') else None,
data.get('foot_image'),
data.get('foot_data_image'),
data.get('screen_image'),
china_time
))
2025-07-28 11:59:56 +08:00
conn.commit()
logger.info(f'保存检测数据: {data_id}')
2025-07-28 11:59:56 +08:00
except Exception as e:
conn.rollback()
logger.error(f'保存检测数据失败: {e}')
raise
def get_latest_detection_data(self, session_id: str, limit: int = 10) -> List[Dict]:
"""获取最新的检测数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM detection_data
WHERE session_id = ?
ORDER BY timestamp DESC
LIMIT ?
''', (session_id, limit))
rows = cursor.fetchall()
data_points = []
for row in rows:
data_point = dict(row)
# 解析JSON字段
2025-07-28 11:59:56 +08:00
try:
if data_point.get('head_pose'):
data_point['head_pose'] = json.loads(data_point['head_pose'])
except:
pass
try:
if data_point.get('body_pose'):
data_point['body_pose'] = json.loads(data_point['body_pose'])
except:
pass
try:
if data_point.get('foot_data'):
data_point['foot_data'] = json.loads(data_point['foot_data'])
2025-07-28 11:59:56 +08:00
except:
pass
data_points.append(data_point)
return data_points
except Exception as e:
logger.error(f'获取最新检测数据失败: {e}')
return []
def get_detection_data_by_id(self, data_id: str) -> Optional[Dict]:
"""根据主键ID查询检测数据详情"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM detection_data
WHERE id = ?
''', (data_id,))
row = cursor.fetchone()
if not row:
return None
data_point = dict(row)
# 解析JSON字段
try:
if data_point.get('head_pose'):
data_point['head_pose'] = json.loads(data_point['head_pose'])
except:
pass
try:
if data_point.get('body_pose'):
data_point['body_pose'] = json.loads(data_point['body_pose'])
except:
pass
try:
if data_point.get('foot_data'):
data_point['foot_data'] = json.loads(data_point['foot_data'])
except:
pass
return data_point
except Exception as e:
logger.error(f'根据ID获取检测数据失败: {e}')
return None
def delete_detection_data(self, data_id: str):
"""按主键删除检测数据记录"""
2025-07-28 11:59:56 +08:00
conn = self.get_connection()
cursor = conn.cursor()
try:
# 验证记录是否存在
cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,))
if cursor.fetchone()[0] == 0:
raise ValueError(f'检测数据记录不存在: {data_id}')
# 删除检测数据记录
cursor.execute('DELETE FROM detection_data WHERE id = ?', (data_id,))
2025-07-28 11:59:56 +08:00
conn.commit()
logger.info(f'删除检测数据记录: {data_id}')
2025-07-28 11:59:56 +08:00
except Exception as e:
conn.rollback()
logger.error(f'删除检测数据记录失败: {e}')
2025-07-28 11:59:56 +08:00
raise
# ==================== 系统设置管理 ====================
def get_setting(self, key: str, default_value: Any = None) -> Any:
"""获取系统设置"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT value FROM system_settings WHERE key = ?', (key,))
row = cursor.fetchone()
if row:
try:
return json.loads(row['value'])
except:
return row['value']
return default_value
except Exception as e:
logger.error(f'获取系统设置失败: {e}')
return default_value
def set_setting(self, key: str, value: Any, description: str = ''):
"""设置系统设置"""
conn = self.get_connection()
cursor = conn.cursor()
try:
value_str = json.dumps(value) if not isinstance(value, str) else value
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-28 11:59:56 +08:00
cursor.execute('''
INSERT OR REPLACE INTO system_settings (key, value, description, updated_at)
2025-08-02 16:52:17 +08:00
VALUES (?, ?, ?, ?)
''', (key, value_str, description, china_time))
2025-07-28 11:59:56 +08:00
conn.commit()
logger.info(f'设置系统设置: {key}')
except Exception as e:
conn.rollback()
logger.error(f'设置系统设置失败: {e}')
raise
2025-07-31 17:19:17 +08:00
# ==================== 用户管理 ====================
def register_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""用户注册"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 检查手机号是否已存在(如果提供了手机号)
if user_data.get('phone'):
cursor.execute('SELECT COUNT(*) FROM users WHERE phone = ?', (user_data['phone'],))
if cursor.fetchone()[0] > 0:
return {
'success': False,
'message': '手机号已存在'
}
user_id = self.generate_user_id()
2025-08-02 16:52:17 +08:00
# 密码明文存储
password = user_data['password']
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-31 17:19:17 +08:00
cursor.execute('''
2025-08-02 16:52:17 +08:00
INSERT INTO users (id, name, username, password, phone, is_active, user_type, register_date, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2025-07-31 17:19:17 +08:00
''', (
user_id,
user_data['name'],
user_data['username'],
2025-08-02 16:52:17 +08:00
password,
2025-07-31 17:19:17 +08:00
user_data.get('phone'), # 手机号可选
2025-08-02 16:52:17 +08:00
1, # 新注册用户默认激活
'user',
china_time,
china_time,
china_time
2025-07-31 17:19:17 +08:00
))
conn.commit()
logger.info(f'用户注册成功: {user_data["username"]}')
return {
'success': True,
'user_id': user_id,
'message': '注册成功'
}
except sqlite3.IntegrityError:
conn.rollback()
logger.error(f'用户名已存在: {user_data["username"]}')
return {
'success': False,
'message': '用户名已存在'
}
except Exception as e:
conn.rollback()
logger.error(f'用户注册失败: {e}')
return {
'success': False,
'message': '注册失败'
}
def authenticate_user(self, username: str, password: str) -> Optional[Dict]:
"""用户登录验证"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM users
WHERE username = ? AND password = ? AND is_active = 1
2025-08-02 16:52:17 +08:00
''', (username, password))
2025-07-31 17:19:17 +08:00
row = cursor.fetchone()
if row:
user = dict(row)
# 不返回密码
del user['password']
logger.info(f'用户登录成功: {username}')
return user
else:
logger.warning(f'用户登录失败: {username}')
return None
except Exception as e:
logger.error(f'用户验证失败: {e}')
return None
def get_user_by_phone(self, phone: str) -> Optional[Dict]:
"""根据手机号查询用户"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM users WHERE phone = ?', (phone,))
row = cursor.fetchone()
if row:
user = dict(row)
# 不返回密码
del user['password']
return user
return None
except Exception as e:
logger.error(f'根据手机号查询用户失败: {e}')
return None
def get_users(self, page: int = 1, size: int = 10, status: str = 'all') -> List[Dict]:
"""获取用户列表"""
conn = self.get_connection()
cursor = conn.cursor()
try:
offset = (page - 1) * size
if status == 'pending':
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at
FROM users
WHERE is_active = 0 AND user_type = 'user'
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
elif status == 'active':
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at
FROM users
WHERE is_active = 1
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
else:
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at
FROM users
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
rows = cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
logger.error(f'获取用户列表失败: {e}')
return []
def get_users_count(self, status: str = 'all') -> int:
"""获取用户总数"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if status == 'pending':
cursor.execute('SELECT COUNT(*) FROM users WHERE is_active = 0 AND user_type = "user"')
elif status == 'active':
cursor.execute('SELECT COUNT(*) FROM users WHERE is_active = 1')
else:
cursor.execute('SELECT COUNT(*) FROM users')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f'获取用户总数失败: {e}')
return 0
def approve_user(self, user_id: str, approved: bool = True):
"""审核用户"""
conn = self.get_connection()
cursor = conn.cursor()
try:
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
2025-07-31 17:19:17 +08:00
cursor.execute('''
UPDATE users SET
is_active = ?,
2025-08-02 16:52:17 +08:00
updated_at = ?
2025-07-31 17:19:17 +08:00
WHERE id = ?
2025-08-02 16:52:17 +08:00
''', (1 if approved else 0, china_time, user_id))
2025-07-31 17:19:17 +08:00
conn.commit()
status = '通过' if approved else '拒绝'
logger.info(f'用户审核{status}: {user_id}')
except Exception as e:
conn.rollback()
logger.error(f'用户审核失败: {e}')
raise
def get_user(self, user_id: str) -> Optional[Dict]:
"""获取单个用户信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at, updated_at
FROM users WHERE id = ?
''', (user_id,))
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f'获取用户信息失败: {e}')
return None
def update_user(self, user_id: str, user_data: Dict[str, Any]):
"""更新用户信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
2025-08-02 16:52:17 +08:00
# 密码明文存储,无需加密处理
2025-07-31 17:19:17 +08:00
# 构建更新语句
fields = []
values = []
for key, value in user_data.items():
if key in ['name', 'username', 'password', 'is_active', 'user_type','phone']:
fields.append(f'{key} = ?')
values.append(value)
if fields:
2025-08-02 16:52:17 +08:00
# 使用中国时区时间
china_time = self.get_china_time()
fields.append('updated_at = ?')
values.append(china_time)
2025-07-31 17:19:17 +08:00
values.append(user_id)
2025-07-31 17:53:09 +08:00
sql = f"UPDATE users SET {', '.join(fields)} WHERE id = ?"
2025-07-31 17:19:17 +08:00
cursor.execute(sql, values)
conn.commit()
logger.info(f'更新用户信息: {user_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新用户信息失败: {e}')
raise
def delete_user(self, user_id: str):
"""删除用户"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
conn.commit()
logger.info(f'删除用户: {user_id}')
except Exception as e:
conn.rollback()
logger.error(f'删除用户失败: {e}')
raise
def get_system_setting(self, key: str, default_value: str = None) -> Optional[str]:
"""获取系统设置值"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT value FROM system_settings WHERE key = ?', (key,))
result = cursor.fetchone()
if result:
return result[0]
else:
logger.warning(f'系统设置项不存在: {key}')
return default_value
except Exception as e:
logger.error(f'获取系统设置失败: {e}')
return default_value
def set_system_setting(self, key: str, value: str, description: str = None):
"""设置系统设置值"""
conn = self.get_connection()
cursor = conn.cursor()
try:
china_time = self.get_china_time()
# 检查设置项是否存在
cursor.execute('SELECT COUNT(*) FROM system_settings WHERE key = ?', (key,))
exists = cursor.fetchone()[0]
if exists:
# 更新现有设置
cursor.execute('''
UPDATE system_settings SET value = ?, updated_at = ?
WHERE key = ?
''', (value, china_time, key))
else:
# 插入新设置
cursor.execute('''
INSERT INTO system_settings (key, value, description, updated_at)
VALUES (?, ?, ?, ?)
''', (key, value, description, china_time))
conn.commit()
logger.info(f'设置系统配置: {key} = {value}')
except Exception as e:
conn.rollback()
logger.error(f'设置系统配置失败: {e}')
raise
2025-07-28 11:59:56 +08:00
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
logger.info('数据库连接已关闭')