如果你还不知道redis的基本命令与基本使用方法,请看

写在前面

redis辣么多数据结构,这么多命令,具体一点,都可以应用在什么场景呢?用来解决什么具体的问题?

分布式锁

redis是网络单线程的,它只有一个线程负责接受请求,这个特性即降低了redis本身的开发成本,也提高了redis的可用性。

分布式环境下,数据一致性问题一直是一个比较重要的话题,分布式与单机情况下最大的不同在于其不是多线程而是多进程。

多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置,例如cas,java的synchronize。而进程之间可能不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。

常见的场景,秒杀场景中的库存超卖问题、多机定时任务的并发执行问题等。

库存超卖问题

假如订单服务部署了多个实例。

现在做一个商品秒杀活动,商品一共只有2个,同时购买的用户则可能有几千上万。

理想状态下第一个和第二个用户能购买成功,其他用户提示购买失败,

实际可能出现的情况是,多个用户都同时查到商品还没卖完,第一个用户买到,更新库存之前,第二个用户又下了订单,导致出错。

下面用java代码做一个演示:

java实例都可以被正常运行在jdk1.8+,使用jedis连接redis实例

importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;importredis.clients.jedis.JedisPoolConfig;/*** JedisPool连接

*@authortaifeng zhang

**/

public classJedisPoolConnect {public staticJedisPool jedispool;/*** 连接并返回jedis实例

**/

public staticJedis connectJedis () {if (jedispool == null) {

JedisPoolConfig jedisPoolConfig= newJedisPoolConfig();

jedisPoolConfig.setMinIdle(1);

jedisPoolConfig.setMaxIdle(10);

jedisPoolConfig.setTestOnBorrow(true);

jedispool= new JedisPool(jedisPoolConfig, "127.0.0.1", 6379);

}returnjedispool.getResource();

}

}import redis.clients.jedis.*;importredis.clients.jedis.Jedis;/*** 一个简单的超卖演示程序

**/

public classMarketWrong {public static String GOODS_LEN_KEY = "jedis:market:demo";private final Integer DECR_THREAD_LEN = 16;public voidsuperMarket () {//开线程去减库存

int i =DECR_THREAD_LEN;while (i > 0) {new Thread(() ->{boolean hasGoods = true;while (hasGoods) { //当库存大于0的时候

int goodsLen =getGoodsLen();if (goodsLen > 0) {

decrGoodsLen();//一般进来之后就直接减去库存了

System.out.println("现在库存为" +getGoodsLen());try{

Thread.sleep(100); //模拟中间处理流程

} catch(Exception e) {

System.out.println("执行减库存错误" + e.getMessage() + e.getLocalizedMessage() +e.getStackTrace());

}finally{//最后逻辑

}

}else{

System.out.println("======卖完啦=======");

hasGoods= false;

}

}

}).start();

i--;

}

}public voidsetGoodsLen (Integer len) {

Jedis jedis=JedisPoolConnect.connectJedis();try{

jedis.set(GOODS_LEN_KEY, String.valueOf(len));

}finally{

jedis.close();

}

}privateInteger getGoodsLen () {

Jedis jedis=JedisPoolConnect.connectJedis();try{

String val=jedis.get(GOODS_LEN_KEY);if (val != null) {returnInteger.parseInt(val);

}

}finally{

jedis.close();

}return 0;

}private voiddecrGoodsLen () {

Jedis jedis=JedisPoolConnect.connectJedis();try{//库存减1

jedis.decr(GOODS_LEN_KEY);

}finally{

jedis.close();

}

}

}

用junit测试上面的代码:

importorg.junit.Test;importorg.springframework.boot.test.context.SpringBootTest;

@SpringBootTestpublic classMarketWrongTests {/*** 测试超卖小程序*/@Testpublic void superMarket () throwsException {

MarketWrong marketWrong= newMarketWrong();//这次就卖500件吧

marketWrong.setGoodsLen(500);

marketWrong.superMarket();

Thread.sleep(60000); //卖一分钟

}

}

