深入理解java线程池(1)——ThreadPoolExecutor

简介: 简介:梳理线程池核心原理,探讨对ThreadPoolExecutor的理解和总结使用线程池的常用tips

1. 线程池有多重要🤔🤔🤔

线程是一个程序“媛”一定会涉及到的一个概念,但是线程的创建和切换代价都比较大的。所以,我们有没有一个好的方案能做到线程的复用呢?这就涉及到一个概念——线程池。线程池解决的核心问题就是资源管理的问题,合理的使用线程池能够带来3个很明显的好处:

  1. 降低资源消耗:通过重用已经创建的线程,来降低线程创建和销毁造成的资源消耗;
  2. 提高响应速度:在高并发场景下,任务到达时不需要等待线程创建就可以立即执行;
  3. 提高线程的可管理性:线程在系统中是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为不合理的分配线程导致资源调度失衡,降低系统的可用性,而使用线程池则可以统一管理、分配、调优和监控。

综上所述,了解线程池的原理和使用在生产中是非常重要的。本文主要探讨JDK提供的ThreadPoolExecutor类。

2. Java对多线程的支持⚙️——ThreadPoolExecutor

Java的线程池支持主要通过ThreadPoolExecutor来实现,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor。

2.1 核心原理

首先了解ThreadPoolExecutor的继承关系:

  1. 顶层接口Executor:定义 execute 方法来执行任务,入参是 Runnable,无出参
publicinterfaceExecutor {
/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/voidexecute(Runnablecommand);
}

将任务的提交和执行进行解耦,使用者无需关注如何创建线程和如何调度线程,仅需要提供Runnable,关注核心任务的逻辑,将Runnable提交到Executor,由Executor完成线程的调配和任务的执行。

  1. ExecutorServiceExecutor 的功能太弱,ExecutorService 丰富了对任务的执行和管理的功能,补充可以为一个或者一批异步任务生成Future的方法,并提供了管理线程池的方法,比如shutdown()和shutdownNow()
publicinterfaceExecutorServiceextendsExecutor {
voidshutdown();
List<Runnable>shutdownNow();
booleanisShutdown();
booleanisTerminated();
booleanawaitTermination(longtimeout, TimeUnitunit)
throwsInterruptedException;
<T>Future<T>submit(Callable<T>task);
<T>Future<T>submit(Runnabletask, Tresult);
Future<?>submit(Runnabletask);
<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks)
throwsInterruptedException;
<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks,
longtimeout, TimeUnitunit)
throwsInterruptedException;
<T>TinvokeAny(Collection<?extendsCallable<T>>tasks)
throwsInterruptedException, ExecutionException;
<T>TinvokeAny(Collection<?extendsCallable<T>>tasks,
longtimeout, TimeUnitunit)
throwsInterruptedException, ExecutionException, TimeoutException;
}
  1. AbstractExecutorService:封装了Executor很多通用功能的抽象类,实现了部分ExecutorService方法。可以串联任务的流程,保证下次的实现仅关注一个执行任务的方法即可。
  2. ThreadPoolExecutor:是实现ExecutorService接口的实现类之一(当然实现ExecutorService接口的常用类还有ScheduledThreadPoolExecutor、TimedExecutorService等),主要是维护线程的生命周期。

那么ThreadPoolExecutor是如何做到的呢?其运行原理如下:


step1:调用ThreadPoolExecutor的execute提交线程,首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
step2:如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
step3:如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
step4:如果线程数大于等于MaxPoolSize,那么执行拒绝策略

实际上线程池的运行是分为两部分:任务管理、线程管理。任务管理相当于生产者角色,提交任务即生产消息;线程管理相当于消费者角色,线程被统一维护在线程池内,根据任务请求的情况进行线程的分配,当线程执行完任务后会继续获取新的任务执行,最终获取不到任务的时候,线程会被回收。


了解完整体的设计和核心原理,再具体看下线程池是如何管理任务和线程的?接下来从线程池的生命周期开始分析。

2.2 生命周期

