xxl-job架构源码解析
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。1 xxljob架构2xxljob的源码目录3xxljob的源码解析xxl-job的源码主要分为两大部分,xxl-job-core和xxl-job-admin。其中xxl-job-core是它交互的核心,而xxl-job-admin则是后台管理的一些接口。
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
1 xxljob架构
2xxljob的源码目录
3xxljob的源码解析
xxl-job的源码主要分为两大部分,xxl-job-core和xxl-job-admin。其中xxl-job-core是它交互的核心,而xxl-job-admin则是后台管理的一些接口。本文主要分析xxl-job-core设计的核心思想。
设计思想:xxl-job通过在调度中心操作各个执行器的调度方法,使用http作为rpc,调度执行执行器的业务,从而使得业务和调度相对隔离。
- 执行器注册和发现
XxlJobSpringExecutor
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
// start
//开始通过applicationContext上下文中获取到相应的执行器
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//扫描初始化执行器的方法,并存储在ConcurrentMap里面,方便调用
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// execute method
if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
method.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
}
}
- 执行器和调度中心的注册
XxlJobExecutor开始和调度中心进行注册,包含一些端口等信息,同时开启一些线程,进行其他业务等。
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
//初始化netty,绑定端口,交互调度中心,采用netty进行业务http交互,监控调度中心的调度请求,并分配到特定的执行通道进行执行
initEmbedServer(address, ip, port, appname, accessToken);
}
- 执行器的执行业务
执行器通过netty接收请求,然后分配到其他通道进行解耦合执行执行业务代码,然后封装返回执行的信息以及一些日志操作。
-
/** * netty_http * * Copy from : https://github.com/xuxueli/xxl-rpc * * @author xuxueli 2015-11-24 22:25:15 */ public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class); private ExecutorBiz executorBiz; private String accessToken; private ThreadPoolExecutor bizThreadPool; public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { this.executorBiz = executorBiz; this.accessToken = accessToken; this.bizThreadPool = bizThreadPool; } @Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8); String uri = msg.uri(); HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); // invoke bizThreadPool.execute(new Runnable() { @Override public void run() { // do invoke Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json String responseJson = GsonTool.toJson(responseObj); // write response writeResponse(ctx, keepAlive, responseJson); } }); } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { // valid if (HttpMethod.POST != httpMethod) { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); } if (uri==null || uri.trim().length()==0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty."); } if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.equals(accessTokenReq)) { return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong."); } // services mapping try { if ("/beat".equals(uri)) { return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e)); } } /** * write response */ private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { // write response FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString() response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); if (keepAlive) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } ctx.writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.channel().close(); // beat 3N, close if idle logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel."); } else { super.userEventTriggered(ctx, evt); } } }
- xxl-job-rpc的核心
public class ExecutorBizClient implements ExecutorBiz {
public ExecutorBizClient() {
}
public ExecutorBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
private int timeout = 3;
@Override
public ReturnT<String> beat() {
return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, null, String.class);
}
@Override
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
@Override
public ReturnT<String> kill(KillParam killParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);
}
@Override
public ReturnT<LogResult> log(LogParam logParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
}
}
4 xxl-job-admin核心之业务触发和路由策略
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// load data
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// cover addressList
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
//真正执行触发业务
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
private static boolean isNumeric(String str){
try {
int result = Integer.valueOf(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
processTrigger各种路由策略的业务代码
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
触发的业务是实现过程
package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.cron.CronExpression;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author xuxueli 2019-05-21
*/
public class JobScheduleHelper {
private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
private static JobScheduleHelper instance = new JobScheduleHelper();
public static JobScheduleHelper getInstance(){
return instance;
}
public static final long PRE_READ_MS = 5000; // pre read
private Thread scheduleThread;
private Thread ringThread;
private volatile boolean scheduleThreadToStop = false;
private volatile boolean ringThreadToStop = false;
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime);
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
}
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
}
public void toStop(){
// 1、stop schedule
scheduleThreadToStop = true;
try {
TimeUnit.SECONDS.sleep(1); // wait
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (scheduleThread.getState() != Thread.State.TERMINATED){
// interrupt and wait
scheduleThread.interrupt();
try {
scheduleThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// if has ring data
boolean hasRingData = false;
if (!ringData.isEmpty()) {
for (int second : ringData.keySet()) {
List<Integer> tmpData = ringData.get(second);
if (tmpData!=null && tmpData.size()>0) {
hasRingData = true;
break;
}
}
}
if (hasRingData) {
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// stop ring (wait job-in-memory stop)
ringThreadToStop = true;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (ringThread.getState() != Thread.State.TERMINATED){
// interrupt and wait
ringThread.interrupt();
try {
ringThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
}
}
5总结
xxl-job是一款轻量级的任务调度工具,可以做到不侵害原有系统的业务代码。很好的做到了执行器和调度中心分离的思想,使得源代码更加简洁明了,是一款非常优秀的任务调度框架。在这框架的设计的源代码中涉及了大量的线程的知识以及netty等网络知识。其中设计结构简单,架构清晰,值得学习。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)