Spring中的任务调度和线程池支持
1.Spring与QuartzQuartz是Open Symphony开发的一款开源的任务调度框架。相对于JDK的Timer之类的简单任务调度程序来说,Quartz拥有更为全面的功能支持:1)允许批处理任务状态的持久化,并且提供不同的持久化策略支持2)批处理任务的远程调度3)提供基于Web的监控接口4)集群支持5)插件式的可扩展性 2.Quartz拥有明确的角色划分...
1.Spring与Quartz
Quartz是Open Symphony开发的一款开源的任务调度框架。相对于JDK的Timer之类的简单任务调度程序来说,Quartz拥有更为全面的功能支持:
1)允许批处理任务状态的持久化,并且提供不同的持久化策略支持
2)批处理任务的远程调度
3)提供基于Web的监控接口
4)集群支持
5)插件式的可扩展性
2.Quartz拥有明确的角色划分,分别对应处理任务调度过程中的各种相关关注点:
1)Job:代表将要被调度的任务
2)JobDetail:主要职能是提供Job执行的上下文信息,Job所依赖的某些配置信息,可以通过JobDetail提供,二者通过JobDataMap进行数据交互
3)Trigger:用于触发被调度任务的执行,可以根据不同的触发条件,提供不同的Trigger实现
4)Scheduler:Quartz的核心调度程序,负责管理所有相关的Trgger和JobDetail
3.一个Trigger只能用于一个Job,但多个Trigger可以用于同一Job
Quartz中最经常使用的主要的两种Trigger实现类:
一个是SimpleTrigger,可以指定简单的基于时间间隔的调度规则
一个是CronTrigger,可以类似于Unix/Linux操作系统中Cron程序多使用的表达式来指定调度规则
public class FXNewsJob implements Job{
private FXNewsProvider fxNewsProvider;
//getXXX();
//setXXX();
public void execute(JobExecutionContext jobExecCtx) throws JobExecutionException{
getFxNewsProvider().getAndPersistNews();
}
}
Trigger simpleTrigger = new SimpleTrigger("triggerName",Scheduler.DEFAULT_GROUP,new Date(),null,SimpleTrigger.REPEAT_INDEFINITELY,60000);
Trigger cronTrigger = new CronTrigger("cronTriggerName",Scheduler.DEFAULT_GROUP,"0 0/1 * * * ?");
在Job和Trigger都具备后,通过Scheduler将它们关联起来并进行最终的任务调度
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start();
JobDetail jobDetail = new JobDetail("jobName",Scheduler.DEFAULT_GROUP,FXNewsJob.class);
Trigger cronTrigger = new CronTrigger("cronTriggerName",Scheduler.DEFAULT_GROUP,"0 0/1 * * * ?");
scheduler.scheduleJob(jobDetail,cronTrigger);
4.融入Spring大家庭的Quartz
1)Job的实现策略
在Quartz中,每一Job所需要的执行上下文(Execution Context)信息是由其对应的JobDetail提供的,二者通过JobDataMap进行数据通信
JobDetail jobDetail = new JobDetail("jobName",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);
jobDetail.getJobDataMap().put("message","helloworld");
jobDetail.getJobDataMap().put("counter",10);
public class HelloWorldJob implements Job{
public void execute(JobExecutionContext ctx) throws JobExecutionException{
JobDataMap jobDataMap = ctx.getJobDetail().getJobDataMap();
String message = jobDataMap.getString("message");
int counter = jobDataMap.getInt("counter");
//...
}
}
Spring提供了org.springframework.scheduling.quartz.QuartzJobBean。在实现Job的时候,通过继承QuartzJobBean,让我们在Job实现类中,直接以bean属性的形式访问当前Job执行上下文信息
public class HelloWorldJobExtendingQuartzJobBean extends QuartzJobBean{
private String message;
private int counter;
//getXXX();
//setXXX();
public void executeInternal(JobExecutionContext ctx) throws JobExecutionException{
//getMessage()并使用
//getCounter()并使用
}
}
Spring提供了org.springframework.scheduling.quartz.SpringBeanJobFactory是一种更好的选择,允许对要执行的Job进行定制
2)JobDetail的更多选择
通过org.springframework.scheduling.quartz.JobDetailBean来创建和配置相应Job所对应的JobDetail实例
<bean id="fxNewsJobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
<property name="jobClass" value="HelloWorldJob | HelloWorldJobExtendingQuartzJobBean" />
<property name="jobDataAsMap">
<map>
<entry key="message">
<value>HelloWorld</value>
</entry>
<entry key="counter">
<value>10</value>
</entry>
</map>
</property>
</bean>
一个比较优雅的MethodInvokingJobDetailFactoryBean
<bean id="jobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject" ref="fxNewsProvider" />
<property name="targetMethod" value="getAndPersistNews" />
</bean>
MethodInvokingJobDetailFactoryBean会在内部构建相应的Job实现类(MethodInvokingJob和StatefulMethodInvokingJob),这些Job实现类会在合适的时机调用指定给MethodInvokingJobDetailFactoryBean的业务对象上的业务方法。但是,通过它所返回的JobDetail信息是不可序列化的,从而也就无法保存或者说持久化。
3)Trigger的可配置化
Spring分别提供了SimpleTriggerBean和CronTrggerBean封装类,同样采用FactoryBean机制实现
<bean id="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerBean">
<property name="jobDetail" ref="jobDetail"/>
<property name="repeatInterval" value="3000"/>
</bean>
<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTrggerBean">
<property name="jobDetail" ref="jobDetail"/>
<property name="cronExpression" value="0 0/1 * * * ?"/>
</bean>
可以结合Trgger与Quartz提供的Calender来达成最终所需要的调度规则。Trigger负责提供覆盖范围足够广的调度规则,而Calender负责排除这一规则范围内不需要的部分
4)Scheduler的新家
Spring提供了org.springframework.scheduling.quartz.SchedulerFactoryBean对Quartz的Scheduler进行管理
<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<ref bean="newsTrigger" />
<ref bean="simpleTrigger" />
</property>
</bean>
FXNewsProvider相关任务调度类配置代码示例
<bean id="fxNewsProvider" class="com.xxx.news.FXNewsProvider" p:newsListener-ref="newsListener" p:newsListener-ref="newsPersister"/>
<bean id="jobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject" ref="fxNewsProvider" />
<property name="targetMethod" value="getAndPersistNews" />
</bean>
<bean id="newsTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
<property name="jobDetail" ref="jobDetail" />
<property name="cronExpression" value="0 0/1 * * * ?" />
</bean>
<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="newsTrigger"/>
</list>
</property>
</bean>
5.Spring对JDK Timer的集成
JDK Timer小记:所有经由Timer进行调度的任务都需要继承TimerTask
public class FXNewsTimerTask extends TimerTask{
private FXNewsProvider fxNewsProvider;
//getXXX();
//setXXX();
public void run(){
getFxNewsProvider().getAndPersistNews();
}
}
FXNewsTimerTask task = new FXNewsTimerTask();
Timer timer = new Timer("schedulerName[optional]");
timer.schedule(task,0,60000);
1)逃离TimerTask的魔咒
<bean id="fxNewsProvider" class="com.xxx.news.FXNewsProvider" p:newListener-ref="newsListener" p:newPersistencer-ref="newsPersister">
<bean id="task" class="org.springframework.scheduling.timer.MehodInvokingTimerTaskFactoryBean">
<property name="targetObject" ref="fxNewsProvider"/>
<property name="targetMethod" value="getAndPersistNews"/>
</bean>
2)TimerTask的模块化封装-ScheduledTimerTask
通过一个ScheduledTimerTask将对应的TimerTask和相关的Trigger信息封装到一个可以统一管理的实体中
<bean id="scheduledTask" class="org.springframework.scheduling.timer.ScheduledTimerTask">
<property name="timerTask" ref="task" />
<property name="period" value="3000">
</bean>
3)Timer的新家-TimerFactoryBean
TimerFactoryBean实现代码摘录
ScheduledTimerTask[] stt = ... ;
for(ScheduledTimerTask task : stt){
if(task.isOneTimeTask()){
timer.schedule(task.getTimerTask(),task.getDelay());
}else{
if(task.isFixedRate()){
timer.scheduleAtFixedRate(task.getTimerTask(),task.getDelay(),task.getPeriod());
}else{
timer.schedule(task.getTimerTask(),task.getDelay(),task.getPeriod());
}
}
}
<bean id="scheduler" class="org.springframework.scheduling.timer.TimerFactoryBean">
<property name="scheduledTimerTasks">
<list>
<ref bean="scheduledTask" />
</list>
</property>
</bean>
6.Executor的孪生兄弟TaskExecutor
Java5为我们带来了一套新的任务执行框架,以Executor为首:
public interface Executor{
void execute(Runnable task);
}
Executor这一抽象的意义在于,可以将任务的提交(task commission)和任务的执行(task execution)策略分隔开来,解除二者之间的耦合性,以Runnable类型界定的任务提交之后,最终会以什么样的策略执行(什么时间执行,交给谁执行,等等),完全由不同的Executor实现类负责,提交任务的客户端完全可以忽略后者的实现细节。
Spring提供的TaskExecutor完成Executor同样的功能
public interface TaskExecutor{
void execute(Runnable task);
}
7.可用的TaskExecutor
1)SyncTaskExecutor
提交给SyncTaskExecutor的任务将直接在当前线程中执行
public class SyncTaskExecutor implements TaskExecutor,Serializable{
public void execute(Runnable task){
Assert.notNull(task,"Runnable must not be null");
task.run();
}
}
2)SimpleAsyncTaskExecutor
提供最基本的异步执行能力,实现的方式则是为每个任务都创建新的线程
public class SimpleAsyncTaskExecutor implements TaskExecutor{
public void execute(Runnable task){
new Thread(task).start();
}
}
SimpleAsyncTaskExecutor提供了相应的属性以控制创建的线程数目上限
<bean id="simpleAsyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<property name="concurrencyLimit" value="100" />
</bean>
3)ThreadPoolTaskExecutor
用线程池来管理并重用处理任务异步执行的工作线程,ThreadPoolTaskExecutor是对标准的java.util.concurrent.ThreadPoolExecutor进行了封装
Spring提供的ThreadPoolTaskExecutor一共有两个:
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor是对java.util.concurrent.ThreadPoolExecutor进行了封装
org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor是对JSR-166 backport的ThreadPoolTaskExecutor进行封装
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10">
<property name="maxPoolSize" value="20">
<property name="queueCapacity" value="20">
</bean>
4)ConcurrentTaskExecutor
为Java5的Executor和Spring的TaskExecutor搭建了一道桥梁。构建自己需要的Executor实例,通过Executor.newXXXThreadPool(),然后以ConcurrentTaskExecutor对其进行封装,封装后获得的ConcurrentTaskExecutor即获得响应Executor的能力。
Executor executor = Executors.newScheduledThreadPool(10);
TaskExecutor taskExecutor = new ConcurrentTaskExecutor(executor);
5)TimeTaskExecutor、SimpleThreadPoolTaskExecutor和WorkManagerTaskExecutor
都是使用特定的调度程序(Job Scheduler)来执行提交给他们的任务
public class PrototypeTimerTaskExecutor implements TaskExecutor{
private Timer timer = new Timer();
public void execute(final Runnable task){
timer.schedule(new TimerTask(){ public void run(){task.run();}},new Date());
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)