【使用redisson完成延迟队列的功能】使用redisson配合线程池完成异步执行功能,延迟队列和不需要延迟的队列
添加延迟队列时使用,监测扫描时也会用这个工具类进行获取消息。
·
1. 使用redisson完成延迟队列的功能
引入依赖
spring-boot-starter-actuator是Spring Boot提供的一个用于监控和管理应用程序的模块
用于查看应用程序的健康状况、审计信息、指标和其他有用的信息。这些端点可以帮助你监控应用程序的运行状态、性能指标和健康状况。
已经有了其他的监控和管理工具,不需要使用Spring Boot Actuator提供的功能。
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</exclusion>
</exclusions>
</dependency>
1.1 延时队列工具类
添加延迟队列时使用,监测扫描时也会用这个工具类进行获取消息
package cn.creatoo.common.redis.queue;
import cn.creatoo.common.core.utils.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 分布式延时队列工具类
* @author
*/
@Component
@ConditionalOnBean({RedissonClient.class})
public class RedisDelayQueueUtil {
private static final Logger log = LoggerFactory.getLogger(RedisDelayQueueUtil.class);
@Resource
private RedissonClient redissonClient;
/**
* 添加延迟队列
*
* @param value 队列值
* @param delay 延迟时间
* @param timeUnit 时间单位
* @param queueCode 队列键
* @param <T>
*/
public <T> boolean addDelayQueue(@NonNull T value, @NonNull long delay, @NonNull TimeUnit timeUnit, @NonNull String queueCode) {
if (StringUtils.isBlank(queueCode) || Objects.isNull(value)) {
return false;
}
try {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(value, delay, timeUnit);
//delayedQueue.destroy();
log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
} catch (Exception e) {
log.error("(添加延时队列失败) {}", e.getMessage());
throw new RuntimeException("(添加延时队列失败)");
}
return true;
}
/**
* 获取延迟队列
*
* @param queueCode
* @param <T>
*/
public <T> T getDelayQueue(@NonNull String queueCode) throws InterruptedException {
if (StringUtils.isBlank(queueCode)) {
return null;
}
RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Map> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
T value = (T) blockingDeque.poll();
return value;
}
/**
* 删除指定队列中的消息
*
* @param o 指定删除的消息对象队列值(同队列需保证唯一性)
* @param queueCode 指定队列键
*/
public boolean removeDelayedQueue(@NonNull Object o, @NonNull String queueCode) {
if (StringUtils.isBlank(queueCode) || Objects.isNull(o)) {
return false;
}
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
boolean flag = delayedQueue.remove(o);
//delayedQueue.destroy();
return flag;
}
}
1.2 延迟队列执行器
package cn.creatoo.system.handler;
/**
* 延迟队列执行器
*/
public interface RedisDelayQueueHandle<T> {
void execute(T t);
}
1.3 实现队列执行器
实现队列执行器接口,在这里写延迟要做的业务逻辑
package cn.creatoo.system.handler.impl;
import cn.creatoo.common.core.domain.vo.WaterVo;
import cn.creatoo.system.api.RemoteFileService;
import cn.creatoo.system.handler.RedisDelayQueueHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component("exposeLinkCloudDelay")
public class ExposeLinkCloudDelay implements RedisDelayQueueHandle<Map> {
@Autowired
private RemoteFileService remoteFileService;
@Override
public void execute(Map map) {
long dataId = Long.parseLong(map.get("dataId").toString());
WaterVo waterVo = new WaterVo();
waterVo.setFileLink(map.get("fileLink").toString());
waterVo.setType(Integer.parseInt(map.get("type").toString()));
waterVo.setDataId(dataId);
remoteFileService.waterLink(waterVo);
}
}
1.4 延迟队列业务枚举类
package cn.creatoo.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* 延迟队列业务枚举类
* @author shang tf
* @data 2024/3/21 14:52
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum FileRedisDelayQueueEnum {
EXPOSE_LINK_DELAY("EXPOSE_LINK_DELAY","资源链接处理","exposeLinkDelay"),
EXPOSE_LINK_CLOUD_DELAY("EXPOSE_LINK_CLOUD_DELAY","资源链接处理","exposeLinkCloudDelay"),
COMPRESSED_LINK_DELAY("COMPRESSED_LINK_DELAY","文件压缩处理","compressedLinkDelay"),
UPLOAD_TO_CLOUD_DELAY("UPLOAD_TO_CLOUD_DELAY","资源上传消费端","uploadToCloudDelay"),
GET_HASHCODE_DELAY("GET_HASHCODE_DELAY","资源hash值获取","getHashcodeDelay"),
UPLOAD_FILE_TO_CABINET("UPLOAD_FILE_CABINET","异步添加文件到数据柜","uploadFileCabinet");
/**
* 延迟队列 Redis Key
*/
private String code;
/**
* 中文描述
*/
private String name;
/**
* 延迟队列具体业务实现的 Bean
* 可通过 Spring 的上下文获取
*/
private String beanId;
}
1.5 启动延迟队列监测扫描
package cn.creatoo.system.handler.impl;
import cn.creatoo.common.core.enums.FileRedisDelayQueueEnum;
import cn.creatoo.common.redis.queue.RedisDelayQueueUtil;
import cn.creatoo.system.handler.RedisDelayQueueHandle;
import com.alibaba.fastjson2.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author shang tf
* @data 2024/3/14 10:45
* 启动延迟队列监测扫描
* 文件处理的延迟队列线程池
*/
@Slf4j
@Component
public class FileRedisDelayQueueRunner implements CommandLineRunner {
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@Autowired
private ApplicationContext context;
@Autowired
private ThreadPoolTaskExecutor ptask;
@Value("${file-thread-pool.core-pool-size:1}")
private int corePoolSize;
@Value("${file-thread-pool.maximum-pool-size:1}")
private int maximumPoolSize;
private ThreadPoolExecutor executorService;
/**
* 程序加载配置文件后,延迟创建线程池
*/
@PostConstruct
public void init() {
executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000), new ThreadFactoryBuilder().setNameFormat("delay-queue-%d").build());
}
@Override
public void run(String... args) {
ptask.execute(() -> {
while (true) {
try {
FileRedisDelayQueueEnum[] queueEnums = FileRedisDelayQueueEnum.values();
for (FileRedisDelayQueueEnum queueEnum : queueEnums) {
Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
if (value != null) {
System.out.println("----------------value:" + JSON.toJSONString(value));
RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) context.getBean(queueEnum.getBeanId());
executorService.execute(() -> {
redisDelayQueueHandle.execute(value);
});
}
}
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("(FileRedission延迟队列监测异常中断) {}", e.getMessage());
}
}
});
log.info("(FileRedission延迟队列监测启动成功)");
}
}
1.6 使用延迟队列
使用时在需要延时的地方。
通过注入RedisDelayQueueUtil
,使用addDelayQueue
方法进行添加延迟任务。
Map<String, String> map = new HashMap<>();
map.put("dataId", examineVo.getId().toString());
map.put("fileLink", resourceLink);
map.put("type", resourceType.toString());
map.put("remark", "资源链接处理");
// 5秒后执行exposeLinkCloudDelay中的方法
redisDelayQueueUtil.addDelayQueue(map, 5, TimeUnit.SECONDS, FileRedisDelayQueueEnum.EXPOSE_LINK_CLOUD_DELAY.getCode());
2. 使用redisson完成不延时队列的功能
2.1 分布式队列工具类
package cn.creatoo.common.redis.queue;
import cn.creatoo.common.core.utils.StringUtils;
import org.redisson.api.RBoundedBlockingQueue;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 分布式队列工具类
*/
@Component
@ConditionalOnBean({RedissonClient.class})
public class RedisBlockQueueUtil {
private static final Logger log = LoggerFactory.getLogger(RedisBlockQueueUtil.class);
@Resource
private RedissonClient redissonClient;
//
public <T> boolean addQueue(@NonNull T value, @NonNull String queueCode) {
if (StringUtils.isBlank(queueCode) || Objects.isNull(value)) {
return false;
}
try {
RBoundedBlockingQueue<T> queue = redissonClient.getBoundedBlockingQueue(queueCode);
queue.trySetCapacity(10000);
queue.put(value);
} catch (Exception e) {
throw new RuntimeException("(添加redisson队列失败)");
}
return true;
}
/**
* 获取队列
* @param queueCode
* @param <T>
*/
public <T> T getQueuePeek(@NonNull String queueCode) throws InterruptedException {
if (StringUtils.isBlank(queueCode)) {
return null;
}
RQueue<T> queue = redissonClient.getBoundedBlockingQueue(queueCode);
T obj = (T) queue.peek();
return obj;
}
public <T> T getQueueTake(@NonNull String queueCode) throws InterruptedException {
if (StringUtils.isBlank(queueCode)) {
return null;
}
RBoundedBlockingQueue<T> queue = redissonClient.getBoundedBlockingQueue(queueCode);
T obj = (T) queue.take();
return obj;
}
}
2.2 队列业务枚举
package cn.creatoo.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* 队列业务枚举
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisQueueEnum {
FLOW_RECORD("redissionQueue:FLOW_RECORD", "流量流水"),
USER_LOGIN_RECORD("redissionQueue:USER_LOGIN_RECORD", "用户登录流水"),
USER_REGISTER_RECORD("redissionQueue:USER_REGISTER_RECORD", "用户注册流水"),
SMS_SEND_RECORD("redissionQueue:SMS_SEND_RECORD", "短信流水");
/**
* 队列 Redis Key
*/
private String code;
/**
* 中文描述
*/
private String name;
}
2.3 启动队列监测扫描
package cn.creatoo.system.handler.impl;
import cn.creatoo.common.core.enums.RedisQueueEnum;
import cn.creatoo.common.core.utils.StringUtils;
import cn.creatoo.common.mongodb.model.FlowStatistics;
import cn.creatoo.common.mongodb.model.MessageSendRecord;
import cn.creatoo.common.mongodb.model.UserLogin;
import cn.creatoo.common.mongodb.model.UserRegister;
import cn.creatoo.common.redis.queue.RedisBlockQueueUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* description: 启动队列监测扫描
*/
@Slf4j
@Component
public class RedisQueueRunner implements CommandLineRunner {
@Autowired
private RedisBlockQueueUtil redisBlockQueueUtil;
//@Autowired
//private IBdStatcountService bdStatcountService;
@Autowired
private ThreadPoolTaskExecutor ptask;
@Resource
private MongoTemplate mongoTemplate;
//@Autowired
//private BdAdminHomeService bdAdminHomeService;
@Value("${prodHost.mall}")
private String mallHost;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),new ThreadFactoryBuilder().setNameFormat("queue-%d").build());
@Override
public void run(String... args) throws Exception {
ptask.execute(() -> {
while (true){
try {
RedisQueueEnum[] queueEnums = RedisQueueEnum.values();
for (RedisQueueEnum queueEnum : queueEnums) {
Object value = redisBlockQueueUtil.getQueuePeek(queueEnum.getCode());
if (value != null) {
executorService.execute(() -> {
try {
//System.out.println(value.toString());
if(queueEnum.getCode().equals(RedisQueueEnum.FLOW_RECORD.getCode())){
FlowStatistics flowStatistics = redisBlockQueueUtil.getQueueTake(queueEnum.getCode());
/* if(flowStatistics!=null && StringUtils.isNotBlank(flowStatistics.getUrl())){
mongoTemplate.insert(flowStatistics, "pv_" + new SimpleDateFormat("yyyy").format(new Date()));
// 添加首页统计缓存
bdAdminHomeService.addDetailCache(flowStatistics);
if(StringUtils.isNotBlank(flowStatistics.getUrl())){
bdStatcountService.browseByUrl(flowStatistics.getUrl());
}
}*/
} else if (queueEnum.getCode().equals(RedisQueueEnum.USER_LOGIN_RECORD.getCode())) {
UserLogin userLogin = redisBlockQueueUtil.getQueueTake(queueEnum.getCode());
mongoTemplate.insert(userLogin, "user_login_" + new SimpleDateFormat("yyyy").format(new Date()));
} else if (queueEnum.getCode().equals(RedisQueueEnum.USER_REGISTER_RECORD.getCode())) {
UserRegister userRegister = redisBlockQueueUtil.getQueueTake(queueEnum.getCode());
mongoTemplate.insert(userRegister, "user_register");
} else if (queueEnum.getCode().equals(RedisQueueEnum.SMS_SEND_RECORD.getCode())) {
MessageSendRecord sendRecord = redisBlockQueueUtil.getQueueTake(queueEnum.getCode());
mongoTemplate.insert(sendRecord, "sms_send_" + new SimpleDateFormat("yyyy").format(new Date()));
}
} catch (InterruptedException e) {
log.error("(Redission队列监测异常中断) {}", e.getMessage());
}
});
}
}
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("(Redission队列监测异常中断) {}", e.getMessage());
}
}
});
log.info("(Redission队列监测启动成功)");
}
}
2.4 使用
这个是直接执行,没有延迟的功能
redisBlockQueueUtil.addQueue(userRegister, RedisQueueEnum.USER_REGISTER_RECORD.getCode());
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献5条内容
所有评论(0)