线程池的运行状态,不是由用户显式设置的,而是伴随着线程池的运行,由内部来维 护。线程池内部使用一个变量维护两个值:运行状态 (runState) 和线程数量 (workerCount)。在具体实现中,线程池将运行状态 (runState)、线程数量 (workerCount)。两个关键参数的维护放在了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 这个 AtomicInteger 类型,是对线程池的运行状态和线程池中有效线程的数量 进行控制的一个字段,它同时包含两部分的信息:运行状态 (runState) 、有效线程的数量 (workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关 决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线 程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码 所示:

// 计算当前运行状态privatestaticintrunStateOf(intc)     { returnc&~CAPACITY; }
// 计算当前线程数据privatestaticintworkerCountOf(intc)  { returnc&CAPACITY; }
// 通过状态和线程数计算ctlprivatestaticintctlOf(intrs, intwc) { returnrs|wc; }


ThreadPoolExecutor 的运行状态有 5 种,分别为:

  • RUNNING:运行状态,可以接受新提交的任务,并且能够处理阻塞队列中的任务;
  • SHUTDOWN:关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中的任务;
  • STOP:停止状态,不再接受新的任务,也不处理阻塞队列中的任务,中断正在处理的任务线程;
  • TIDYING:整理状态,所有的任务都已经终止,workCount(有效线程数为0);
  • TERMINATED:终止状态,在terminated方法执行之后进入该状态。


分析完线程池的生命周期,那么在线程池中是如何实现调度呢?

2.3 线程调度

线程池的本质是对任务和线程的管理,而做到这一点的核心思想就是将任务和线程两者解耦,不让两者直接关联。前面小节有提到,线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。那么缓冲模块是线程池能够管理任务的核心部分。

从存储结构、是否有界、是否线程安全,逐个分析常用的几种类型阻塞队列:

  • ArrayBlockingQueue:是一个基于数组的阻塞队列
  • 底层存储结构:数组
  • 是否有界:有(创建该阻塞队列实例需要指定队列容量)
  • 是否线程安全:是(在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制,换言之生产者线程与消费者线程间无法同时操作)
  • LinkedBlockingQueue:是一个基于链表的阻塞队列,
  • 底层存储结构:链表
  • 是否有界:可有可无(该阻塞队列容量默认为 Integer.MAX_VALUE,即如果未显式设置队列容量时可以视为是一个无界队列;反之构建实例过程中指定队列容量,则其就是一个有界队列。)
  • 是否线程安全:是(在并发控制层面,其使用了两个ReentrantLock可重入锁来分别控制对入队、出队这两种类型的操作。使得生产者线程与消费者线程间可以同时操作提高效率。特别地对于链表这种结构而言,Java还提供了一个实现BlockingDeque接口的LinkedBlockingDeque类——其是一个基于链表的双向阻塞队列)
  • PriorityBlockingQueue:优先级队列
  • 底层存储结构:使用数组实现元素的存储、最小堆的表示(默认使用元素的自然排序,即要求元素实现Comparable接口;或者显式指定比较器Comparator。)
  • 是否有界:无
  • 是否线程安全:否(在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制。在创建该队列实例时虽然可以指定容量。但这并不是队列的最终容量,而只是该队列实例的初始容量。一旦后续过程队列容量不足,其会自动进行扩容。值得一提的是,为了保证同时只有一个线程进行扩容,其内部是通过CAS方式来实现的,而不是利用ReentrantLock可重入锁来控制。)

示例:

@Testpublicvoidtest1() {
BlockingQueue<Integer>blockingQueue=newPriorityBlockingQueue<>(2);
blockingQueue.offer(13);
blockingQueue.offer(5);
blockingQueue.offer(7);
Integersize=blockingQueue.size();
System.out.println("blockingQueue: "+blockingQueue+", size: "+size);
Integere1=blockingQueue.poll();
System.out.println("e1: "+e1);
Integere2=blockingQueue.poll();
System.out.println("e2: "+e2);
Integere3=blockingQueue.poll();
System.out.println("e3: "+e3);
}

运行结果:

  • DelayQueue:延迟队列
  • 底层存储结构:使用PriorityQueue实现元素的存储
  • 是否有界:无
  • 是否线程安全:否(使用ReentrantLock实现线程同步)

示例:

publicclassBlockingQueueTest {
privatestaticDateTimeFormatterformatter=DateTimeFormatter.ofPattern("HH:mm:ss");
@Testpublicvoidtest2() throwsException {
BlockingQueue<Cache>blockingQueue=newDelayQueue<>();
newThread(() -> {
while (true) {
try {
Cachecache=blockingQueue.take();
info("消费者: "+cache.toString());
                } catch (Exceptione) {
System.out.println("Happen Exception: "+e.getMessage());
                }
            }
        }).start();
LongtimeStamp=System.currentTimeMillis();
Cachecache1=newCache("name", "Aaron", timeStamp+15*1000);
blockingQueue.put(cache1);
Cachecache2=newCache("age", "18", timeStamp+27*1000);
blockingQueue.put(cache2);
Cachecache3=newCache("country", "China", timeStamp+7*1000);
blockingQueue.put(cache3);
Thread.sleep(120*1000);
    }
/*** 打印信息*/privatestaticvoidinfo(Stringmsg) {
Stringtime=formatter.format(LocalTime.now());
Stringthread=Thread.currentThread().getName();
Stringlog="["+time+"] "+msg;
System.out.println(log);
    }
@AllArgsConstructor@DataprivatestaticclassCacheimplementsDelayed {
// 缓存 KeyprivateStringkey;
// 缓存 ValueprivateStringvalue;
// 缓存到期时间privateLongexpire;
/*** 计算当前延迟时间* @param unit* @return*/@OverridepubliclonggetDelay(TimeUnitunit) {
// 缓存有效的剩余毫秒数longdelta=expire-System.currentTimeMillis();
returnunit.convert(delta, TimeUnit.MILLISECONDS);
        }
/*** 定义比较规则, 延迟时间按从小到大进行排序* @param o* @return*/@OverridepublicintcompareTo(Delayedo) {
Cacheother= (Cache) o;
returnthis.getExpire().compareTo(other.getExpire());
        }
@OverridepublicStringtoString() {
Datetime=newDate(expire);
SimpleDateFormatformatter=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
StringtimeStr=formatter.format(time);
return"Cache, key: "+key+", expire: "+timeStr;
        }
    }
}

运行结果:

  • SynchronousQueue:同步队列(该队列没有容量无法存储元素,故生产者添加的数据会直接被消费者获取并且立刻消费。所以当生产者线程添加数据时,如果此时恰好有一个消费者已经准备好获取队头元素了,则会添加成功;否则要么添加失败返回false要么被阻塞。)Executors.newCachedThreadPool()创建的线程池实例,其内部任务队列使用的就是SynchronousQueue,故offer方法添加任务到队列失败后则会开启新的线程来进行处理。

示例:

@Testpublicvoidtest3() {
BlockingQueue<Integer>blockingQueue=newSynchronousQueue<>();
Booleanb1=blockingQueue.offer(237);
info("生产者 b1: "+b1);
// 消费者线程newThread( ()->{
try{
Integere=blockingQueue.take();
info("消费者:"+e);
        } catch (Exceptione) {
info("Happen Exception: "+e.getMessage());
        }
    } ).start();
// 确保消费者线程已经准备完毕try { Thread.sleep(2000); } catch (Exceptione) {}
Booleanb2=blockingQueue.offer(996);
info("生产者 b2: "+b2);
try { Thread.sleep(120*1000); } catch (Exceptione) {}
}

测试结果如下,符合预期。生产者第一次添加元素结果失败,原因很简单。因为同步队列没有存储元素的能力,故如果没有消费者直接取走,则生产者即会添加失败;第二次添加时,消费者线程已经在阻塞等待了,故添加成功:

下面利用阻塞的put方法来添加元素,示例代码如下所示:

@Testpublicvoidtest4() {
BlockingQueue<Integer>blockingQueue=newSynchronousQueue<>();
// 生产者线程newThread(() -> {
try {
info("生产者: Start");
while (true) {
Integernum= (int) (Math.random() *100+1);
info("生产者: put "+num);
blockingQueue.put(num);
                }
            } catch (Exceptione) {
info("Happen Exception: "+e.getMessage());
            }
        }).start();
// 消费者线程newThread(() -> {
try {
info("消费者: Start");
while (true) {
try {
Thread.sleep(5000);
                    } catch (Exceptione) {
                    }
Integere=blockingQueue.take();
info("消费者: "+e);
                }
            } catch (Exceptione) {
info("Happen Exception: "+e.getMessage());
            }
        }).start();
try { Thread.sleep(120*1000); } catch (Exceptione) {}
    }

3. 小结

本片仅从线程池的源码出发,分析其原理,具体使用过程中整理了几条使用TIPS如下:

使用TIPS:

1.shutdown()和shutdownNow()

shutdown做了几件事:

  • 检查是否能操作目标线程
  • 将线程池状态转为SHUTDOWN
  • 中断所有空闲线程

 

publicvoidshutdown() {
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {
mainLock.unlock();
        }
tryTerminate();
 }

什么是空闲线程?interruptIdleWorkers的核心逻辑如下:

privatevoidinterruptIdleWorkers(booleanonlyOne) {
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
for (Workerw : workers) {
Threadt=w.thread;
if (!t.isInterrupted() &&w.tryLock()) {
try {
t.interrupt();
                    } catch (SecurityExceptionignore) {
                    } finally {
w.unlock();
                    }
                }
if (onlyOne)
break;
            }
        } finally {
mainLock.unlock();
        }
}

这里主要是为了中断worker,但是中断之前需要先获取锁,这就意味着正在运行的Worker不能中断。但是上面的代码有w.tryLock(),那么获取不到锁就不会中断,shutdown的Interrupt只是对所有的空闲Worker(正在从workQueue中取Task,此时Worker没有加锁)发送中断信号。

当调用ShutDown方法时,首先设置了线程池的状态为ShutDown,此时1阶段的worker进入到状态判断时会返回null,此时Worker退出。

因为getTask的时候是不加锁的,所以在shutdown时可以调用worker.Interrupt.此时会中断退出,Loop到状态判断时,同时workQueue为empty。那么抛出中断异常,导致重新Loop,在检测线程池状态时,Worker退出。如果workQueue不为null就不会退出,此处有些疑问,因为没有看见中断标志位清除的逻辑,那么这里就会不停的循环直到workQueue为Empty退出。

这里也能看出来shutdown()只是清除一些空闲Worker,并且拒绝新Task加入,对于workQueue中的线程还是继续处理的。
对于shutdown中获取mainLock而addWorker中也做了mainLock的获取,这么做主要是因为Works是HashSet类型的,是线程不安全的,可以看到在addWorker后面也是对线程池状态做了判断,将Worker添加和中断逻辑分离开。

再看shutdownNow:

publicList<Runnable>shutdownNow() {
List<Runnable>tasks;
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks=drainQueue();
        } finally {
mainLock.unlock();
        }
tryTerminate();
returntasks;
}

shutdownNow和shutdown代码类似,但是实现却很不相同。首先是设置线程池状态为STOP,前面的代码可以看到,是对SHUTDOWN有一些额外的判断逻辑,但是对于>=STOP,基本都是reject,STOP也是比SHUTDOWN更加严格的一种状态。此时不会有新Worker加入,所有刚执行完一个线程后去GetTask的Worker都会退出。
之后调用interruptWorkers

 

privatevoidinterruptWorkers() {
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try {
for (Workerw : workers)
w.interruptIfStarted();
        } finally {
mainLock.unlock();
        }
 }

这里可以看出来,此方法目的是中断所有的Worker,而不是像shutdown中那样只中断空闲线程。这样体现了STOP的特点,中断所有线程,同时workQueue中的Task也不会执行了。

所以接下来drainQueue:

 

privateList<Runnable>drainQueue() {
BlockingQueue<Runnable>q=workQueue;
ArrayList<Runnable>taskList=newArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnabler : q.toArray(newRunnable[0])) {
if (q.remove(r))
taskList.add(r);
            }
        }
returntaskList;
 }

获取所有没有执行的Task,并且返回。
这也体现了STOP的特点:拒绝所有新Task的加入,同时中断所有线程,WorkerQueue中没有执行的线程全部抛弃。所以此时Pool是空的,WorkerQueue也是空的。

2.work和Task

Worker是当前线程池中的线程,而task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。

3.maximumPoolSize和corePoolSize

maximumPoolSize为线程池最大容量,也就是说线程池最多能起多少Worker。corePoolSize是核心线程池的大小,当corePoolSize满了时,同时workQueue full(ArrayBolckQueue是可能满的) 那么此时允许新建Worker去处理workQueue中的Task,但是不能超过maximumPoolSize。超过corePoolSize之外的线程会在空闲超时后终止。

4.Keep-alive times 参数:

作用: 如果当前线程池中有超过 coreSize 的线程,并且线程空闲的时间超过 keepAliveTime,当前线程就会被回收,这样可以避免线程没有被使用时的资源浪费;

通过 setKeepAliveTime 方法可以动态的设置 keepAliveTime 的值;

如果设置 allowCoreThreadTimeOut 为 ture 的话,core thread 空闲时间超过 keepAliveTime 的话,也会被回收

