在电商中经常会有防超卖的需求,本质上是对一条数据的多线程并发情况下的数据安全性进行控制。

譬如一个商品goods,库存是100,在多线程都去读取修改的情况下,会产生数据错乱。

不加锁的情况

我们来看一个简单的例子,有个goods表,里面有个int型字段amount。我们用多线程来频繁修改amount的值,看看结果。

 @Transactional(rollbackFor = Exception.class)
    public void mult(Long goodsId) {
        PtGoods ptGoods = ptGoodsManager.find(goodsId);
        System.out.println(ptGoods.getAmount());

        ptGoods.setAmount(ptGoods.getAmount() + 1);
        ptGoodsManager.update(ptGoods);
    }

这是操作修改amount字段的类GoodsService的代码,就是每次调用给amount的值加1.

下面是测试类,用100个线程来模拟同时操作该商品的数量:

for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                goodsService.mult(1L);
            }
            ).start();

        }

运行后,我们去查看amount的最终的值,发现是10(每次调用都不一样)。正常情况下,被100次调用后应该是100才对,说明在并发下,出现了数据错误,原因大家都懂不细说。

解决方案

我们为了不让商品出现超卖,必然需要对上面的情况进行处理,方式也有很多种,基本都是锁。

来简单探讨一下几种情况下的处理。

1 最简单的是单体应用,也就是你的程序只部署了一份,不是分布式负载均衡部署的,也没有其他程序来修改你的数据。那么在天真的同学,可能会想到在修改数据的地方加个synchronized。锁住整个方法,线程自然老实排队通过该方法。

这样会有效吗?答案当然是否定的,可以看到,方法中大量重复读取了修改前的数据库值,即便线程是一个一个执行的,但是db依然返回了修改前的值。原因也很简单,jpa在执行save方法后,方法已经走完,就会进入下一个线程的逻辑,但此时,数据库并没有更新成功。jpa操作db,也是通过线程池连接的,执行了save,修改了表的值,save走完了,db未必瞬时更新,这是要时间的。与此同时,另一个查询请求早已到来,可能因为jpa的二级缓存、或者就是查询时,db的值还未修改成功。无论哪种都会导致读取到脏数据。

2 既然线程锁不能起到有效的作用,大家可能意识到问题点在于数据库的读写不一致。这就诞生了另外一套方案,即数据库上的锁——悲观锁和乐观锁。

悲观锁可以做到我读取时,就把该条数据锁住,其他人不允许读写,我做完我的事务后,别人才可以读写。

乐观锁可以做到,在我写入时,会再次查询最新的值,之后对比一下我读取时的版本,倘若最新的版本和我读取的不一致,那我就不写入,并抛异常。

jpa做悲观锁很简单,在Repository的方法上,加上Lock注解,PESSIMISTIC_READ,PESSIMISTIC_WRITE比较常用。具体的不细谈,因为我在做微服务时,不赞同用任何数据库级别的方式,尤其是悲观锁,使用不当易造成表锁,会严重影响其他业务的读写。

乐观锁也很简单,只需在model类里加上@Version注解的一个字段即可。譬如我在Goods类上加个

@Version
    private Long version;即可实现乐观锁。倘若该表的读多写少,发生冲突的概率不高,那么避免并发读写错误可以选择比较简单的乐观锁。后面会来看一下乐观锁的应用。

3 分布式锁,就是借助于redis或者zookeeper来完成的在分布式环境下的锁,能够使用于分布式环境和单实例环境,而无需对数据库做任何要求,无需关心使用的是mysql还是MongoDB等任何数据库。也就是完全基于应用层面的对并发读写进行的控制,也是比较推荐的实现方式。

乐观锁

乐观锁就是在修改时,带上version版本号。这样如果试图修改已被别人修改过的数据时,会抛出异常。在一定程度上,也可以作为防超卖的一种处理方法。我们来看一下。

我们在Goods的entity类上,加上这个字段。

@Version
    private Long version;
@Transactional
    public synchronized void mult(Long goodsId) {
        PtGoods ptGoods = ptGoodsManager.find(goodsId);
        logger.info("----amount:" + ptGoods.getAmount());

        ptGoods.setAmount(ptGoods.getAmount() + 1);
        ptGoodsManager.update(ptGoods);

    }

测试一下:

for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                goodsService.mult(1L);
            }
            ).start();

        }

可以发现,抛出了很多异常,这就是乐观锁的异常。可想而知,当高并发购买同一个商品时,会出现大量的购买失败,而不会出现超卖的情况,因为他限制了并发的访问修改。

这样其实显而易见,也是大有问题的,只适应于读多写少的情况,否则大量的失败也是有损用户体验,明明有货,却不卖出。

那怎么解决呢,有人提出了重试策略。托若出现了上面的异常,就去重试执行该方法,直到成功。我们来看看能不能有用。

定义个自定义注解

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author wuweifeng wrote on 2019/5/8.
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RetryOnFailure {
}

定义个切面,注意把order设置的小一点,让该切面优先于Transactional注解,这样才能优先被捕捉乐观锁异常。

@Aspect
@Component
@Order(1)
public class RetryAspect {
    public static final int MAX_RETRY_TIMES = 10;//max retry times
    private Logger log = LoggerFactory.getLogger(getClass());


