一个简单的(基于redisson的)分布式同步工具类封装

背景说明

有些分布式同步逻辑不需要作用于整个方法,只需要作用于指定的业务逻辑代码块即可,类似于synchronized代码块。于是有了下面这个简单的封装类。

准备工作

提示:此同步工具类中的redis分布式锁直接采用redisson实现。

第一步:引入redisson依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.17.0</version>
</dependency>

第二步:配置RedissonClient

提示:这里的配置以单体redis为例,更多配置详见redisson官网

import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RedissonConfig
 *
 * @author JustryDeng
 * @since 2022/3/25 10:33
 */
@Slf4j
@Configuration
public class RedissonConfig {
    
    @Value("${spring.redis.host}")
    private String host;
    
    @Value("${spring.redis.port}")
    private String port;
    
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        String address = host + ":" + port;
        log.info("redis address -> {}", address);
        config.useSingleServer()
                .setAddress("redis://" + address);
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }
}

工具类

工具类接口

import lombok.Getter;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * redis锁支持
 *
 * @author JustryDeng
 * @since 2022/4/19 9:36
 */
public interface RedisLockSupport {
    
    /**
     * 执行同步逻辑
     * <br />
     * 此逻辑,应由redis lock保证全局同步
     *
     * @param function
     *            业务逻辑块
     * @param param
     *            参数
     *
     * @throws NotAcquiredRedisLockException 获取redis锁时抛出失败
     * @return 逻辑执行结果
     */
    <P, R> R exec(Function<P, R> function, P param) throws NotAcquiredRedisLockException;
    
    /**
     * 执行同步逻辑
     * <br />
     * 此逻辑,应由redis lock保证全局同步
     *
     * @param function
     *            业务逻辑块
     * @return 执行结果
     *
     * @throws NotAcquiredRedisLockException 获取redis锁时抛出失败
     */
    <R> R exec(NoArgFunction<R> function) throws NotAcquiredRedisLockException;
    
    /**
     * 执行同步逻辑
     * <br />
     * 此逻辑,应由redis lock保证全局同步
     *
     * @param consumer
     *            业务逻辑块
     * @param param
     *            参数
     *
     * @throws NotAcquiredRedisLockException 获取redis锁时抛出失败
     */
    <P> void voidExec(Consumer<P> consumer, P param) throws NotAcquiredRedisLockException;
    
    /**
     * 执行同步逻辑
     * <br />
     * 此逻辑,应由redis lock保证全局同步
     *
     * @param consumer
     *            业务逻辑块
     * @throws NotAcquiredRedisLockException 获取redis锁时抛出失败
     */
    void voidExec(NoArgConsumer consumer) throws NotAcquiredRedisLockException;
    
    /**
     * 获取redis lock失败
     *
     * @author JustryDeng
     * @since 2022/4/19 10:44
     */
    @Getter
    class NotAcquiredRedisLockException extends RuntimeException{
    
        /** 锁 key */
        private final String lockKey;
    
        /** 等待获取锁的最大时长 */
        private final long waitTime;
    
        /** waitTime的时间单位 */
        private final TimeUnit timeUnit;
    
        public NotAcquiredRedisLockException(String lockKey, long waitTime, TimeUnit timeUnit) {
            super(String.format("lockKey=%s, waitTime=%d, timeUnit=%s", lockKey, waitTime, timeUnit));
            this.lockKey = lockKey;
            this.waitTime = waitTime;
            this.timeUnit = timeUnit;
        }
    }
    
}

工具类接口的默认实现

import lombok.Getter;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * redis分布式锁默认实现
 * <p>
 *     使用示例见{@link com.ideaaedi.heywebuy.srv.RedisLockSupportTest}
 * </p>
 *
 * @author JustryDeng
 * @since 2022/4/19 10:08
 */
@Getter
public class DefaultRedisLockSupport implements RedisLockSupport {
    
    /** 默认的redisson客户端 */
    private static volatile RedissonClient defaultRedissonClient;
    
