SpringBoot全局配置线程池并发处理socket

spring全局配置线程池,先创建一个名为ExecutorConfig的类,之后在类中添加如下代码:

@Configuration
@EnableAsync
public class ExecutorConfig {

    @Bean
    public Executor carSocketExecutor() {
        //获取当前机器的核数
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(cpuNum);
        //配置最大线程数
        executor.setMaxPoolSize(cpuNum * 2);
        //配置队列大小
        executor.setQueueCapacity(300);
        //线程存活时间
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("carSocketExecutor");
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

}

首先获取当前运行设备的CPU核数,不管程序部署在windows系统还是Liunx系统,都能够获取到运行系统的CPU核数,把CPU核数设为核心线程数,最大程度减小线程切换。最大线程数量是当前运行设备的CPU核数的两倍,目的也是为了减小线程切换。缓存队列设置为300,如果并发更高点可以设置的更大些。

@Async("carSocketExecutor")
    public void listenerSignal() {
        try {
            int port = 9876;
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            log.info("开启监听线程,线程名:" + Thread.currentThread().getName());
            while (true) {
                log.info("等待接收数据。。。");
                Socket socket = serverSocket.accept();
                log.info("开始解析数据。。。");
                socketServerService.readSignalData(socket);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

添加@Async("carSocketExecutor"),当代码执行到listenerSignal()方法的时候就会开启一个线程,加入到名为carSocketExecutor的线程池中,也就是在全局中配置的线程池名。

@Async("carSocketExecutor")
    public void socketExecutor(Socket socket)  {
        try {
            log.info("线程名:" + Thread.currentThread().getName() + " 开始处理。。。");
            Thread.sleep(2000);
            log.info(" 处理完成。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

程序执行到serverSocket.accept()的时候socket会进行阻塞,当一个socket处理完成才会放下一个socket进来,当同时有多个socket进来的时候,sokcet就会在这里进行阻塞,这样的并发处理的效果很差。因此利用线程池到listenerRadio()可以开启一个线程处理的sokcet,再来一个socket可以再开一个线程并发的去处理,这样可以达到同一时间处理多个socket的目的。

同时开启多个线程发送socket消息。

这里我们让处理socket的线程休眠两秒做个测试,可以看出是在并发处理socket。

经验分享 程序员 微信小程序 职场和发展