运行输出,每次库存都会变为负数,开了16个线程同时买东西:

//省略了几万行

现在库存为8

现在库存为8

现在库存为4

现在库存为4

现在库存为4

现在库存为4

现在库存为3

现在库存为-5现在库存为-5现在库存为-5现在库存为-5现在库存为-5现在库存为-5现在库存为-5现在库存为-5

======卖完啦=======

======卖完啦=======

======卖完啦=======

上面的代码示例中,库存数据是共享资源(存到redis了,相当于数据库),面对高并发情形,需要保证对资源的访问次序。在单机环境Java提供基于内存的锁来处理并发问题,但是这些API在分布式场景中就无能为力了。也就是说单纯的内存锁并不能提供这种多机器并发服务的能力。分布式系统中,由于分布式系统的分布性,即多线程和多进程并且分布在不同机器中,synchronized和lock这两种锁将失去原有锁的效果,需要我们自己实现分布式锁。

也就是说库存的递减必须是顺序的

常见的锁方案如下:

基于数据库实现分布式锁 基于缓存,实现分布式锁,如redis 基于Zookeeper实现分布式锁

下面实现一个redis的锁,剖析一把redis是如何实现分布式锁的:

importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisCluster;importredis.clients.jedis.JedisPool;importredis.clients.jedis.JedisPoolConfig;importredis.clients.jedis.params.SetParams;importjava.util.ArrayList;importjava.util.Arrays;/*** redis锁实现

*@authortaifeng zhang

**/

public classRedisLock {private static String REDIS_LOCK_KEY = "redis:lock:key";/***设置lockkey

**/

public static voidsetRedisLockKey(String redisLockKey) {

REDIS_LOCK_KEY=redisLockKey;

}/*** 尝试获取锁

*@paramov 可以指定一个锁标识,锁的唯一值,区分每个锁的所有者身份

*@paramtimeout 获取锁的超时时间

**/

public boolean tryLock (String ov, inttimeout) {

Jedis jedis=JedisPoolConnect.connectJedis();try{//set nx ex

SetParams setParams = newSetParams();

setParams.nx();

setParams.ex(timeout);

Object val= jedis.set(REDIS_LOCK_KEY, ov, setParams); //set [key] nx ex [timeout]

return val != null;

}finally{

jedis.close();

}

}/*** 使用lua脚本释放锁

*@paramov 释放之前先确定解锁人的身份,所以要用到lua的原子特性

**/

public booleantryUnlock (String ov) {

Jedis jedis=JedisPoolConnect.connectJedis();try{

String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL= "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";

String sha1=jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);

String[] keys={REDIS_LOCK_KEY};

String[] args={ov};

Integer val= Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());return val > 0;

}finally{

jedis.close();

}

}

}

实现原则有几点: 1、原子相关操作步骤必须全部包括在锁内 2、每个锁都有一个唯一的value,标识加锁人的身份。 3、加超时时间防止死锁 (超时时间要合理)

加锁代码解析

/*** 尝试获取锁

*@paramov 可以指定一个锁标识,锁的唯一值,区分每个锁的所有者身份

*@paramtimeout 获取锁的超时时间

**/

public boolean tryLock (String ov, inttimeout) {

Jedis jedis=JedisPoolConnect.connectJedis();try{//set nx ex

SetParams setParams = newSetParams();

setParams.nx();

setParams.ex(timeout);

Object val= jedis.set(REDIS_LOCK_KEY, ov, setParams); //用 set [key] nx ex [timeout] 命令模拟加锁

return val != null;

}finally{

jedis.close();

}

}

加锁的代码很简单,其实就是利用redis命令 set [key] nx ex [timeout] 的特性,已有值的时候返回值为nil,如果执行这个命令的结果是null,那就可以认为资源已经被上锁

同时,set也将REDIS_LOCK_KEY设置为一个唯一值,在解锁的时候或者锁重入的时候判断身份使用。

解锁代码解析

/*** 使用lua脚本释放锁

*@paramov 释放之前先确定解锁人的身份,所以要用到lua的原子特性

**/