5.拒绝策略:

在 Executor 已经关闭或对最大线程和最大队列都使用饱和时,可以使用 RejectedExecutionHandler 类进行异常捕捉。有如下四种处理策略:

  • AbortPolicy(默认):抛出异常
  • CallerRunsPolicy:不使用线程池,主线程来执行
  • DiscardPolicy:直接丢弃任务
  • DiscardOldestPolicy:丢弃队列中最老任务

6.ThreadPoolExecutor初始化参数设置思路:

publicThreadPoolExecutor(intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler) {
if (corePoolSize<0||maximumPoolSize<=0||maximumPoolSize<corePoolSize||keepAliveTime<0)
thrownewIllegalArgumentException();
if (workQueue==null||threadFactory==null||handler==null)
thrownewNullPointerException();
this.acc=System.getSecurityManager() ==null?null :
AccessController.getContext();
this.corePoolSize=corePoolSize;
this.maximumPoolSize=maximumPoolSize;
this.workQueue=workQueue;
this.keepAliveTime=unit.toNanos(keepAliveTime);
this.threadFactory=threadFactory;
this.handler=handler;
    }
  • 线程大小的设置,常见实现:
  • coreSize == maxSize:

如上设置,随着请求的不断增加,会是这样的现象:

  • 请求数 < coreSize 时,新增线程;
  • 请求数 >= coreSize && 队列不满时,添加任务入队;
  • 队列满时,此时因为 coreSize 和 maxSize 相等,任务会被直接拒绝。

这么写的最大目的:是让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗,一般来说业务流量都有波峰低谷,在流量低谷时,线程不会被回收;流量波峰时,maxSize 的线程可以应对波峰,不需要慢慢初始化到 maxSize 的过程。

如上设置有两个前提条件:

  1. allowCoreThreadTimeOut 采取默认 false,而不会主动设置成 true,allowCoreThreadTimeOut 是 false 的话,当线程空闲时,就不会回收核心线程;
  2. keepAliveTime 和 TimeUnit 都会设置很大,这样线程空闲的时间就很长,线程就不会轻易的被回收

大多数情况下机器的资源是比较充足,不需要担心线程空闲会浪费机器的资源,所以这种写法目前比较常见。

  • maxSize 无界 + SynchronousQueue:

SynchronousQueue 其内部有堆栈和队列两种形式,默认是堆栈的形式,其内部是没有存储的容器的,放元素和拿元素是一一对应的,比如我使用 put 方法放元素,如果此时没有对应的 take 操作的话,put 操作就会阻塞,需要有线程过来执行 take 操作后,put 操作才会返回。

maxSize 无界 + SynchronousQueue 这样的组合方式优缺点都很明显:

  • 优点:阻塞队列没有存储空间,只要请求到来,就必须找到一条空闲线程去处理这个请求,找不到则在线程池新开辟一条线程去执行。如果是其他的队列的话,只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积。
  • 缺点:
  1. 比较消耗资源,大量请求到来时,会新建大量的线程来处理请求
  2. 正是因为 SynchronousQueue 没有存储空间,若线程池中的线程数已经达到了 maxSize 且没有空闲线程,那么第 maxSize+1 个任务就会被reject。所以如果请求的量难以预估的话,maxSize 的大小也很难设置
  • maxSize 有界 + Queue 无界:

适用场景:对实时性要求不大,但流量忽高忽低的场景。

比如设置 maxSize 为 20,Queue 选择默认构造器的 LinkedBlockingQueue,这样做的优缺点如下:

  • 优点:
  1. 电脑 cpu 固定的情况下,每秒能同时工作的线程数是有限的,此时开很多的线程其实也是浪费,还不如把这些请求放到队列中去等待,这样可以减少线程之间的 CPU 的竞争;
  2. LinkedBlockingQueue 默认构造器构造出来的链表的最大容量是 Integer 的最大值,非常适合流量忽高忽低的场景,当流量高峰时,大量的请求被阻塞在队列中,让有限的线程可以慢慢消费。
  • 缺点:流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证,所以当对请求的实时性要求较高的场景,不能使用该组合。
  • maxSize 有界 + Queue 有界:

这种组合是对「maxSize 有界 + Queue 无界」缺点的补充,把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可。需要把线程和队列的大小进行配合计算,保证大多数请求都可以在要求的时间内,有响应返回。


  • 如何设置空闲线程不被回收?

