最近写了一个分布式事务,欢迎来吐槽。

项目地址:https://github.com/zjwwf/dsc-transaction

一、mq消息分布式事务实现流程图

 

注:步骤1执行本地事务、步骤2发生mq消息以及3、写入事务表消息在事务发起者中执行,要保证三个步骤在同一个本地事务中,1.0.0版本使用rocketmq的事务消息实现,保证三个步骤在同一个本地事务中执行,保证一致性。

二、mq消息分布式事务使用方法

1、maven引入(此分布式事务项目中使用的是spring5,若和其他项目spring本版本冲突,修改源码spring版本,或者使用maven去除此分布式事务项目中的spring依赖)

<dependency>
    <groupId>com.zhuo</groupId>
    <artifactId>transaction-spring</artifactId>
    <version>1.0.0</version>
</dependency>

2、事务发起者配置

①:spring bean配置

@Bean
public SpringBeanFactory springBeanFactory(){
    return new SpringBeanFactory();
}
@Bean(initMethod = "init")
public TcInitiatorInterceptor tcInitiatorInterceptor(){
    TcInitiatorInterceptor tcInitiatorInterceptor = new TcInitiatorInterceptor();
    return tcInitiatorInterceptor;
}
@Bean(initMethod = "init")
public AbstractTransactionProducer transactionProducer(){
//127.0.0.1:9876 为rocketmq地址
    AbstractTransactionProducer transactionProducer = new RocketMqAbstractTransactionProducer("127.0.0.1:9876");
//127.0.0.1:9876 为zookeeper地址
    ((RocketMqAbstractTransactionProducer) transactionProducer).setZkServer("127.0.0.1:2181");
    return transactionProducer;
}
@Bean
public TcInitiatorAspect tcInitiatorAspect(TcInitiatorInterceptor tcInitiatorInterceptor, AbstractTransactionProducer transactionProducer){
    TcInitiatorAspect tcInitiatorAspect = new TcInitiatorAspect();
    tcInitiatorAspect.setTcInitiatorInterceptor(tcInitiatorInterceptor);
    tcInitiatorAspect.setTransactionProducer(transactionProducer);
    return tcInitiatorAspect;
}
//配置事务消息存储组件,目前支持mysql,后期会支持redis等,若没有配置事务消息存储组件则没有记录事务成功和异常的执行记录
@Bean(initMethod = "init")
public TransactionRepository transactionRepository(){
    MysqlDataSource dataSource = new MysqlDataSource();
    dataSource.setUrl("jdbc:mysql://localhost:3306/tc_example?autoReconnect=true&serverTimezone=UTC");
    dataSource.setPassword("123456");
    dataSource.setUser("root");
    TransactionRepository transactionRepository = new JdbcTransactionRepository(dataSource);
    return transactionRepository;
}
//配置事务消息存储组件,redis支持(mysql和redis任选其一即可,也可以不进行配置事务消息存储组件配置)
@Bean(initMethod = "init")
public JedisClient jedisClient(){
    JedisClient jedisClient = new JedisClient();
    jedisClient.setHost("127.0.0.1");
    return jedisClient;
}
@Bean(initMethod = "init")
public TransactionRepository transactionRepository(JedisClient jedisClient){
    TransactionRepository transactionRepository = new RedisTransactionRepository(jedisClient);
    return transactionRepository;
}
//配置事务发生异常时候,执行补偿事件的定时器任务,默认一分钟执行一次,每次处理10条事务消息,可以不配置,自己写定时任务
@Bean(initMethod = "init")
public TransactionMsgJobConfigure transactionMsgJobConfigure(){
    TransactionMsgJobConfigure transactionMsgJobConfigure = new TransactionMsgJobConfigure("0 */1 * * * ?");
transactionMsgJobConfigure.setBatchSize(10);
    return transactionMsgJobConfigure;
}

注:配置事务消息存储组件可以配置redis存储或者mysql存储。也可以不进行配置事务消息存储组件配置,不配置则不进行事务消息的存储

补偿事件定时任务可以不配置,不配置则不进行执行补偿事件,可自己写定时任务处理

②:事务方法中添加注解以及参数方法和事务补偿方法

