一、背景
在《分布式锁主动续期的入门级实现-自省 | 简约而不简单》中通过【自省】的方式讨论了关于分布式锁自动续期功能的入门级实现方式,简单同步一下上下文:
- 客户端抢到分布式锁之后开始执行任务,执行完毕后再释放分布式锁。
- 持锁后因客户端异常未能把锁释放,会导致锁成为永恒锁。
- 为了避免这种情况,在创建锁的时候给锁指定一个过期时间。
- 到期之后锁会被自动删除掉,这个角度看是对锁资源的一种保护。
- 重点:但若锁过期被删除后,任务还没结束怎么办?
- 可以通过在一个额外的线程中主动推迟分布式锁的过期时间,下文也用续期一词来表述;避免当任务还没执行完,锁就被删除了
二、理还乱?
逻辑看很简单,也很清晰,但任何事情都有两面性,每个锁配一个额外的线程做watchDog
专门去处理,实现起来自然简单清晰,但肯定也有弊端。如果要把锁的功能做的健壮,总要从不断地自我质疑、自我反思中,理顺思路,寻找答案,我认为这属于自省式学习,以后也想尝试这种模式,一起来试试吧:
- 问题:如果同时有成百上千个锁呢?
同时有成百上千个锁,按照上篇中的实现方式,就会对应创建成百上千个线程在做续期工作,但实际上间歇性的续租操作并非高并发操作,只需要几个线程即可。类比一下一群羊只要少数牧羊犬看护的情景?
- 问题:什么场景下会有同时出现这么多锁呢?
如运营要做抢购活动,那么就会瞬间有成百上千的下单请求进入服务中,在高并发场景下特别容易出现超时而导致 rpc 重试 ,而这时需要拥有一种防重入的自保护机制的。对防重入感兴趣的这里提供一个直通车:《分布式锁中-基于 Redis 的实现如何防重入》。 - 问题:如何避免创建这么多线程呢?
池化机制,Java 中提供了用于执行调度任务的线程池,如ScheduledExecutorService#scheduleAtFixedRate
。 - 问题:如何构建
示例:
new ScheduledThreadPoolExecutor(corePoolSize, new NamedThreadFactory("defaultKeepAlivePool-", true), new ThreadPoolExecutor.AbortPolicy() 复制代码
- 构造函数详情:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } 复制代码
- 问题:线程池的构建居然敢用
Integer.MAX_VALUE
?
这构造方法明显有问题,你该知道Integer.MAX_VALUE
违反了阿里 Java 代码规范。 - 问题:哪一条规范?
看下边这条规范中,明确指出允许的请求队列长度为Integer.MAX_VALUE
,可能会 xxx 导致 OOM
- 问题:有
Integer.MAX_VALUE
就能 OOM 了?
是的,交给线程池执行的是一个个任务对象,每个任务对象都会占用一定的内存,当线程池处理任务的能力降低,任务数越来越多的时候就 OOM 了。 - 问题:能举个例子嘛?
设置 JVM 内存-Xms200m -Xmx200m
,JVM 内存上限设定小一些,每个任务里占用的内存给大一些,加速 OOM 报错。
// 调整VM参数,加速OOM:-Xms200m -Xmx200m public static void testOOM(){ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); try { for(int i = 0;i<10000000;i++){ scheduledExecutorService.scheduleAtFixedRate(() -> { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor)scheduledExecutorService; int size = executor.getQueue().size(); System.out.println("size : " + size); //为了快速复现,任务里给个大内存占用 byte[] array = new byte[1024*1024*10]; try { TimeUnit.MINUTES.sleep(10); int length = array.length; System.out.println(length); } catch (InterruptedException e) { e.printStackTrace(); } },5,5,TimeUnit.SECONDS); } TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } 复制代码
- 问题:结果啥样?
果然模拟出了 OOM,结果如下:
size : 1637400 size : 1637778 ... Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.Executors.callable(Executors.java:407) at java.util.concurrent.FutureTask.<init>(FutureTask.java:152) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.<init>(ScheduledThreadPoolExecutor.java:219) at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570) at TestSchedule.testOOM(TestSchedule.java:107 at TestSchedule.main(TestSchedule.java:8) 复制代码
- 问题:那
ScheduledThreadPool
就不能用了?
看场景,对于分布式锁看门口的场景下,一个系统中除非恶意写 bug;否则按照常规并发设置,如 Dubbo Provider 实例中,一般线程数是设置为 200,那并发情况下同时也就存在 200 个锁,从续期任务的提交看,最多只有 200 个,这个任务数,远远不到 Integer.MAX。 - 问题:那要是就无限创建锁呢? 这种不讲场景的无限 xxx 的操作是 bug。很多操作被无限 xxx 后都能 OOM 。
- 问题:那我只能用它了?
从网络中搜集到的资料情况来看,ScheduledThreadPool
最多,读者老师若熟悉其它的池化调度组件,也烦请留言告知。
- 问题:虽然只提交了 200 个任务,但任务是定时触发的,这有风险的呀?
定时重复执行的任务,如果每 5 秒执行一次,一个任务执行的耗时在 30 秒,那任务数就越来越多了。 - 问题:定时任务数会因执行慢而越来越多嘛?
public static void testInterval(){ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); try { System.out.println("start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); scheduledExecutorService.scheduleAtFixedRate(() -> { try { System.out.println("enter : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); TimeUnit.SECONDS.sleep(10); System.out.println("exist : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); } catch (InterruptedException e) { e.printStackTrace(); } },5,5,TimeUnit.SECONDS); TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } 复制代码
- 任务没结束时,下个任务并未开始,而是等任务结束后才开始,那就没有了耗时大于定时间隔,而导致任务越来越多的风险了。
start : 2022-12-06T17:33:39.177 // 任务开始 enter : 2022-12-06T17:33:44.239 // 5秒后首次执行 exist : 2022-12-06T17:33:54.239 // 任务耗时10秒 enter : 2022-12-06T17:33:54.239 // 要求任务间隔是5秒,但已耗时10秒,超过了5秒,任务没结束时,下个任务并未开始,而是等任务结束后才开始 exist : 2022-12-06T17:34:04.24 enter : 2022-12-06T17:34:04.24 exist : 2022-12-06T17:34:14.241 enter : 2022-12-06T17:34:14.241 exist : 2022-12-06T17:34:24.241 复制代码
- 问题:那任务执行耗时 小于 定时间隔的,什么时机开始下一次任务呢?
public static void testInterval(){ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); try { System.out.println("start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); scheduledExecutorService.scheduleAtFixedRate(() -> { try { System.out.println("enter : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); TimeUnit.SECONDS.sleep(5); System.out.println("exist : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); } catch (InterruptedException e) { e.printStackTrace(); } },10,10,TimeUnit.SECONDS); TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } ``` ``` start : 2022-12-06T17:31:01.771 //任务开始 enter : 2022-12-06T17:31:11.846 //10秒后首次执行 exist : 2022-12-06T17:31:16.846 //任务耗时5秒 enter : 2022-12-06T17:31:21.845 //因间隔10秒,任务耗时5秒,所以5秒后继续重新执行任务 exist : 2022-12-06T17:31:26.846 enter : 2022-12-06T17:31:31.846 exist : 2022-12-06T17:31:36.846 enter : 2022-12-06T17:31:41.847 exist : 2022-12-06T17:31:46.847 复制代码
- 问题:所以
scheduleAtFixedRate
的逻辑结论是?
- 如果上一个任务的执行时间大于等待时间,任务结束后,下一个任务马上执行。
- 如果上一个任务的执行时间小于等待时间,任务结束后,下一个任务在(间隔时间-执行时间)后开始执行。
- 问题:那
scheduleWithFixedDealy
呢?
- 如果上个任务的执行时间大于等待时间,任务结束后也会等待相应的时间才执行下一个任务
三、新的思考
- 问题:那阿里的规范写的有问题?
可再认真阅读此规范,微妙之处请读者老师自行品鉴,也可留言讨论。
问题:那是不是就可以放心大胆的霍霍了?
那肯定是不能随心所欲的。
- 问题:那还有什么注意事项?
还不累嘛?休息休息,日更该发稿了,要不审核就下班了。 - 问题:掘金审核还下班?
好问题,审核是什么机制呢,欢迎留言讨论,咱们下一篇再聊。
四、最后说一句
我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。