美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 大家好,这篇文章我们来聊下动态线程池开源项目(DynamicTp)的通知告警模块。目前项目提供以下通知告警功能,每一个通知项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看core模块notify包。

大家好,这篇文章我们来聊下动态线程池开源项目(DynamicTp)的通知告警模块。目前项目提供以下通知告警功能,每一个通知项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看core模块notify包。

1.核心参数变更通知

2.线程池活跃度告警

3.队列容量告警

4.拒绝策略告警

5.任务执行超时告警

6.任务排队超时告警


DynamicTp项目地址

目前700star,感谢你的star,欢迎pr,业务之余一起给开源贡献一份力量

gitee地址https://gitee.com/yanhom/dynamic-tp

github地址https://github.com/lyh200/dynamic-tp


系列文章

美团动态线程池实践思路,开源了

动态线程池框架(DynamicTp),监控及源码解析篇

动态线程池(DynamicTp),动态调整Tomcat、Jetty、Undertow线程池参数篇


线程池解读

上篇文章里大概讲到了JUC线程池的执行流程,我们这里再仔细回顾下,上图是JUC下线程池ThreadPoolExecutor类的继承体系。

顶级接口Executor提供了一种方式,解耦任务的提交和执行,只定义了一个execute(Runnable command)方法用来提交任务,至于具体任务怎么执行则交给他的实现者去自定义实现。

ExecutorService接口继承Executor,且扩展了生命周期管理的方法、返回Futrue的方法、批量提交任务的方法

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

AbstractExecutorService抽象类继承ExecutorService接口,对ExecutorService相关方法提供了默认实现,用RunnableFuture的实现类FutureTask包装Runnable任务,交给execute()方法执行,然后可以从该FutureTask阻塞获取执行结果,并且对批量任务的提交做了编排

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
    
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

ThreadPoolExecutor继承AbstractExecutorService,采用池化思想管理一定数量的线程来调度执行提交的任务,且定义了一套线程池的生命周期状态,用一个ctl变量来同时保存当前池状态(高3位)和当前池线程数(低29位)。看过源码的小伙伴会发现,ThreadPoolExecutor类里的方法大量有同时需要获取或更新池状态和池当前线程数的场景,放一个原子变量里,可以很好的保证数据的一致性以及代码的简洁性。

  // 用此变量保存当前池状态(高3位)和当前线程数(低29位)
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  private static final int COUNT_BITS = Integer.SIZE - 3;
  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  // runState is stored in the high-order bits
  // 可以接受新任务提交,也会处理任务队列中的任务
  // 结果:111 00000000000000000000000000000
  private static final int RUNNING    = -1 << COUNT_BITS;
  
  // 不接受新任务提交,但会处理任务队列中的任务
  // 结果:000 00000000000000000000000000000
  private static final int SHUTDOWN   =  0 << COUNT_BITS;
  
  // 不接受新任务,不执行队列中的任务,且会中断正在执行的任务
  // 结果:001 00000000000000000000000000000
  private static final int STOP       =  1 << COUNT_BITS;
  
  // 任务队列为空,workerCount = 0,线程池的状态在转换为TIDYING状态时,会执行钩子方法terminated()
  // 结果:010 00000000000000000000000000000
  private static final int TIDYING    =  2 << COUNT_BITS;
  
  // 调用terminated()钩子方法后进入TERMINATED状态
  // 结果:010 00000000000000000000000000000
  private static final int TERMINATED =  3 << COUNT_BITS;

  // Packing and unpacking ctl
  // 低29位变为0,得到了线程池的状态
  private static int runStateOf(int c)     { return c & ~CAPACITY; }
  // 高3位变为为0,得到了线程池中的线程数
  private static int workerCountOf(int c)  { return c & CAPACITY; }
  private static int ctlOf(int rs, int wc) { return rs | wc; }

核心入口execute()方法执行逻辑如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

可以总结出如下主要执行流程,当然看上述代码会有一些异常分支判断,可以自己顺理加到下述执行主流程里

1.判断线程池的状态,如果不是RUNNING状态,直接执行拒绝策略

2.如果当前线程数 < 核心线程池,则新建一个线程来处理提交的任务

3.如果当前线程数 > 核心线程数且任务队列没满,则将任务放入任务队列等待执行

4.如果 核心线程池 < 当前线程池数 < 最大线程数,且任务队列已满,则创建新的线程执行提交的任务

5.如果当前线程数 > 最大线程数,且队列已满,则拒绝该任务