public booleantryUnlock (String ov) {

Jedis jedis=JedisPoolConnect.connectJedis();try{

String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL= "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";

String sha1=jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);

String[] keys={REDIS_LOCK_KEY};

String[] args={ov};

Integer val= Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());return val > 0;

}finally{

jedis.close();

}

}

解锁代码的精髓是这句lua脚本:

if (redis.call('get', KEYS[1]) == ARGV[1]) thenreturn redis.call('del', KEYS[1])else return 0

从redis读取key的值,如果它等于传入的唯一key,则可以释放锁,否则返回0

为什么要检查唯一key再释放锁呢?主要是为了这么一个场景:

A用户来获取了锁

B用户来获取锁,锁已经被a拿走了,等待锁

A用户可能因为突然发生网络延迟,超过了超时时间,这时候锁因为超时自动释放了。

B用户获取了锁

A用户这时候网络恢复了。。。这时候A用户要释放锁,如果释放成功就会导致连锁反应,b用户被解锁,b又可能去解锁c

所以每次加锁解锁都需要验证获取锁的用户身份,一般存放在key的value里面,在释放锁之前先检查,也就是 check and set

锁的重入

上面谈到,我们记录了每个锁的用户身份,那是不是同一个用户一次操作需要两次锁,是可以重用的呢?

答案是ok的

我们可以在trylock中加一个lua脚本用来先check 再 set,如果判断check与用户符合,则直接返回true就可以了。

public boolean tryLock (String ov, inttimeout) {

Jedis jedis=JedisPoolConnect.connectJedis();try{//加上锁的重入特性

String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return 1 else return 0 end"; //如果当前锁的值等于ov的话,认为来获取锁的还是同一个人

String sha1 =jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);

String[] keys={REDIS_LOCK_KEY};

String[] args={ov};

Integer val= Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());if (val > 0) { //判定成功后,锁就重入了,即无需第二次获取锁

return true;

}//set nx ex

SetParams setParams = newSetParams();

setParams.nx();

setParams.ex(timeout);

Object val= jedis.set(REDIS_LOCK_KEY, ov, setParams); //set [key] nx ex [timeout]

return val != null;

}finally{

jedis.close();

}

}

最后我们看看关于超卖问题,我们将代码加上锁 注意两个todo的地方。

import redis.clients.jedis.*;importredis.clients.jedis.Jedis;public classMarketWrong {public static String GOODS_LEN_KEY = "jedis:market:demo";private final Integer DECR_THREAD_LEN = 16;

RedisLock redisLock= newRedisLock();public voidsuperMarket () {//开线程去减库存

int i =DECR_THREAD_LEN;while (i > 0) {int whilekey =i;new Thread(() ->{intn;int j = 0;boolean hasGoods = true;while (hasGoods) { //当库存大于0的时候

String ov = whilekey + "-" +j;//todo 加锁

while (!redisLock.tryLock(ov, 20)) { //如果获取不到锁就等待

}int goodsLen =getGoodsLen();if (goodsLen > 0) {

decrGoodsLen();//一般进来之后就直接减去库存了

System.out.println("现在库存为" +getGoodsLen());

redisLock.tryUnlock(ov);//todo 解除锁

try{

Thread.sleep(100); //模拟中间处理流程

} catch(Exception e) {

System.out.println("执行减库存错误" + e.getMessage() + e.getLocalizedMessage() +e.getStackTrace());

}finally{//最后逻辑

}

}else{

System.out.println("======卖完啦=======");

hasGoods= false;

}

j++; //需要这个用来生成ov,相当于模拟每一个买家的id

}

}).start();

i--;

}

}/*** 一个简单的超卖演示程序

**/

public voidsetGoodsLen (Integer len) {

Jedis jedis=JedisPoolConnect.connectJedis();try{

jedis.set(GOODS_LEN_KEY, String.valueOf(len));

}finally{

jedis.close();

}

}privateInteger getGoodsLen () {

Jedis jedis=JedisPoolConnect.connectJedis();try{

String val=jedis.get(GOODS_LEN_KEY);if (val != null) {returnInteger.parseInt(val);

}

}finally{

jedis.close();

}return 0;

}private voiddecrGoodsLen () {

Jedis jedis=JedisPoolConnect.connectJedis();try{//库存减1

jedis.decr(GOODS_LEN_KEY);

}finally{

jedis.close();

}

}

}

