背景:

需求: 业务中需要批量处理任务,且需要每个任务间隔一段时间。最好在不同服务器同时运行不影响每个任务间隔。
部署环境: 没有mq队列,有redis。
秉着尽量不多增加系统复杂度的情况,使用redis来实现队列功能。

首先看一下代码:

1.1.核心代码


import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @program: wys-service
 * @description:
 * @author: wuyuanshn
 * @create: 2023-06-07 14:20
 **/
@Slf4j
@Service
public class ListRedisQueue {
    @Resource
    private RedisServiceUtils redisServiceUtils;


    //队列名 OBJECT可以替换为自己的项目名
    public static final String LOCK_KEY = "OBJECT_LIST_QUEUE_%s";

    @Resource
    private RedisTemplate redisTemplate;

    public void produce(String key, String message) {
        redisTemplate.opsForList().rightPush(key, message);

    }

    public void produce(String key, Object... vs) {
        redisTemplate.opsForList().rightPush(key, vs);
    }

    public void produce(String key, List<String> messageList) {
        redisTemplate.opsForList().rightPushAll(key, messageList);
    }

    public void consume(String key) {
        String lockKey = String.format(LOCK_KEY, key);
        try {
            boolean lock = redisServiceUtils.getLock(lockKey, 1800);
            if (!lock) {
                log.info("ListRedisQueue consume lock ");
                return;
            }

            while (true) {
                String msg = (String) redisTemplate.opsForList().leftPop(key);
                if (StringUtils.isBlank(msg)) {
                    log.info("ListRedisQueue consume 获取 end");
                    //执行队列结束触发任务
                    executeEndByKey(key);
                    return;
                }
                log.info("ListRedisQueue consume 获取消息:{}", msg);
                //执行队列任务
                executeByKey(key, msg);

                //更新key过期时间
                redisTemplate.expire(lockKey, 15, TimeUnit.MINUTES);

                try {
                    Random random = new Random();
                    int i = random.nextInt(2000) + 1000;
                    log.info("ListRedisQueue consume sleep:{}", i);
                    Thread.sleep(i);
                } catch (Exception e) {
                    log.error("ListRedisQueue consume sleep error", e);
                }
            }
        } catch (Exception e) {
            log.error("ListRedisQueue consume error", e);
        } finally {
            redisServiceUtils.releaseLock(lockKey);
        }

    }

    /**
     * 执行队列任务
     *
     * @param key
     * @param value
     */
    public void executeByKey(String key, String value) {
        ListQueueNameEnum structureEnum = ListQueueNameEnum.getStructureEnum(key);
        if (structureEnum == null) {
            return;
        }
        switch (structureEnum) {
            case URL_BASE:
                if (StringUtils.isNotBlank(value)) {
                    try {
                        //业务
                    } catch (Exception e) {
                        log.error("ListRedisQueue executeByKey ERROR ", e);
                    }
                }
                return;
            default:
        }
    }

    /**
     * 执行队列结束触发任务
     *
     * @param key
     */
    public void executeEndByKey(String key) {
        ListQueueNameEnum structureEnum = ListQueueNameEnum.getStructureEnum(key);
        if (structureEnum == null) {
            return;
        }
        switch (structureEnum) {
            case URL_BASE:
				// 结束业务
                return;
            default:

        }


    }


}



代码讲解:

  • 每个任务(相当于mq消息)接收到后需要如何处理,就在此方法中编辑 executeByKey 。
  • 任务(mq消息)处理完毕时,如果需要做相关逻辑操作,如通知某程序等,在executeEndByKey方法编写。
  • 睡眠时间 random.nextInt(2000) + 1000; 可以根据自己的业务时间情况进行修改。
  • redis锁 代码没有粘贴,使用自己项目中的锁即可。如果项目中没有 网上也有很多。 RedisServiceUtils

1.2.可以根据不同业务类型创建不同枚举(相当于mq名称)



import org.apache.commons.lang.StringUtils;

/**
 * @program: wys-service
 * @description: redis队列名称列表
 * @author: wuyuanshn
 * @create: 2023-06-07 17:15
 **/
public enum ListQueueNameEnum {
    TX_URL_BASE("URL_BASE", "信息"),
    ;

    private String code;
    private String value;

    ListQueueNameEnum(String code, String value) {
        this.code = code;
        this.value = value;
    }

    public String getValue() {
        return value;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public void setValue(String value) {
        this.value = value;
    }

    /**
     * 根据统计编号获取对应枚举实例
     *
     * @param value
     * @return
     */
    public static ListQueueNameEnum getStructureEnum(String value) {
        if (StringUtils.isBlank(value)) {
            return null;
        }
        for (ListQueueNameEnum result : ListQueueNameEnum.values()) {
            if (result.name().equals(value)) {
                return result;
            }
        }
        return null;
    }

}


1.3.测试接口

测试只需要调用consume接口 即可。


    @Resource
    private ListRedisQueue listRedisQueue;

 @PostMapping(value = "/produce")
    public void produce() {
        for (int i = 0; i < 1; i++) {
            listRedisQueue.produce(ListQueueNameEnum.URL_BASE.getCode(), "xxxx", "1233", "345566");
        }
    }

/** 测试只需要调用consume 此方法即可。
*/
    @PostMapping(value = "/consume")
    public void consume() {
      produce(); 
      log.info("生产消息完毕");
        listRedisQueue.consume(ListQueueNameEnum.URL_BASE.getCode());
    }

总结

是使用redis代替mq功能,终究还是比较复杂,虽然实现了多项目间公用一个队列的需求。但是并不完美。如果可以还是尽量用合适的服务如mq等来实现业务。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