addWorker()方法逻辑

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取当前池状态
            int rs = runStateOf(c);

            // 1.判断如果线程池状态 > SHUTDOWN,直接返回false,否则2
            // 2.如果线程池状态 = SHUTDOWN,并且firstTask不为null则直接返回false,因为SHUTDOWN状态的线程池不能在接受新任务,否则3
            // 3.如果线程池状态 = SHUTDOWN,并且firstTask == null,此时如果任务队列为空,则直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 1.如果当前线程池线程数大于等于CAPACITY(理论上的最大值5亿),则返回fasle
                // 2.如果创建核心线程情况下当前池线程数 >= corePoolSize,则返回false
                // 3.如果创建非核心线程情况下当前池线程数 >= maximumPoolSize,则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas 增加当前池线程数量,成功则退出循环    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // cas 增加当前池线程数量失败(多线程并发),则重新获取ctl,计算出当前线程池状态,如果不等于上述计算的状态rs,则说明线程池状态发生了改变,需要跳到外层循环重新进行状态判断,否则执行内部循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 至此说明线程池状态校验通过,且增加池线程数量成功,则创建一个Worker线程来执行任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 访问worker set时需要获取mainLock全局锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 1.当前池状态 < SHUTDOWN,也就是RUNNING状态,如果已经started,抛出异常
                    // 2.当前池状态 = SHUTDOWN,且firstTask == null,需要处理任务队列中的任务,如果已经started,抛出异常
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 刚创建线程添加到workers集合中
                        workers.add(w);
                        int s = workers.size();
                        // 判断更新历史最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动新建线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 启动失败,workerCount--,workers里移除该worker
                addWorkerFailed(w);
        }
        return workerStarted;
    }

线程池中的线程并不是直接用的Thread类,而是定义了一个内部工作线程Worker类,实现了AQS以及Runnable接口,然后持有一个Thread类的引用及一个firstTask(创建后第一个要执行的任务),每个Worker线程启动后会执行run()方法,该方法会调用执行外层runWorker(Worker w)方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 1.如果task不为空,则作为该线程的第一个任务直接执行
        // 2.如果task为空,则通过getTask()方法从任务队列中获取任务执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 线程池状态 >= STOP,则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 实际执行任务前调用的钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 实际执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后调用的钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务置为null,重新获取新任务,完成数++
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 无任务可执行,执行worker销毁逻辑
        processWorkerExit(w, completedAbruptly);
    }
}

getTask()方法逻辑

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 以下两种情况递减工作线程数量
        // 1. rs >= STOP
        // 2. rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 允许核心线程超时 或者 当前线程数 > 核心线程数,有可能发生超时关闭
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc什么情况 > maximumPoolSize,调用setMaximumPoolSize()方法将maximumPoolSize调小了,会发生这种情况,此时需要关闭多余线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 阻塞队列获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 发生中断,进行重试
            timedOut = false;
        }
    }
}

以上内容比较详细的介绍了ThreadPoolExecutor的继承体系,以及相关的核心源码,基于此,现在我们来看DynamicTp提供的告警通知能力。


核心参数变更通知

对应配置中心的监听端监听到配置变更后,封装到DtpProperties中然后交由DtpRegistry类中的refresh()方法去做配置更新,同时通知时会高亮显示有变更的字段


线程池活跃度告警

活跃度 = activeCount / maximumPoolSize

服务启动后会开启一个定时监控任务,每隔一定时间(可配置)去计算线程池的活跃度,达到配置的threshold阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知


队列容量告警

容量使用率 = queueSize / queueCapacity

服务启动后会开启一个定时监控任务,每隔一定时间去计算任务队列的使用率,达到配置的threshold阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知


拒绝策略告警

/**
 * Do sth before reject.
 * @param executor ThreadPoolExecutor instance
 */
default void beforeReject(ThreadPoolExecutor executor) {
    if (executor instanceof DtpExecutor) {
        DtpExecutor dtpExecutor = (DtpExecutor) executor;
        dtpExecutor.incRejectCount(1);
        Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
        AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
    }
}

