线程池常用参数介绍
corePoolSize核心线程数,当往线程池内部提交任务的时候,线程池会创建一个线程来执行任务。即使此时有空闲的工作线程能够处理当前任务,只要总的工作线程数小于corePoolSize,也会创建新的工作线程。
maximumPoolSize当任务的堵塞队列满了之后,如果还有新的任务提交到线程池内部,此时倘若工作线程数小于maximumPoolSize,则会创建新的工作线程。
keepAliveTime上边我们说到了工作线程
Worker(java.util.concurrent.ThreadPoolExecutor.Worker),当工作线程处于空闲状态中,如果超过了keepAliveTime依然没有任务,那么就会销毁当前工作线程。如果工作线程需要一直处于执行任务,每个任务的连续间隔都比较短,那么这个keepAliveTime 属性可以适当地调整大一些。
unitkeepAliveTime对应的时间单位
workQueue工作队列,当工作线程数达到了核心线程数,那么此时新来的线程就会被放入到工作队列中。线程池内部的工作队列全部都是继承自阻塞队列的接口,对于常用的阻塞队列类型为:
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- PriorityBlockingQueue
RejectedExecutionHandlerJDK内部的线程拒绝策略包含了多种许多种,这里我罗列一些常见的拒绝策略给大家认识下:
- AbortPolicy 直接抛出异常
- CallerRunsPolicy 任务的执行由注入的线程自己执行
- DiscardOldestPolicy 直接抛弃掉堵塞队列中队列头部的任务,然后执行尝试将当前任务提交到堵塞队列中。
- DiscardPolicy 直接抛弃这个任务
从线程池设计中的一些启发
多消费队列的设计场景应用:业务上游提交任务,然后任务被放进一个堵塞队列中,接下来消费者需要从堵塞队列中提取元素,并且将它们转发到多个子队列中,各个子队列分别交给不同的子消费者处理数据。例如下图所示:
public interface AsyncHandlerService { /** * 任务放入队列中 * * @param asyncHandlerData */ boolean putTask(AsyncHandlerData asyncHandlerData); /** * 启动消费 */ void startJob(); }
多消费者分发处理实现类:
@Component("asyncMultiConsumerHandlerHandler") public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{ private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10); @Override public boolean putTask(AsyncHandlerData asyncHandlerData) { return taskQueueHandler.addTask(asyncHandlerData); } @Override public void startJob(){ Thread thread = new Thread(taskQueueHandler); thread.setDaemon(true); thread.start(); } /** * 将任务分发给各个子队列去处理 */ static class TaskQueueHandler implements Runnable { private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11); public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() { return tasks; } private TaskDispatcherHandler[] taskDispatcherHandlers; private int childConsumerSize = 0; public TaskQueueHandler(int childConsumerSize) { this.childConsumerSize = childConsumerSize; taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize]; for (int i = 0; i < taskDispatcherHandlers.length; i++) { taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i); Thread thread = new Thread(taskDispatcherHandlers[i]); thread.setDaemon(false); thread.setName("taskQueueHandler-child-"+i); thread.start(); } } public boolean addTask(AsyncHandlerData asyncHandlerData) { return tasks.offer(asyncHandlerData); } @Override public void run() { int index = 0; for (; ; ) { try { AsyncHandlerData asyncHandlerData = tasks.take(); index = (index == taskDispatcherHandlers.length) ? 0 : index; taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData); index++; } catch (InterruptedException e) { e.printStackTrace(); } } } } static class TaskDispatcherHandler implements Runnable { private BlockingQueue<AsyncHandlerData> subTaskQueue; private String childName; private AtomicLong taskCount = new AtomicLong(0); public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) { this.subTaskQueue = blockingQueue; this.childName = childName; } public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) { subTaskQueue.add(asyncHandlerData); } @Override public void run() { for (; ; ) { try { AsyncHandlerData asyncHandlerData = subTaskQueue.take(); long count = taskCount.incrementAndGet(); System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo() + count); Thread.sleep(3000); System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count); } catch (Exception e) { e.printStackTrace(); } } } } }
测试接口:
@GetMapping(value = "/send-async-data") public boolean sendAsyncData(){ AsyncHandlerData asyncHandlerData = new AsyncHandlerData(); asyncHandlerData.setDataInfo("data info"); boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData); if(!status){ throw new RuntimeException("insert fail"); } return status; }
这种设计模型适合用于对于请求吞吐量要求较高,每个请求都比较耗时的场景中。
自定义拒绝策略的应用根据具体的应用场景,通过实现java.util.concurrent.RejectedExecutionHandler
接口,自定义拒绝策略,例如对于当抛出拒绝异常的时候,往数据库中记录一些信息或者日志。
相关案例代码:
public class MyRejectPolicy{ static class MyTask implements Runnable{ @Override public void run() { System.out.println("this is test"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS , new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("任务被拒绝:" + r.toString()); //记录一些信息 } }); for(int i=0;i<100;i++){ Thread thread = new Thread(new MyTask()); threadPoolExecutor.execute(thread); } Thread.yield(); } }
统计线程池的详细信息
通过阅读线程池的源代码之后,可以借助重写beforeExecute、afterExecute、terminated 方法去对线程池的每个线程耗时做统计。以及通过继承 ThreadPoolExecutor 对象之后,对当前线程池的coreSIze、maxiMumSize等等属性进行监控。
相关案例代码:
public class SysThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<>(); private Logger logger = LoggerFactory.getLogger(SysThreadPool.class); public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); startTime.set(System.currentTimeMillis()); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); long endTime = System.currentTimeMillis(); long executeTime = endTime - startTime.get(); logger.info("Thread {}: ExecuteTime {}", r, executeTime); } @Override public void shutdown() { super.shutdown(); } @Override public void execute(Runnable command) { super.execute(command); } public void getTaskInfo(){ logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size()); } static class MyTestTask implements Runnable{ @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2)); sysThreadPool.getTaskInfo(); System.out.println("------------"); for(int i=0;i<10;i++){ Thread thread = new Thread(new MyTestTask()); sysThreadPool.submit(thread); sysThreadPool.getTaskInfo(); } System.out.println("------------"); Thread.sleep(3000); } }
通过日志打印记录线程池的参数变化:
通过这份案例代码不妨可以设想下通过一些定时上报逻辑来实现线程池的监控功能。