Java定时任务调度原理解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 随着互联网应用的快速普及,开发者们往往会遇到业务逻辑复杂、时间驱动类型业务、数据处理、离线分析等场景,比如整点发送优惠券、按月批量统计报表等,为了减少对核心系统的影响,我们通常会采用定时任务框架来处理。定时任务顾名思义就是预先设定任务执行时间,到点后任务自动被调度执行,下面列出几种常见的定时任务框架并简单介绍其实现原理。

1,定时任务管理简介

随着互联网应用的快速普及,开发者们往往会遇到业务逻辑复杂、时间驱动类型业务、数据处理、离线分析等场景,比如整点发送优惠券、按月批量统计报表等,为了减少对核心系统的影响,我们通常会采用定时任务框架来处理。定时任务顾名思义就是预先设定任务执行时间,到点后任务自动被调度执行,下面列出几种常见的定时任务框架并简单介绍其实现原理。

2 Java 原生定时任务调度器

2.1 Timer

2.1.1 简介

Timer是从Java SDK1.3开始提供的最原生定时任务执行解决方案,位于java.util包下,主要包括如下四类角色:

  • Timer:任务调度器
  • TimerThread:任务执行器
  • TimerTask:定时任务
  • TaskQueue:任务队列,队列中的任务按执行时间先后顺序排序,队首执行时间最靠前

2.1.2 关键源码解析

Timer的实现非常简单,翻看源代码我们来看下它的核心调度和执行处理过程:

privatevoidmainLoop() {
while (true) {
try {
TimerTasktask;
booleantaskFired;
synchronized(queue) {  // 同步锁住队列                 .....
task=queue.getMin();  // 取出队首任务 synchronized(task.lock)  { 
                    .....
if (taskFired= (executionTime<=currentTime)) {  // 如果达到执行时间设置可执行标志if (task.period==0) { 
queue.removeMin();   // 移除队列task.state=TimerTask.EXECUTED;   // 标记完成状态                        } else {
queue.rescheduleMin(
task.period<0?currentTime-task.period : executionTime+task.period);
                        }
                    }
                }
if (!taskFired) queue.wait(executionTime-currentTime); // 如果时间没达到,等待△time            }
if (taskFired)  // 如果可执行则runtask.run();
        } catch(InterruptedExceptione) {
        }
    }
}


借助同步悲观锁机制,整个执行器采用单线程无限遍历队列来实现,不断取出队首任务,判断是否达到执行时间,如果达到则执行,如果没有达到则等待直到达到执行时间。执行过程可简化为如下流程:

2.1.3 优缺点

优点:简单易用。

缺陷:不支持多线程;对系统时钟敏感;当前任务异常会终止队列中后续任务执行;不支持定时表达式。

2.2 ScheduledExecutorService

2.2.1 简介

我们知道Java1.5是Java历史版本上的一个重大转折点,天才并发大师Doug Lee为Java带来了完整的线程池

编程框架J.U.C,结束了Java只能手动创建线程的历史,使得多线程编程更加简单、安全和高效。我们先来看下J.U.C包下线程池的类图:

其中有两个可直接使用的实例化类,ThreadPoolExecutor和ScheduledThreadPoolExecutor,前者是最基础的通用线程池,使用者可以通过灵活的构造函数传参创建所需要的线程池。后者就是我们要介绍的定时任务管理器。ScheduledExecutorService是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说任务是并发执行,互不影响。需要注意的是只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是出于轮询任务的状态。

ScheduledExecutorService定时任务框架体系也拥有定时任务的4个主要角色:

  • ScheduledThreadPoolExecutor:调度器
  • ThreadPoolExecutor:执行器
  • ScheduledFutureTask:定时任务,记录执行时间和周期
  • DelayedWorkQueue:任务队列

2.2.2 关键源码解析

在深入了解Java并发包下的定时任务调度之前,强烈建议先认真阅读2.1中Timer的核心思路,因为ScheduledExecutorService实现定时任务调度的最本质思路和Timer基本如出一辙,最大的改变就是将Timer的单线程变成了多线程。

我们先来看下其实现类ScheduledThreadPoolExecutor的创建过程:

publicScheduledThreadPoolExecutor(intcorePoolSize,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
newDelayedWorkQueue(), threadFactory, handler);
}

可以看出它直接调用了通用父类线程池ThreadPoolExecutor来实例化自己,也就说任务的执行是通过ThreadPoolExecutor来实现,另外值得关注的是任务队列它采用了延迟队列DelayedWorkQueue,DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类。我们来看下DelayedWorkQueue对定时任务是如何管理的:

// 添加定时任务publicbooleanoffer(Runnablex) {
    ...
