Quartz 动态调度 Job

  1. 1. 1. 找到调度类
  2. 2. 2. 扩展方式选择
  3. 3. 3. Job 和 Trigger 的理解
  4. 4. 4. Pause 动作
    1. 4.1. 4.1 Pause Trigger 暂停 Trigger
    2. 4.2. 4.2 Pause Job 暂停 Job
  5. 5. 5. 创建新的 Job
  6. 6. 6. Trigger 操作
    1. 6.1. 6.1 Trigger 新增
    2. 6.2. 6.2 Trigger Cron 修改
  7. 7. 7. 重新加载 Job 和 Trigger
    1. 7.1. 7.0 配置 Quartz 不覆盖已经存在的 Job
    2. 7.2. 7.1 reload jobs
  8. 8. 8. 总结

最近在开发 Quartz Job 任务时有一种很迫切的需求,希望能动态调整生产的 Job 的运行,开启或停止 Job、调整 Job 的运行频率。翻了一下源码,基于 Quartz 的 Scheduler 扩展一个。

1. 找到调度类

从 QuartzAutoConfiguration 配置类中可以看到,该类注册了 SchedulerFactoryBean 类型的 Bean。SchedulerFactoryBean 类是个 FactoryBean,在 Spring 中专们用于产生 Bean 的。SchedulerFactoryBean 产生的就是 Scheduler Bean。

Scheduler 的注释可以看出,Quartz 中使用 Scheduler 接口的实现类作为 job 和 trigger 的管理、调度和执行的管理器。

1
2
3
4
5
6
7
* This is the main interface of a Quartz Scheduler.
*
* <p>
* A <code>Scheduler</code> maintains a registry of <code>{@link org.quartz.JobDetail}</code>s
* and <code>{@link Trigger}</code>s. Once registered, the <code>Scheduler</code>
* is responsible for executing <code>Job</code> s when their associated
* <code>Trigger</code> s fire (when their scheduled time arrives).

2. 扩展方式选择

找到了实现功能的类,接下来应该思考用何种方式扩展。通常有:

  1. 继承该类,实现自己需要的功能
  2. 包装一层,组合原有功能实现需要功能

在当前场景下,继承 Scheduler 的子类是非常麻烦的。不光是要考虑如何使用 Scheduler 的扩展点,还要实现 QuartzAutoConfiguration 的部分逻辑,增加了复杂度,还不容易跟随 Quartz 后继的升级。

包装一层除了没有以上的问题外还可以隔离业务层和底层组件,业务层不需要知道低层使用的是什么方式取到这些信息,将来优化调整也容易。

Scheduler 对象由 SchedulerFactoryBean 的 bean 产生,自动注册到了 Spring Context 中。所以我们的包装类直接注入该类既可使用,以下是通过构造器注入的 JobScheduler,该类是我们在业务层使用的 Service。

1
2
3
4
5
6
7
8
9
@Service
@Slf4j
public class JobScheduler {
private final Scheduler scheduler;

public JobScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}
}

3. Job 和 Trigger 的理解

Quartz 中的 Job 和 Trigger 的作用和字面含意一样:

  1. Job 表示的是要执行的作业任务,Trigger 表示的是任务触发的时间
  2. 一个 Job 可以有多个 Trigger

当操作暂停任务的执行时要确定任务操作的对象,暂停 Job 相当于暂停 Job 下所有的 Trigger。如果 Job 只有一个 Trigger,暂停 Job 相当于操作 Trigger。如果 Job 下有多个 Trigger,暂停一个 Trigger,Job 还会执行。

所以在操作调度的时候,要确定是操作的目标,以免生产产生出乎意料的现象。

4. Pause 动作

4.1 Pause Trigger 暂停 Trigger

一个 Job 可能会有几个执行时间的要求,所以通常配置了多个 Trigger,暂停 Trigger 只会影响 Job 在一个执行时间的配置。

Scheduler 提供了 pauseTrigger 的接口,传入 TriggerKey 就可以暂停:

1
2
3
4
5
6
7
/**
* Pause the <code>{@link Trigger}</code> with the given key.
*
* @see #resumeTrigger(TriggerKey)
*/
void pauseTrigger(TriggerKey triggerKey)
throws SchedulerException;

TriggerKey 通常用 TriggerKey.triggerKey(name, group) 来构造,我们应避免手动 new 对象。实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 暂停 trigger
*
* @param name trigger name
* @param group trigger group
* @return true 成功
*/
public boolean pauseTrigger(String name, String group) {
try {
scheduler.pauseTrigger(TriggerKey.triggerKey(name, group));
return true;
} catch (SchedulerException e) {
log.warn("Pause trigger error", e);
}

return false;
}

4.2 Pause Job 暂停 Job

Job 的暂停也直接使用 Scheduler 的接口,这里就不再赘述。

5. 创建新的 Job

