辅助控制线程使用优化

This commit is contained in:
weitang 2025-04-24 09:32:24 +08:00
parent 1248e5543b
commit 9ba1be252f
8 changed files with 184 additions and 109 deletions

View File

@ -17,7 +17,7 @@ package com.yfd.platform.component.iec104.client;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yfd.platform.component.iec104.config.Iec104Config;
import com.yfd.platform.component.iec104.core.CachedThreadPool;
import com.yfd.platform.component.iec104.core.OptimizedThreadPool;
import com.yfd.platform.component.iec104.server.Iec104Master;
import com.yfd.platform.component.iec104.server.Iec104MasterFactory;
import com.yfd.platform.modules.auxcontrol.domain.GatewayDevice;
@ -70,7 +70,8 @@ public class IEC104ClientRunner implements ApplicationRunner {
iec104Config.setTerminnalAddress(terminnalAddress);
iec104Config.setSlaveCode(gatewayDevice.getDeviceCode());
iec104Config.setSlaveIP(gatewayDevice.getIpAddr());
Runnable runnable = () -> {
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
Iec104Master iec104server = Iec104MasterFactory.createTcpClientMaster(iec104Config.getSlaveIP(),
2404);
try {
@ -79,8 +80,8 @@ public class IEC104ClientRunner implements ApplicationRunner {
} catch (Exception e) {
log.error(String.format("%s-对应的终端连接失败!", iec104Config.getSlaveIP()));
}
};
CachedThreadPool.getCachedThreadPool().execute(runnable);
});
threadPool.shutdown();
}
}

View File

@ -1,26 +0,0 @@
package com.yfd.platform.component.iec104.core;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池
*/
public final class CachedThreadPool {
private static CachedThreadPool cachedThreadPool = new CachedThreadPool();
private ExecutorService executorService;
private CachedThreadPool() {
executorService = Executors.newCachedThreadPool();
}
public static CachedThreadPool getCachedThreadPool() {
return cachedThreadPool;
}
public void execute(Runnable runnable) {
executorService.execute(runnable);
}
}

View File

@ -1,8 +1,11 @@
package com.yfd.platform.component.iec104.core;
import com.yfd.platform.component.iec104.client.MasterSysDataHandler;
import com.yfd.platform.component.iec104.common.Iec104Constant;
import com.yfd.platform.component.iec104.message.MessageDetail;
import com.ydl.iec.util.Iec104Util;
import com.yfd.platform.component.iec104.server.Iec104Master;
import com.yfd.platform.component.iec104.server.Iec104MasterFactory;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,7 +62,8 @@ public class ControlManageUtil {
* 启动S发送S确认帧 的任务
*/
public void startSendFrameTask() {
Runnable runnable = () -> {
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
while (true) {
try {
synchronized (sendSframeLock) {
@ -76,8 +80,9 @@ public class ControlManageUtil {
LOGGER.error("Exception caught", e);
}
}
};
CachedThreadPool.getCachedThreadPool().execute(runnable);
});
threadPool.shutdown();
}

View File

@ -0,0 +1,125 @@
package com.yfd.platform.component.iec104.core;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 优化后的线程池管理类
* 说明使用可控的自定义线程池替代无界的CachedThreadPool
*/
public final class OptimizedThreadPool {
// 使用静态内部类实现线程安全的单例模式
private static class InstanceHolder {
private static final OptimizedThreadPool INSTANCE = new OptimizedThreadPool();
}
private final ThreadPoolExecutor executorService;
// 私有构造函数防止外部实例化
private OptimizedThreadPool() {
// 获取处理器核心数动态适配不同机器
int corePoolSize = Runtime.getRuntime().availableProcessors();
// 配置线程池参数生产环境建议参数可配置化
this.executorService = new ThreadPoolExecutor(
// 核心线程数建议等于CPU核心数
corePoolSize,
// 最大线程数建议2-4倍核心数
corePoolSize * 2,
// 空闲线程存活时间
60L,
// 时间单位
TimeUnit.SECONDS,
// 有界任务队列防止OOM
new LinkedBlockingQueue<>(1000),
// 自定义线程工厂便于问题排查
new CustomThreadFactory(),
// 拒绝策略主线程执行
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* 获取线程池单例实例线程安全
*/
public static OptimizedThreadPool getInstance() {
return InstanceHolder.INSTANCE;
}
/**
* 执行异步任务增强版
*/
public void execute(Runnable task) {
// 添加基础校验生产环境可扩展
if (task == null) {
throw new IllegalArgumentException("Task cannot be null");
}
// 添加异常处理包装
executorService.execute(wrapTask(task));
}
/**
* 任务包装器统一异常处理
*/
private Runnable wrapTask(Runnable task) {
return () -> {
try {
task.run();
} catch (Exception e) {
// 统一的异常处理建议接入日志系统
System.err.println("Task execution failed: " + e.getMessage());
e.printStackTrace();
}
};
}
/**
* 优雅关闭线程池重要必须调用
*/
public void shutdown() {
executorService.shutdown();
try {
// 等待任务完成可配置超时时间
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 获取线程池状态监控用
*/
public String getPoolStatus() {
return String.format(
"Pool Status: [Active: %d, Core: %d, Max: %d, Queue: %d/%d]",
executorService.getActiveCount(),
executorService.getCorePoolSize(),
executorService.getMaximumPoolSize(),
executorService.getQueue().size(),
executorService.getQueue().remainingCapacity()
);
}
/**
* 自定义线程工厂命名规范化
*/
private static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadCounter = new AtomicInteger(1);
private static final String THREAD_NAME_PREFIX = "optimized-pool-thread-";
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, THREAD_NAME_PREFIX + threadCounter.getAndIncrement());
thread.setDaemon(false); // 非守护线程
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
}

View File

@ -1,8 +1,11 @@
package com.yfd.platform.component.iec104.core;
import cn.hutool.core.util.ObjUtil;
import com.yfd.platform.component.iec104.client.MasterSysDataHandler;
import com.yfd.platform.component.iec104.common.BasicInstruction104;
import com.yfd.platform.component.iec104.message.MessageDetail;
import com.yfd.platform.component.iec104.server.Iec104Master;
import com.yfd.platform.component.iec104.server.Iec104MasterFactory;
import com.yfd.platform.component.iec104.server.master.BootNettyClientChannel;
import com.yfd.platform.component.iec104.server.master.BootNettyClientChannelCache;
import io.netty.channel.ChannelHandlerContext;
@ -71,12 +74,13 @@ public class ScheduledTaskPool {
* @Description: 发送测试帧
*/
private void sendTestFrame() {
Runnable runnable = () -> {
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
try {
BootNettyClientChannel Channel= BootNettyClientChannelCache.get(ctx.channel().id().asShortText());
if(ObjUtil.isNotEmpty(Channel)){
String slave_ip= Channel.getCode();
LOGGER.info(String.format("向从站[%s]发送测试链路指令!",slave_ip));
BootNettyClientChannel channel = BootNettyClientChannelCache.get(ctx.channel().id().asShortText());
if(ObjUtil.isNotEmpty(channel )){
String slaveIp = channel .getCode();
LOGGER.info(String.format("向从站[%s]发送测试链路指令!",slaveIp ));
ctx.channel().writeAndFlush(BasicInstruction104.TESTFR);
//对时指令
ctx.channel().writeAndFlush(BasicInstruction104.getTimeScale104());
@ -84,8 +88,8 @@ public class ScheduledTaskPool {
} catch (Exception e) {
LOGGER.error("Exception caught", e);
}
};
CachedThreadPool.getCachedThreadPool().execute(runnable);
});
threadPool.shutdown();
}
@ -99,7 +103,8 @@ public class ScheduledTaskPool {
}
private void sendGeneralCall() {
Runnable runnable = () -> {
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
try {
BootNettyClientChannel channel = BootNettyClientChannelCache.get(ctx.channel().id().asShortText());
if (ObjUtil.isNotEmpty(channel)) {
@ -111,8 +116,8 @@ public class ScheduledTaskPool {
} catch (Exception e) {
LOGGER.error("Exception caught", e);
}
};
CachedThreadPool.getCachedThreadPool().execute(runnable);
});
threadPool.shutdown();
}
public void stopSendCommandTask() {

View File

@ -1,10 +1,8 @@
package com.yfd.platform.component.iec104.server.master.handler;
import cn.hutool.core.util.ObjUtil;
import com.yfd.platform.component.iec104.core.CachedThreadPool;
import com.yfd.platform.component.iec104.core.ControlManageUtil;
import com.yfd.platform.component.iec104.core.Iec104ThreadLocal;
import com.yfd.platform.component.iec104.core.ScheduledTaskPool;
import com.yfd.platform.component.iec104.common.BasicInstruction104;
import com.yfd.platform.component.iec104.core.*;
import com.yfd.platform.component.iec104.message.MessageDetail;
import com.yfd.platform.component.iec104.server.handler.ChannelHandlerImpl;
import com.yfd.platform.component.iec104.server.handler.DataHandler;
@ -52,16 +50,15 @@ public class Iec104ClientHandler extends SimpleChannelInboundHandler<MessageDeta
BootNettyClientChannelCache.save(ctx.channel().id().asShortText(),Channel);
if (dataHandler != null) {
CachedThreadPool.getCachedThreadPool().execute(new Runnable() {
@Override
public void run() {
try {
dataHandler.handlerAdded(new ChannelHandlerImpl(ctx));
} catch (Exception e) {
LOGGER.error("Exception caught", e);
}
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
try {
dataHandler.handlerAdded(new ChannelHandlerImpl(ctx));
} catch (Exception e) {
LOGGER.error("Exception caught", e);
}
});
threadPool.shutdown();
}
}
@ -80,17 +77,16 @@ public class Iec104ClientHandler extends SimpleChannelInboundHandler<MessageDeta
@Override
public void channelRead0(ChannelHandlerContext ctx, MessageDetail ruleDetail104) throws IOException {
if (dataHandler != null) {
CachedThreadPool.getCachedThreadPool().execute(new Runnable() {
@Override
public void run() {
try {
dataHandler.channelRead(new ChannelHandlerImpl(ctx), ruleDetail104);
} catch (Exception e) {
// TODO Auto-generated catch block
LOGGER.error("Exception caught", e);
}
}
});
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
try {
dataHandler.channelRead(new ChannelHandlerImpl(ctx), ruleDetail104);
} catch (Exception e) {
// TODO Auto-generated catch block
LOGGER.error("Exception caught", e);
}
});
threadPool.shutdown();
}
}

View File

@ -1,18 +1,17 @@
package com.yfd.platform.component.iec104.server.slave.handler;
import com.yfd.platform.component.iec104.core.CachedThreadPool;
import com.yfd.platform.component.iec104.core.ControlManageUtil;
import com.yfd.platform.component.iec104.core.Iec104ThreadLocal;
import com.yfd.platform.component.iec104.core.OptimizedThreadPool;
import com.yfd.platform.component.iec104.message.MessageDetail;
import com.yfd.platform.component.iec104.server.handler.ChannelHandlerImpl;
import com.yfd.platform.component.iec104.server.handler.DataHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
*
@ -42,15 +41,15 @@ public class Iec104TcpSlaveHandler extends SimpleChannelInboundHandler<MessageDe
*/
Iec104ThreadLocal.getControlPool().startSendFrameTask();
if (dataHandler != null) {
Runnable runnable = () -> {
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
try {
dataHandler.handlerAdded(new ChannelHandlerImpl(ctx));
} catch (Exception e) {
LOGGER.error("Exception caught", e);
}
};
CachedThreadPool.getCachedThreadPool().execute(runnable);
});
threadPool.shutdown();
}
}
@ -67,15 +66,17 @@ public class Iec104TcpSlaveHandler extends SimpleChannelInboundHandler<MessageDe
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageDetail ruleDetail104) throws Exception {
if (dataHandler != null) {
Runnable runnable = () -> {
OptimizedThreadPool threadPool = OptimizedThreadPool.getInstance();
threadPool.execute(() -> {
try {
dataHandler.channelRead(new ChannelHandlerImpl(ctx), ruleDetail104);
} catch (Exception e) {
LOGGER.error("Exception caught", e);
}
};
CachedThreadPool.getCachedThreadPool().execute(runnable);
}
});
threadPool.shutdown();
}
}
/**

View File

@ -1,32 +0,0 @@
package com.yfd.platform.uavsystem.conf.thread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThread {
public static ThreadPoolExecutor getThread(String threadName){
ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(3,12,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),new MyThreadFactory(threadName), new MyAbortPolicy());
return threadPoolExecutor;
}
/**
*
* @param threadNum 核心线程数
* @param threadMaxNum 最大线程数
* @param timeOut 超时时间单位秒
* @param QueueNum 队列数量
* @param threadName 线程名称(前缀)
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThread(Integer threadNum,Integer threadMaxNum,Long timeOut,Integer QueueNum,String threadName){
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadNum,threadMaxNum,timeOut, TimeUnit.SECONDS,new LinkedBlockingQueue<>(QueueNum),new MyThreadFactory(threadName), new MyAbortPolicy());
return threadPoolExecutor;
}
}