finalReentrantLocklock=this.lock;
lock.lock();
try {
inti=size;
if (i>=queue.length) grow();
size=i+1;
if (i==0) {  // 队列为空,则直接加到队首queue[0] =e;
setIndex(e, 0);
        } else {
siftUp(i, e);   // 队列不对空,调用siftUp方法决定x在队列中的顺序,shiftUp通过调用任务的compareTo来实现将定时执行时间最早的放在最前面        }
if (queue[0] ==e) {
leader=null;
available.signal();
        }
    } finally {
lock.unlock();
    }
returntrue;
}

上述的siftUp方法使得任务队列和Timer中的任务队列达到了同样的效果,定时执行时间最早的放在队列最前面,我们再来看下它是如何达成定时执行的。

在ScheduledThreadPoolExecutor中我们可以看到所有提交的定时任务最后都调用了ensurePrestart()方法,它是父类线程池的方法,调用了addWorker()方法,该方法是线程池对线程的核心管理方法,不过不是本文章的重点不细讲,ensurePrestart方法中调用addWorker方法传递的firstTask都是null,也就是说给线程池提交了一个空任务,那么线程池执行任务都需要从队列中拉取的任务,那我们来看下队列拿任务的take方法的实现:

publicRunnableScheduledFuture<?>take() throwsInterruptedException {
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?>first=queue[0]; // 直接获取队首任务if (first==null)
available.await();
else {
longdelay=first.getDelay(NANOSECONDS);  // 队首任务执行时间和当前时间的时间差if (delay<=0)
returnfinishPoll(first); // 小于0则直接弹出执行first=null; 
if (leader!=null)
available.await();
else {
ThreadthisThread=Thread.currentThread();
leader=thisThread;
try {
available.awaitNanos(delay);  // 否则等待delay时间                    } finally {
if (leader==thisThread)
leader=null;
                    }
                }
            }
        }
    } finally {
if (leader==null&&queue[0] !=null)
available.signal();
lock.unlock();
    }
}

到这里大家发现没有,任务定时执行的思路和Timer其实是一样的,通过判断队首的定时任务的执行时间是否达到,达到则弹出执行,否则等待阻塞。

2.2.3 优缺点

优点:支持多定时任务并发执行;支持延迟执行;当前任务异常不会终止队列中后续任务执行。

缺点:不支持定时表达式;不支持分布式。

3 第三方定时任务调度框架

定时任务处理方式的核心思路其实都差不多,而第2章节介绍的是Java对其最原始最底层的处理方式,因此通过剖析源码的方式较为详细的介绍。但是实际企业生产使用过程中,他们并没有那么方便,比如不支持定时表达式,不支持界面化操作,不支持自动告警,不支持定时时间实时修改等,从而产生了大量的企业级定时任务框架,比如经典的Quartz,开源的xxl-job,阿里巴巴的schduleX等,这些框架通常功能丰富,但是往往又很重,对于轻量级应用而言引用并不划算,感兴趣的同学可以查阅更多资料详细了解。下面将介绍一款Spring自实现的定时任务调度方案Spring Scheduler。

4 Spring定时任务调度器

4.1 简介

为了简化使用者对动态任务的调度,Spring自实现了一个轻量级任务调度管理器进行动态任务管理与调度,使用者只需要在配置类上加入@EnableSchedule注解即可开启对计划任务的支持,然后在要执行计划任务的方法上加上@Schedule即可。Spring通过接口TaskScheduler和TaskExecutor这两个接口的方式为异步定时任务提供了一种抽象,前者拥有任务调度能力,后者拥有任务执行能力。

4.2 关键源码解析

4.2.1 @EnableSchedule注解到底做了什么

查看这个注解的元注解可以看到@Configuration和@Import(SchedulingConfiguration.class),@Import是用来导入配置类的,查看SchedulingConfiguration.class发现它向Spring容器声明了一个Bean:ScheduledAnnotationBeanPostProceessor,看一下这个类的解释:

* <p>This post-processor is automatically registered by Spring's

* {@code <task:annotation-driven>} XML element, and also by the

* {@link EnableScheduling @EnableScheduling} annotation.

这个类在初始化时主要会做两件事情:

(1)初始化TaskScheduler

TaskScheduler是实际任务的调度器。ScheduledAnnotationBeanPostProceessor这个类实现了ApplicationContextAware接口,重写了onApplicationEvent方法,这个方法会调用finishRegistration()方法,finishRegistration()方法在最后通过一个register调用了afterPropertiesSet()方法,这里先不讨论register是做什么的,afterPropertiesSet()方法第一步就是判断调度器是否为空,显然如果我们只单纯的加了一个@EnableSchedule注解,调度器为空,那么这里就会创建调度器,看这里创建的思路:

if (this.taskScheduler==null) {
this.localExecutor=Executors.newSingleThreadScheduledExecutor();
this.taskScheduler=newConcurrentTaskScheduler(this.localExecutor);
}

注意了,这里首先创建的是一个单线程的任务执行器,然后把这个执行器传递给新建的调度器,这个时候该任务调度器拥有了单线程任务执行能力,这也是为什么如果你只是单纯的引入此注解,多任务在执行的时候会发生阻塞。

(2)寻找所有加了@Scheduled和@Schedules注解的方法

postProcessAfterInitialization()方法会在所有bean初始化完后了找到所有加了@Scheduled和@Schedules注解的方法,并解析定时表达式进行任务初始化准备工作。

4.2.2 如何支持任务的并发执行性

从上面的分析中可以看出,@EnableSchedule注解默认对任务的执行采用的是单线程的方式,即所有任务都必须等待当前任务执行完成后才可以继续调度执行,在多任务系统中默认实现方式显然不能满足要求。如果需要支持多任务的并发执行,Spring也为我们提供了实现方式,实现SchedulingConfigurer接口,为调度器TaskScheduler创建自定义的任务执行器,创建方式如下:

@Configuration@EnableSchedulingpublicclassTaskScheduleConfigimplementsSchedulingConfigurer {
privateThreadPoolTaskSchedulertaskScheduler;
@OverridepublicvoidconfigureTasks(ScheduledTaskRegistrartaskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
    }
@Bean(destroyMethod="shutdown")
publicThreadPoolTaskSchedulertaskScheduler(){
//创建一个线程池调度器taskScheduler=newThreadPoolTaskScheduler();
//设置线程池容量taskScheduler.setPoolSize(2);
//线程名前缀taskScheduler.setThreadNamePrefix("task-");
//等待时常taskScheduler.setAwaitTerminationSeconds(60);
//当调度器shutdown被调用时等待当前被调度的任务完成taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//设置当任务被取消的同时从当前调度器移除的策略taskScheduler.setRemoveOnCancelPolicy(true);
//设置任务注册器的调度器returntaskScheduler;
    }
}

4.3,Spring如何实现任务异步化

Spring的定时任务执行都是同步的,有时候我们会碰到单任务执行时间很长的问题,需要将任务的执行异步化,可以将@Async注解和@Scheduled注解联合起来使用。但是这里会有一个问题,当任务被异步化的时候他会直接告诉任务调度器当前任务已经执行完了,因此下一次任务会根据定时时间准点执行,有可能实际上上一次任务并没有执行完。在某些场景下会导致数据异常,比如:

@Async@Scheduled(fixedRate=10*1000L)
publicvoidtaskMonitor() {
List<Task>taskList=taskBO.findUnfinishedTaskList();
for (Tasktask : taskList) {
executeInLock("monitor_"+task.getId(), () -> {
List<Subtask>subtaskList=taskBO.findUnfinishedSubtaskListByTaskId(task.getId());
if (subtaskList.isEmpty()) {
taskBO.completeTask(task);
            } else {
//未完成的subtask,true表示任务执行失败,记录错误log、发通知if (checkUnfinishSubtaskList(subtaskList)) {
logger.error("task execute fail:"+JSON.toJSONString(task));
taskBO.failTask(task, subtaskList);
sendDingtalk("任务执行超时,请关注:"+JSON.toJSONString(task));
                }
            }
returntrue;
        }, 8L, false);
    }
}

如果taskMonitor第一次没有执行完,针对某个taskA,正在执行completeTask方法,completeTask里面有insert DB和更新taskA状态的操作,但是数据还没有插入到DB,然后锁超时释放,这个时候taskMonitor又开始调度执行了,而taskA又被查询出来,再次执行completeTask操作,这样就会有重复数据插入到DB,因此尽量避免两个注解同时使用。如果确实需要将定时任务异步化,可以把锁的超时释放时间设置长一些,保证任务执行完在释放。

相关文章
|
23天前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
72 13
|
10天前
|
人工智能 自然语言处理 Java
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
67 9
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
|
17天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
9天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
50 14
|
15天前
|
Java 数据库连接 Spring
反射-----浅解析(Java)
在java中,我们可以通过反射机制,知道任何一个类的成员变量(成员属性)和成员方法,也可以堆任何一个对象,调用这个对象的任何属性和方法,更进一步我们还可以修改部分信息和。
|
17天前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
28 3
|
17天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
46 2
|
17天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
74 1
|
1月前
|
Java 编译器
Java 泛型详细解析
本文将带你详细解析 Java 泛型,了解泛型的原理、常见的使用方法以及泛型的局限性,让你对泛型有更深入的了解。
54 2
Java 泛型详细解析
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
60 12

推荐镜像

更多
下一篇
开通oss服务