java redis实现消息队列功能
需求:业务中需要批量处理任务,且需要每个任务间隔一段时间。最好在不同服务器同时运行不影响每个任务间隔。部署环境:没有mq队列,有redis。
·
java redis实现消息队列功能
背景:
需求: 业务中需要批量处理任务,且需要每个任务间隔一段时间。最好在不同服务器同时运行不影响每个任务间隔。
部署环境: 没有mq队列,有redis。
秉着尽量不多增加系统复杂度的情况,使用redis来实现队列功能。
首先看一下代码:
1.1.核心代码
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @program: wys-service
* @description:
* @author: wuyuanshn
* @create: 2023-06-07 14:20
**/
@Slf4j
@Service
public class ListRedisQueue {
@Resource
private RedisServiceUtils redisServiceUtils;
//队列名 OBJECT可以替换为自己的项目名
public static final String LOCK_KEY = "OBJECT_LIST_QUEUE_%s";
@Resource
private RedisTemplate redisTemplate;
public void produce(String key, String message) {
redisTemplate.opsForList().rightPush(key, message);
}
public void produce(String key, Object... vs) {
redisTemplate.opsForList().rightPush(key, vs);
}
public void produce(String key, List<String> messageList) {
redisTemplate.opsForList().rightPushAll(key, messageList);
}
public void consume(String key) {
String lockKey = String.format(LOCK_KEY, key);
try {
boolean lock = redisServiceUtils.getLock(lockKey, 1800);
if (!lock) {
log.info("ListRedisQueue consume lock ");
return;
}
while (true) {
String msg = (String) redisTemplate.opsForList().leftPop(key);
if (StringUtils.isBlank(msg)) {
log.info("ListRedisQueue consume 获取 end");
//执行队列结束触发任务
executeEndByKey(key);
return;
}
log.info("ListRedisQueue consume 获取消息:{}", msg);
//执行队列任务
executeByKey(key, msg);
//更新key过期时间
redisTemplate.expire(lockKey, 15, TimeUnit.MINUTES);
try {
Random random = new Random();
int i = random.nextInt(2000) + 1000;
log.info("ListRedisQueue consume sleep:{}", i);
Thread.sleep(i);
} catch (Exception e) {
log.error("ListRedisQueue consume sleep error", e);
}
}
} catch (Exception e) {
log.error("ListRedisQueue consume error", e);
} finally {
redisServiceUtils.releaseLock(lockKey);
}
}
/**
* 执行队列任务
*
* @param key
* @param value
*/
public void executeByKey(String key, String value) {
ListQueueNameEnum structureEnum = ListQueueNameEnum.getStructureEnum(key);
if (structureEnum == null) {
return;
}
switch (structureEnum) {
case URL_BASE:
if (StringUtils.isNotBlank(value)) {
try {
//业务
} catch (Exception e) {
log.error("ListRedisQueue executeByKey ERROR ", e);
}
}
return;
default:
}
}
/**
* 执行队列结束触发任务
*
* @param key
*/
public void executeEndByKey(String key) {
ListQueueNameEnum structureEnum = ListQueueNameEnum.getStructureEnum(key);
if (structureEnum == null) {
return;
}
switch (structureEnum) {
case URL_BASE:
// 结束业务
return;
default:
}
}
}
代码讲解:
- 每个任务(相当于mq消息)接收到后需要如何处理,就在此方法中编辑 executeByKey 。
- 任务(mq消息)处理完毕时,如果需要做相关逻辑操作,如通知某程序等,在executeEndByKey方法编写。
- 睡眠时间 random.nextInt(2000) + 1000; 可以根据自己的业务时间情况进行修改。
- redis锁 代码没有粘贴,使用自己项目中的锁即可。如果项目中没有 网上也有很多。 RedisServiceUtils
1.2.可以根据不同业务类型创建不同枚举(相当于mq名称)
import org.apache.commons.lang.StringUtils;
/**
* @program: wys-service
* @description: redis队列名称列表
* @author: wuyuanshn
* @create: 2023-06-07 17:15
**/
public enum ListQueueNameEnum {
TX_URL_BASE("URL_BASE", "信息"),
;
private String code;
private String value;
ListQueueNameEnum(String code, String value) {
this.code = code;
this.value = value;
}
public String getValue() {
return value;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public void setValue(String value) {
this.value = value;
}
/**
* 根据统计编号获取对应枚举实例
*
* @param value
* @return
*/
public static ListQueueNameEnum getStructureEnum(String value) {
if (StringUtils.isBlank(value)) {
return null;
}
for (ListQueueNameEnum result : ListQueueNameEnum.values()) {
if (result.name().equals(value)) {
return result;
}
}
return null;
}
}
1.3.测试接口
测试只需要调用consume接口 即可。
@Resource
private ListRedisQueue listRedisQueue;
@PostMapping(value = "/produce")
public void produce() {
for (int i = 0; i < 1; i++) {
listRedisQueue.produce(ListQueueNameEnum.URL_BASE.getCode(), "xxxx", "1233", "345566");
}
}
/** 测试只需要调用consume 此方法即可。
*/
@PostMapping(value = "/consume")
public void consume() {
produce();
log.info("生产消息完毕");
listRedisQueue.consume(ListQueueNameEnum.URL_BASE.getCode());
}
总结
是使用redis代替mq功能,终究还是比较复杂,虽然实现了多项目间公用一个队列的需求。但是并不完美。如果可以还是尽量用合适的服务如mq等来实现业务。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献3条内容
所有评论(0)