/**
 *  事务发起者本地事务方法
 *  paramMethod 构建事务参数者事务方法的参数以及服务名
 *  cancalMethod 事务发生异常时 补偿方法
 * @param user
 * @param schoolName
 */
@TcInitiator(paramMethod ="add1ParamMethod",cancalMethod = "add1CancalMethod" )
@Override
public void add1(User user, String schoolName) {
    //事务发起者本地事务方法    
userDao.add(user);
}

/**
 *  事务参与者参数方法,只能返回MqMsg对象
 *  参数和事务方法一致
 * @param user
 * @param schoolName
 * @return
 */
public MqMsg add1ParamMethod(User user, String schoolName){
    School school = new School();
    school.setSchoolName(schoolName);
    Grade grade = new Grade();
    grade.setGradeName("grade"+new Random().nextInt(100));
    MqMsg mqmsg = new MqMsg();
    List<MqMsg.MethodParam> list = new ArrayList<>();
    list.add(new MqMsg.MethodParam(ObjectMapperUtils.toJsonString(school),School.class));
    //第一个参数为为事务参数者方法的 全量+.+方法名
    mqmsg.put("com.zhuojing.service.impl.SchoolServiceImpl.add",list);
    List<MqMsg.MethodParam> list2 = new ArrayList<>();
    list2.add(new MqMsg.MethodParam(ObjectMapperUtils.toJsonString(grade),Grade.class));
    //第一个参数为为事务参数者方法的 全量+.+方法名
    mqmsg.put("com.zhuojing.service.impl.GradeServiceImpl.add",list2);
    //事务参与者服务名(事务参与者配置时候需要设一个事务参与者服务名)
    List<String> participantServiceList = new ArrayList<>();
    participantServiceList.add("service-02");
    participantServiceList.add("service-03");
    mqmsg.setParticipantService(participantServiceList);
    return mqmsg;
}

/**
 *  事务补偿方法,参数和事务方法一致,当注解@TcInitiator cancalMethod不配置的时候可以不需要这个补偿方法
 * @param user
 * @param schoolName
 */
public void add1CancalMethod(User user, String schoolName){
    System.out.println("add1CancalMethod");
}

3、事务参与者配置

①:spring bean配置

@Bean
public SpringBeanFactory springBeanFactory(){
    return new SpringBeanFactory();
}
@Bean(initMethod = "start")
public AbstractTransactionConsumer transactionConsumer(){
    AbstractTransactionConsumer transactionConsumer = new RocketMqAbstractTransactionConsumer();
//rocketmq地址
    ((RocketMqAbstractTransactionConsumer) transactionConsumer).setNamesrvAddr("127.0.0.1:9876");
//事务参与者服务名称
    ((RocketMqAbstractTransactionConsumer) transactionConsumer).setServiceName("service-02");
//zookeeper地址
    ((RocketMqAbstractTransactionConsumer) transactionConsumer).setZkServer("127.0.0.1:2181");
    return transactionConsumer;
}

@Bean
public TransactionRepository transactionRepository(){
    MysqlDataSource dataSource = new MysqlDataSource();
    dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/tc_example?autoReconnect=true&serverTimezone=UTC");
    dataSource.setPassword("zj123456");
    dataSource.setUser("root");
    TransactionRepository transactionRepository = new JdbcTransactionRepository(dataSource);
    return transactionRepository;
}
//配置事务消息存储组件,redis支持(mysql和redis任选其一即可,也可以不进行配置事务消息存储组件配置)
@Bean(initMethod = "init")
public JedisClient jedisClient(){
    JedisClient jedisClient = new JedisClient();
    jedisClient.setHost("127.0.0.1");
    return jedisClient;
}
@Bean(initMethod = "init")
public TransactionRepository transactionRepository(JedisClient jedisClient){
    TransactionRepository transactionRepository = new RedisTransactionRepository(jedisClient);
    return transactionRepository;
}

注:配置事务消息存储组件可以配置redis存储或者mysql存储。也可以不进行配置事务消息存储组件配置,不配置则不进行事务消息的存储

②:事务方法中添加注解

