目录

一.Quartz理论基础

(一)Timer

二.线程池

(一)ScheduledThreadPoolExcutor

(二)SingleThreadScheduledExecutor

二.quartz使用

(一) Job:封装成JobDetail设置属性

(二)Trigger:触发器:指定执行时间,开始结束时间等

(三)Scheduler:调度器,基于trigger的设定执行job

(四)JobDetailMap:保存任务实例的状态信息

三.Quartz集群原理

注意问题

四.Quartz实现异步通知

1. 一个Job绑定多个触发器

五.创建一个任务

(一)JobDetailFactoryBean或者CronTriggerFactoryBean

(二)使用builder创建Job或者Trigger

六.监听机制

1.JobListener

2.TriggerListener

3.SchedulerListener

七.基础代码实现

八.数据库各表及其作用


一.Quartz理论基础

(一)Timer

1.Quartz中的TaskQueue就是小顶堆的结构,用来存放timeTask

2.TimerThread:任务执行线程,死循环不断检查是否有有任务开始执行了,有就还是在这个线程执行它

3.单线程执行任务,有任务肯互相阻塞

schedule:执行任务超时,会导致后面的任务往后推移,预想在这个间隔内存存在的任务执行就不会执行了,就没有了

scheduleAtFixedRate:任务超时可能导致下一个任务马上开始执行

4.运行时异常会导致timer线程终止

5.任务调度是基于时间的,对系统时间敏感

二.线程池

(一)ScheduledThreadPoolExcutor

1.使用多线程执行任务,不会互相阻塞

2.如果线程失活,会新建线程执行任务

线程抛异常,任务会被丢弃,需要做捕获处理

3.DelayedWorkQueue:小顶堆,无界队列

在定时线程池当中,最大线程数是没有意义的

执行时间距离当前时间越近的任务在队列的前面

用于添加ScheduleFutureTask(继承于FutureTask,实现RunnableScheduledFuture接口)【提供异步执行的能力,并且可以返回执行】

线程池中的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务

实现了Delayed接口,可以通过getDelay方法来获取延迟时间

4.Leader-Follower模式:避免没必要的唤醒和阻塞操作,这样会更加有效,且节省资源

5.应用场景:适用于多个后台线程执行周期性任务,同时为了满足资源管理的需求二需要限制后台线程数量

(二)SingleThreadScheduledExecutor

1.单线程的ScheduleThreadPoolExector

2.应用场景:适用于需要单个后台线程执行周期任务,同时需要保证任务顺序执行

二.quartz使用

(一) Job:封装成JobDetail设置属性

@DisallowConcurrentExecution//禁止并发的执行同一个job的定义的多个实例(jobDetial定义的)@PersistJobDataAfterExecution//持久化JobDetail中的JobDetailMap(对Trigger中的dataMap无效)//如果一个任务不是持久化的,则当没有触发器关联他的时候,Quartz会从scheduler中删除掉他
boolean recovering = jobExecutionContext.isRecovering();//如果一个任务请求恢复,一般是该任务执行期间发生了系统崩溃或其他关闭进程的操作//当服务再次启动的时候,会再次执行该任务,此时jobExecutionContext.isRecovering()会返回true
@DisallowConcurrentExecution@PersistJobDataAfterExecutionpublic class TestJob implements Job {@Overridepublic void execute(JobExecutionContext jobExecutionContext)throws JobExecutionException {}}

(二)Trigger:触发器:指定执行时间,开始结束时间等

1.优先级:同时触发的Trigger之间才会比较优先级

如果trigger是可恢复的,再恢复后再调度时,优先级不变

2.错过触发:

判断misfire的条件:

(1)job到达触发时间没有被执行

(2)被执行的延迟时间超过Quartz配置的misfireThreshold阈值

可能原因:

1.当job达到触发时间时,所有的线程都被其他job占用,没有可用线程

2.在job需要触发的时间点,schedule停止了(可能是意外停止的)

3.job使用了@DisallowConcurrentExecution注解,job不能并发执行,当达到下一个job执行点的时候,上一个任务还没没有完成

4.job指定了过去开始的执行时间,例如当前时间是8:00:00指定开始的时间为7:00:00

策略:

1.默认使用 MISFIRE_INSTRUCTION_SMART_POLICY策略

2.SimpleTrigger:

1)now*相关的策略,会立即执行第一个misfire任务,同时修改startTime和repeatCount, 一次会重新计算finalFireTime,原计划时间会被打乱

2)next*相关的策略,不会立即执行misfire的任务,也不会修改startTime和repeatCount, 因此finalFireTime也没有发生改变,发生了misfire也是按照原计划进行执行

3.CronTrigger:

1)MISFIRE_INSTRUCTION_SMART_POLICY: Quartz不会判断发生了misfire,立即执行所有发生了misfire的任务,然后按照原计划进行执行。 例如:10:15分立即执行9:00和10:O0的任务,然后等待下一个任务在11:00执行,后续按照原计划执行。 2)MISFIRE_INSTRUCTION_FIRE_ONCE_NOW: 立即执行第一个发生misfire的任务,忽略其他发生misfire的任务,然后按照原计划继续执行。 例如:在10:15立即执行9:00任务,忽略10:00任务,然后等待下一个任务在11:00执行,后续按照原计划执行。 3)MISFIRE_INSTRUCTION_DO_NOTHING: 所有发生的misfire的任务都被忽略,只是按照原计划继续执行

calender: 用于排除时间段

SimpleTrigger:具体时间点,指定间隔重复执行

CronTrigger:cron表达式

(三)Scheduler:调度器,基于trigger的设定执行job

1.SchedulerFactory:

(1)创建Scheduler

(2)DirectSchedulerFactory:在代码里定制Schedule参数

(3)StdSchedulerFactory:读取classpath下的quartz.properties配置来实例化Scheduler

2.JobStore:

存储运行时信息的,包括Trigger,Scheduler,JobDetail,业务锁等

RAMJobStore(内存实现)

JobStoreTX(JDBC,事务由Quartz管理)

JobStoreCMT(JDBC,使用容器事务)

ClusteredJobStore(集群实现)

TerracottaJobStore(Terracotta中间件)

3.ThreadPool:SimpleThreadPool 、自定义线程池

(四)JobDetailMap:保存任务实例的状态信息

1.jobDetail:

默认只在Job被添加到调度程序(任务执行计划表)schedule的时候,存储一次关于该任务的状态信息数据,可以使用注解@PersistJobDataAfterExecution标明在一个任务执行完毕后就存储一次

2.trigger

任务被多个触发器引用的时候,根据不同的触发时机,可以输入不同的输入条件

所以,JobDataMap在Job和Trigger存储的数据并不一致。

三.Quartz集群原理

Quartz急群众每个节点是一个独立的Quartz任务应用,它又管理者其他节点。该集群需要分别对每个节点分别启动或停止,独立的Quartz节点并不与另一个节点或是管理节点通信。Quartz应用是通过共有相同数据库表来感知到另一应用。也就是说只有使用持久化JobStore存储Job和Trigger才能完成Quartz集群。

Quartz的集群部署方案是分布式的,没有负责集中管理的节点,而是利用数据库杭锁的方式来实现集群环境下的并发控制。

一个scheduler实例在集群模式下首先获取{0}LOCKS表中的行锁;

Quartz主要由两个行级锁。

lock_namedesc
STATE_ACCESS状态访问锁
TRIGGER_ACCESS触发器访问锁

Quartz集群争用触发器行锁,锁被占用只能等待,获取触发器行锁之后,先获取需要等待触发的其他触发器信息。数据库更新触发器状态信息,及时是否触发器行锁,供其他调度实例获取,然后在进行触发器任务调度操作,对数据库操作就要先获取行锁。

同一集群下,instanceName必须相同,instanceId可自动生成,isClustered为true,持久化存储,指定数据库类型对应的驱动类和数据源连接。

注意问题

1. 时间同步问题

Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。

节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。

2. 节点争抢Job问题

因为Quartz使用了一个随机的负载均衡算法,Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。

四.Quartz实现异步通知

1. 一个Job绑定多个触发器

一个Trigger只能绑定一个Job,但是一个Job可以绑定多个Trigger。

那么,若是我们将一个Job上绑定多个触发器,且每个触发器只是触发一次的话,那么,实际上可以实现阶梯式的异步通知,下面的基础代码实现中监听器StartApplicationListener就使用了异步输出。

五.创建一个任务

(一)JobDetailFactoryBean或者CronTriggerFactoryBean

JobDetailFactoryBean或者CronTriggerFactoryBean都实现了FactoryBean接口和InitializingBean接口。虽然我们new JobDetailFactoryBean(),但是实际上是将JobDetail交由的IOC管理。而InitializingBean接口会在属性装载完毕之后,自动的回调afterPropertiesSet()方法,完成Bean对象的最终构建:

@Beanpublic JobDetailFactoryBean jobDetail(){//查询数据库或者配置文件JobDetailFactoryBean jobDetailFactoryBean=new JobDetailFactoryBean();jobDetailFactoryBean.setName("");jobDetailFactoryBean.setBeanName("");jobDetailFactoryBean.setJobClass((Class

或者最后直接进行

 JobDetail jobDetail= jobFactory.getObject();

(二)使用builder创建Job或者Trigger

// 定义任务调度实例, 并与TestJob绑定JobDetail job = JobBuilder.newJob(TestJob.class).withIdentity("testJob", "testJobGroup")//调度器的名字和组别.usingJobData("job","jobDetail")//给调度器当中定义key并赋值,相当于参数,之后可以在job中取到.storeDurably()//表示当没有触发器与之关联时,仍然将job继续保存在Scheduler中.build();// 定义触发器, 会马上执行一次, 接着5秒执行一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("testTrigger", "testTriggerGroup")//触发器的名字和分组.startNow()//定义好后马上就启动.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5))//按照一定的策略触发.usingJobData("job","jobDedfafafafafdsfal")//给触发器当中定义key并赋值,相当于参数,之后可以在job中取到//.repeatForever()表示一直执行.forJob(jobKey.getName(), jobKey.getGroup()) //制定Trigger和Job的关联关系.usingJobData(jobDataMap)//具体执行的方法中可以拿到这个传进去的信息。.build();

六.监听机制

Quartz的监听器用于当任务调度中关注的事件发生时,能够及时获取这一事件的通知。

  1. Quartz监听的种类:
  • JobListener:任务监听;
  • TriggerListener:触发器监听;
  • SchedulerListener:调度器监听;
  1. 监听器的作用域
  • 全局监听器:能够接收到所有的Job/Trigger的事件通知;
  • 局部监听器:只能接收在其上注册Job或者Trigger的事件;

1.JobListener

public interface JobListener {//获取该JobListener的名称String getName();//Scheduler在JobDetail将要被执行时调用该方法void jobToBeExecuted(JobExecutionContext context);//Scheduler在JobDetail将要被执行时,但又被TriggerListener否决调用void jobExecutionVetoed(JobExecutionContext context);//任务执行完毕调用该方法void jobWasExecuted(JobExecutionContext context,JobExecutionException jobException);}

将JobListener绑定到Scheduler中:

//监听所有的Jobscheduler.getListenerManager().addJobListener(new SimpleJobListener(), EverythingMatcher.allJobs());//监听特定的Jobscheduler.getListenerManager().addJobListener(new SimpleJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("HelloWorld1_Job", "HelloWorld1_Group")));//监听同一任务组的Jobscheduler.getListenerManager().addJobListener(new SimpleJobListener(), GroupMatcher.jobGroupEquals("HelloWorld2_Group"));//监听两个任务组的Jobscheduler.getListenerManager().addJobListener(new SimpleJobListener(), OrMatcher.or(GroupMatcher.jobGroupEquals("HelloWorld1_Group"), GroupMatcher.jobGroupEquals("HelloWorld2_Group")));

2.TriggerListener

触发器监听,即在任务调度过程中,与触发器Trigger相关的事件:触发器触发、触发器未正常触发、触发器触发完成等。

public interface TriggerListener {//获取触发器的名字public String getName();//Job的execute()方法被调用时调用该方法。public void triggerFired(Trigger trigger, JobExecutionContext context);//Trigger触发后,TriggerListener给了一个选择否定Job的执行。假如该方法返回true,该Job将不会被触发public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context);//Trigger错过触发时间触发该方法,此方法不应该含有长时间的处理逻辑。public void triggerMisfired(Trigger trigger);//Trigger被触发并且完成Job后触发。public void triggerComplete(Trigger trigger, JobExecutionContext context,int triggerInstructionCode);}

将TriggerListener绑定到Scheduler中:

//监听所有的Triggerscheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), EverythingMatcher.allTriggers());//监听特定的Triggerscheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), KeyMatcher.keyEquals(TriggerKey.triggerKey("HelloWord1_Job", "HelloWorld1_Group")));//监听一组Triggerscheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), GroupMatcher.groupEquals("HelloWorld1_Group"));//移除监听器scheduler.getListenerManager().removeTriggerListener("SimpleTrigger");

3.SchedulerListener

SchedulerListener会在Scheduler的生命周期关键事件发生时调用。与Scheduler有关的事件包括:增加一个job/trigger,删除一个job/Trigger,scheduler发生严重错误,关闭Scheduler等。

public interface SchedulerListener { //用于部署JobDetail时调用public void jobScheduled(Trigger trigger);//用于卸载JobDetail时调用public void jobUnscheduled(String triggerName, String triggerGroup);//当一个Trigger没有触发次数时调用。public void triggerFinalized(Trigger trigger);public void triggersPaused(String triggerName, String triggerGroup);public void triggersResumed(String triggerName, String triggerGroup);//当一个或一组 JobDetail 暂停时调用这个方法。public void jobsPaused(String jobName, String jobGroup);//当一个或一组 Job 从暂停上恢复时调用这个方法。假如是一个 Job 组,jobName 参数将为 null。public void jobsResumed(String jobName, String jobGroup);//在 Scheduler 的正常运行期间产生一个严重错误时调用这个方法。public void schedulerError(String msg, SchedulerException cause);//当Scheduler 开启时,调用该方法public void schedulerStarted();//当Scheduler处于StandBy模式时,调用该方法public void schedulerInStandbyMode();//当Scheduler停止时,调用该方法public void schedulerShutdown();//当Scheduler中的数据被清除时,调用该方法。public void schedulingDataCleared();}

将SchedulerListener绑定到Scheduler中:

//创建监听scheduler.getListenerManager().addSchedulerListener(new SimpleSchedulerListener());//移除监听scheduler.getListenerManager().removeSchedulerListener(new SimpleSchedulerListener());

七.基础代码实现

Start.java

@SpringBootApplicationpublic class Start {public static void main(String[] args) {SpringApplication.run(Start.class,args);}}
QuartzConfiguration.java
@Configurationpublic class QuartzConfiguration {@Autowiredprivate DataSource dataSource;@Beanpublic Scheduler scheduler() throws IOException {return schedulerFactoryBean().getScheduler();}@Beanpublic Properties quartzProperties() throws IOException {PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();propertiesFactoryBean.setLocation(new ClassPathResource("/application.yml"));propertiesFactoryBean.afterPropertiesSet();return propertiesFactoryBean.getObject();}@Beanpublic Executor schedulerThreadPool(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());return executor;}@Beanpublic SchedulerFactoryBean schedulerFactoryBean() throws IOException {SchedulerFactoryBean factory = new SchedulerFactoryBean();factory.setSchedulerName("cluster_scheduler");factory.setDataSource(dataSource);factory.setApplicationContextSchedulerContextKey("application");factory.setQuartzProperties(quartzProperties());factory.setTaskExecutor(schedulerThreadPool());factory.setStartupDelay(10);return factory;}}
StartApplicationListener.java
@Component//@Controller也可以public class StartApplicationListener implements ApplicationListener {@Autowiredprivate Scheduler scheduler;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {try {TriggerKey triggerKey1 = TriggerKey.triggerKey("trigger1","group1");Trigger trigger1 = scheduler.getTrigger(triggerKey1);if (trigger1 ==null){trigger1 = TriggerBuilder.newTrigger().withIdentity(triggerKey1).withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))//cron表达式,每个10秒执行一次.build();JobDetail jobDetail = JobBuilder.newJob(TestJob.class).withIdentity("job1","group1").build();scheduler.scheduleJob(jobDetail,trigger1);}TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2","group2");Trigger trigger2 = scheduler.getTrigger(triggerKey2);if (trigger2 ==null) {trigger2 = TriggerBuilder.newTrigger().withIdentity(triggerKey2).withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))//cron表达式,每个10秒执行一次.build();JobDetail jobDetail = JobBuilder.newJob(TestJob.class).withIdentity("job2", "group2").build();scheduler.scheduleJob(jobDetail, trigger2);}scheduler.start();} catch (SchedulerException e) {e.printStackTrace();}}}

从Start.java程序入口进入程序,俩个trigger一起异步输出如下

//间隔3s输出一次

开启时间 :2022-10-28 16:50:27
开启时间 :2022-10-28 16:50:27
开启时间 :2022-10-28 16:50:30
开启时间 :2022-10-28 16:50:30
开启时间 :2022-10-28 16:50:33
开启时间 :2022-10-28 16:50:33
开启时间 :2022-10-28 16:50:36
开启时间 :2022-10-28 16:50:36

进程已结束,退出代码为 -1

八.数据库各表及其作用

1.qrtz_job_details:记录每个任务的详细信息。2.qrtz_triggers:记录每个触发器的详细信息。3.qrtz_corn_triggers:记录cornTrigger的信息。4.qrtz_scheduler_state:记录 调度器(每个机器节点)的生命状态。5.qrtz_fired_triggers:记录每个正在执行的触发器。6. qrtz_simple_triggers:存储简单的trigger,包括重复次数,间隔,以及触发次数。7. qrtz_simprop_triggers:存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型的触发器,8. qrtz_blob_triggers:自定义的triggers使用blog类型进行存储,非自定义的triggers不会存放在此表中,Quartz提供的triggers包括:CronTrigger,CalendarIntervalTrigger,DailyTimeIntervalTrigger以及SimpleTrigger。9. qrtz_calendars:以 Blob 类型存储 Quartz 的 Calendar 信息10. qrtz_paused_trigger_grps:存储已暂停的 Trigger 组的信息11.qrtz_locks:记录程序的悲观锁(防止多个节点同时执行同一个定时任务)。

本笔记基于图灵的课堂和https://www.jianshu.com/p/ece144448134这位大佬的笔记