一、前言

大家在日常工作中,一定使用过 Spring 的 @Scheduled 注解吧,通过该注解可以非常方便的帮助我们实现任务的定时执行。

但是该注解是不支持运行时动态修改执行间隔的,不知道你在业务中有没有这些需求和痛点:

  •     在服务运行时能够动态修改定时任务的执行频率和执行开关,而无需重启服务和修改代码
  •     能够基于配置,在不同环境/机器上,实现定时任务执行频率的差异化

这些都可以通过 Spring 的 SchedulingConfigurer 注解来实现

这个注解其实大家并不陌生,如果有使用过 @Scheduled 的话,因为 @Scheduled 默认是单线程执行的,因此如果存在多个任务同时触发,可能触发阻塞。使用 SchedulingConfigurer 可以配置用于执行 @Scheduled 的线程池,来避免这个问题。

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //设定一个长度10的定时任务线程池
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }
}

但其实这个接口,还可以实现动态定时任务的功能,下面来演示如何实现。


二、功能实现

    后续定义的类开头的 DS 是 Dynamic Schedule 的缩写。

使用到的依赖,除了 Spring 外,还包括:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-collections4</artifactId>
    <version>4.4</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
    <version>1.18.18</version>
</dependency>

2.1 @EnableScheduling

首先需要开启 @EnableScheduling 注解,直接在启动类添加即可:

@EnableScheduling
@SpringBootApplication
public class DSApplication {
    public static void main(String[] args) {
        SpringApplication.run(DSApplication.class, args);
    }
}

2.2 IDSTaskInfo

定义一个任务信息的接口,后续所有用于动态调整的任务信息对象,都需要实现该接口。

    id:该任务信息的唯一 ID,用于唯一标识一个任务
    cron:该任务执行的 cron 表达式。
    isValid:任务开关
    isChange:用于标识任务参数是否发生了改变

public interface IDSTaskInfo {
    /**
     * 任务 ID
     */
    long getId();

    /**
     * 任务执行 cron 表达式
     */
    String getCron();

    /**
     * 任务是否有效
     */
    boolean isValid();

    /**
     * 判断任务是否发生变化
     */
    boolean isChange(IDSTaskInfo oldTaskInfo);
}

2.3 DSContainer

顾名思义,是存放 IDSTaskInfo 的容器。

具有以下成员变量:

  •     scheduleMap:用于暂存 IDSTaskInfo 和实际任务 ScheduledTask 的映射关系。其中:
    •         task_id:作为主键,确保一个 IDSTaskInfo 只会被注册进一次
    •         T:暂存当初注册时的 IDSTaskInfo,用于跟最新的 IDSTaskInfo 比较参数是否发生变化
    •         ScheduledTask:暂存当初注册时生成的任务,如果需要取消任务的话,需要拿到该对象
    •         Semaphore:确保每个任务实际执行时只有一个线程执行,不会产生并发问题
  •     taskRegistrar:Spring 的任务注册管理器,用于注册任务到 Spring 容器中
  •     name:调用方提供的类名

具有以下成员方法:

  •     void checkTask(final T taskInfo, final TriggerTask triggerTask):检查 IDSTaskInfo,判断是否需要注册/取消任务。具体的逻辑包括:

        如果任务已经注册:
            如果任务无效:则取消任务
            如果任务有效:
                如果任务配置发生了变化:则取消任务并重新注册任务
        如果任务没有注册:
            如果任务有效:则注册任务

  •     Semaphore getSemaphore():获取信号量属性。
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TriggerTask;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

/**
 * 存放 IDSTaskInfo 容器
 * @author jitwxs
 * @date 2021年03月27日 16:29
 */
@Slf4j
public class DSContainer<T extends IDSTaskInfo> {
    /**
     * IDSTaskInfo和真实任务的关联关系
     *
     * <task_id, <Task, <Scheduled, Semaphore>>>
     */
    private final Map<Long, Pair<T, Pair<ScheduledTask, Semaphore>>> scheduleMap = new ConcurrentHashMap<>();

    private final ScheduledTaskRegistrar taskRegistrar;

    private final String name;

    public DSContainer(ScheduledTaskRegistrar scheduledTaskRegistrar, final String name) {
        this.taskRegistrar = scheduledTaskRegistrar;
        this.name = name;
    }

    /**
     * 注册任务
     * @param taskInfo 任务信息
     * @param triggerTask 任务的触发规则
     */
    public void checkTask(final T taskInfo, final TriggerTask triggerTask) {
        final long taskId = taskInfo.getId();

        if (scheduleMap.containsKey(taskId)) {
            if (taskInfo.isValid()) {
                final T oldTaskInfo = scheduleMap.get(taskId).getLeft();

                if(oldTaskInfo.isChange(taskInfo)) {
                    log.info("DSContainer will register {} again because task config change, taskId: {}", name, taskId);
                    cancelTask(taskId);
                    registerTask(taskInfo, triggerTask);
                }
            } else {
                log.info("DSContainer will cancelTask {} because task not valid, taskId: {}", name, taskId);
                cancelTask(taskId);
            }
        } else {
            if (taskInfo.isValid()) {
                log.info("DSContainer will register {} task, taskId: {}", name, taskId);
                registerTask(taskInfo, triggerTask);
            }
        }
    }

    /**
     * 获取 Semaphore,确保任务不会被多个线程同时执行
     */
    public Semaphore getSemaphore(final long taskId) {
        return this.scheduleMap.get(taskId).getRight().getRight();
    }

    private void registerTask(final T taskInfo, final TriggerTask triggerTask) {
        final ScheduledTask latestTask = taskRegistrar.scheduleTriggerTask(triggerTask);
        this.scheduleMap.put(taskInfo.getId(), Pair.of(taskInfo, Pair.of(latestTask, new Semaphore(1))));
    }

    private void cancelTask(final long taskId) {
        final Pair<T, Pair<ScheduledTask, Semaphore>> pair = this.scheduleMap.remove(taskId);
        if (pair != null) {
            pair.getRight().getLeft().cancel();
        }
    }
}

2.4 AbstractDSHandler

下面定义实际的动态线程池处理方法,这里采用抽象类实现,将共用逻辑封装起来,方便扩展。

具有以下抽象方法:

  •     List<T> listTaskInfo():获取所有的任务信息。
  •     void doProcess(T taskInfo):实现实际执行任务的业务逻辑。

具有以下公共方法:

  •     void configureTasks(ScheduledTaskRegistrar taskRegistrar):创建 DSContainer 对象,并创建一个单线程的任务定时执行,调用 scheduleTask() 方法处理实际逻辑。
  •     void scheduleTask():首先加载所有任务信息,然后基于 cron 表达式生成 TriggerTask 对象,调用 checkTask() 方法确认是否需要注册/取消任务。当达到执行时间时,调用 execute() 方法,执行任务逻辑。
  •     void execute(final T taskInfo):获取信号量,成功后执行任务逻辑。
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TriggerTask;
import org.springframework.scheduling.support.CronTrigger;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * 抽象 Dynamic Schedule 实现,基于 SchedulingConfigurer 实现
 * @author jitwxs
 * @date 2021年03月27日 16:41
 */
@Slf4j
public abstract class AbstractDSHandler<T extends IDSTaskInfo> implements SchedulingConfigurer {

    private DSContainer<T> dsContainer;
    
    private final String CLASS_NAME = getClass().getSimpleName();

    /**
     * 获取所有的任务信息
     */
    protected abstract List<T> listTaskInfo();

    /**
     * 做具体的任务逻辑
     *
     * <p/> 该方法执行时位于跟 SpringBoot @Scheduled 注解相同的线程池内。如果内部仍需要开子线程池执行,请务必同步等待子线程池执行完毕,否则可能会影响预期效果。
     */
    protected abstract void doProcess(T taskInfo) throws Throwable;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        dsContainer = new DSContainer<>(taskRegistrar, CLASS_NAME);
        // 每隔 100ms 调度一次,用于读取所有任务
        taskRegistrar.addFixedDelayTask(this::scheduleTask, 1000);
    }

    /**
     * 调度任务,加载所有任务并注册
     */
    private void scheduleTask() {
        CollectionUtils.emptyIfNull(listTaskInfo()).forEach(taskInfo ->
                dsContainer.checkTask(taskInfo, new TriggerTask(() ->
                        this.execute(taskInfo), triggerContext -> new CronTrigger(taskInfo.getCron()).nextExecutionTime(triggerContext)
                ))
        );
    }

    private void execute(final T taskInfo) {
        final long taskId = taskInfo.getId();

        try {
            Semaphore semaphore = dsContainer.getSemaphore(taskId);
            if (Objects.isNull(semaphore)) {
                log.error("{} semaphore is null, taskId: {}", CLASS_NAME, taskId);
                return;
            }
            if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
                try {
                    doProcess(taskInfo);
                } catch (Throwable throwable) {
                    log.error("{} doProcess error, taskId: {}", CLASS_NAME, taskId, throwable);
                } finally {
                    semaphore.release();
                }
            } else {
                log.warn("{} too many executor, taskId: {}", CLASS_NAME, taskId);
            }
        } catch (InterruptedException e) {
            log.warn("{} interruptedException error, taskId: {}", CLASS_NAME, taskId);
        } catch (Exception e) {
            log.error("{} execute error, taskId: {}", CLASS_NAME, taskId, e);
        }
    }
}

三、快速测试

至此就完成了动态任务的框架搭建,下面让我们来快速测试下。为了尽量减少其他技术带来的复杂度,本次测试不涉及数据库和真实的定时任务,完全采用模拟实现。

3.1 模拟定时任务