线程池线程数达到配置的最大线程数,且任务队列已满,再提交任务会触发拒绝策略。DtpExecutor线程池用到的RejectedExecutionHandler是经过动态代理包装过的,在执行具体的拒绝策略之前会执行RejectedAware类beforeReject()方法,此方法会去做拒绝数量累加(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知


任务队列超时告警

重写ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了执行超时或排队超时值,则会用DtpRunnable包装任务,同时记录任务的提交时间submitTime,beforeExecute根据当前时间和submitTime的差值就可以计算到该任务在队列中的等待时间,然后判断如果差值大于配置的queueTimeout则累加排队超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

@Override
public void execute(Runnable command) {
    if (CollUtil.isNotEmpty(taskWrappers)) {
        for (TaskWrapper t : taskWrappers) {
            command = t.wrap(command);
        }
    }

    if (runTimeout > 0 || queueTimeout > 0) {
        command = new DtpRunnable(command);
    }
    super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
    if (!(r instanceof DtpRunnable)) {
        super.beforeExecute(t, r);
        return;
    }
    DtpRunnable runnable = (DtpRunnable) r;
    long currTime = System.currentTimeMillis();
    if (runTimeout > 0) {
        runnable.setStartTime(currTime);
    }
    if (queueTimeout > 0) {
        long waitTime = currTime - runnable.getSubmitTime();
        if (waitTime > queueTimeout) {
            queueTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
        }
    }

    super.beforeExecute(t, r);
}


任务执行超时告警

重写ThreadPoolExecutor的afterExecute()方法,根据当前时间和beforeExecute()中设置的startTime的差值即可算出任务的实际执行时间,然后判断如果差值大于配置的runTimeout则累加排队超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

@Override
protected void afterExecute(Runnable r, Throwable t) {

    if (runTimeout > 0) {
        DtpRunnable runnable = (DtpRunnable) r;
        long runTime = System.currentTimeMillis() - runnable.getStartTime();
        if (runTime > runTimeout) {
            runTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
        }
    }

    super.afterExecute(r, t);
}


告警通知相关配置项

如果想使用通知告警功能,配置文件必须要配置platforms字段,且可以配置多个平台,如钉钉、企微等;notifyItems配置具体告警项,包括阈值、平台、告警间隔等。

spring:
  dynamic:
    tp:
      # 省略其他项
      platforms:                         # 通知平台
        - platform: wechat
          urlKey: 38a98-0c5c3b649c
          receivers: test
        - platform: ding
          urlKey: f80db3e801d593604f4a08dcd6a
          secret: SECb5444a6f375d5b9d21
          receivers: 17811511815
      executors:                                   # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
        - threadPoolName: dtpExecutor1
          executorType: common                          # 线程池类型common、eager:适用于io密集型
          corePoolSize: 2
          maximumPoolSize: 4
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue       # 任务队列,查看源码QueueTypeEnum枚举类
          rejectedHandlerType: CallerRunsPolicy        # 拒绝策略,查看RejectedTypeEnum枚举类
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: dtp1                         # 线程名前缀
          waitForTasksToCompleteOnShutdown: false        # 参考spring线程池设计
          awaitTerminationSeconds: 5                     # 单位(s)
          preStartAllCoreThreads: false                  # 是否预热核心线程,默认false
          runTimeout: 200                                # 任务执行超时阈值,目前只做告警用,单位(ms)
          queueTimeout: 100                              # 任务在队列等待超时阈值,目前只做告警用,单位(ms)
          taskWrapperNames: ["ttl"]                      # 任务包装器名称,集成TaskWrapper接口
          notifyItems:                     # 报警项,不配置自动会按默认值配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)
            - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类
              threshold: 80                # 报警阈值
              platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台
              interval: 120                # 报警间隔(单位:s)
            - type: change
            - type: liveness
              threshold: 80
              interval: 120
            - type: reject
              threshold: 1
              interval: 160
            - type: run_timeout
              threshold: 1
              interval: 120
            - type: queue_timeout
              threshold: 1
              interval: 140

总结

本文开头介绍了线程池ThreadPoolExecutor的继承体系,核心流程的源码解读。然后介绍了DynamicTp提供的以上6种告警通知能力,希望通过监控+告警可以让我们及时感知到我们业务线程池的执行负载情况,第一时间做出调整,防止事故的发生。


联系我

对项目有什么想法或者建议,可以加我微信交流,或者创建issues,一起完善项目

公众号:CodeFox

微信:yanhom1314

目录
相关文章
|
2月前
|
安全 虚拟化
在数字化时代,网络项目的重要性日益凸显。本文从前期准备、方案内容和注意事项三个方面,详细解析了如何撰写一个优质高效的网络项目实施方案,帮助企业和用户实现更好的体验和竞争力
在数字化时代,网络项目的重要性日益凸显。本文从前期准备、方案内容和注意事项三个方面,详细解析了如何撰写一个优质高效的网络项目实施方案,帮助企业和用户实现更好的体验和竞争力。通过具体案例,展示了方案的制定和实施过程,强调了目标明确、技术先进、计划周密、风险可控和预算合理的重要性。
47 5
|
2天前
|
监控 安全 数据可视化
哪些项目适合采用BOT+EPC模式?深度解析
2分钟了解什么是BOT+EPC项目管理模式以及该模式适用于哪些类型的项目。
19 1
哪些项目适合采用BOT+EPC模式?深度解析
|
1月前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
47 10
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
57 4
|
1月前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
42 4
|
1月前
|
Java 调度 Android开发
安卓与iOS开发中的线程管理差异解析
在移动应用开发的广阔天地中,安卓和iOS两大平台各自拥有独特的魅力。如同东西方文化的差异,它们在处理多线程任务时也展现出不同的哲学。本文将带你穿梭于这两个平台之间,比较它们在线程管理上的核心理念、实现方式及性能考量,助你成为跨平台的编程高手。
|
2月前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
65 4
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####

推荐镜像

更多