提交代码
This commit is contained in:
parent
ff1e18aee9
commit
a3f1191e7d
@ -2,6 +2,7 @@ package com.yfd.platform.component;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Component
|
||||
@ -14,6 +15,12 @@ public class TaskStatusHolder {
|
||||
return taskId + ":" + nodeId;
|
||||
}
|
||||
|
||||
public String generateKeybyId(List<String> dataset) {
|
||||
return dataset + ":" ;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// 原子性检查并标记任务开始
|
||||
public boolean startTaskIfAbsent(String key) {
|
||||
return taskStatusMap.putIfAbsent(key, "IN_PROGRESS") == null;
|
||||
|
@ -324,6 +324,30 @@ public class TsFilesController {
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地和minio的文件差异
|
||||
* 参数说明 taskId 节点ID
|
||||
* 参数说明 nodeId 任务ID
|
||||
* 参数说明 id 文件id
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult 返回成功或者失败
|
||||
***********************************/
|
||||
@Log(module = "实验数据管理", value = "对比本地和minio的文件差异返回集合!")
|
||||
@PostMapping("/compareMd5List")
|
||||
@ApiOperation("对比本地和minio的文件差异返回集合")
|
||||
public ResponseResult compareMd5List(String id, String nodeId, String taskId) {
|
||||
try {
|
||||
List<String> dataset = new ArrayList<>();
|
||||
if (StrUtil.isNotEmpty(id)) {
|
||||
String[] splitIds = id.split(",");
|
||||
// 数组转集合
|
||||
dataset = Arrays.asList(splitIds);
|
||||
}
|
||||
return ResponseResult.successData(tsFilesService.compareMd5List(dataset, nodeId, taskId));
|
||||
} catch (Exception e) {
|
||||
return ResponseResult.error("对比失败");
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地有minio没有的文件差异
|
||||
* 参数说明 taskId 节点ID
|
||||
@ -353,6 +377,31 @@ public class TsFilesController {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地有minio没有的文件差异
|
||||
* 参数说明 taskId 节点ID
|
||||
* 参数说明 nodeId 任务ID
|
||||
* 参数说明 id 文件id
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult 返回成功或者失败
|
||||
***********************************/
|
||||
@Log(module = "实验数据管理", value = "对比本地有minio没有的文件差异返回集合!")
|
||||
@PostMapping("/compareLocalList")
|
||||
@ApiOperation("对比本地有minio没有的文件差异返回集合")
|
||||
public ResponseResult compareLocalList(String id, String nodeId, String taskId) {
|
||||
try {
|
||||
List<String> dataset = new ArrayList<>();
|
||||
if (StrUtil.isNotEmpty(id)) {
|
||||
String[] splitIds = id.split(",");
|
||||
// 数组转集合
|
||||
dataset = Arrays.asList(splitIds);
|
||||
}
|
||||
return ResponseResult.successData(tsFilesService.compareLocalList(dataset, nodeId, taskId));
|
||||
} catch (Exception e) {
|
||||
return ResponseResult.error("对比失败");
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地没有minio有的文件差异
|
||||
* 参数说明 taskId 节点ID
|
||||
@ -372,11 +421,33 @@ public class TsFilesController {
|
||||
dataset = Arrays.asList(splitIds);
|
||||
}
|
||||
Page<TsFiles> tsfilesPage = tsFilesService.compareMinio(dataset, nodeId, taskId, page);
|
||||
|
||||
//DualTreeResponse response = tsFilesService.compareMinio(dataset, nodeId, taskId);
|
||||
return ResponseResult.successData(tsfilesPage);
|
||||
// List<TsFilesDTO> dtos = tsFilesService.compareMinio(dataset, nodeId, taskId);
|
||||
// return ResponseResult.successData(dtos);
|
||||
} catch (Exception e) {
|
||||
return ResponseResult.error("对比失败");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地没有minio有的文件差异
|
||||
* 参数说明 taskId 节点ID
|
||||
* 参数说明 nodeId 任务ID
|
||||
* 参数说明 id 文件id
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult 返回成功或者失败
|
||||
***********************************/
|
||||
@Log(module = "实验数据管理", value = "对比本地没有minio有的文件差异返回集合!")
|
||||
@PostMapping("/compareMinioList")
|
||||
@ApiOperation("对比本地没有minio有的文件差异返回集合")
|
||||
public ResponseResult compareMinioList(String id, String nodeId, String taskId) {
|
||||
try {
|
||||
List<String> dataset = new ArrayList<>();
|
||||
if (StrUtil.isNotEmpty(id)) {
|
||||
String[] splitIds = id.split(",");
|
||||
// 数组转集合
|
||||
dataset = Arrays.asList(splitIds);
|
||||
}
|
||||
return ResponseResult.successData(tsFilesService.compareMinioList(dataset, nodeId, taskId));
|
||||
|
||||
} catch (Exception e) {
|
||||
return ResponseResult.error("对比失败");
|
||||
}
|
||||
@ -466,7 +537,7 @@ public class TsFilesController {
|
||||
***********************************/
|
||||
@Log(module = "实验数据管理", value = "文件自动备份!")
|
||||
@PostMapping("/automaticFileBackup")
|
||||
@ApiOperation("自动备份本地文件到备份空间")
|
||||
@ApiOperation("自动备份本地文件到备份空间通过节点和任务")
|
||||
public ResponseResult automaticFileBackup(String taskId, String nodeId) throws IOException {
|
||||
|
||||
|
||||
@ -492,7 +563,41 @@ public class TsFilesController {
|
||||
} else {
|
||||
return ResponseResult.success("任务已由其他请求启动");
|
||||
}
|
||||
// return ResponseResult.success(tsFilesService.automaticFileBackup(taskId, nodeId));
|
||||
}
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 文件自动备份通过ID
|
||||
* 参数说明 taskId 节点ID
|
||||
* 参数说明 nodeId 任务ID
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult 返回成功或者失败
|
||||
***********************************/
|
||||
@Log(module = "实验数据管理", value = "文件自动备份!")
|
||||
@PostMapping("/automaticFileBackupByIds")
|
||||
@ApiOperation("自动备份本地文件到备份空间通过ID")
|
||||
public ResponseResult automaticFileBackupByIds(String id) throws IOException {
|
||||
if (StrUtil.isEmpty(id) ) {
|
||||
return ResponseResult.error("参数为空");
|
||||
}
|
||||
List<String> dataset = StrUtil.split(id, ",");
|
||||
|
||||
// 生成唯一Key
|
||||
String asyncKey = taskStatusHolder.generateKeybyId(dataset);
|
||||
|
||||
// 检查任务是否已存在
|
||||
String existingStatus = taskStatusHolder.getStatus(asyncKey);
|
||||
if ("IN_PROGRESS".equals(existingStatus)) {
|
||||
return ResponseResult.success("任务正在处理中!");
|
||||
} else if ("COMPLETED".equals(existingStatus)) {
|
||||
return ResponseResult.success("任务已完成!");
|
||||
}
|
||||
// 原子性启动新任务
|
||||
if (taskStatusHolder.startTaskIfAbsent(asyncKey)) {
|
||||
// 直接异步执行并推送结果
|
||||
tsFilesService.automaticFileBackupAsyncByIds(dataset);
|
||||
return ResponseResult.success("任务开始处理!");
|
||||
} else {
|
||||
return ResponseResult.success("任务已由其他请求启动");
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************
|
||||
|
@ -237,4 +237,12 @@ public interface ITsFilesService extends IService<TsFiles> {
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult
|
||||
***********************************/
|
||||
void stopSimpleNavi(String token);
|
||||
|
||||
List<TsFiles> compareLocalList(List<String> dataset, String nodeId, String taskId);
|
||||
|
||||
Object compareMinioList(List<String> dataset, String nodeId, String taskId);
|
||||
|
||||
Object compareMd5List(List<String> dataset, String nodeId, String taskId);
|
||||
|
||||
void automaticFileBackupAsyncByIds(List<String> dataset);
|
||||
}
|
||||
|
@ -3,13 +3,10 @@ package com.yfd.platform.modules.experimentalData.service.impl;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
|
||||
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.*;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.nio.file.attribute.PosixFilePermission;
|
||||
import java.nio.file.attribute.PosixFilePermissions;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
@ -2566,6 +2563,48 @@ public class TsFilesServiceImpl extends ServiceImpl<TsFilesMapper, TsFiles> impl
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<TsFiles> compareLocalList(List<String> dataset, String nodeId, String taskId) {
|
||||
|
||||
// ==================== 1. 构建查询条件 ====================
|
||||
QueryWrapper<TsFiles> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.select("id", "node_id", "task_id", "is_file", "parent_id", "file_name",
|
||||
"file_size", "work_path", "backup_path", "upload_time"); // 移除无用字段
|
||||
|
||||
if (StringUtils.isNoneEmpty(nodeId, taskId)) {
|
||||
queryWrapper.eq("node_id", nodeId)
|
||||
.eq("task_id", taskId)
|
||||
.isNotNull("work_path")
|
||||
.ne("work_path", "")
|
||||
.and(wq -> wq.isNull("backup_path").or().eq("backup_path", ""));
|
||||
} else {
|
||||
if (CollectionUtils.isEmpty(dataset)) {
|
||||
throw new IllegalArgumentException("dataset参数不可为空"); // 优化点9:提前校验参数
|
||||
}
|
||||
queryWrapper.in("id", dataset);
|
||||
}
|
||||
List<TsFiles> records = tsFilesMapper.selectList(queryWrapper);
|
||||
if (records == null) {
|
||||
return records;
|
||||
}
|
||||
|
||||
// 递归查询每个记录的子节点,并添加到 records 中
|
||||
List<TsFiles> allFiles = new ArrayList<>();
|
||||
for (TsFiles tsFiles : records) {
|
||||
tsFiles.setWorkPath(processingPath(tsFiles.getWorkPath(), tsFiles.getNodeId())); // 优化点11:路径处理内聚
|
||||
// 如果备份路径为空 增加 将当前节点加入结果列表
|
||||
if (StringUtils.isEmpty(tsFiles.getBackupPath())) {
|
||||
allFiles.add(tsFiles);
|
||||
|
||||
}
|
||||
// 查询该节点的所有子节点并递归添加
|
||||
if ("FOLDER".equals(tsFiles.getIsFile())) {
|
||||
List<TsFiles> childFiles = getChildFilesRecursiveLocal(tsFiles.getId(), allFiles);
|
||||
}
|
||||
}
|
||||
return allFiles;
|
||||
}
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地没有但MinIO有的文件差异(优化版)
|
||||
* 优化点:
|
||||
@ -2655,6 +2694,50 @@ public class TsFilesServiceImpl extends ServiceImpl<TsFilesMapper, TsFiles> impl
|
||||
return allFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TsFiles> compareMinioList(List<String> dataset, String nodeId, String taskId) {
|
||||
// ==================== 1. 构建查询条件 ====================
|
||||
QueryWrapper<TsFiles> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.select("id", "node_id", "task_id", "is_file", "parent_id", "file_name",
|
||||
"file_size", "work_path", "backup_path", "upload_time");
|
||||
|
||||
|
||||
if (StringUtils.isNoneEmpty(nodeId, taskId)) {
|
||||
// 场景1:根据 nodeId + taskId 查询(确保数据库有联合索引)
|
||||
queryWrapper.eq("node_id", nodeId)
|
||||
.eq("task_id", taskId)
|
||||
.isNotNull("backup_path")
|
||||
.ne("backup_path", "")
|
||||
.and(wq -> wq.isNull("work_path").or().eq("work_path", ""));
|
||||
} else {
|
||||
// 场景2:根据 id 列表查询(id字段需有索引)
|
||||
queryWrapper.in("id", dataset);
|
||||
// .isNotNull("backup_path")
|
||||
// .ne("backup_path", "")
|
||||
// .and(wq -> wq.isNull("work_path").or().eq("work_path", ""));
|
||||
}
|
||||
List<TsFiles> records = tsFilesMapper.selectList(queryWrapper);
|
||||
if (records == null) {
|
||||
return records;
|
||||
|
||||
}
|
||||
// 递归查询每个记录的子节点,并添加到 records 中
|
||||
List<TsFiles> allFiles = new ArrayList<>();
|
||||
for (TsFiles tsFiles : records) {
|
||||
tsFiles.setBackupPath(processingPath(tsFiles.getBackupPath(), tsFiles.getNodeId()));
|
||||
// 如果工作路径为空 增加 将当前节点加入结果列表
|
||||
if (StringUtils.isEmpty(tsFiles.getWorkPath())) {
|
||||
allFiles.add(tsFiles);
|
||||
|
||||
}
|
||||
// 查询该节点的所有子节点并递归添加
|
||||
if ("FOLDER".equals(tsFiles.getIsFile())) {
|
||||
List<TsFiles> childFiles = getChildFilesRecursiveMinio(tsFiles.getId(), allFiles);
|
||||
}
|
||||
}
|
||||
return allFiles;
|
||||
}
|
||||
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 对比本地和minio的文件差异
|
||||
@ -2740,6 +2823,68 @@ public class TsFilesServiceImpl extends ServiceImpl<TsFilesMapper, TsFiles> impl
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public List<TsFiles> compareMd5List(List<String> dataset, String nodeId, String taskId) {
|
||||
// 获取本地文件路径根目录和存储空间名称
|
||||
StorageSourceConfig filePathConfig = getStorageConfig("filePath");
|
||||
StorageSourceConfig bucketConfig = getStorageConfig("bucketName");
|
||||
|
||||
// ================ 1. 执行原始分页查询 ================
|
||||
QueryWrapper<TsFiles> queryWrapper = buildQueryWrapper(dataset, nodeId, taskId);
|
||||
List<TsFiles> records = tsFilesMapper.selectList(queryWrapper);
|
||||
|
||||
if (StringUtils.isEmpty(nodeId) || StringUtils.isEmpty(taskId)) {
|
||||
if (records == null) {
|
||||
return records;
|
||||
}
|
||||
|
||||
// 递归查询每个记录的子节点,并添加到 records 中
|
||||
List<TsFiles> allFiles = new ArrayList<>();
|
||||
for (TsFiles tsFiles : records) {
|
||||
// 将当前节点加入结果列表
|
||||
allFiles.add(tsFiles);
|
||||
// 查询该节点的所有子节点并递归添加
|
||||
if ("FOLDER".equals(tsFiles.getIsFile())) {
|
||||
List<TsFiles> childFiles = getChildFilesRecursiveMd5(tsFiles.getId(), allFiles);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ================ 2. 过滤并处理符合条件的记录 ================
|
||||
List<TsFiles> filteredRecords = records.stream()
|
||||
.filter(tsFile -> {
|
||||
try {
|
||||
// 计算本地文件MD5
|
||||
File localFile = new File(filePathConfig.getValue() + tsFile.getWorkPath(), tsFile.getFileName());
|
||||
String localMD5 = calculateMD5Data(new FileInputStream(localFile));
|
||||
|
||||
// 计算MinIO文件MD5
|
||||
|
||||
String minioMD5 = getMinioMD5Data(bucketConfig.getValue(), tsFile.getBackupPath(), tsFile.getFileName());
|
||||
|
||||
// 路径处理
|
||||
tsFile.setWorkPath(processingPath(tsFile.getWorkPath(), tsFile.getNodeId()));
|
||||
tsFile.setBackupPath(processingPath(tsFile.getBackupPath(), tsFile.getNodeId()));
|
||||
|
||||
// 设置MD5字段(即使不满足条件也保留字段)
|
||||
tsFile.setLocatMd5(localMD5);
|
||||
tsFile.setMinioMd5(minioMD5);
|
||||
|
||||
// 返回是否满足过滤条件
|
||||
return StringUtils.isNoneEmpty(localMD5, minioMD5) && !localMD5.equals(minioMD5);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("MD5计算失败: {}", tsFile.getFileName(), e);
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return filteredRecords;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// 递归查询所有子节点
|
||||
private List<TsFiles> getChildFilesRecursiveMd5(String parentId, List<TsFiles> allFiles) {
|
||||
// 构建查询条件,获取当前parentId的所有子节点
|
||||
@ -3073,11 +3218,25 @@ public class TsFilesServiceImpl extends ServiceImpl<TsFilesMapper, TsFiles> impl
|
||||
taskStatusHolder.finishTask(asyncKey);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
@Async("asyncExecutor")
|
||||
public void automaticFileBackupAsyncByIds(List<String> dataset) {
|
||||
try {
|
||||
// 执行实际备份逻辑
|
||||
this.automaticFileBackupByIds(dataset);
|
||||
} finally {
|
||||
// 生成唯一Key
|
||||
String asyncKey = taskStatusHolder.generateKeybyId(dataset);
|
||||
// 无论成功失败都标记完成
|
||||
taskStatusHolder.finishTask(asyncKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 文件自动备份
|
||||
* 用途说明: 文件自动备份通过节点和任务
|
||||
* 参数说明 taskId 节点ID
|
||||
* 参数说明 nodeId 任务ID
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult 返回成功或者失败
|
||||
@ -3134,6 +3293,73 @@ public class TsFilesServiceImpl extends ServiceImpl<TsFilesMapper, TsFiles> impl
|
||||
return "本地有新增文件 " + FileCount + " 个, 新增文件夹 " + FolderCount + " 个, 已将 " + BackupsFileCount + " 个文件, " + BackupsFolderCount + " 个文件夹备份成功";
|
||||
}
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 文件自动备份通过ID
|
||||
* 参数说明 taskId 节点ID
|
||||
* 参数说明 nodeId 任务ID
|
||||
* 返回值说明: com.yfd.platform.config.ResponseResult 返回成功或者失败
|
||||
***********************************/
|
||||
public String automaticFileBackupByIds(List<String> dataset) {
|
||||
|
||||
List<TsFiles> allFiles = new ArrayList<>();
|
||||
QueryWrapper<TsFiles> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.select("id", "node_id", "task_id", "is_file", "parent_id", "file_name",
|
||||
"file_size", "work_path", "backup_path", "upload_time"); // 移除无用字段
|
||||
queryWrapper.in("id", dataset);
|
||||
List<TsFiles> records = tsFilesMapper.selectList(queryWrapper);
|
||||
for (TsFiles tsFiles : records) {
|
||||
// 如果备份路径为空 增加 将当前节点加入结果列表
|
||||
if (StringUtils.isEmpty(tsFiles.getBackupPath())) {
|
||||
allFiles.add(tsFiles);
|
||||
}
|
||||
// 查询该节点的所有子节点并递归添加
|
||||
if ("FOLDER".equals(tsFiles.getIsFile())) {
|
||||
List<TsFiles> childFiles = getChildFilesRecursiveLocal(tsFiles.getId(), allFiles);
|
||||
}
|
||||
}
|
||||
int FileCount = 0, FolderCount = 0;
|
||||
int BackupsFileCount = 0, BackupsFolderCount = 0;
|
||||
|
||||
if (allFiles.isEmpty()) {
|
||||
return "本地有新增文件 " + FileCount + " 个, 文件夹 " + FolderCount + " 个, 已将 " + BackupsFileCount + " 个文件, " + BackupsFolderCount + " 个文件夹备份成功";
|
||||
}
|
||||
|
||||
for (TsFiles tsFiles : allFiles) {
|
||||
ParameterList parameterList = new ParameterList();
|
||||
Parameter fileParameter = new Parameter();
|
||||
Parameter FolderParameter = new Parameter();
|
||||
List<ParameterList> fileParameterlist = new ArrayList<>();
|
||||
List<ParameterList> FolderParameterlist = new ArrayList<>();
|
||||
|
||||
if ("FILE".equals(tsFiles.getIsFile())) {
|
||||
parameterList.setName(tsFiles.getFileName());
|
||||
parameterList.setPath(tsFiles.getWorkPath());
|
||||
parameterList.setSize(tsFiles.getFileSize());
|
||||
parameterList.setType(tsFiles.getIsFile());
|
||||
fileParameterlist.add(parameterList);
|
||||
fileParameter.setParameterLists(fileParameterlist);
|
||||
Boolean value = uploadToBackup(fileParameter);
|
||||
if (value) {
|
||||
BackupsFileCount++;
|
||||
}
|
||||
FileCount++;
|
||||
} else {
|
||||
parameterList.setName(tsFiles.getFileName());
|
||||
parameterList.setPath(tsFiles.getWorkPath());
|
||||
parameterList.setSize(tsFiles.getFileSize());
|
||||
parameterList.setType(tsFiles.getIsFile());
|
||||
FolderParameterlist.add(parameterList);
|
||||
FolderParameter.setParameterLists(FolderParameterlist);
|
||||
Boolean value = uploadToBackup(FolderParameter);
|
||||
if (value) {
|
||||
BackupsFolderCount++;
|
||||
}
|
||||
FolderCount++;
|
||||
}
|
||||
}
|
||||
return "本地有新增文件 " + FileCount + " 个, 新增文件夹 " + FolderCount + " 个, 已将 " + BackupsFileCount + " 个文件, " + BackupsFolderCount + " 个文件夹备份成功";
|
||||
}
|
||||
|
||||
|
||||
/**********************************
|
||||
* 用途说明: 从备份空间下载到工作空间
|
||||
@ -4605,6 +4831,8 @@ public class TsFilesServiceImpl extends ServiceImpl<TsFilesMapper, TsFiles> impl
|
||||
ServerSendEventServer.removeUser(token);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user