为了模拟一个定时任务,我定义了一个 foo() 方法,其中只输出一句话。后续我将通过定时调用该方法,来模拟定时任务。

import lombok.extern.slf4j.Slf4j;

import java.time.LocalTime;

@Slf4j
public class SchedulerTest {
    public void foo() {
        log.info("{} Execute com.github.jitwxs.sample.ds.test.SchedulerTest#foo", LocalTime.now());
    }
}

3.2 实现 IDSTaskInfo

首先定义 IDSTaskInfo,我这里想通过反射来实现调用 foo() 方法,因此 reference 表示的是要调用方法的全路径。另外我实现了 isChange() 方法,只要 cron、isValid、reference 发生了变动,就认为该任务的配置发生了改变。

import com.github.jitwxs.sample.ds.config.IDSTaskInfo;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class SchedulerTestTaskInfo implements IDSTaskInfo {
    private long id;

    private String cron;

    private boolean isValid;

    private String reference;

    @Override
    public boolean isChange(IDSTaskInfo oldTaskInfo) {
        if(oldTaskInfo instanceof SchedulerTestTaskInfo) {
            final SchedulerTestTaskInfo obj = (SchedulerTestTaskInfo) oldTaskInfo;
            return !this.cron.equals(obj.cron) || this.isValid != obj.isValid || !this.reference.equals(obj.getReference());
        } else {
            throw new IllegalArgumentException("Not Support SchedulerTestTaskInfo type");
        }
    }
}

3.3 实现 AbstractDSHandler

有几个需要关注的:

(1)listTaskInfo() 返回值我使用了 volatile 变量,便于我修改它,模拟任务信息数据的改变。

(2)doProcess() 方法中,读取到 reference 后,使用反射进行调用,模拟定时任务的执行。

(3)额外实现了 ApplicationListener 接口,当服务启动后,每隔一段时间修改下任务信息,模拟业务中调整配置。

  •     服务启动后,foo() 定时任务将每 10s 执行一次。
  •     10s 后,将 foo() 定时任务执行周期从每 10s 执行调整为 1s 执行。
  •     10s 后,关闭 foo() 定时任务执行。
  •     10s 后,开启 foo() 定时任务执行。
import com.github.jitwxs.sample.ds.config.AbstractDSHandler;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * @author jitwxs
 * @date 2021年03月27日 21:54
 */
@Component
public class SchedulerTestDSHandler extends AbstractDSHandler<SchedulerTestTaskInfo> implements ApplicationListener {
    public volatile List<SchedulerTestTaskInfo> taskInfoList = Collections.singletonList(
            SchedulerTestTaskInfo.builder()
                    .id(1)
                    .cron("0/10 * * * * ? ")
                    .isValid(true)
                    .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo")
                    .build()
    );

    @Override
    protected List<SchedulerTestTaskInfo> listTaskInfo() {
        return taskInfoList;
    }

    @Override
    protected void doProcess(SchedulerTestTaskInfo taskInfo) throws Throwable {
        final String reference = taskInfo.getReference();
        final String[] split = reference.split("#");
        if(split.length != 2) {
            return;
        }

       try {
           final Class<?> clazz = Class.forName(split[0]);
           final Method method = clazz.getMethod(split[1]);
           method.invoke(clazz.newInstance());
       } catch (Exception e) {
           e.printStackTrace();
       }
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));

            // setting 1 seconds execute
            taskInfoList = Collections.singletonList(
                    SchedulerTestTaskInfo.builder()
                            .id(1)
                            .cron("0/1 * * * * ? ")
                            .isValid(true)
                            .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo")
                            .build()
            );

            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));

            // setting not valid
            taskInfoList = Collections.singletonList(
                    SchedulerTestTaskInfo.builder()
                            .id(1)
                            .cron("0/1 * * * * ? ")
                            .isValid(false)
                            .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo")
                            .build()
            );

            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));

            // setting valid
            taskInfoList = Collections.singletonList(
                    SchedulerTestTaskInfo.builder()
                            .id(1)
                            .cron("0/1 * * * * ? ")
                            .isValid(true)
                            .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo")
                            .build()
            );
        }, 12, 86400, TimeUnit.SECONDS);
    }
}

3.4 运行程序

整个应用包结构如下:

 运行程序后,在控制台可以观测到如下输出:

四、后记

以上完成了动态定时任务的介绍,你能够根据本篇文章,实现以下需求吗:

  •     本文基于 cron 表达式实现了频率控制,你能改用 fixedDelay 或 fixedRate 实现吗?
  •     基于数据库/配置文件/配置中心,实现对服务中定时任务的动态频率调整和任务的启停。
  •     开发一个数据表历史数据清理功能,能够动态配置要清理的表、清理的规则、清理的周期。
  •     开发一个数据表异常数据告警功能,能够动态配置要扫描的表、告警的规则、扫描的周期。
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