    @Around("@annotation(retryOnFailure)")
    public Object doConcurrentOperation(ProceedingJoinPoint pjp, RetryOnFailure retryOnFailure) throws Throwable {
        int attempts = 0;

        do {
            attempts++;
            try {
                pjp.proceed();
            } catch (Exception e) {
                if (e instanceof ObjectOptimisticLockingFailureException ||
                        e instanceof StaleObjectStateException) {
                    log.info("retrying....times:{}", attempts);
                    if (attempts > MAX_RETRY_TIMES) {
                        log.info("retry excceed the max times..");
                        throw e;
                    }
                }

            }
        } while (attempts < MAX_RETRY_TIMES);
        return null;
    }
}

从结果上看,进行了大量的重试,最终原本该加到100的amount字段,值已经到200多了。出现了严重的并发修改值错误。原因我也没细想,因为根本没打算采用这种方式。首先重试策略肯定是比较差的方式,因为非常不可控。其次,靠借助于数据库的锁来抛异常然后做处理,也是不太好的方式,应当尽量将问题控制在DB以外,由业务代码来控制。

分布式锁

通过上面的一系列操作,我们可以看到,通过单体应用自身的代码是控制不住这种情况的。此时就需要借助于一个第三方框架,能够提供无论是多线程或者分布式环境下的具备原子性的组件。比较常用的就是redis和zookeeper了。

以redis为例,这里选用功能比较丰富强大的redis客户端redisson来完成,https://github.com/redisson/redisson

redisson具体的可以去上GitHub看他文档,我这里直接来讲怎么用了。

pom里加入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.10.6</version>
        </dependency>

redisson支持单点、集群等模式,这里选择单点的。application.yml配置好redis的连接:

spring:  
    redis:
        host: ${REDIS_HOST:127.0.0.1}
        port: ${REDIS_PORT:6379}
        password: ${REDIS_PASSWORD:}

配置redisson的客户端bean

@Configuration
public class RedisConfig {
    @Value("${spring.redis.host}")
    private String host;

    @Bean(name = {"redisTemplate", "stringRedisTemplate"})
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }

    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":6379");
        return (Redisson) Redisson.create(config);
    }

}

至于使用redisson的功能也很少,其实就是对并发访问的方法加个锁即可,方法执行完后释放锁。这样下一个请求才能进入到该方法。

我们创建一个redis锁的注解

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author wuweifeng wrote on 2019/5/8.
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonLock {
    /**
     * 要锁哪个参数
     */
    int lockIndex() default -1;

    /**
     * 锁多久后自动释放(单位:秒)
     */
    int leaseTime() default 10;
}

切面类:

import com.tianyalei.giftmall.global.annotation.RedissonLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * 分布式锁
 * @author wuweifeng wrote on 2019/5/8.
 */
@Aspect
@Component
@Order(1) //该order必须设置,很关键
public class RedissonLockAspect {
    private Logger log = LoggerFactory.getLogger(getClass());
    @Resource
    private Redisson redisson;

    @Around("@annotation(redissonLock)")
    public Object around(ProceedingJoinPoint joinPoint, RedissonLock redissonLock) throws Throwable {
        Object obj = null;

        //方法内的所有参数
        Object[] params = joinPoint.getArgs();

        int lockIndex = redissonLock.lockIndex();
        //取得方法名
        String key = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint
                .getSignature().getName();
        //-1代表锁整个方法,而非具体锁哪条数据
        if (lockIndex != -1) {
            key += params[lockIndex];
        }

        //多久会自动释放,默认10秒
        int leaseTime = redissonLock.leaseTime();
        int waitTime = 5;

        RLock rLock = redisson.getLock(key);
        boolean res = rLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
        if (res) {
            log.info("取到锁");
            obj = joinPoint.proceed();
            rLock.unlock();
            log.info("释放锁");
        } else {
            log.info("----------nono----------");
            throw new RuntimeException("没有获得锁");
        }

        return obj;
    }
}

这里解释一下,防超卖,其实是对某一个商品在被修改时进行加锁,而这个时候其他的商品是不受影响的。所以不能去锁整个方法,而应该是锁某个商品。所以我设置了一个lockIndex的参数,来指明你要锁的是方法的哪个属性,这里就是锁goodsId,如果不写,则是锁整个方法。

在切面里里面RLock.tryLock,则是最多等待5秒,托若还没取到锁就走失败,取到了则进入方法走逻辑。第二个参数是自动释放锁的时间,以避免自己刚取到锁,就挂掉了,导致锁无法释放。

测试类:

package com.tianyalei.giftmall;

import com.tianyalei.giftmall.core.goods.GoodsService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

@RunWith(SpringRunner.class)
@SpringBootTest
public class GiftmallApplicationTests {
    @Resource
    private GoodsService goodsService;

    private CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
    private CyclicBarrier cyclicBarrier1 = new CyclicBarrier(100);

    @Test
    public void contextLoads() {
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    cyclicBarrier.await();

                    goodsService.multi(1L);
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
            ).start();
            new Thread(() -> {
                try {
                    cyclicBarrier1.await();

                    goodsService.multi(2L);
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
            ).start();
        }

        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

这里用100并发,同时操作2个商品。

可以看到,这两个商品在各自更新各自的,互不影响。最终在5秒后,有的超时了。调大等待时间,则能保证每个都是100.

通过这种方式,即完成了分布式锁,简单也便捷。当然这里只是举例,在实际项目中,倘若要做防止超卖,以追求最大性能的话,也可以考虑使用redis来存储amount,借助于redis的increase来做数量的增减,能迅速的给出客户端是否抢到了商品的判断,之后再通过消息队列去生成订单之类的耗时操作。

 

 

 

 

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