keepAliveTime 设置成 0 时,线程使用 poll 方法在队列上进行超时阻塞时,会立马返回 null,也就是空闲线程会立马被回收。所以这种设置是错误🙅的。

可以设置 keepAliveTime 为无穷大值,并且设置 TimeUnit 为时间的大单位,比如设置 keepAliveTime 为 365,TimeUnit 为 TimeUnit.DAYS,意思是线程空闲 1 年内都不会被回收。

假设机器的内存都够大,合理设置 maxSize 后,即使线程空闲,也不希望线程被回收,所以也可以设置 keepAliveTime 为无穷大。


  • 什么时候应该使用公用线程池?

是否使用公用线程池,一般有以下原则:

  1. 查询和写入不公用线程池:一般来说,对于互联网应用,查询量远远大于写入的量,如果查询和写入都要走线程池的话,一定不要公用线程池,也就是说查询走查询的线程池,写入走写入的线程池。如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理;
  2. 多个写入业务场景看情况是否需要公用线程池:原则上来说,每个业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易,一旦多个业务场景公用线程池,可能就会造成业务场景之间的互相影响,每个写入业务场景独立使用自己的线程池是比较合理的;
  3. 多个查询业务场景是可以公用线程池的:查询的请求一般来说有几个特点:查询的场景多、rt 时间短、查询的量比较大,如果给每个查询场景都弄一个单独的线程池的话,第一个比较耗资源,第二个很难定义线程池中线程和队列的大小,比较复杂,所以多个相似的查询业务场景是可以公用线程池的。

  • 如何算线程大小和队列大小?

主要从几个方面入手:

  • 根据业务进行考虑,初始化线程池时,需要考虑所有业务的并发情况:
  • 如果目前所有的业务同时都有很大流量,那么在对于当前业务设置线程池时,尽量把线程大小、队列大小都设置小;
  • 如果所有业务基本上都不会同时有流量,那么就可以稍微设置大一点
  • 根据业务的实时性要求:
  • 如果实时性要求高的话,把队列设置小一点,coreSize == maxSize,并且设置 maxSize 大一点;
  • 如果实时性要求低的话,就可以把队列设置大一点


举个🌰:假设现在机器上某一时间段只会运行一种业务,业务的实时性要求较高,每个请求的平均 rt(ResponseTime) 是 300ms,请求超时时间是 3000ms,机器是 4 核 CPU,内存 16G,一台机器的 qps 是 100,这时候可以模拟一下如何设置:

4 核 CPU,假设 CPU 能够跑满,每个请求的 rt 是 300ms,就是 300 ms 能执行 4 条请求,3000ms 内能执行 3000/300 * 4 = 40 条请求;

300 ms 能执行 4 条请求,实际上 4 核 CPU 的性能远远高于这个,可以拍脑袋假设增加 15条,也就是说 3000ms 内预估能够执行 55 条;

一台机器的 qps 是 100,此时计算出一台机器 3 秒内最多处理 55 条请求,所以此时如果不进行 rt 优化的话,需要加至少一台机器。

线程池可以大概这么设置:

ThreadPoolExecutorexecutor=newThreadPoolExecutor(20, 20, 365L, TimeUnit.DAYS,
newLinkedBlockingQueue(35));

线程数最大为 20,队列最大为 35,这样机器差不多可以在 3000ms 内处理最大的请求 55 条,当然根据你机器的性能和实时性要求,你可以调整线程数和队列的大小占比,只要总和小于 55 即可。

以上只是很粗糙的设置,在实际项目中,还需要根据实际情况不断的观察和调整。

随着云巧资产市场的业务功能逐渐丰富和复杂,串行实现已无法有效支持业务,因此需要灵活的运用多线程进行优化,下一篇将结合资产市场核心业务——下行篇,总结多线程的有效实践和踩坑经验,敬请期待👻👻👻。

参考:

相关文章
|
2月前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
15天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
72 17
|
5月前
|
Java 调度 数据库
Java并发编程:深入理解线程池
在Java并发编程的海洋中,线程池是一艘强大的船,它不仅提高了性能,还简化了代码结构。本文将带你潜入线程池的深海,探索其核心组件、工作原理及如何高效利用线程池来优化你的并发应用。
|
5月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
151 1
|
2月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
231 64
|
5月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
1月前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
126 38
|
2月前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####

热门文章

最新文章