@TcParticipant
    @Transactional
    @Override
    public Integer add(School school) {
        System.out.println("add method");
        List<School> list = schoolDao.getBySchoolName(school.getSchoolName());
        if(list != null && list.size() > 0){
            return list.get(0).getId();
        }
        schoolDao.add(school);
        return school.getId();
    }

四、TCC分布式事务实现流程图

 

五、使用方法

1、maven引入(此分布式事务项目中使用的是spring5,若和其他项目spring本版本冲突,修改源码spring版本,或者使用maven去除此分布式事务项目中的spring依赖)

<dependency>
    <groupId>com.zhuo</groupId>
    <artifactId>transaction-spring</artifactId>
    <version>3.0.0</version>
</dependency>

2、使用

①:spring bean配置

@Bean
public SpringBeanFactory springBeanFactory(){
    return new SpringBeanFactory();
}
@Bean
public TccTransactionAspect tccTransactionAspect(){
    TccTransactionAspect tccTransactionAspect = new TccTransactionAspect();
    return tccTransactionAspect;
}
//配置事务消息存储组件,目前支持mysql,后期会支持redis等,若没有配置事务消息存储组件则没有记录事务成功和异常的执行记录
@Bean(initMethod = "init")
public TransactionRepository transactionRepository(){
    MysqlDataSource dataSource = new MysqlDataSource();
    dataSource.setUrl("jdbc:mysql://localhost:3306/tc_example?autoReconnect=true&serverTimezone=UTC");
    dataSource.setPassword("123456");
    dataSource.setUser("root");
    TransactionRepository transactionRepository = new JdbcTransactionRepository(dataSource);
    return transactionRepository;
}
//配置事务消息存储组件,redis支持(mysql和redis任选其一即可,也可以不进行配置事务消息存储组件配置)
@Bean(initMethod = "init")
public JedisClient jedisClient(){
    JedisClient jedisClient = new JedisClient();
    jedisClient.setHost("127.0.0.1");
    return jedisClient;
}
@Bean(initMethod = "init")
public TransactionRepository transactionRepository(JedisClient jedisClient){
    TransactionRepository transactionRepository = new RedisTransactionRepository(jedisClient);
    return transactionRepository;
}

//配置事务发生异常时候,执行补偿事件的定时器任务,默认一分钟执行一次,每次处理10条事务消息,可以不配置,自己写定时任务
@Bean(initMethod = "init")
public TransactionMsgJobConfigure transactionMsgJobConfigure(){
    TransactionMsgJobConfigure transactionMsgJobConfigure = new TransactionMsgJobConfigure("0 */1 * * * ?");
transactionMsgJobConfigure.setBatchSize(10);
    return transactionMsgJobConfigure;
}

注:配置事务消息存储组件可以配置redis存储或者mysql存储。也可以不进行配置事务消息存储组件配置,不配置则不进行事务消息的存储

补偿事件定时任务可以不配置,不配置则不进行执行补偿事件,可自己写定时任务处理

②:事务方法中添加注解以及confirm方法和cancel方法

/**
 * 使用TccTransaction注解,相当于try方法
 * confirmMethod: confirmMethod方法
 * cancalMethod:cancalMethod 方法
 * async:默认true,true代表异步执行 cancalMethod方法,
 */
@Transactional
@TccTransaction(confirmMethod = "confirmRecord",cancalMethod = "cancelRecord",async = false)
@Override
public void add(User user, String schoolName) {
    Grade grade = new Grade();
    grade.setGradeName("grade"+new Random().nextInt(100));
    gradeService.add(grade);
    userDao.add(user);
}
/**
 * confirm方法,参数必须跟try方法一致
 * @param user
 * @param schoolName
 * @return
 */
public void confirmRecord(User user, String schoolName){
    System.out.println("confirmRecord method");
}

/**
 * cancel方法,参数必须跟try方法一致
 * @param user
 * @param schoolName
 * @return
 */
public void cancelRecord(User user, String schoolName){
    System.out.println("cancelRecord method");
}

注:配置本地事务消息存储组件可以配置redis存储或者mysql存储。也可以不进行配置事务消息存储组件配置,不配置则不进行事务消息的存储,若没有配置本地事务消息存储,则也不需要配置定时任务

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