Quartz是一款性能强大的定时任务调度器。开发人员可以使用Quartz让任务在特定时间特定阶段进行运行。比如对特定类型新闻或股指期货指数等内容的爬取,可以编写爬虫程序然后使用Quartz在后台指定特定时间点对任务进行执行,来自动收集信息。大型系统间数据的按时批量导入任务也可由Quartz进行调度。Quartz提供两种类型的任务触发方式,一种是按指定时间间隔触发任务,另一种是按指定日历时间触发任务。下面将对Quartz进行详细介绍。
一、Hello Quartz
下面首先实现一个简单实例。
新建maven项目,在pom.xml导入Quartz的jar包:org.quartz-scheduler quartz 2.2.1
定义HelloJob类,实现Job接口并定义具体的任务逻辑。
public class HelloJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("hello"); }}
实例化Scheduler、Triggle和Job对象,并执行定时任务。
public class QuartzConsole { public static void main(String[] args) throws SchedulerException { SchedulerFactory factory=new org.quartz.impl.StdSchedulerFactory(); Scheduler scheduler=factory.getScheduler();//通过SchedulerFactory实例化Scheduler对象 scheduler.start(); JobDetail job=newJob(HelloJob.class)//指定Job的运行类 .withIdentity("myJob","group1") .build();// name "myJob", group "group1"两个变量作为一个job的key Trigger trigger=newTrigger().withIdentity("myTrigger","group1")// name "myTrigger", group "group1" 两个变量作为一个trigger的key .startNow() .withSchedule(simpleSchedule() .withIntervalInSeconds(5) .repeatForever())//定义任务触发方式,每5秒执行一次,一直重复。 .build(); scheduler.scheduleJob(job,trigger); }}
运行程序每5秒执行一次Job。
Quartz通过Job、Triggle和Schedule实现任务的调度。三者关系如图所示。Job定义:开发者实现Job接口,重写execute()方法定义具体Job实现。JobDetail接口定义一个job的相关配置细节。通过JobBuilder构建一个实现JobDetail接口的JobDetailImpl类,传入Scheduler对象。
**Triggle定义:**Triggle有两种触发器实现,SimpleTriggle按指定时间间隔进行触发,CronTriggle按指定日历时间进行触发。Triggle接口同Job类似定义了触发器的具体配置细节,由TriggleBuilder构建触发器实例。 **Scheduler定义:**Scheduler调度器由SchedulerFactory产生,start()方法定义schedule的执行,将实例化的Job和Triggle对象作为scheduleJob()的入参,由该方法执行具体任务的触发执行。二、SimpleTriggle和CronTriggle触发器。
SimTriggle触发器可以指定某一个任务在一个特定时刻执行一次,或者在某一时刻开始执行然后重复若干次。
SimpleTriggle的代码实现如下。public class SimpleTriggerJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("Hello SimpleTriggerJob -- "+df.format(new Date())); }}
//SimpleTrigger触发器按指定时间间隔调度任务public class SimpleTriggerTester { public static void main(String[] args) throws SchedulerException { JobDetail jobDetail=newJob(SimpleTriggerJob.class) .withIdentity("simpleTriggerJob","group1") .build(); //未来5秒后执行一次// SimpleTrigger trigger=(SimpleTrigger) newTrigger()// .withIdentity("simpleTrigger","group1")// .startAt(futureDate(5, DateBuilder.IntervalUnit.SECOND))// .forJob("simpleTriggerJob","group1")// .build(); //从特定时间开始,然后每隔十秒执行1次,重复3次:// SimpleTrigger trigger=(SimpleTrigger) newTrigger()// .withIdentity("simpleTrigger","group1")// .startAt(dateOf(18,11,40))// .withSchedule(simpleSchedule()// .withIntervalInSeconds(10)// .withRepeatCount(3))// .forJob("simpleTriggerJob","group1")// .build(); //在每分钟开始时执行,每分钟执行一次,永远重复,直到指定时间点停止 SimpleTrigger trigger=(SimpleTrigger) newTrigger() .withIdentity("simpleTrigger","group1") .startAt(evenMinuteDate(null)) .withSchedule(simpleSchedule() .withIntervalInMinutes(1) .repeatForever()) .endAt(dateOf(18,22,0)) .forJob("simpleTriggerJob","group1") .build(); SchedulerFactory factory=new StdSchedulerFactory(); Scheduler scheduler=factory.getScheduler(); scheduler.start(); scheduler.scheduleJob(jobDetail,trigger); }}
CronTriggle触发器作用范围更广,它是基于日历的概念而不是像SimpleTriggle触发器基于较短的一段特定时间间隔。
例如:可以使用CronTriggle触发器,指定任务在每个周五晚上7点执行一次;在每个月的倒数第二天早上7点执行三次;按照时区的变换对任务运行进行动态调整。 通过向cronSchedule()构造方法传递特定格式字符串配置任务的执行。 字符串格式如“Seconds Minutes Hours Day-of-Month Month Day-of-Week Year” 例如: “0 30 10-12 ? * WED,FRI”表示每周三和周五的10:30,11:30,12:30各执行一次 “0 0/30 8-9 5,20 * ?”表示每个月第五天和第二十天的8点、9点每半个小时执行一次。 取值范围: Seconds:0-60 Minutes :0-60 Hours:0-23 Day-of-Month:1-31 Month:1-12 Day-of-Week:1-7或SUN, MON, TUE, WED, THU, FRI 和SAT. “-”可代表从A到B时间段 “/”代表一个递增时间,A/B指在当前的时间域,从A开始每B个当前时间单位执行一次,等价于在该时间域的第A,A+B,A+2B…时刻各触发任务一次。 “?”用于day-of-month和day-of-week时间域,表示没有特别的设置。 “L”用于day-of-month和day-of-week时间域,指定每个月或每周的倒数第n天。day-of-month的“6L”或者“FRIL”代表每个月的最后一个周五。“L-3”代表从每个月的第三天到最后一天。 “A#B”在day-of-week时间域代表每个月的第B周的星期A。 CronTriggle的代码实现如下。 “*”在时间域上代表“每个”或者无限重复的意思。 CronTrigger实例代码如下://CronTrigger按指定日历时间运行job// cronSchedule("_ _ _ _ _ _") 方法的6个参数代表的含义// Seconds// Minutes// Hours// Day-of-Month// Month// Day-of-Week// Year (optional field)// 具体参数设置见http://www.quartz-scheduler.org/documentation/quartz-2.2.x/tutorials/tutorial-lesson-06.htmlpublic class CronTriggerTester { public static void main(String[] args) throws SchedulerException { JobDetail jobDetail=newJob(CronTriggerJob.class) .withIdentity("CronTriggerJob","group1") .build(); //每天的指定小时分钟点执行一次// Trigger trigger=newTrigger()// .withIdentity("CronTrigger","group1")// .forJob("CronTriggerJob","group1")// .withSchedule(dailyAtHourAndMinute(19,7))// .build(); //每周日每分钟的第五秒开始执行,每5秒执行一次// Trigger trigger=newTrigger()// .withIdentity("CronTrigger","group1")// .forJob("CronTriggerJob","group1")// .withSchedule(cronSchedule("0/5 * * ? * 1"))// .build(); //按指定时区的时间执行 Trigger trigger=newTrigger() .withIdentity("CronTrigger","group1") .forJob("CronTriggerJob","group1") .withSchedule(dailyAtHourAndMinute(19,23) .inTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))) .build(); SchedulerFactory factory=new StdSchedulerFactory(); Scheduler scheduler= factory.getScheduler(); scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); }}
三、Listeners ——TriggerListeners、JobListeners和SchedulerListeners
监听器用来对Job、Trigger和Schedule运行过程中的所处的运行状态和运行行为进行监听。TriggerListeners、JobListeners和SchedulerListeners分别为一组接口。实现接口并重写接口方法,实现对监听器的定制化开发。 然后通过ListenerManager对监听器进行注册。
关于监听器的实例代码如下: 定制化的JobListner类:public class MyJobListener implements JobListener{ @Override public String getName() { return "jobListener name is MyJobListener"; } @Override public void jobToBeExecuted(JobExecutionContext context) { System.out.println("当前任务将要执行。"); } @Override public void jobExecutionVetoed(JobExecutionContext context) { System.out.println("当前任务执行被否决。"); } @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { System.out.println("当前任务执行完毕。"); }}
定制化的TriggerListener类:
public class MyTriggerListener implements TriggerListener { @Override public String getName() { return "TriggerListener name is MyTriggerListener"; } @Override public void triggerFired(Trigger trigger, JobExecutionContext context) { System.out.println("触发器正在触发"); } @Override public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) { return false; } @Override public void triggerMisfired(Trigger trigger) { System.out.println("触发器错过触发"); } @Override public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) { System.out.println("触发器触发完毕"); }}
定制化的SchedulerListener类:
public class MySchedulerListener implements SchedulerListener { @Override public void jobScheduled(Trigger trigger) { System.out.println("jobScheduled"); } @Override public void jobUnscheduled(TriggerKey triggerKey) { System.out.println("jobScheduled"); } @Override public void triggerFinalized(Trigger trigger) { System.out.println("triggerFinalized"); } @Override public void triggerPaused(TriggerKey triggerKey) { System.out.println("triggerPaused"); } @Override public void triggersPaused(String triggerGroup) { System.out.println("triggersPaused"); } @Override public void triggerResumed(TriggerKey triggerKey) { System.out.println("triggerResumed"); } @Override public void triggersResumed(String triggerGroup) { System.out.println("triggersResumed"); } @Override public void jobAdded(JobDetail jobDetail) { System.out.println("jobAdded"); } @Override public void jobDeleted(JobKey jobKey) { System.out.println("jobDeleted"); } @Override public void jobPaused(JobKey jobKey) { System.out.println("jobPaused"); } @Override public void jobsPaused(String jobGroup) { System.out.println("jobsPaused"); } @Override public void jobResumed(JobKey jobKey) { System.out.println("jobResumed"); } @Override public void jobsResumed(String jobGroup) { System.out.println("jobsResumed"); } @Override public void schedulerError(String msg, SchedulerException cause) { System.out.println("schedulerError"); } @Override public void schedulerInStandbyMode() { System.out.println("schedulerInStandbyMode"); } @Override public void schedulerStarted() { System.out.println("schedulerStarted"); } @Override public void schedulerStarting() { System.out.println("schedulerStarting"); } @Override public void schedulerShutdown() { System.out.println("schedulerShutdown"); } @Override public void schedulerShuttingdown() { System.out.println("schedulerShuttingdown"); } @Override public void schedulingDataCleared() { System.out.println("schedulingDataCleared"); }}
监听器测试类,Job使用HelloQuartz一节中的HelloJob类:
public class ListenerTester { public static void main(String[] args) throws SchedulerException { //初始化调度器 SchedulerFactory factory=new StdSchedulerFactory(); Scheduler scheduler=factory.getScheduler(); JobDetail jobDetail=newJob(HelloJob.class) .withIdentity("printerJob","group2") .build(); Trigger trigger=newTrigger().withIdentity("jobListenerTrigger","group2") .startNow() .withSchedule(simpleSchedule().withIntervalInSeconds(5)) .build(); //实例化监听器对象 MyJobListener listener=new MyJobListener(); MyTriggerListener triggerListener=new MyTriggerListener(); MySchedulerListener schedulerListener=new MySchedulerListener(); //通过调度器的ListenerManager注册JobListener和TriggerListener //scheduler.getListenerManager().addJobListener(listener,and(jobGroupEquals("group2"),keyEquals(jobKey("printerJob","group2")))); scheduler.getListenerManager().addJobListener(listener,keyEquals(jobKey("printerJob","group2"))); scheduler.getListenerManager().addTriggerListener(triggerListener,keyEquals(triggerKey("jobListenerTrigger","group2"))); scheduler.getListenerManager().addSchedulerListener(schedulerListener);//删除JobListener//scheduler.getListenerManager().removeJobListener(listener.getName());//删除TriggerListener//scheduler.getListenerManager().removeTriggerListener(triggerListener.getName());//删除SchedulerListener//scheduler.getListenerManager().removeSchedulerListener(schedulerListener); scheduler.start(); scheduler.scheduleJob(jobDetail,trigger); }
四、Quartz的持久化配置
Quartz提供两种持久化方式,基于内存的RAMJobStore方式和基于磁盘介质的JDBCJobStore方式。上文实例使用的是Quartz的基于内存的持久化方式,优点是内存存储执行高效,缺点很明显,当操作系统崩溃或其他异常导致定时器终止将无法恢复之前状态。
下面介绍Quartz的JDBCJobStore持久化配置,Quartz提供基于多种数据库的持久化配置形式。本文以mySql 5.6为例对Quartz进行配置。 官网下载。 首先建立数据存储表,Quartz压缩包下的\docs\dbTables提供对多种数据库的sql建表语句支持。使用tables_mysql_innodb.sql在mysql数据库中建立相关数据表。注意Quartz默认数据表以QRTZ_开头,可以修改为自己的命名规则。 一共建立11张表,根据名称可猜测大致 QRTZ_FIRED_TRIGGERS; QRTZ_PAUSED_TRIGGER_GRPS; QRTZ_SCHEDULER_STATE; QRTZ_LOCKS; QRTZ_SIMPLE_TRIGGERS; QRTZ_SIMPROP_TRIGGERS; QRTZ_CRON_TRIGGERS; QRTZ_BLOB_TRIGGERS; QRTZ_TRIGGERS; QRTZ_JOB_DETAILS; QRTZ_CALENDARS;在项目中进行配置,Quartz使用JDBC进行数据库连接。导入最新的mysql jdbc connector数据源。因为使用的是较新的5.6版本mysql,建议使用最新的msql myconnector,不然有可能会报sql格式错误异常。
mysql mysql-connector-java 5.1.31
resources目录下建立quartz.properties进行配置,Quartz会自动加载。关键配置参数和相关解释如下:
#集群配置org.quartz.scheduler.instanceName: DefaultQuartzScheduler #如果运行在非集群环境中,自动产生值将会是 NON_CLUSTERED。假如是在集群环境下,将会是主机名加上当前的日期和时间。org.quartz.scheduler.instanceId:AUTOorg.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false #Quartz 自带的线程池实现类是 org.quartz.smpl.SimpleThreadPoolorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool #根据任务的多少灵活配置线程池中线程的数量org.quartz.threadPool.threadCount: 10 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.misfireThreshold: 60000 #============================================================================# Configure JobStore#============================================================================#默认配置,数据保存到内存#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore#持久化配置org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties:true#数据库表前缀#org.quartz.jobStore.tablePrefix:qrtz_#注意这里设定的数据源名称为dbqzorg.quartz.jobStore.dataSource:dbqz#============================================================================# Configure Datasources#============================================================================#org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE#org.quartz.dataSource.dbqz.validationQuery=SELECT 1#JDBC驱动org.quartz.dataSource.dbqz.driver:com.mysql.jdbc.Driverorg.quartz.dataSource.dbqz.URL:jdbc:mysql://127.0.0.1:3306/quartz?useUnicode=true&characterEncoding=UTF-8org.quartz.dataSource.dbqz.user:数据库用户名org.quartz.dataSource.dbqz.password:密码org.quartz.dataSource.dbqz.maxConnection:10
实例代码:
public class DBScheduleTest { private static String JOB_GROUP_NAME = "ddlib"; private static String TRIGGER_GROUP_NAME = "ddlibTrigger"; public static void main(String[] args) throws SchedulerException, ParseException {// startJob(); resumeJob(); } public static void startJob() throws SchedulerException { SchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler=factory.getScheduler(); JobDetail jobDetail=newJob(PersistenceJob.class) .withIdentity("job_1","jobGroup1") .build(); Trigger trigger=newTrigger() .withIdentity("trigger_1","triggerGroup1") .startNow() .withSchedule(simpleSchedule().repeatSecondlyForTotalCount(100)) .build(); scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } scheduler.shutdown(); } public static void resumeJob() throws SchedulerException { SchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); // 获取调度器中所有的触发器组 ListtriggerGroups = scheduler.getTriggerGroupNames(); // 重新恢复在triggerGroup1组中,名为trigger_1触发器的运行 for (int i = 0; i < triggerGroups.size(); i++) { List triggers = scheduler.getTriggerGroupNames(); for (int j = 0; j < triggers.size(); j++) { Trigger tg = scheduler.getTrigger(new TriggerKey(triggers .get(j), triggerGroups.get(i))); // 根据名称判断 if (tg instanceof SimpleTrigger && tg.getDescription().equals("triggerGroup1.trigger_1")) { // 恢复运行 scheduler.resumeJob(new JobKey(triggers.get(j), triggerGroups.get(i))); } } } scheduler.start(); }}
自定义Job类PersistenceJob:
public class PersistenceJob implements Job { private static int i=0; @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("job执行--"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"--"+i++); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
startJob()执行一个job,并设置触发器,每隔一秒执行一次,一共执行100次,在10秒之后,线程终止并让schedule关闭。观察数据库表结构。该job以及job的执行情况已经更新进数据表。
resumeJob()重新创建schedule,并从数据库中查找拥有相同key的触发器,schedule.resuemeJob()恢复任务的运行。当任务结束删除数据表中的Job相关注册信息。五、Spring集成Quartz
spring提供对quartz的集成。通过对quartz相关bean的配置实现对quartz的加载。以spring boot为例,首先在maven项目的pom.xml中导入相关包:
org.springframework.boot spring-boot-starter-parent 1.5.6.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
然后同上文配置quartz的jdbc持久化存储。
在resources下添加quartz-context.xml,对quartz进行配置。 其中对job的构建方式有两种,一种是通过org.springframework.scheduling.quartz.JobDetailFactoryBean进行job构建,要实现Job接口。另一种是通过org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean对job进行构建,不用实现job接口。 quartz-context.xml配置如下:service
六、基于Spring QuartzJobBean的Quartz Job配置方式
在实际情况中,自定义的job往往需要调用service和dao层的方法,对相关操作进行持久化。为了避免各模块间的高耦合。引入Spring QuartzJobBean,然后通过反射机制对具体业务逻辑方法进行调用。Spring QuartzJobBean是一个实现Job接口的抽象类,阅读源码发现executeInternal()在重写excute()的同时,将JobDetail中定义的DataMap键值映射为继承其子类的成员变量。我们通过继承QuartzJobBean定义自己的JobBean,然后设置与xml中对应job dataMap键值对相同的配置项为成员变量。通过设置jobData的targetClass和targetMethod两个键值对,来传递需要调用的业务类中的具体方法信息,最后在自定义的JobBean中通过反射机制获取该方法。具体实例如下:
首先定义继承QuartzJobBean的JobBean,MyQuartzJobBean.javapublic class MyQuartzJobBean extends QuartzJobBean{ /**目标类*/ private String targetObject; /**要执行的方法*/ private String targetMethod; @Override protected void executeInternal(JobExecutionContext context)//executeInternal方法会将配置的job Data 键-值 作为该类的成员变量 throws JobExecutionException { try { ApplicationContext ctx=new ClassPathXmlApplicationContext("applicationContext.xml"); Object targetClass=ctx.getBean(targetObject); Method method=null; //反射机制调用 method=targetClass.getClass().getMethod(targetMethod,new Class[]{JobExecutionContext.class}); method.invoke(targetClass,new Object[]{context}); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); }catch (Exception e){ e.printStackTrace(); } } //不可删除,QuartzJobBean通过set的方法,将job data值赋值给成员变量 public void setTargetObject(String targetObject) { this.targetObject = targetObject; } public void setTargetMethod(String targetMethod) { this.targetMethod = targetMethod; }}
定义具体业务类SpringJobTester.java,实现具体Job的业务逻辑。
public class SpringJobTester{ //实现Job的具体业务方法 public void someService(JobExecutionContext context) { SimpleDateFormat df=new SimpleDateFormat("HH:mm:ss"); System.out.println("Hello SpringJobTester!----"+df.format(new Date())); }}
quartz-context.xml配置如下: