ShardingSphere集成springBoot实现分布式事务
XA事务是典型的强一致性事务,也就是完全遵循事务的ACID设计原则。这里的BASE来自基本可用(BasicallyAvailable)、软状态(SoftState)和最终一致性(EventualConsistency)这三个概念实现柔性事务如阿里巴巴提供一些优秀的框架Seata,ShardingSphere内部也集成了对Seata的支持,可以根据需要集成其他分布式事务类开源框架,并基于微内核架构嵌
1.ShardingSphere 中的分布式事务
在 ShardingSphere 中,除本地事务之外,还提供针对分布式事务的两种实现方案,分别是 XA 事务和柔性事务 具体可见官网:分布式事务 :: ShardingSphere
XA 事务:XA 事务提供基于两阶段提交协议的实现机制。所谓两阶段提交,顾名思义分成两个阶段,一个是准备阶段,一个是执行阶段。在准备阶段中,协调者发起一个提议,分别询问各参与者是否接受。在执行阶段,协调者根据参与者的反馈,提交或终止事务。如果参与者全部同意则提交,只要有一个参与者不同意就终止,业界在实现 XA 事务时也存在一些主流工具库,包括 Atomikos、Narayana 和 Bitronix。ShardingSphere 对这三种工具库都进行了集成,并默认使用 Atomikos 来完成两阶段提交
BASE 事务:XA 事务是典型的强一致性事务,也就是完全遵循事务的 ACID 设计原则。与 XA 事务这种“刚性”不同,柔性事务则遵循 BASE 设计理论,追求的是最终一致性。这里的 BASE 来自基本可用(Basically Available)、软状态(Soft State)和最终一致性(Eventual Consistency)这三个概念 实现柔性事务如阿里巴巴提供一些优秀的框架 Seata,ShardingSphere 内部也集成了对 Seata 的支持,可以根据需要集成其他分布式事务类开源框架,并基于微内核架构嵌入到 ShardingSphere 运行时环境中
2.项目搭建
2.1 pom文件中的依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.shardingsphere</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>shardingsphere</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.6</version> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.0</version> </dependency> <!-- sharding-jdbc --> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>4.1.1</version> </dependency> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-transaction-xa-core</artifactId> <version>4.1.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.8</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.2 配置spring-boot的事务管理器
package com.shardingsphere.config; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; @Configuration @EnableTransactionManagement public class TransactionConfig { /** * 关联 datasource 到 spring 的 PlatformTransactionManager(没有直接使用 jdbc 原生事务) */ @Bean public PlatformTransactionManager txManager(@Qualifier("shardingDataSource") final DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } /** * 通过 jdbcTemplate 简化原生 sharding-jdbc SQL 的使用 */ @Bean public JdbcTemplate jdbcTemplate(@Qualifier("shardingDataSource")final DataSource dataSource) { return new JdbcTemplate(dataSource); } }
2.3 创建MessageMapper层
package com.shardingsphere.Mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.shardingsphere.entity.Message; import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Repository; /** * @Author 何志鹏 * @Date 2022/7/18 10:59 * @Version 1.0 */ @Mapper @Repository public interface MessageMapper extends BaseMapper<Message> { }
2.4 创建 MessageService层
package com.shardingsphere.service; import com.baomidou.mybatisplus.extension.service.IService; import com.shardingsphere.entity.Message; /** * @Author 何志鹏 * @Date 2022/7/18 18:10 * @Version 1.0 */ public interface MessageService extends IService<Message> { int add(); }
2.5 创建MessageServiceImpl层
package com.shardingsphere.service.impl; import cn.hutool.core.lang.Snowflake; import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.core.mapper.Mapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.shardingsphere.Mapper.MessageMapper; import com.shardingsphere.Mapper.UserMapper; import com.shardingsphere.entity.Message; import com.shardingsphere.entity.User; import com.shardingsphere.service.MessageService; import org.apache.shardingsphere.transaction.annotation.ShardingTransactionType; import org.apache.shardingsphere.transaction.core.TransactionType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Date; /** * @Author 何志鹏 * @Date 2022/7/18 18:12 * @Version 1.0 */ @Service public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message> implements MessageService { @Autowired private UserMapper userMapper; @Autowired private MessageMapper messageMapper; @Transactional(rollbackFor = Exception.class) @ShardingTransactionType(TransactionType.XA) @Override public int add() { Snowflake snowflake = IdUtil.createSnowflake(1, 1); Message message = new Message(); int randomNum = (int) (Math.random() * 9000 + 1000); message.setMsgId(snowflake.nextId()); message.setContactId(snowflake.nextId()); message.setUserId(new Long(randomNum)); message.setUserTag(Boolean.TRUE); message.setRecallTime(new Date()); message.setContent("测试测试"); message.setPushMsg("55555555"); message.setCreationTime(new Date()); messageMapper.insert(message); User user = new User(); user.setName("何志鹏555"); int randomNum2 = (int) (Math.random() * 9000 + 1000); user.setAge(new Long(randomNum2)); user.setSex("2"); user.setEducation("1"); int insert = userMapper.insert(user); if(insert>0){ //抛出异常 事务回滚 int a = 10 / 0; return 1; } return 0; } }
2.6 创建MessageController层
package com.shardingsphere.controller; import com.shardingsphere.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Author 何志鹏 * @Date 2022/7/19 15:23 * @Version 1.0 */ @RestController @RequestMapping("/message") public class MessageController { @Autowired private MessageService messageService; @GetMapping("/add") public int add(){ return messageService.add(); } }
2.7 用postman调用message/add测试
以上如此便会回滚事务 注意分布式事务管理器的特有配置 XA事务管理器参数配置(可选)
ShardingSphere默认的XA事务管理器为Atomikos,在项目的logs目录中会生成xa_tx.log
, 这是XA崩溃恢复时所需的日志,请勿删除 也可以通过在项目的classpath中添加jta.properties
来定制化Atomikos配置项。具体的配置规则请参考Atomikos的官方文档 如下:
最后附上代码: https://gitee.com/hezhipeng_ek/shardingsphere.git
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)