业务上提出了一个流程的批量发起,要求在 1 分钟内异步发起 1000 个任务。因此构造了如下的线程池模型 1 、调度线程池 单例、防止并发过多
//调度线程池
public static final ThreadPoolTaskExecutor dispatchThreadPool = threadPoolDispatchTaskExecutor();
private static ThreadPoolTaskExecutor threadPoolDispatchTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(10);
threadPoolTaskExecutor.setThreadNamePrefix("all-task-dispatch-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
2 、任务执行线程池、每次执行获取
public static ThreadPoolTaskExecutor threadPoolHandleTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(10);
threadPoolTaskExecutor.setThreadNamePrefix("task-handler-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
3 、 执行任务方法
public void doSomething(List<String> taskList) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = ThreadPoolConfig.threadPoolHandleTaskExecutor();
//分配处理线程池的 max + queue 可以充分利用线程,防止进入拒绝策略
List<List<String>> taskGroup = ListUtil.split(taskList, threadPoolTaskExecutor.getMaxPoolSize() + threadPoolTaskExecutor.getQueueSize());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (List<String> taskMembers : taskGroup) {
for (String task : taskMembers) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
log.info("执行完任务 :{}", task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, threadPoolTaskExecutor).exceptionally(e -> {
log.error("异常: {}, {}", e.getMessage(), e);
return null;
});
log.debug("分发 task: {}, 完毕", task);
futures.add(future);
}
//阻塞每组任务线程 防止超发
log.debug("阻塞每组任务线程 防止超发");
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
log.debug("每组任务线程执行结束");
}
log.debug("执行结束");
threadPoolTaskExecutor.shutdown();
}
4 、 调度线程池分发任务
TaskHandler taskHandler = new TaskHandler();
ThreadPoolTaskExecutor dispatch = ThreadPoolConfig.dispatchThreadPool;
dispatch.submit(
() -> taskHandler.doSomething(taskList)
);
这样设计,经过测试是可以达到 1 分钟 1000 条任务、但不知道是否合理,还有如果多个并发的开销是否过大
1
zhlxsh 2022-10-28 23:39:58 +08:00 via iPhone
是不是应该看 CPU 上下文切换?然后判断最大的创建线程数量?
|
2
uselessVisitor OP @zhlxsh 这个我想要看上线后的效果再调整。。
|