BodyBalanceEvaluation/backend/database.py

1516 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库管理模块
负责SQLite数据库的创建、连接和数据操作
"""
import sqlite3
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Optional, Any
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str):
self.db_path = db_path
self.connection = None
# 设置中国上海时区 (UTC+8)
self.china_tz = timezone(timedelta(hours=8))
def get_china_time(self) -> str:
"""获取中国时区的当前时间字符串"""
return datetime.now(self.china_tz).strftime('%Y-%m-%d %H:%M:%S')
def generate_patient_id(self) -> str:
"""生成患者唯一标识(YYYYMMDD0000)年月日+四位序号"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取当前日期
china_tz = timezone(timedelta(hours=8))
today = datetime.now(china_tz).strftime('%Y%m%d')
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
# 查询今天已有的最大序号
cursor.execute('''
SELECT id FROM patients
WHERE id LIKE ?
ORDER BY id DESC
LIMIT 1
''', (f'{today}%',))
result = cursor.fetchone()
if result:
# 提取序号部分并加1
last_id = result[0]
if len(last_id) >= 12 and last_id[:8] == today:
last_sequence = int(last_id[8:12])
sequence = str(last_sequence + 1).zfill(4)
else:
sequence = '0001'
else:
# 今天第一个患者
sequence = '0001'
patient_id = f'{today}{sequence}'
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM patients WHERE id = ?', (patient_id,))
if cursor.fetchone()[0] == 0:
return patient_id
# 如果10次尝试都失败使用UUID作为备用方案
logger.warning('患者ID生成重试次数过多使用UUID备用方案')
return str(uuid.uuid4())
except Exception as e:
logger.error(f'生成患者ID失败: {e}')
# 如果生成失败使用UUID作为备用方案
return str(uuid.uuid4())
def generate_user_id(self) -> str:
"""生成用户唯一标识,格式为六位数字序号(000001)"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
# 获取当前最大的用户ID只考虑六位数字格式的ID
cursor.execute('SELECT MAX(CAST(id AS INTEGER)) FROM users WHERE id GLOB "[0-9][0-9][0-9][0-9][0-9][0-9]"')
result = cursor.fetchone()[0]
if result is None:
# 如果没有用户记录从000001开始
next_id = 1
else:
next_id = result + 1
# 格式化为六位数字,前面补零
user_id = f"{next_id:06d}"
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM users WHERE id = ?', (user_id,))
if cursor.fetchone()[0] == 0:
return user_id
# 如果10次尝试都失败使用时间戳+随机数作为备用方案
logger.warning('用户ID生成重试次数过多使用备用方案')
import time
import random
timestamp_suffix = str(int(time.time()))[-4:]
random_suffix = str(random.randint(10, 99))
return f"{timestamp_suffix}{random_suffix}"
except Exception as e:
logger.error(f'生成用户ID失败: {e}')
# 如果出错,使用时间戳作为备用方案
import time
return str(int(time.time()))[-6:]
def generate_session_id(self) -> str:
"""生成会话唯一标识(YYYYMMDDHHMMSS)年月日时分秒"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取当前时间
china_tz = timezone(timedelta(hours=8))
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
current_time = datetime.now(china_tz)
session_id = current_time.strftime('%Y%m%d%H%M%S')
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,))
if cursor.fetchone()[0] == 0:
return session_id
# 如果存在冲突等待1秒后重试
import time
time.sleep(0.001) # 等待1毫秒
# 如果10次尝试都失败在时间后添加随机后缀
import random
current_time = datetime.now(china_tz)
base_id = current_time.strftime('%Y%m%d%H%M%S')
suffix = str(random.randint(10, 99))
session_id = f'{base_id}{suffix}'
# 最后检查一次
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,))
if cursor.fetchone()[0] == 0:
return session_id
# 如果还是冲突使用UUID作为备用方案
logger.warning('会话ID生成重试次数过多使用UUID备用方案')
return str(uuid.uuid4())
except Exception as e:
logger.error(f'生成会话ID失败: {e}')
# 如果生成失败使用UUID作为备用方案
return str(uuid.uuid4())
def generate_detection_data_id(self) -> str:
"""生成检测数据记录唯一标识(YYYYMMDDHHMMSS)年月日时分秒"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取当前时间
china_tz = timezone(timedelta(hours=8))
# 使用循环确保生成唯一ID防止并发冲突
for attempt in range(10): # 最多尝试10次
current_time = datetime.now(china_tz)
data_id = current_time.strftime('%Y%m%d%H%M%S')
# 检查ID是否已存在
cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,))
if cursor.fetchone()[0] == 0:
return data_id
# 如果存在冲突等待1毫秒后重试
import time
time.sleep(0.001)
# 如果10次尝试都失败在时间后添加随机后缀
import random
current_time = datetime.now(china_tz)
base_id = current_time.strftime('%Y%m%d%H%M%S')
suffix = str(random.randint(100, 999))
data_id = f'{base_id}{suffix}'
# 最后检查一次
cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,))
if cursor.fetchone()[0] == 0:
return data_id
# 如果还是冲突使用UUID作为备用方案
logger.warning('检测数据ID生成重试次数过多使用UUID备用方案')
return str(uuid.uuid4())
except Exception as e:
logger.error(f'生成检测数据ID失败: {e}')
# 如果生成失败使用UUID作为备用方案
return str(uuid.uuid4())
def get_connection(self) -> sqlite3.Connection:
"""获取数据库连接"""
if not self.connection:
self.connection = sqlite3.connect(
self.db_path,
check_same_thread=False,
timeout=30.0
)
self.connection.row_factory = sqlite3.Row
return self.connection
def init_database(self):
"""初始化数据库表结构"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 创建用户表(医生)
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY, -- 用户唯一标识(0000000)六位序号
name TEXT NOT NULL, -- 用户真实姓名
username TEXT UNIQUE NOT NULL, -- 用户名(登录名)
password TEXT NOT NULL, -- 密码
register_date TIMESTAMP, -- 注册日期
is_active BOOLEAN DEFAULT 1, -- 账户是否激活0=未激活1=已激活)
user_type TEXT DEFAULT 'user', -- 用户类型user/admin/doctor
phone TEXT DEFAULT '', -- 联系电话
created_at TIMESTAMP, -- 记录创建时间
updated_at TIMESTAMP -- 记录更新时间
)
''')
# 创建患者表
cursor.execute('''
CREATE TABLE IF NOT EXISTS patients (
id TEXT PRIMARY KEY, -- 患者唯一标识(YYYYMMDD0000)年月日+四位序号
name TEXT NOT NULL, -- 患者姓名
gender TEXT, -- 性别
birth_date TIMESTAMP, -- 出生日期
nationality TEXT, -- 民族
residence TEXT, -- 居住地
height REAL, -- 身高cm
weight REAL, -- 体重kg
shoe_size TEXT, -- 鞋码
phone TEXT, -- 电话号码
email TEXT, -- 电子邮箱
occupation TEXT, -- 职业
workplace TEXT, -- 工作单位
medical_history TEXT, -- 病史
notes TEXT, -- 备注信息
created_at TIMESTAMP, -- 记录创建时间
updated_at TIMESTAMP -- 记录更新时间
)
''')
# 创建检测会话表
cursor.execute('''
CREATE TABLE IF NOT EXISTS detection_sessions (
id TEXT PRIMARY KEY, -- 会话唯一标识(YYYYMMDDHHMMSS)年月日时分秒
patient_id TEXT NOT NULL, -- 患者ID外键
creator_id TEXT, -- 创建人ID医生ID外键
start_time TIMESTAMP, -- 检测开始时间
end_time TIMESTAMP, -- 检测结束时间
duration INTEGER, -- 检测持续时间(秒)
settings TEXT, -- 检测设置JSON格式
normal_video_path TEXT, -- 足部检测视频文件路径
femtobolt_video_path TEXT, -- 深度相机视频文件路径
screen_video_path TEXT, -- 屏幕录制视频路径
diagnosis_info TEXT, -- 诊断信息
treatment_info TEXT, -- 处理信息
suggestion_info TEXT, -- 建议信息
status TEXT DEFAULT 'created', -- 会话状态created/running/diagnosed/completed)
created_at TIMESTAMP, -- 记录创建时间
FOREIGN KEY (patient_id) REFERENCES patients (id), -- 患者表外键约束
FOREIGN KEY (creator_id) REFERENCES users (id) -- 用户表外键约束
)
''')
# 创建检测数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS detection_data (
id TEXT PRIMARY KEY, -- 记录唯一标识(YYYYMMDDHHMMSS)年月日时分秒
session_id TEXT NOT NULL, -- 检测会话ID外键
head_pose TEXT , -- 头部姿态数据JSON格式
body_pose TEXT , -- 身体姿态数据JSON格式
body_image TEXT, -- 身体视频截图存储路径
foot_data TEXT , -- 足部姿态数据JSON格式
foot_image TEXT, -- 足部监测视频截图存储路径
foot_data_image TEXT, -- 足底压力数据图存储路径
screen_image TEXT, -- 屏幕录制视频截图存储路径
timestamp TIMESTAMP, -- 数据记录时间戳
FOREIGN KEY (session_id) REFERENCES detection_sessions (id) -- 检测会话表外键约束
)
''')
# 创建系统设置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_settings (
key TEXT PRIMARY KEY, -- 设置项键名(唯一标识)
value TEXT NOT NULL, -- 设置项值
description TEXT, -- 设置项描述说明
updated_at TIMESTAMP -- 最后更新时间
)
''')
# 创建索引以提高查询性能
# 患者表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_name ON patients (name)') # 患者姓名索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_phone ON patients (phone)') # 患者电话索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_email ON patients (email)') # 患者邮箱索引
# 检测会话表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_patient ON detection_sessions (patient_id)') # 患者ID索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_creator ON detection_sessions (creator_id)') # 创建人ID索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_time ON detection_sessions (start_time)') # 开始时间索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_status ON detection_sessions (status)') # 会话状态索引
# 检测数据表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_session ON detection_data (session_id)') # 会话ID索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_timestamp ON detection_data (timestamp)') # 时间戳索引
# 用户表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_username ON users (username)') # 用户名索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_type ON users (user_type)') # 用户类型索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_phone ON users (phone)') # 用户电话索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_active ON users (is_active)') # 激活状态索引
# 插入默认管理员账户(如果不存在)
cursor.execute('SELECT COUNT(*) FROM users WHERE username = ?', ('admin',))
admin_exists = cursor.fetchone()[0]
if admin_exists == 0:
admin_id = self.generate_user_id()
# 默认密码为 admin123明文存储
admin_password = 'admin123'
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
INSERT INTO users (id, name, username, password, is_active, user_type, register_date, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (admin_id, '系统管理员', 'admin', admin_password, 1, 'admin', china_time, china_time, china_time))
logger.info('创建默认管理员账户: admin/admin123')
# 插入默认系统设置
china_time = self.get_china_time()
# 检查并插入默认摄像头设备索引配置
cursor.execute('SELECT COUNT(*) FROM system_settings WHERE key = ?', ('monitor_device_index',))
monitor_config_exists = cursor.fetchone()[0]
if monitor_config_exists == 0:
cursor.execute('''
INSERT INTO system_settings (key, value, description, updated_at)
VALUES (?, ?, ?, ?)
''', ('monitor_device_index', '1', '足部监视摄像头设备索引号', china_time))
logger.info('创建默认摄像头设备索引配置: 1')
conn.commit()
logger.info('数据库初始化完成')
except Exception as e:
conn.rollback()
logger.error(f'数据库初始化失败: {e}')
raise
# ==================== 患者管理 ====================
def create_patient(self, patient_data: Dict[str, Any]) -> str:
"""创建患者记录"""
# 验证必填字段
if not patient_data.get('name'):
raise ValueError('患者姓名不能为空')
conn = self.get_connection()
cursor = conn.cursor()
try:
patient_id = self.generate_patient_id()
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
INSERT INTO patients (
id, name, gender, birth_date, nationality, residence,
height, weight, shoe_size, phone, email, occupation, workplace,
medical_history, notes, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
patient_id,
patient_data.get('name'),
patient_data.get('gender'),
patient_data.get('birth_date'),
patient_data.get('nationality'),
patient_data.get('residence'),
patient_data.get('height'),
patient_data.get('weight'),
patient_data.get('shoe_size'),
patient_data.get('phone'),
patient_data.get('email'),
patient_data.get('occupation'),
patient_data.get('workplace'),
patient_data.get('medical_history'),
patient_data.get('notes'),
china_time,
china_time
))
conn.commit()
logger.info(f'创建患者记录: {patient_id}')
return patient_id
except Exception as e:
conn.rollback()
logger.error(f'创建患者记录失败: {e}')
raise
def get_patients(self, page: int = 1, size: int = 10, keyword: str = '') -> List[Dict]:
"""获取患者列表"""
# 验证分页参数
if page < 1:
page = 1
if size < 1 or size > 100:
size = 10
conn = self.get_connection()
cursor = conn.cursor()
try:
offset = (page - 1) * size
if keyword:
cursor.execute('''
SELECT * FROM patients
WHERE name LIKE ? OR phone LIKE ? OR email LIKE ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', size, offset))
else:
cursor.execute('''
SELECT * FROM patients
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
rows = cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
logger.error(f'获取患者列表失败: {e}')
raise
def get_patients_count(self, keyword: str = '') -> int:
"""获取患者总数"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if keyword:
cursor.execute('''
SELECT COUNT(*) FROM patients
WHERE name LIKE ? OR phone LIKE ?
''', (f'%{keyword}%', f'%{keyword}%'))
else:
cursor.execute('SELECT COUNT(*) FROM patients')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f'获取患者总数失败: {e}')
return 0
def get_patient(self, patient_id: str) -> Optional[Dict]:
"""获取单个患者信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM patients WHERE id = ?', (patient_id,))
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f'获取患者信息失败: {e}')
return None
def update_patient(self, patient_id: str, patient_data: Dict[str, Any]):
"""更新患者信息"""
# 验证必填字段
if not patient_data.get('name'):
raise ValueError('患者姓名不能为空')
# 验证患者是否存在
if not self.get_patient(patient_id):
raise ValueError(f'患者不存在: {patient_id}')
conn = self.get_connection()
cursor = conn.cursor()
try:
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
UPDATE patients SET
name = ?, gender = ?, birth_date = ?, nationality = ?,
residence = ?, height = ?, weight = ?, shoe_size = ?, phone = ?, email = ?,
occupation = ?, workplace = ?, medical_history = ?, notes = ?, updated_at = ?
WHERE id = ?
''', (
patient_data.get('name'),
patient_data.get('gender'),
patient_data.get('birth_date'),
patient_data.get('nationality'),
patient_data.get('residence'),
patient_data.get('height'),
patient_data.get('weight'),
patient_data.get('shoe_size'),
patient_data.get('phone'),
patient_data.get('email'),
patient_data.get('occupation'),
patient_data.get('workplace'),
patient_data.get('medical_history'),
patient_data.get('notes'),
china_time,
patient_id
))
conn.commit()
logger.info(f'更新患者信息: {patient_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新患者信息失败: {e}')
raise
def delete_patient(self, patient_id: str):
"""删除患者记录"""
# 验证患者是否存在
if not self.get_patient(patient_id):
raise ValueError(f'患者不存在: {patient_id}')
conn = self.get_connection()
cursor = conn.cursor()
try:
# 删除相关的检测数据
cursor.execute('''
DELETE FROM detection_data
WHERE session_id IN (
SELECT id FROM detection_sessions WHERE patient_id = ?
)
''', (patient_id,))
# 删除检测会话
cursor.execute('DELETE FROM detection_sessions WHERE patient_id = ?', (patient_id,))
# 删除患者记录
cursor.execute('DELETE FROM patients WHERE id = ?', (patient_id,))
conn.commit()
logger.info(f'删除患者记录: {patient_id}')
except Exception as e:
conn.rollback()
logger.error(f'删除患者记录失败: {e}')
raise
def batch_delete_patients(self, patient_ids: List[str]):
"""批量删除患者记录"""
if not patient_ids:
return
conn = self.get_connection()
cursor = conn.cursor()
try:
# 验证所有患者是否存在
placeholders = ','.join(['?' for _ in patient_ids])
cursor.execute(f'SELECT COUNT(*) FROM patients WHERE id IN ({placeholders})', patient_ids)
existing_count = cursor.fetchone()[0]
if existing_count != len(patient_ids):
raise ValueError('部分患者记录不存在')
# 批量删除相关的检测数据
cursor.execute(f'''
DELETE FROM detection_data
WHERE session_id IN (
SELECT id FROM detection_sessions WHERE patient_id IN ({placeholders})
)
''', patient_ids)
# 批量删除检测会话
cursor.execute(f'DELETE FROM detection_sessions WHERE patient_id IN ({placeholders})', patient_ids)
# 批量删除患者记录
cursor.execute(f'DELETE FROM patients WHERE id IN ({placeholders})', patient_ids)
conn.commit()
logger.info(f'批量删除患者记录: {len(patient_ids)}')
except Exception as e:
conn.rollback()
logger.error(f'批量删除患者记录失败: {e}')
raise
# ==================== 检测会话管理 ====================
def create_detection_session(self, patient_id: str, settings: Dict[str, Any], creator_id: str = None) -> str:
"""创建检测会话"""
conn = self.get_connection()
cursor = conn.cursor()
try:
session_id = self.generate_session_id()
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
INSERT INTO detection_sessions (
id, patient_id, creator_id, duration, settings, status,
diagnosis_info, treatment_info, suggestion_info, start_time, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
session_id,
patient_id,
creator_id,
settings.get('duration', 60),
json.dumps(settings),
'running',
settings.get('diagnosis_info', ''),
settings.get('treatment_info', ''),
settings.get('suggestion_info', ''),
china_time,
china_time
))
conn.commit()
logger.info(f'创建检测会话: {session_id}')
return session_id
except Exception as e:
conn.rollback()
logger.error(f'创建检测会话失败: {e}')
raise
def update_session_status(self, session_id: str, status: str) -> bool:
"""更新会话状态
Returns:
bool: 更新成功返回True失败返回False
"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if status in ['completed', 'stopped', 'error']:
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
UPDATE detection_sessions SET
status = ?, end_time = ?
WHERE id = ?
''', (status, china_time, session_id))
else:
cursor.execute('''
UPDATE detection_sessions SET
status = ?
WHERE id = ?
''', (status, session_id))
conn.commit()
logger.info(f'更新会话状态: {session_id} -> {status}')
return True
except Exception as e:
conn.rollback()
logger.error(f'更新会话状态失败: {e}')
return False
def update_session_duration(self, session_id: str, duration: int):
"""更新会话持续时间"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET duration = ?
WHERE id = ?
''', (duration, session_id))
conn.commit()
logger.info(f'更新会话持续时间: {session_id} -> {duration}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话持续时间失败: {e}')
raise
def update_session_normal_video_path(self, session_id: str, video_path: str):
"""更新会话足部检测视频路径"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET normal_video_path = ?
WHERE id = ?
''', (video_path, session_id))
conn.commit()
logger.info(f'更新会话足部检测视频路径: {session_id} -> {video_path}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话足部检测视频路径失败: {e}')
raise
def update_session_femtobolt_video_path(self, session_id: str, video_path: str):
"""更新会话深度相机视频路径"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET femtobolt_video_path = ?
WHERE id = ?
''', (video_path, session_id))
conn.commit()
logger.info(f'更新会话深度相机视频路径: {session_id} -> {video_path}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话深度相机视频路径失败: {e}')
raise
def update_session_screen_video_path(self, session_id: str, video_path: str):
"""更新会话屏幕录制视频路径"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET screen_video_path = ?
WHERE id = ?
''', (video_path, session_id))
conn.commit()
logger.info(f'更新会话屏幕录制视频路径: {session_id} -> {video_path}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话屏幕录制视频路径失败: {e}')
raise
def update_session_diagnosis_info(self, session_id: str, diagnosis_info: str):
"""更新会话诊断信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET diagnosis_info = ?
WHERE id = ?
''', (diagnosis_info, session_id))
conn.commit()
logger.info(f'更新会话诊断信息: {session_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话诊断信息失败: {e}')
raise
def update_session_treatment_info(self, session_id: str, treatment_info: str):
"""更新会话处理信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET treatment_info = ?
WHERE id = ?
''', (treatment_info, session_id))
conn.commit()
logger.info(f'更新会话处理信息: {session_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话处理信息失败: {e}')
raise
def update_session_suggestion_info(self, session_id: str, suggestion_info: str):
"""更新会话建议信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE detection_sessions SET suggestion_info = ?
WHERE id = ?
''', (suggestion_info, session_id))
conn.commit()
logger.info(f'更新会话建议信息: {session_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新会话建议信息失败: {e}')
raise
def update_session_all_info(self, session_id: str, diagnosis_info: str = None, treatment_info: str = None, suggestion_info: str = None, status: str = None):
"""同时更新会话的诊断信息、处理信息、建议信息和状态"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 构建动态SQL语句只更新非None的字段
update_fields = []
update_values = []
if diagnosis_info is not None:
update_fields.append('diagnosis_info = ?')
update_values.append(diagnosis_info)
if treatment_info is not None:
update_fields.append('treatment_info = ?')
update_values.append(treatment_info)
if suggestion_info is not None:
update_fields.append('suggestion_info = ?')
update_values.append(suggestion_info)
if status is not None:
update_fields.append('status = ?')
update_values.append(status)
# 如果状态是完成、停止或错误,同时更新结束时间
if status in ['completed', 'stopped', 'error']:
update_fields.append('end_time = ?')
update_values.append(self.get_china_time())
if not update_fields:
logger.warning(f'没有提供要更新的信息: {session_id}')
return
# 添加session_id到参数列表
update_values.append(session_id)
sql = f'''
UPDATE detection_sessions SET {', '.join(update_fields)}
WHERE id = ?
'''
cursor.execute(sql, update_values)
conn.commit()
updated_info = []
if diagnosis_info is not None:
updated_info.append('诊断信息')
if treatment_info is not None:
updated_info.append('处理信息')
if suggestion_info is not None:
updated_info.append('建议信息')
if status is not None:
updated_info.append(f'状态({status})')
logger.info(f'批量更新会话信息成功: {session_id}, 更新字段: {", ".join(updated_info)}')
except Exception as e:
conn.rollback()
logger.error(f'批量更新会话信息失败: {e}')
raise
def get_detection_sessions(self, page: int = 1, size: int = 10, patient_id: str = None) -> List[Dict]:
"""获取检测会话列表"""
conn = self.get_connection()
cursor = conn.cursor()
try:
offset = (page - 1) * size
if patient_id:
cursor.execute('''
SELECT s.*, p.name as patient_name, u.name as creator_name
FROM detection_sessions s
LEFT JOIN patients p ON s.patient_id = p.id
LEFT JOIN users u ON s.creator_id = u.id
WHERE s.patient_id = ?
ORDER BY s.start_time DESC
LIMIT ? OFFSET ?
''', (patient_id, size, offset))
else:
cursor.execute('''
SELECT s.*, p.name as patient_name, u.name as creator_name
FROM detection_sessions s
LEFT JOIN patients p ON s.patient_id = p.id
LEFT JOIN users u ON s.creator_id = u.id
ORDER BY s.start_time DESC
LIMIT ? OFFSET ?
''', (size, offset))
rows = cursor.fetchall()
sessions = []
for row in rows:
session = dict(row)
# 解析设置JSON
if session['settings']:
try:
session['settings'] = json.loads(session['settings'])
except:
session['settings'] = {}
sessions.append(session)
return sessions
except Exception as e:
logger.error(f'获取检测会话列表失败: {e}')
raise
def get_sessions_count(self, patient_id: str = None) -> int:
"""获取会话总数"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if patient_id:
cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE patient_id = ?', (patient_id,))
else:
cursor.execute('SELECT COUNT(*) FROM detection_sessions')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f'获取会话总数失败: {e}')
return 0
def get_session_data(self, session_id: str) -> Optional[Dict]:
"""获取会话详细数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 获取会话基本信息
cursor.execute('''
SELECT s.*, p.name as patient_name, u.name as creator_name
FROM detection_sessions s
LEFT JOIN patients p ON s.patient_id = p.id
LEFT JOIN users u ON s.creator_id = u.id
WHERE s.id = ?
''', (session_id,))
session_row = cursor.fetchone()
if not session_row:
return None
session = dict(session_row)
# 解析设置JSON
if session['settings']:
try:
session['settings'] = json.loads(session['settings'])
except:
session['settings'] = {}
# 获取检测数据
cursor.execute('''
SELECT * FROM detection_data
WHERE session_id = ?
ORDER BY timestamp
''', (session_id,))
data_rows = cursor.fetchall()
session['data'] = []
for row in data_rows:
data_point = dict(row)
# 解析数据JSON
try:
data_point['data_value'] = json.loads(data_point['data_value'])
except:
pass
session['data'].append(data_point)
return session
except Exception as e:
logger.error(f'获取会话数据失败: {e}')
return None
# ==================== 检测记录数据管理 ====================
def save_detection_data(self, session_id: str, data: Dict[str, Any]):
"""保存检测数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 使用中国时区时间
china_time = self.get_china_time()
# 生成检测数据记录ID
data_id = self.generate_detection_data_id()
# 根据新的表结构保存数据
cursor.execute('''
INSERT INTO detection_data (
id, session_id, head_pose, body_pose, body_image,
foot_data, foot_image, foot_data_image, screen_image, timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data_id,
session_id,
json.dumps(data.get('head_pose')) if data.get('head_pose') else None,
json.dumps(data.get('body_pose')) if data.get('body_pose') else None,
data.get('body_image'),
json.dumps(data.get('foot_data')) if data.get('foot_data') else None,
data.get('foot_image'),
data.get('foot_data_image'),
data.get('screen_image'),
china_time
))
conn.commit()
logger.info(f'保存检测数据: {data_id}')
except Exception as e:
conn.rollback()
logger.error(f'保存检测数据失败: {e}')
raise
def get_latest_detection_data(self, session_id: str, limit: int = 10) -> List[Dict]:
"""获取最新的检测数据"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM detection_data
WHERE session_id = ?
ORDER BY timestamp DESC
LIMIT ?
''', (session_id, limit))
rows = cursor.fetchall()
data_points = []
for row in rows:
data_point = dict(row)
# 解析JSON字段
try:
if data_point.get('head_pose'):
data_point['head_pose'] = json.loads(data_point['head_pose'])
except:
pass
try:
if data_point.get('body_pose'):
data_point['body_pose'] = json.loads(data_point['body_pose'])
except:
pass
try:
if data_point.get('foot_data'):
data_point['foot_data'] = json.loads(data_point['foot_data'])
except:
pass
data_points.append(data_point)
return data_points
except Exception as e:
logger.error(f'获取最新检测数据失败: {e}')
return []
def get_detection_data_by_id(self, data_id: str) -> Optional[Dict]:
"""根据主键ID查询检测数据详情"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM detection_data
WHERE id = ?
''', (data_id,))
row = cursor.fetchone()
if not row:
return None
data_point = dict(row)
# 解析JSON字段
try:
if data_point.get('head_pose'):
data_point['head_pose'] = json.loads(data_point['head_pose'])
except:
pass
try:
if data_point.get('body_pose'):
data_point['body_pose'] = json.loads(data_point['body_pose'])
except:
pass
try:
if data_point.get('foot_data'):
data_point['foot_data'] = json.loads(data_point['foot_data'])
except:
pass
return data_point
except Exception as e:
logger.error(f'根据ID获取检测数据失败: {e}')
return None
def delete_detection_data(self, data_id: str):
"""按主键删除检测数据记录"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 验证记录是否存在
cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,))
if cursor.fetchone()[0] == 0:
raise ValueError(f'检测数据记录不存在: {data_id}')
# 删除检测数据记录
cursor.execute('DELETE FROM detection_data WHERE id = ?', (data_id,))
conn.commit()
logger.info(f'删除检测数据记录: {data_id}')
except Exception as e:
conn.rollback()
logger.error(f'删除检测数据记录失败: {e}')
raise
# ==================== 系统设置管理 ====================
def get_setting(self, key: str, default_value: Any = None) -> Any:
"""获取系统设置"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT value FROM system_settings WHERE key = ?', (key,))
row = cursor.fetchone()
if row:
try:
return json.loads(row['value'])
except:
return row['value']
return default_value
except Exception as e:
logger.error(f'获取系统设置失败: {e}')
return default_value
def set_setting(self, key: str, value: Any, description: str = ''):
"""设置系统设置"""
conn = self.get_connection()
cursor = conn.cursor()
try:
value_str = json.dumps(value) if not isinstance(value, str) else value
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
INSERT OR REPLACE INTO system_settings (key, value, description, updated_at)
VALUES (?, ?, ?, ?)
''', (key, value_str, description, china_time))
conn.commit()
logger.info(f'设置系统设置: {key}')
except Exception as e:
conn.rollback()
logger.error(f'设置系统设置失败: {e}')
raise
# ==================== 用户管理 ====================
def register_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""用户注册"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 检查手机号是否已存在(如果提供了手机号)
if user_data.get('phone'):
cursor.execute('SELECT COUNT(*) FROM users WHERE phone = ?', (user_data['phone'],))
if cursor.fetchone()[0] > 0:
return {
'success': False,
'message': '手机号已存在'
}
user_id = self.generate_user_id()
# 密码明文存储
password = user_data['password']
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
INSERT INTO users (id, name, username, password, phone, is_active, user_type, register_date, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
user_id,
user_data['name'],
user_data['username'],
password,
user_data.get('phone'), # 手机号可选
1, # 新注册用户默认激活
'user',
china_time,
china_time,
china_time
))
conn.commit()
logger.info(f'用户注册成功: {user_data["username"]}')
return {
'success': True,
'user_id': user_id,
'message': '注册成功'
}
except sqlite3.IntegrityError:
conn.rollback()
logger.error(f'用户名已存在: {user_data["username"]}')
return {
'success': False,
'message': '用户名已存在'
}
except Exception as e:
conn.rollback()
logger.error(f'用户注册失败: {e}')
return {
'success': False,
'message': '注册失败'
}
def authenticate_user(self, username: str, password: str) -> Optional[Dict]:
"""用户登录验证"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM users
WHERE username = ? AND password = ? AND is_active = 1
''', (username, password))
row = cursor.fetchone()
if row:
user = dict(row)
# 不返回密码
del user['password']
logger.info(f'用户登录成功: {username}')
return user
else:
logger.warning(f'用户登录失败: {username}')
return None
except Exception as e:
logger.error(f'用户验证失败: {e}')
return None
def get_user_by_phone(self, phone: str) -> Optional[Dict]:
"""根据手机号查询用户"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM users WHERE phone = ?', (phone,))
row = cursor.fetchone()
if row:
user = dict(row)
# 不返回密码
del user['password']
return user
return None
except Exception as e:
logger.error(f'根据手机号查询用户失败: {e}')
return None
def get_users(self, page: int = 1, size: int = 10, status: str = 'all') -> List[Dict]:
"""获取用户列表"""
conn = self.get_connection()
cursor = conn.cursor()
try:
offset = (page - 1) * size
if status == 'pending':
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at
FROM users
WHERE is_active = 0 AND user_type = 'user'
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
elif status == 'active':
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at
FROM users
WHERE is_active = 1
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
else:
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at
FROM users
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (size, offset))
rows = cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
logger.error(f'获取用户列表失败: {e}')
return []
def get_users_count(self, status: str = 'all') -> int:
"""获取用户总数"""
conn = self.get_connection()
cursor = conn.cursor()
try:
if status == 'pending':
cursor.execute('SELECT COUNT(*) FROM users WHERE is_active = 0 AND user_type = "user"')
elif status == 'active':
cursor.execute('SELECT COUNT(*) FROM users WHERE is_active = 1')
else:
cursor.execute('SELECT COUNT(*) FROM users')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f'获取用户总数失败: {e}')
return 0
def approve_user(self, user_id: str, approved: bool = True):
"""审核用户"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 使用中国时区时间
china_time = self.get_china_time()
cursor.execute('''
UPDATE users SET
is_active = ?,
updated_at = ?
WHERE id = ?
''', (1 if approved else 0, china_time, user_id))
conn.commit()
status = '通过' if approved else '拒绝'
logger.info(f'用户审核{status}: {user_id}')
except Exception as e:
conn.rollback()
logger.error(f'用户审核失败: {e}')
raise
def get_user(self, user_id: str) -> Optional[Dict]:
"""获取单个用户信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, username, phone, register_date, is_active, user_type, created_at, updated_at
FROM users WHERE id = ?
''', (user_id,))
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f'获取用户信息失败: {e}')
return None
def update_user(self, user_id: str, user_data: Dict[str, Any]):
"""更新用户信息"""
conn = self.get_connection()
cursor = conn.cursor()
try:
# 密码明文存储,无需加密处理
# 构建更新语句
fields = []
values = []
for key, value in user_data.items():
if key in ['name', 'username', 'password', 'is_active', 'user_type','phone']:
fields.append(f'{key} = ?')
values.append(value)
if fields:
# 使用中国时区时间
china_time = self.get_china_time()
fields.append('updated_at = ?')
values.append(china_time)
values.append(user_id)
sql = f"UPDATE users SET {', '.join(fields)} WHERE id = ?"
cursor.execute(sql, values)
conn.commit()
logger.info(f'更新用户信息: {user_id}')
except Exception as e:
conn.rollback()
logger.error(f'更新用户信息失败: {e}')
raise
def delete_user(self, user_id: str):
"""删除用户"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
conn.commit()
logger.info(f'删除用户: {user_id}')
except Exception as e:
conn.rollback()
logger.error(f'删除用户失败: {e}')
raise
def get_system_setting(self, key: str, default_value: str = None) -> Optional[str]:
"""获取系统设置值"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT value FROM system_settings WHERE key = ?', (key,))
result = cursor.fetchone()
if result:
return result[0]
else:
logger.warning(f'系统设置项不存在: {key}')
return default_value
except Exception as e:
logger.error(f'获取系统设置失败: {e}')
return default_value
def set_system_setting(self, key: str, value: str, description: str = None):
"""设置系统设置值"""
conn = self.get_connection()
cursor = conn.cursor()
try:
china_time = self.get_china_time()
# 检查设置项是否存在
cursor.execute('SELECT COUNT(*) FROM system_settings WHERE key = ?', (key,))
exists = cursor.fetchone()[0]
if exists:
# 更新现有设置
cursor.execute('''
UPDATE system_settings SET value = ?, updated_at = ?
WHERE key = ?
''', (value, china_time, key))
else:
# 插入新设置
cursor.execute('''
INSERT INTO system_settings (key, value, description, updated_at)
VALUES (?, ?, ?, ?)
''', (key, value, description, china_time))
conn.commit()
logger.info(f'设置系统配置: {key} = {value}')
except Exception as e:
conn.rollback()
logger.error(f'设置系统配置失败: {e}')
raise
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
logger.info('数据库连接已关闭')