    /** redisson客户端(优先级高于defaultRedissonClient,当redissonClient不为null时,使用redissonClient) */
    protected RedissonClient redissonClient;
    
    /** 锁 key */
    protected final String lockKey;
    
    /** 等待获取锁的最大时长 */
    protected long waitTime = 1L;
    
    /** 释放锁的最大时长 */
    protected long leaseTime = 3L;
    
    /** WaitTime和LeaseTime的时间单位 */
    protected TimeUnit unit = TimeUnit.SECONDS;
    
    public DefaultRedisLockSupport(String lockKey) {
        this.lockKey = lockKey;
    }
    
    public DefaultRedisLockSupport(RedissonClient redissonClient, String lockKey) {
        this.redissonClient = redissonClient;
        this.lockKey = lockKey;
    }
    
    public DefaultRedisLockSupport(String lockKey, long waitTime, long leaseTime) {
        this.lockKey = lockKey;
        this.waitTime = waitTime;
        this.leaseTime = leaseTime;
    }
    
    public DefaultRedisLockSupport(RedissonClient redissonClient, String lockKey, long waitTime, long leaseTime) {
        this.redissonClient = redissonClient;
        this.lockKey = lockKey;
        this.waitTime = waitTime;
        this.leaseTime = leaseTime;
    }
    
    public DefaultRedisLockSupport(String lockKey, long waitTime, long leaseTime, TimeUnit unit) {
        this.lockKey = lockKey;
        this.waitTime = waitTime;
        this.leaseTime = leaseTime;
        this.unit = unit;
    }
    
    public DefaultRedisLockSupport(RedissonClient redissonClient, String lockKey, long waitTime, long leaseTime, TimeUnit unit) {
        this.redissonClient = redissonClient;
        this.lockKey = lockKey;
        this.waitTime = waitTime;
        this.leaseTime = leaseTime;
        this.unit = unit;
    }
    