加上锁之后再测试,超卖问题已解决,注意现在的输出是线性递增的,因为开线程的模拟方式就是并发处理,每次16个线程几乎是同时进行的,所以在没有锁的时候,并发读取的goodslen很有可能都是16个线程一样的。

所以redis的这个锁的实现也叫: 分布式互斥锁

现在库存为8

现在库存为7

现在库存为6

现在库存为5

现在库存为4

现在库存为3

现在库存为2

现在库存为1

现在库存为0======卖完啦=======

======卖完啦=======

======卖完啦=======

redis实现的分布式互斥锁并不完美,但在大多数应用场景下够用了,另外还可以使用zookeeper甚至mysql来实现。

分布式定时任务问题

分布式场景下,还有另外一个问题--定时任务并发问题,当我们的应用采用分布式部署的时候,就必然会有各种定时任务被部署到不同的机器实例上,如果两台机器同时运行同一个定时任务的话,任务就执行了两次。

这个问题可能更复杂一点,仅仅是加一个锁有可能会坏事儿,因为定时任务的多机分布会产生几个需要解决的问题:

多台机器的时间一致性问题

如果多台机器的时区不一致,那锁基本上无从谈起了。 或者时区一致,但可能服务器时间相差几秒钟,那么也有可能导致锁丢失。

锁未释放问题(服务器宕机怎么办)

那么如果serverA在加锁的过程中,出现宕机怎么办,是否会一直处于加锁状态

命名空间问题

每个定时任务应该有不同的锁命名,防止出现同名锁。

还是让我们看一个java代码的例子 注意,redis连接和锁代码有复用上面一节的

importorg.springframework.scheduling.annotation.EnableScheduling;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importredis.clients.jedis.Jedis;

@Component

@EnableSchedulingpublic classScheduleDemo {private String sourceKey = "redis:schedule:test:key";private void sendEmail (String serviceKey) throwsInterruptedException {

Jedis jedis=JedisPoolConnect.connectJedis();try{

Integer sendPatch= 0; //从redis读取来模拟发送的批次

Object val =jedis.get(sourceKey);if (val != null) {

sendPatch=Integer.parseInt(val.toString());

}

Thread.sleep(2000);

System.out.println("批次[" + sendPatch +"]====发送邮件====" +serviceKey);

jedis.incr(sourceKey);//批次加1

} finally{

jedis.close();

}

}//模拟service

@Scheduled(cron = "0 27 09 * * ?") //【cron改为后面的时间】

public void serviceA () throwsInterruptedException {this.sendEmail("service");

}

}

将这段代码打开两个实例运行【ps,你可以在idea中右上角直接配两个config就可以了】

687474703a2f2f696d616765732e636e626c6f67732e636f6d2f636e626c6f67735f636f6d2f7a74666a732f3834313530382f6f5f2545342542432538312545342542382539412545352542452541452545342542462541312545362538382541412545352539422542455f39346638326432382d613662382d343439362d396433612d3732663030623932653836302e706e67

看运行结果:

687474703a2f2f696d616765732e636e626c6f67732e636f6d2f636e626c6f67735f636f6d2f7a74666a732f3834313530382f6f5f2545342542432538312545342542382539412545352542452541452545342542462541312545362538382541412545352539422542455f62663130343138622d323038622d346231392d626664642d6530303162363662353137642e706e67 687474703a2f2f696d616765732e636e626c6f67732e636f6d2f636e626c6f67735f636f6d2f7a74666a732f3834313530382f6f5f2545342542432538312545342542382539412545352542452541452545342542462541312545362538382541412545352539422542455f66396330303535312d353430332d343133372d393632312d3336383931363637643433642e706e67