Quartz 中创建新的 Job 调用 scheduleJob 接口实现:

1
Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException;

该接口通过传入构造的 JobDetail 和 Trigger 注册 Job,返回值是第一次执行时间。

JobDetail 对象我们可以用 JobBuilder 来构造:

1
2
3
4
5
6
public static JobDetail createJobDetail(String name, String group, Class<?> jobClass) {
return JobBuilder.newJob((Class<? extends Job>) jobClass)
.storeDurably()
.withIdentity(name, group)
.build();
}

storeDurably 表示当 Job 没有 Trigger 时依然保留。

Trigger 同样也有构造器 TriggerBuilder:

1
2
3
4
5
6
7
8
9
10
11
12
public static Trigger createTrigger(String name, String group, JobKey jobKey, String cronExp, Map<String, Object> jobData) {
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger()
.withIdentity(name, group)
.forJob(jobKey)
.startNow();

if (jobData != null)
triggerBuilder.usingJobData(new JobDataMap(jobData));

return triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cronExp))
.build();
}
  1. startNow 表示 trigger 一创建就立即启用
  2. jobData 表示 trigger 启动 Job 的时候传入特定数据(这个数据我们可以做成接口,运行时调整 trigger 的 job data)
  3. withSchedule(CronScheduleBuilder.cronSchedule(cronExp)) 表示我们调度时间配置方式使用 cron(建议使用 cron,方便、强大)

创建好以上两个对象后,直接传到 scheduler 对象中就可以产生新 Job。

6. Trigger 操作

6.1 Trigger 新增

Scheduler 类中还有以下方法:

1
Date scheduleJob(Trigger trigger) throws SchedulerException;

scheduleJob 方法单传入 trigger 就可以给 Job 增加一个执行时间配置。

6.2 Trigger Cron 修改

Trigger 的修改的场景大多是修改 cron,修改 trigger 用到的方法是 rescheduleJob

1
Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) throws SchedulerException;

7. 重新加载 Job 和 Trigger

当设置 Quartz 启动的时候不自动刷新 Job 信息,动态调度的配置就不被代码里配置的 Job 影响,哪怕代码里修改了 Job 的执行时间。

7.0 配置 Quartz 不覆盖已经存在的 Job

1
2
3
4
5
6
7
8
@Bean
public SchedulerFactoryBeanCustomizer schedulerFactoryBeanCustomizer() {
return schedulerFactoryBean -> {
schedulerFactoryBean.setOverwriteExistingJobs(false);
schedulerFactoryBean.setExposeSchedulerInRepository(true);
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(true);
};
}

如果配置了以上的 setOverwriteExistingJobs 配置,那在代码修改后应该执行 reload 操作以实现 Job 重新加载。

7.1 reload jobs

首先要清除所有 Job

1
scheduler.deleteJobs(Lists.newArrayList(scheduler.getJobKeys(GroupMatcher.anyJobGroup())));

重新初始化

1
schedulerAccessorBean.afterPropertiesSet();

SchedulerAccessorBean 类的作用:

1
2
* Spring bean-style class for accessing a Quartz Scheduler, i.e. for registering jobs,
* triggers and listeners on a given {@link org.quartz.Scheduler} instance.

简单的说就是操作 Scheduler 对象的方法,SchedulerFactoryBean 的 afterPropertiesSet 调用了父类的 registerListeners()registerJobsAndTriggers() 方法来注册和加载 Job 和 Trigger,SchedulerAccessorBean 类同样调是相同方法。

因此,通过该类我们可以实现重新初始化 Job。当然,SchedulerAccessorBean 默认是没有注册到 Spring Context 的,需要我们自己配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public SchedulerAccessorBean schedulerAccessorBean() {
val schedulerAccessorBean = new SchedulerAccessorBean();

schedulerAccessorBean.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());

applicationContext.getBeansWithAnnotation(ScheduledJob.class);

if (this.jobDetails.getIfAvailable() != null)
schedulerAccessorBean.setJobDetails(this.jobDetails.getIfAvailable());

if (this.calendars.getIfAvailable() != null)
schedulerAccessorBean.setCalendars(this.calendars.getIfAvailable());

if (this.triggers.getIfAvailable() != null)
schedulerAccessorBean.setTriggers(this.triggers.getIfAvailable());

return schedulerAccessorBean;
}
  1. ScheduledJob 注解是我加的,用于通过注解自动发现并注册 Job。applicationContext.getBeansWithAnnotation 在这里的作用是让 Spring 容器先处理 ScheduledJob 的注册,因为下面我要用注册产生的 Job 和 Trigger 信息。

  2. this.jobDetails 等几个变量是 ObjectProvider 对象(Spring 用于包装注入对象,在代码中根据需要注入对象)

8. 总结

通过代码,我了解了 Quartz 文档通常不会说明的内容,同时对 Quartz 内部实现机制有了更深入的了解。