基于springboot实现动态定时任务,包括添加、删除、启动、关闭、日志
实现定时任务的动态控制com.studioustiger.job.entitycom.studioustiger.job.mappercom.studioustiger.job.servicecom.studioustiger.job.executorcom.studioustiger.job.configcom.studioustiger.job.scheduledcom.studioustig
·
表结构
-- 定时任务调度表
CREATE TABLE `scheduled_task` (
`id` varchar(20) NOT NULL COMMENT 'id',
`name` varchar(255) DEFAULT NULL COMMENT '定时任务名称',
`task_type` varchar(100) DEFAULT NULL COMMENT '定时任务分类',
`task_describe` varchar(255) DEFAULT NULL COMMENT '定时任务描述',
`cron` varchar(50) NOT NULL COMMENT 'cron策略',
`method` varchar(10) NOT NULL COMMENT '请求方法',
`url` varchar(500) NOT NULL COMMENT '请求路径',
`enable` varchar(2) NOT NULL DEFAULT '0' COMMENT '是否启用定时任务',
`open_log` varchar(2) DEFAULT '0' COMMENT '是否开启日志',
`create_user` varchar(40) DEFAULT NULL COMMENT '创建人',
`create_time` varchar(40) DEFAULT NULL COMMENT '创建时间',
`update_user` varchar(40) DEFAULT NULL COMMENT '更新人',
`update_time` varchar(40) DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_url` (`url`) BLOCK_SIZE 16384 LOCAL
) COMMENT = '定时任务调度表';
-- 定时任务调度日志表
CREATE TABLE `scheduled_log` (
`id` varchar(20) NOT NULL COMMENT 'id',
`task_id` varchar(20) DEFAULT NULL COMMENT '定时任务ID',
`task_name` varchar(255) DEFAULT NULL COMMENT '定时任务名称',
`execute_status` varchar(40) DEFAULT NULL COMMENT '执行状态',
`content` longtext DEFAULT NULL COMMENT '内容',
`execute_time` varchar(40) DEFAULT NULL COMMENT '执行时间',
PRIMARY KEY (`id`)
) COMMENT = '定时任务调度日志表';
com.studioustiger.job.constant
/**
* @ClassName JobConstant
* @Description 常量类,定时任务操作触发器的key
* @Author huxuehao
**/
public class JobConstant {
public static final String OPEN_SCHEDULE = "openSchedule";
public static final String CLOSE_SCHEDULE = "closeSchedule";
public static final String DELETE_SCHEDULE = "deleteSchedule";
}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @ClassName LogProperties
* @Description 日志配置文件
* @Author huxuehao
**/
@Data
@Component
@ConfigurationProperties(prefix = "task.log", ignoreUnknownFields = false)
public class LogProperties {
/* 启用操作成功日志 */
private boolean successOpen = true;
/* 启用操作失败日志 */
private boolean failOpen = true;
/* 日志保存天数 */
private int saveDays = 2;
}
com.studioustiger.job.entity
import lombok.Data;
import java.io.Serializable;
/**
* @ClassName BaseDto
* @Description 基础dto
* @Author huxuehao
**/
@Data
public class BaseDto implements Serializable {
private static final long serialVersionUID = -153746138274322843L;
private String createUser;
private String createTime;
private String updateUser;
private String updateTime;
}
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName ScheduleLogDto
* @Description TODO
* @Author huxuehao
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("scheduled_log")
public class ScheduleLogDto implements Serializable {
private static final long serialVersionUID = -153746138274322843L;
private String id;
private String taskId;
private String taskName;
private String executeStatus;
private String content;
private String executeTime;
}
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.scheduling.support.CronTrigger;
/**
* @ClassName ScheduleTask
* @Description 定时任务信息表
* @Author huxuehao
**/
@Data
@TableName("scheduled_task")
public class ScheduleTaskDto extends BaseDto {
private String id;
private String name;
private String taskType;
private String taskDescribe;
private String cron;
private String method;
private String url;
private String enable;
private String openLog;
// 提供转换为CronTrigger的工具方法
public CronTrigger toCronTrigger() {
return new CronTrigger(this.cron);
}
}
import lombok.Data;
/**
* @ClassName ScheduleTaskPo
* @Description TODO
* @Author huxuehao
**/
@Data
public class ScheduleTaskPo extends BaseDto {
private String id;
private String name;
private String taskType;
private String taskDescribe;
private String cron;
private String method;
private String url;
private String enable;
private String openLog;
private String total;
}
com.studioustiger.job.mapper
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.studioustiger.job.entity.ScheduleLogDto;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* @InterfaceName ScheduleLogMapper
* @Description TODO
* @Author huxuehao
**/
@Repository
public interface ScheduleLogMapper extends BaseMapper<ScheduleLogDto> {
/* 分页 */
List<ScheduleLogDto> getPage();
/* 添加 */
int add(@Param("log") ScheduleLogDto log);
/* 清除日志 */
int clearLog(@Param("endTime") String endTime);
/* 根据定时任务获取最新的错误日志*/
ScheduleLogDto latestLogByTask(@Param("taskId") String taskId);
/* 上一条日志 */
ScheduleLogDto lastLog(String taskId, String executeTime);
/* 下一条日志 */
ScheduleLogDto nextLog(String taskId, String executeTime);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.studioustiger.job.mapper.ScheduleLogMapper">
<resultMap id="scheduleLogMap" type="com.studioustiger.job.entity.ScheduleLogDto">
<result column="id" property="id"/>
<result column="task_id" property="taskId"/>
<result column="task_name" property="taskName"/>
<result column="execute_status" property="executeStatus"/>
<result column="content" property="content"/>
<result column="execute_time" property="executeTime"/>
</resultMap>
<select id="getPage" parameterType="string" resultMap="scheduleLogMap">
select
`id`,
`task_id`,
`task_name`,
`execute_status`,
`content`,
`execute_time`
from
`scheduled_log`
order by `name` desc, `execute_time` desc, id
</select>
<insert id="add">
insert into `scheduled_log`(`id`,`task_id`,`task_name`,`execute_status`,`content`,`execute_time`)
values (
#{log.id},
#{log.taskId},
#{log.taskName},
#{log.executeStatus},
#{log.content},
#{log.executeTime}
)
</insert>
<delete id="clearLog">
delete from
`scheduled_log`
where
`execute_time` <= #{endTime}
</delete>
<select id="latestLogByTask" resultMap="scheduleLogMap">
select
`id`,
`task_id`,
`task_name`,
`execute_status`,
`content`,
`execute_time`
from
`scheduled_log`
where
`task_id` = #{taskId}
and execute_status = 'fail'
order by `execute_time` desc, `id` desc
limit 1
</select>
<select id="lastLog" resultMap="scheduleLogMap">
select
`id`,
`task_id`,
`task_name`,
`execute_status`,
`content`,
`execute_time`
from
`scheduled_log`
where
`task_id` = #{taskId}
and `execute_time` > #{executeTime}
and `execute_status` = 'fail'
order by `execute_time` asc, `id` asc
limit 1
</select>
<select id="nextLog" resultMap="scheduleLogMap">
select
`id`,
`task_id`,
`task_name`,
`execute_status`,
`content`,
`execute_time`
from
`scheduled_log`
where
`task_id` = #{taskId}
and `execute_time` < #{executeTime}
and `execute_status` = 'fail'
order by `execute_time` desc, `id` desc
limit 1
</select>
</mapper>
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.entity.ScheduleTaskPo;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* @ClassName ScheduleTaskMapper
* @Description TODO
* @Author huxuehao
**/
@Repository
public interface ScheduleTaskMapper extends BaseMapper<ScheduleTaskDto> {
/* 添加定时任务信息 */
int add(@Param("task") ScheduleTaskDto task);
/* 更新定时任务信息 */
int update(@Param("task") ScheduleTaskDto task);
/* 获取总数 */
int getTotals(@Param("taskName") String taskName,
@Param("taskType") String taskType,
@Param("taskStatus") String taskStatus);
List<ScheduleTaskPo> getPage(@Param("current") Integer current,
@Param("size") Integer size,
@Param("taskName") String taskName,
@Param("taskType") String taskType,
@Param("taskStatus") String taskStatus);
/* 批量开启定时任务信息 */
int enableByIds(@Param("tasks") List<ScheduleTaskDto> tasks);
/* 批量关闭定时任务信息 */
int disableByIds(@Param("tasks") List<ScheduleTaskDto> tasks);
/* 批量删除定时任务信息 */
int deleteByIds(@Param("ids") List<String> ids);
/* 根据定时任务id获取最新的定时任务信息*/
ScheduleTaskPo refreshResult(@Param("taskId")String taskId);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.studioustiger.job.mapper.ScheduleTaskMapper">
<resultMap id="scheduleTaskMap" type="com.studioustiger.job.entity.ScheduleTaskPo">
<result column="id" property="id"/>
<result column="name" property="name"/>
<result column="task_type" property="taskType"/>
<result column="task_describe" property="taskDescribe"/>
<result column="url" property="url"/>
<result column="cron" property="cron"/>
<result column="method" property="method"/>
<result column="enable" property="enable"/>
<result column="open_log" property="openLog"/>
<result column="create_user" property="createUser"/>
<result column="create_time" property="createTime"/>
<result column="update_user" property="updateUser"/>
<result column="update_time" property="updateTime"/>
<result column="total" property="total"/>
</resultMap>
<select id="getPage" parameterType="string" resultMap="scheduleTaskMap">
select
a.`id`,
a.`name`,
b.`dict_label` as task_type,
a.`task_describe`,
a.`url`,
a.`cron`,
a.`method`,
a.`enable`,
a.`open_log`,
a.`create_user`,
a.`create_time`,
a.`update_user`,
a.`update_time`,
concat(ifnull(c.fail_count,0),'/',ifnull(d.success_count,0)) as total
from
`scheduled_task` as a
left join `sys_dict` as b
on a.`task_type` = b.`dict_value`
and b.`code` = 'schedule_task'
and is_sealed = 0
and b.`del_flag` = 0
left join (select task_id, count(id) as fail_count from `scheduled_log` where `execute_status` = 'fail' group by task_id) c
on c.task_id = a.id
left join (select task_id, count(id) as success_count from `scheduled_log` where `execute_status` = 'success' group by task_id) d
on d.task_id = a.id
<where>
<if test="taskName != null and taskName != ''">
and a.`name` like concat('%',#{taskName},'%')
</if>
<if test="taskType != null and taskType != ''">
and a.`task_type` like concat('%',#{taskType},'%')
</if>
<if test="taskStatus != null and taskStatus != ''">
and a.`enable` like concat('%',#{taskStatus},'%')
</if>
</where>
order by a.`task_type` desc, a.`name` desc, `create_time` desc, id
<if test="current != null">
limit #{current} , #{size}
</if>
</select>
<select id="getTotals" parameterType="string" resultType="int">
select
count(*)
from
`scheduled_task`
<where>
<if test="taskName != null and taskName != ''">
and `name` like concat('%',#{taskName},'%')
</if>
<if test="taskType != null and taskType != ''">
and `task_type` like concat('%',#{taskType},'%')
</if>
<if test="taskStatus != null and taskStatus != ''">
and `enable` like concat('%',#{taskStatus},'%')
</if>
</where>
</select>
<insert id="add">
insert into
`scheduled_task`(
`id`,
`name`,
`task_type`,
`task_describe`,
`url`,
`cron`,
`method`,
`enable`,
`open_log`,
`create_user`,
`create_time`
)
values (
#{task.id},
#{task.name},
#{task.taskType},
#{task.taskDescribe},
#{task.url},
#{task.cron},
#{task.method},
#{task.enable},
#{task.openLog},
#{task.createUser},
#{task.createTime}
)
</insert>
<update id="update">
update
`scheduled_task`
set
`name` = #{task.name},
`task_type` = #{task.taskType},
`task_describe` = #{task.taskDescribe},
`url` = #{task.url},
`cron` = #{task.cron},
`method` = #{task.method},
`enable` = #{task.enable},
`open_log` = #{task.openLog},
`update_user` = #{task.updateUser},
`update_time` = #{task.updateTime}
where
`id` = #{task.id}
</update>
<update id="enableByIds">
update
`scheduled_task`
set
`enable` = '1',
`update_user` = #{tasks[0].updateUser},
`update_time` = #{tasks[0].updateTime}
where
`id` in
<foreach collection="tasks" item="task" open="(" separator="," close=")">
#{task.id}
</foreach>
</update>
<update id="disableByIds">
update
`scheduled_task`
set
`enable` = '0',
`update_user` = #{tasks[0].updateUser},
`update_time` = #{tasks[0].updateTime}
where
`id` in
<foreach collection="tasks" item="task" open="(" separator="," close=")">
#{task.id}
</foreach>
</update>
<delete id="deleteByIds" parameterType="list">
delete from
`scheduled_task`
where
`id` in
<foreach collection="ids" item="val" open="(" separator="," close=")">
#{val}
</foreach>
</delete>
<select id="refreshResult" resultMap="scheduleTaskMap">
select
a.`id`,
a.`name`,
b.`dict_label` as task_type,
a.`task_describe`,
a.`url`,
a.`cron`,
a.`method`,
a.`enable`,
a.`open_log`,
a.`create_user`,
a.`create_time`,
a.`update_user`,
a.`update_time`,
concat(ifnull(c.fail_count,0),'/',ifnull(d.success_count,0)) as total
from
`scheduled_task` as a
left join `sys_dict` as b
on a.`task_type` = b.`dict_value`
and b.`code` = 'schedule_task'
and is_sealed = 0
and b.`del_flag` = 0
left join (select task_id, count(id) as fail_count from `scheduled_log` where `execute_status` = 'fail' group by task_id) c
on c.task_id = a.id
left join (select task_id, count(id) as success_count from `scheduled_log` where `execute_status` = 'success' group by task_id) d
on d.task_id = a.id
where a.id = #{taskId}
</select>
</mapper>
com.studioustiger.job.service
import com.baomidou.mybatisplus.extension.service.IService;
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.entity.ScheduleTaskPo;
import java.util.List;
/**
* @InterfaceName ScheduleTaskService
* @Description TODO
* @Author huxuehao
**/
public interface ScheduleTaskService extends IService<ScheduleTaskDto> {
/* 新增 */
int add(ScheduleTaskDto scheduleTask);
/* 更新 */
int update(ScheduleTaskDto scheduleTask);
/* 总数 */
int getTotals(String taskName, String taskType, String taskStatus);
/* 分页 */
List<ScheduleTaskPo> getPage(Integer current, Integer size, String taskName, String taskType, String taskStatus);
/* 启用 */
int enableByIds(List<String> ids);
/* 禁用 */
int disableByIds(List<String> ids);
/* 删除 */
int deleteByIds(List<String> ids);
ScheduleTaskPo refreshResult(String tasId);
}
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.studioustiger.core.tool.utils.DateUtil;
import com.studioustiger.dataapi.function.MeFunctions;
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.entity.ScheduleTaskPo;
import com.studioustiger.job.mapper.ScheduleTaskMapper;
import com.studioustiger.job.service.ScheduleTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @ClassName ScheduleTaskServiceImpl
* @Description TODO
* @Author huxuehao
**/
@Service
public class ScheduleTaskServiceImpl extends ServiceImpl<ScheduleTaskMapper, ScheduleTaskDto> implements ScheduleTaskService {
@Autowired
ScheduleTaskMapper scheduleTaskMapper;
@Autowired
MeFunctions me;
/* 新增*/
public int add(ScheduleTaskDto scheduleTask){
String id = String.valueOf(me.nextId());
String createUser = String.valueOf(me.id());
String createTime = DateUtil.formatDateTime(new Date());
scheduleTask.setId(id);
scheduleTask.setEnable("0");
scheduleTask.setCreateUser(createUser);
scheduleTask.setCreateTime(createTime);
return scheduleTaskMapper.add(scheduleTask);
}
/* 更新*/
public int update(ScheduleTaskDto scheduleTask){
String updateUser = String.valueOf(me.id());
String updateTime = DateUtil.formatDateTime(new Date());
scheduleTask.setUpdateUser(updateUser);
scheduleTask.setUpdateTime(updateTime);
return scheduleTaskMapper.update(scheduleTask);
}
@Override
public int getTotals(String taskName, String taskType, String taskStatus) {
return scheduleTaskMapper.getTotals(taskName, taskType, taskStatus);
}
/* 分页 */
public List<ScheduleTaskPo> getPage(Integer current, Integer size, String taskName, String taskType, String taskStatus) {
return scheduleTaskMapper.getPage(current, size, taskName, taskType, taskStatus);
}
/* 启用 */
public int enableByIds(List<String> ids){
return scheduleTaskMapper.enableByIds(fillTaskInfo(ids));
}
/* 禁用 */
public int disableByIds(List<String> ids){
return scheduleTaskMapper.disableByIds(fillTaskInfo(ids));
}
/* 填充id、更新人、更新时间 */
private List<ScheduleTaskDto> fillTaskInfo (List<String> ids) {
String updateUser = String.valueOf(me.id());
String updateTime = DateUtil.formatDateTime(new Date());
List<ScheduleTaskDto> tasks = new ArrayList<>();
ids.forEach(item -> {
ScheduleTaskDto task = new ScheduleTaskDto();
task.setId(item);
task.setUpdateUser(updateUser);
task.setUpdateTime(updateTime);
tasks.add(task);
});
return tasks;
}
/* 删除 */
public int deleteByIds(List<String> ids) {
return scheduleTaskMapper.deleteByIds(ids);
}
@Override
public ScheduleTaskPo refreshResult(String taskId) {
return scheduleTaskMapper.refreshResult(taskId);
}
}
import com.studioustiger.job.entity.ScheduleLogDto;
import java.util.List;
/**
* @InterfaceName ScheduleLogService
* @Description TODO
* @Author huxuehao
**/
public interface ScheduleLogService {
/* 分页 */
List<ScheduleLogDto> getPage();
/* 添加 */
int add(ScheduleLogDto scheduleLog);
/* 清除日志 */
int clearLog(String endTime);
/* 根据定时任务获取最新的错误日志*/
ScheduleLogDto latestLogByTask(String taskId);
/* 上一条日志 */
ScheduleLogDto lastLog(String taskId, String executeTime);
/* 下一条日志 */
ScheduleLogDto nextLog(String taskId, String executeTime);
}
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.studioustiger.job.entity.ScheduleLogDto;
import com.studioustiger.job.mapper.ScheduleLogMapper;
import com.studioustiger.job.service.ScheduleLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @ClassName ScheduleLogServiceImpl
* @Description TODO
* @Author huxuehao
**/
@Service
public class ScheduleLogServiceImpl extends ServiceImpl<ScheduleLogMapper, ScheduleLogDto> implements ScheduleLogService {
@Autowired
ScheduleLogMapper scheduleLogMapper;
@Override
public List<ScheduleLogDto> getPage() {
return scheduleLogMapper.getPage();
}
@Override
public int add(ScheduleLogDto scheduleLog) {
return scheduleLogMapper.add(scheduleLog);
}
@Override
public int clearLog(String endTime) {
return scheduleLogMapper.clearLog(endTime);
}
@Override
/* 根据定时任务获取最新的错误日志*/
public ScheduleLogDto latestLogByTask(String taskId) {
return scheduleLogMapper.latestLogByTask(taskId);
}
@Override
public ScheduleLogDto lastLog(String taskId, String executeTime) {
return scheduleLogMapper.lastLog(taskId, executeTime);
}
@Override
public ScheduleLogDto nextLog(String taskId, String executeTime) {
return scheduleLogMapper.nextLog(taskId, executeTime);
}
}
com.studioustiger.job.executor
import com.studioustiger.job.entity.ScheduleTaskDto;
/**
* @InterfaceName TaskExecutor
* @Description 任务执行器
* @Author huxuehao
**/
public interface TaskExecutor {
/**
* 执行任务
* @param task
* @return
*/
boolean execute(ScheduleTaskDto task);
}
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.studioustiger.core.tool.utils.DateUtil;
import com.qstudioustigeranmo.core.tool.utils.StringUtil;
import com.studioustiger.dataapi.function.MeFunctions;
import com.studioustiger.job.constant.JobConstant;
import com.studioustiger.job.constant.LogProperties;
import com.studioustiger.job.entity.ScheduleLogDto;
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.executor.TaskExecutor;
import com.studioustiger.job.service.ScheduleLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
/**
* @ClassName TaskExecutorImpl
* @Description 定时任务执行器,最终定时任务会通过触发execute方法被执行
* @Author huxuehao
**/
@Service
public class TaskExecutorImpl implements TaskExecutor {
private static final Log log = LogFactory.get(TaskExecutorImpl.class);
@Autowired
ScheduleLogService scheduleLogService;
@Autowired
MeFunctions me;
@Autowired
LogProperties logProperties;
@Override
public boolean execute(ScheduleTaskDto task) {
return this.doJob(task);
}
/* 用于定时任务执行 */
public boolean doJob(ScheduleTaskDto task) {
String message = null;
try {
/**
* 此处执行定时任务内容,具体应该怎么执行,下面提供两种思路
* 思路1:基于bean去执行定时任务,定时任务与bean一一对应,将定时任务的逻辑写在@Bean中,ScheduleTaskDto中需要存储 读取 数据库存储的Bean名称。
* 思路2:基于接口Controller去执行定时任务,将定时任务的逻辑写在接口中,ScheduleTaskDto中需要存储 读取 数据库存储的接口路径。
*/
log.info("定时任务[{}]执行成功", task.getName());
} catch (Exception e) {
log.error("定时任务[{}]执行失败", task.getName());
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
message = sw.toString();
} finally {
genLog(task, message);
}
return message == null;
}
/* 用户日志采集 */
private void genLog(ScheduleTaskDto task, String message){
String status = message == null ? "success" : "fail";
if (!logProperties.isFailOpen() && !logProperties.isSuccessOpen()) {
return;
}
if ("success".equals(status) && !logProperties.isSuccessOpen()) {
return;
}
if ("fail".equals(status) && !logProperties.isFailOpen()) {
return;
}
if ("0".equals(task.getOpenLog())) {
return;
}
try {
ScheduleLogDto scheduleLog = new ScheduleLogDto(
String.valueOf(me.nextId()),
task.getId(),
task.getName(),
status,
message,
DateUtil.formatDateTime(new Date())
);
scheduleLogService.add(scheduleLog);
} catch (Exception e) {
log.error("日志持久化错误");
}
}
}
com.studioustiger.job.config
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.executor.TaskExecutor;
import org.springframework.stereotype.Component;
/**
* @InterfaceName Worker
* @Description 开启定时任务需要传入Runnable接口的实现类,作为scheduledFuture.schedule(worker, cronTrigger)中的第一个参数,
* 所以"工作内容"实现Runnable接口是必须要做的。在run()中要执行的内容,我们可以封装成任务执行器TaskExecutor。
*
* 注意:对于任务执行器(TaskExecutor)我们最好不使用(也要看实际情况)spring的bean进行注入,因为使用bean后,上下文中始终使用的是一个
* 对象,可能会给你带来额外的问题(反正我是遇到了),建议使用构造函数传参。
* @Author huxuehao
**/
@Component
public class Worker implements Runnable {
private TaskExecutor taskExecutor;
private ScheduleTaskDto scheduleTask;
public Worker() {
}
public Worker(ScheduleTaskDto scheduleTask, TaskExecutor taskExecutor) {
this.scheduleTask = scheduleTask;
this.taskExecutor = taskExecutor;
}
public void run() {
taskExecutor.execute(scheduleTask);
}
}
import com.studioustiger.job.entity.ScheduleTaskDto;
import lombok.AllArgsConstructor;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
/**
* @ClassName TaskConfig
* @Description 定时任务配置
* @Author huxuehao
**/
@AllArgsConstructor
@SpringBootConfiguration
public class TaskConfig {
/**
* 描述:我们没开启一个定时任务,就会产生一个ScheduledFuture实体,当我们需要动态的操作定时任务(上述的实体)时,
* 我们需要调用scheduledFuture.cancel(true)或scheduledFuture.schedule(worker, cronTrigger),这就意味着
* 我们需要将每一个ScheduledFuture实体存储起来,当需要动态的控制定时任务时,我们去改Map中取出对应的实体进行上述的
* cancel或schedule操作即可。所以我们需要在spring容器中维护一个ScheduledFuture的注册表注册表。
*/
@Bean(name = "scheduledFutureMap")
public Map<String, ScheduledFuture> scheduledFutureMap() {
return new ConcurrentHashMap<>();
}
/**
* 描述:对于定时任务的动态操作(添加、开启、停止、删除)我们需要将上述操作封装成触发器,想要使用的时候
* 我们只需要取出已经初始化好的代码逻辑,取出对应的代码逻辑,传入参数,执行即可。上述描述是一种预处理的思想。
*
* 在这里我们使用java8的Consumer(消费型函数式接口)进行时间,并将实现了不同代码逻辑的Consumer存储到Map中,
* 这样我们一方面可以实现动态扩展,另一方面省去了多 if-else 的操作。
*/
@Bean(name = "operationMap")
public Map<String, Consumer<ScheduleTaskDto>> operationMap() {
return new ConcurrentHashMap<>();
}
/**
* 一个用于开启定时任务的线程池;我们关注的核心方法是:threadPoolTaskScheduler.schedule(工作内容, 触发器)
*/
@Bean(name = "threadPoolTaskScheduler")
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
threadPoolTaskScheduler.setThreadNamePrefix("WorKerThread:");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(30);
return threadPoolTaskScheduler;
}
}
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.studioustiger.job.constant.JobConstant;
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.executor.TaskExecutor;
import com.studioustiger.job.service.ScheduleTaskService;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* @ClassName InitConfig
* @Description 初始化配置
* scheduledFutureMap:作为ScheduledFuture的注册表,用于我们来操作其开启关闭,key为定时任务id。
* operationMap: 存储的定时任务操作触发器,参数是ScheduleTaskDto。
* threadPoolTaskScheduler:定时任务线程池,线程池的大小使用的是Runtime.getRuntime().availableProcessors()。
* scheduleTaskService:操作schedule_task的Service。
* initSchedule():从数据表中寻找有效的定时任务,并执行定时任务。
* initOperationMap():初始化定时任务操作触发器,最终初始化好的定时任务操作会被填充到operationMap中,
* 其目的是为了动态的控制定时任务的新增、开启、关闭。使用Consumer作为定时任务预处理载体。
* @Author huxuehao
**/
@SpringBootConfiguration
@RequiredArgsConstructor
public class InitConfig {
private static final Log log = LogFactory.get(TaskConfig.class);
/* 走构造注入,使用lombok生成全参数构造 */
private final Map<String, ScheduledFuture> scheduledFutureMap;
private final Map<String, Consumer<ScheduleTaskDto>> operationMap;
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final ScheduleTaskService scheduleTaskService;
private final TaskExecutor taskExecutor;
/**
* 描述:初始化定时任务
*/
@PostConstruct
public void initSchedule() {
List<ScheduleTaskDto> taskList = scheduleTaskService.list();
List<ScheduleTaskDto> enableTask = taskList.stream().filter(item -> "1".equals(item.getEnable())).collect(Collectors.toList());
enableTask.forEach(item -> {
Worker worker = new Worker(item, taskExecutor);
CronTrigger cronTrigger = item.toCronTrigger();
ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(worker, cronTrigger);
scheduledFutureMap.put(item.getId(), schedule);
log.info("定时任务:[{}]初始化完成", item.getName());
});
initOperationMap();
}
/**
* 描述:初始化定时任务操作触发器
* 我们关注的核心方法是:
* threadPoolTaskScheduler.schedule(工作内容, 触发器) //此方法用于开启一个定时任务
* scheduledFuture.cancel(true) //此方法用于取消一个定时任务
*/
private void initOperationMap() {
/* 定时任务:打开/更新操作 */
Consumer<ScheduleTaskDto> openSchedule = item -> {
String scheduleId = item.getId();
/* 当定时任务已经存在与scheduledFutureMap中*/
if (scheduledFutureMap.containsKey(scheduleId)) {
/* 重新计算scheduledFutureMap中key为scheduledId的value的值 */
scheduledFutureMap.compute(scheduleId, (k, v) -> {
/* 先判空,如果对象(ScheduledFuture)存在,则将其先停跳 */
Optional.ofNullable(v).ifPresent(v0 -> v0.cancel(true));
/* 开启一个新的定时 */
Worker worker = new Worker(item, taskExecutor);
CronTrigger cronTrigger = item.toCronTrigger();
return threadPoolTaskScheduler.schedule(worker, cronTrigger);
});
}
/* 当定时任务不存在scheduledFutureMap中,则新建定时任务,并添加到map中 */
else {
Worker worker = new Worker(item, taskExecutor);
if (worker != null) {
CronTrigger cronTrigger = item.toCronTrigger();
ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(worker, cronTrigger);
scheduledFutureMap.put(scheduleId, schedule);
}
}
};
operationMap.put(JobConstant.OPEN_SCHEDULE, openSchedule);
log.info("定时任务操作注册表:[{}]初始化完成",JobConstant.OPEN_SCHEDULE);
// 定时任务:关闭操作
Consumer<ScheduleTaskDto> closeSchedule = item -> {
String scheduleId = item.getId();
// 从scheduledFutureMap中获取scheduledId对应的定时任务
ScheduledFuture scheduledFuture = scheduledFutureMap.get(scheduleId);
/* 先判空,如果对象(ScheduledFuture)存在,则停止定时 */
Optional.ofNullable(scheduledFuture).ifPresent(schedule -> schedule.cancel(true));
};
operationMap.put(JobConstant.CLOSE_SCHEDULE, closeSchedule);
log.info("定时任务操作注册表:[{}]初始化完成",JobConstant.CLOSE_SCHEDULE);
// 定时任务:删除操作
Consumer<ScheduleTaskDto> deleteSchedule = item -> {
String scheduleId = item.getId();
// 从scheduledFutureMap中获取scheduledId对应的定时任务
ScheduledFuture scheduledFuture = scheduledFutureMap.get(scheduleId);
/* 先判空,如果对象(ScheduledFuture)存在,则停止定时 */
Optional.ofNullable(scheduledFuture).ifPresent(schedule -> schedule.cancel(true));
// 从注册表中移除
scheduledFutureMap.remove(scheduleId);
};
operationMap.put(JobConstant.DELETE_SCHEDULE, deleteSchedule);
log.info("定时任务操作注册表:[{}]初始化完成",JobConstant.DELETE_SCHEDULE);
}
}
com.studioustiger.job.scheduled
import com.studioustiger.job.constant.LogProperties;
import com.studioustiger.job.service.ScheduleLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
/**
* @ClassName defaultScheduled
* @Description 默认定时任务,可以在该类中进行一些常规的定时任务操作
* @Author huxuehao
**/
@Component
public class defaultScheduled {
@Autowired
ScheduleLogService scheduleLogService;
@Autowired
LogProperties logProperties;
/* 每天1点30分清除一次日志 */
@Scheduled(cron = "0 30 1 * * ?")
public void clearLog() {
String endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis() - 86400000 * logProperties.getSaveDays());
scheduleLogService.clearLog(endTime);
}
}
com.studioustiger.job.controller
import com.studioustiger.core.tool.api.R;
import com.studioustiger.job.constant.JobConstant;
import com.studioustiger.job.entity.ScheduleTaskDto;
import com.studioustiger.job.entity.ScheduleTaskPo;
import com.studioustiger.job.executor.TaskExecutor;
import com.studioustiger.job.service.ScheduleLogService;
import com.studioustiger.job.service.ScheduleTaskService;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Description;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* @ClassName TaskManagerController
* @Description 任务管理接口,这里我就使用构造函数进行依赖注入了
* @Author huxuehao
**/
@RequiredArgsConstructor
@RestController
@RequestMapping(value = "/task")
public class TaskManagerController {
private final ScheduleTaskService scheduleTaskService;
private final ScheduleLogService scheduleLogService;
private final Map<String, Consumer<ScheduleTaskDto>> operationMap;
private final TaskExecutor taskExecutor;
@Description(value = "获取最新的错误日志")
@GetMapping(value = "/log/by-task-id")
public R latestLogByTask(@RequestParam("taskId") String taskId) {
return R.data(scheduleLogService.latestLogByTask(taskId));
}
@Description(value = "上一条日志")
@GetMapping(value = "/log/last")
public R lastLog(@RequestParam("taskId") String taskId,
@RequestParam("executeTime") String executeTime) {
return R.data(scheduleLogService.lastLog(taskId, executeTime));
}
@Description(value = "下一条日志")
@GetMapping(value = "/log/next")
public R nextLog(@RequestParam("taskId") String taskId,
@RequestParam("executeTime") String executeTime) {
return R.data(scheduleLogService.nextLog(taskId, executeTime));
}
@Description(value = "执行任务")
@GetMapping(value = "/execute")
public R execute(@RequestParam("id") String id) {
ScheduleTaskDto task = scheduleTaskService.getById(id);
boolean execute = taskExecutor.execute(task);
if (execute) {
return R.data("success");
}
return R.data("fail");
}
@Description(value = "刷新执行结果")
@GetMapping(value = "/refresh-result")
public R refreshResult(@RequestParam("id") String id) {
return R.data(scheduleTaskService.refreshResult(id));
}
@Description(value = "添加任务")
@PostMapping(value = "/add")
public R addTask(@RequestBody ScheduleTaskDto scheduleTask) {
if (!isValidCron(scheduleTask.getCron())) {
return R.fail("cron表达式校验失败");
}
scheduleTaskService.add(scheduleTask);
return R.success("操作成功");
}
@Description(value = "更新任务")
@PostMapping(value = "/update")
public R updateTask(@RequestBody ScheduleTaskDto scheduleTask) {
if (!isValidCron(scheduleTask.getCron())) {
return R.fail("cron表达式校验失败");
}
scheduleTaskService.update(scheduleTask);
if ("1".equals(scheduleTask.getEnable())) {
openSchedule(Arrays.asList(scheduleTask.getId()));
}
return R.success("操作成功");
}
@Description(value = "删除任务")
@PostMapping(value = "/delete")
public R deleteTask(@RequestBody List<String> ids) {
if (ids != null && ids.size() > 0) {
deleteSchedule(ids);
scheduleTaskService.deleteByIds(ids);
}
return R.success("操作成功");
}
@Description(value = "暂停任务")
@PostMapping(value = "/disable")
public R disableTask(@RequestBody List<String> ids) {
if (ids != null && ids.size() > 0) {
scheduleTaskService.disableByIds(ids);
closeSchedule(ids);
}
return R.success("操作成功");
}
@Description(value = "启动任务")
@PostMapping(value = "/enable")
public R enableTask(@RequestBody List<String> ids) {
if (ids != null && ids.size() > 0) {
scheduleTaskService.enableByIds(ids);
openSchedule(ids);
}
return R.success("操作成功");
}
@Description(value = "启动全部任务")
@GetMapping(value = "/enable-all")
public R enableAllTask() {
List<ScheduleTaskDto> list = scheduleTaskService.list();
if (list != null && list.size() >0) {
List<String> ids = list.stream().map(item -> item.getId()).collect(Collectors.toList());
openSchedule(ids);
}
return R.success("操作成功");
}
@Description(value = "关闭全部任务")
@GetMapping(value = "/disable-all")
public R disableAllTask() {
List<ScheduleTaskDto> list = scheduleTaskService.list();
if (list != null && list.size() >0) {
List<String> ids = list.stream().map(item -> item.getId()).collect(Collectors.toList());
closeSchedule(ids);
}
return R.success("操作成功");
}
@Description(value = "获取任务详情")
@GetMapping(value = "/detail")
public R taskDetail(@RequestParam("id") String id) {
return R.data(scheduleTaskService.getById(id));
}
@Description(value = "获取任务分页")
@GetMapping(value = "/page")
public R taskPage(@RequestParam(required = false, value = "current") Integer current,
@RequestParam(required = false, value = "size") Integer size,
@RequestParam(required = false, value = "taskName") String taskName,
@RequestParam(required = false, value = "taskType") String taskType,
@RequestParam(required = false, value = "taskStatus") String taskStatus) {
int total = scheduleTaskService.getTotals(taskName, taskType, taskStatus);
List<ScheduleTaskPo> page = scheduleTaskService.getPage((current - 1) * size, size, taskName, taskType, taskStatus);
Map pageRes = new LinkedHashMap<>();
int pages = total / size + (total % size == 0 ? 0: 1);
pageRes.put("pages",pages);
pageRes.put("records",page);
pageRes.put("size",size);
pageRes.put("current",current);
pageRes.put("total",total);
pageRes.put("optimizeCountSql",true);
pageRes.put("searchCount",true);
return R.data(pageRes);
}
@Description(value = "获取任务列表")
@GetMapping(value = "/list")
public R taskList() {
List<ScheduleTaskDto> list = scheduleTaskService.list();
return R.data(list);
}
/* 开启定时 */
private void openSchedule(List<String> ids) {
List<ScheduleTaskDto> taskList = scheduleTaskService.listByIds(ids);
if (taskList != null && taskList.size() > 0) {
taskList.forEach(item -> {
operationMap.get(JobConstant.OPEN_SCHEDULE).accept(item);
});
}
}
/* 关闭定时 */
private void closeSchedule(List<String> ids) {
System.out.println(operationMap.toString());
List<ScheduleTaskDto> taskList = scheduleTaskService.listByIds(ids);
if (taskList != null && taskList.size() > 0) {
taskList.forEach(item -> {
operationMap.get(JobConstant.CLOSE_SCHEDULE).accept(item);
});
}
}
/* 删除定时 */
private void deleteSchedule(List<String> ids) {
List<ScheduleTaskDto> taskList = scheduleTaskService.listByIds(ids);
if (taskList != null && taskList.size() > 0) {
taskList.forEach(item -> {
operationMap.get(JobConstant.DELETE_SCHEDULE).accept(item);
});
}
}
private boolean isValidCron(String cronStr) {
return CronExpression.isValidExpression(cronStr);
}
}
效果图
抱歉,没有前端代码(我的前端是基于配置文件配出来的),大家使用elementUI搭即可。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献4条内容
所有评论(0)