邮件1被同时发送了两次,这是不可接受的。

ok,有的同学现在就想到了,加个锁就完事了

我们将发送代码加上一个锁解决这个问题:在sendmail里加一个redis分布式锁

private void sendEmail (String serviceKey) throwsInterruptedException {if (!redisLock.tryLock(serviceKey, 30)) {return; //todo 获取不到锁就取消,同一个定时任务只需要执行一次

}

Jedis jedis=JedisPoolConnect.connectJedis();try{

Integer sendPatch= 0; //从redis读取来模拟发送的批次

Object val =jedis.get(sourceKey);if (val != null) {

sendPatch=Integer.parseInt(val.toString());

}

Thread.sleep(2000);

System.out.println("批次[" + sendPatch +"]====发送邮件====" +serviceKey);

jedis.incr(sourceKey);//批次加1

redisLock.tryUnlock(serviceKey);//todo 解锁

}finally{

jedis.close();

}

}

如果获取不到锁,那么取消这个任务的执行,看起来很完美对不对?

实际上没有解决的问题还有很多。

多个定时任务的多个并发执行sendmail,key如何保证唯一?

可以使用实例的ip+端口做唯一key,这样能够保证多个实例的唯一性

两台服务器时间差超过30s怎么办?

通过中间媒介来确定时间。或者在服务器中杜绝这个问题

最重要的问题还是在于,两台服务器的时间有可能有细微差别,他们本身就有可能不是并发的

这一点在分布式定时任务领域里很重要。

仅仅是加了一个同步锁是远远不够的

解决方案可以是根据业务的不同来设置不同的锁超时时间,例如某个业务定时任务,每天只可以执行一次,那么将超时时间设置为1个小时最保险,如果某个定时任务每分钟执行,执行操作时间大约20s,那你可以将超时时间设置成30s。

另一个解决方案是设置一个统一的、中心级别的定时任务,任务负责派发消息,通过消息队列的方式来做定时,这里就不细表,这种方式比较适合异构、或者跨网络、跨机房级别的分布式。

可以对redis锁做一次小小的改版升级,使用aop加注解来完成锁的配置:

我们定义一个方法级别的aop注解

importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;/*** redis lock

*@authortaifeng zhang

**/@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.METHOD)public @interfaceRedisLockAop {

String key();/*** 两种类型可选

* wait = 等待锁

* return = 取消执行

**/String type()default "wait";int timeout() default 30;

}

然后通过aop,去为加了注解的方法做锁操作

importcom.halfway.halfway.redis.RedisLock;importorg.aspectj.lang.ProceedingJoinPoint;importorg.aspectj.lang.annotation.After;importorg.aspectj.lang.annotation.Around;importorg.aspectj.lang.annotation.Aspect;importorg.springframework.context.annotation.Configuration;importorg.springframework.stereotype.Component;/*** redislock aop实现

*@authorby taifeng zhang

**/@Component

@Aspectpublic classRedisLockAopAspect {private RedisLock redisLock = newRedisLock();

@Around("@within(com.halfway.halfway.redis.lockAop.RedisLockAop) && @annotation(lock)")public Object excuteAop (ProceedingJoinPoint pjp, RedisLockAop lock) throwsThrowable {if ("wait".equals(lock.type())) {while (!redisLock.tryLock(lock.key(), lock.timeout())) {} //todo 等待锁

} else if ("return".equals(lock.type())) {if (!redisLock.tryLock(lock.key(), lock.timeout())) {return null; //todo 取消执行

}

}else{throw new NullPointerException("type只可以是wait或者return");

}

Object val=pjp.proceed();

redisLock.tryUnlock(lock.key());returnval;

}

}

这个方式的好处是锁与代码解耦,无需关注锁的内部实现变化

@Scheduled(cron = "0/30 * * * * ?")

@RedisLockAop(key= "serviceIp:port", type="return", timeout=15)public void serviceA () throwsInterruptedException {this.sendEmail("service");

}

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