    @Override
    public <P, R> R exec(Function<P, R> function, P param) throws NotAcquiredRedisLockException {
        RedissonClient client = redissonClient();
        RLock lock = client.getLock(lockKey);
        boolean obtainLock = false;
        try {
            obtainLock = lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            // ignore
        }
        if (obtainLock) {
            try {
                return function.apply(param);
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        }
        throw new NotAcquiredRedisLockException(lockKey, waitTime, unit);
    }
    
    @Override
    public <R> R exec(NoArgFunction<R> function) throws NotAcquiredRedisLockException {
        RedissonClient client = redissonClient();
        RLock lock = client.getLock(lockKey);
        boolean obtainLock = false;
        try {
            obtainLock = lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            // ignore
        }
        if (obtainLock) {
            try {
                return function.apply();
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        }
        throw new NotAcquiredRedisLockException(lockKey, waitTime, unit);
    }
    
    @Override
    public <P> void voidExec(Consumer<P> consumer, P param) throws NotAcquiredRedisLockException {
        RedissonClient client = redissonClient();
        RLock lock = client.getLock(lockKey);
        boolean obtainLock = false;
        try {
            obtainLock = lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            // ignore
        }
        if (obtainLock) {
            try {
                consumer.accept(param);
                return;
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        }
        throw new NotAcquiredRedisLockException(lockKey, waitTime, unit);
    }
    
    @Override
    public void voidExec(NoArgConsumer consumer) throws NotAcquiredRedisLockException {
        RedissonClient client = redissonClient();
        RLock lock = client.getLock(lockKey);
        boolean obtainLock = false;
        try {
            obtainLock = lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            // ignore
        }
        if (obtainLock) {
            try {
                consumer.accept();
                return;
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        }
        throw new NotAcquiredRedisLockException(lockKey, waitTime, unit);
    }
    
    /**
     * 获取RedissonClient实例
     *
     * @return  RedissonClient实例
     */
    protected RedissonClient redissonClient() {
        if (this.redissonClient != null) {
            return this.redissonClient;
        }
        if (DefaultRedisLockSupport.defaultRedissonClient != null) {
            return DefaultRedisLockSupport.defaultRedissonClient;
        }
        throw new IllegalStateException("There is not redissonClient available.");
    }
    
    /**
     * 初始化默认的Redisson客户端
     *
     * @param redissonClient
     *            Redisson客户端实例
     */
    public static void initDefaultRedissonClient(RedissonClient redissonClient) {
        if (DefaultRedisLockSupport.defaultRedissonClient != null && !DefaultRedisLockSupport.defaultRedissonClient.equals(redissonClient)) {
            throw new IllegalStateException("defaultRedissonClient already been initialized.");
        }
        synchronized (DefaultRedisLockSupport.class) {
            if (DefaultRedisLockSupport.defaultRedissonClient != null) {
                if (DefaultRedisLockSupport.defaultRedissonClient.equals(redissonClient)) {
                    return;
                }
                throw new IllegalStateException("defaultRedissonClient already been initialized.");
            }
            DefaultRedisLockSupport.defaultRedissonClient = redissonClient;
        }
    }
}

工具类接口涉及到的两个其它接口

  • NoArgConsumer

    /**
     * 无参 Consumer
     *
     * @author JustryDeng
     * @since 2022/4/19 11:17
     */
    @FunctionalInterface
    public interface NoArgConsumer {
        
        /**
         * 执行逻辑
         */
        void accept();
    }
    
  • NoArgFunction

    /**
     * 无参 Function
     *
     * @author JustryDeng
     * @since 2022/4/19 11:17
     */
    @FunctionalInterface
    public interface NoArgFunction<R> {
        
        /**
         * 执行逻辑
         *
         * @return 执行结果
         */
        R apply();
    }
    

使用示例

import com.ideaaedi.heywebuy.srv.config.redisson.DefaultRedisLockSupport;
import com.ideaaedi.heywebuy.srv.config.redisson.NoArgConsumer;
import com.ideaaedi.heywebuy.srv.config.redisson.NoArgFunction;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
@SpringBootTest
public class RedisLockSupportTest {
    
    @Resource
    private RedissonClient redissonClient;
    
    
    @BeforeEach
    void initMethod() {
        // 在项目启动时注入
        DefaultRedisLockSupport.initDefaultRedissonClient(redissonClient);
    }
    
    @Test
    void test() {
        // 无参数 无返回值 NoArgConsumer
        new DefaultRedisLockSupport("DefaultRedisLockSupport1", 1, 120).voidExec(new NoArgConsumer() {
            @Override
            public void accept() {
                // 我是业务逻辑
                System.out.println("[无参数 无返回值 NoArgConsumer]:\t\t\t\t\t" + 111111);
            }
        });
        
        // 有参数 无返回值 Consumer
        new DefaultRedisLockSupport("DefaultRedisLockSupport1", 1, 120).voidExec(new Consumer<Integer>() {
            @Override
            public void accept(Integer s) {
                // 我是业务逻辑
                System.out.println("[有参数 无返回值 Consumer]:\t\t\t\t\t" + s);
            }
        }, 2222);
    
    
        // 无参数 有返回值 NoArgFunction
        Map<String, Object> result = new DefaultRedisLockSupport("DefaultRedisLockSupport1", 1, 120)
                .exec(new NoArgFunction<Map<String, Object>>() {
                    @Override
                    public Map<String, Object> apply() {
                        // 我是业务逻辑
                        return Collections.singletonMap("k1", "v1");
                    }
                } );
        System.out.println("[无参数 有返回值 Function]:\t\t\t\t\t" + result);
    
        // 有参数 有返回值 Function
         result = new DefaultRedisLockSupport("DefaultRedisLockSupport1", 1, 120).exec(new Function<String,
                Map<String, Object>>() {
            @Override
            public Map<String, Object> apply(String s) {
                // 我是业务逻辑
                return Collections.singletonMap("k2", s);
            }
        }, "v2");
        System.out.println("[有参数 有返回值 Function]:\t\t\t\t\t" + result);
    }
}

相关资料

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