盘点 Seata : Client 端 AT 事务发起流程
首先分享之前的所有文章 ,欢迎点赞收藏转发三连下次一定 >>>> ????????????文章合集 : ????https://juejin.cn/post/6941642435189538824Github :????https://github.com/black-antCASE 备份 :????https://gitee.com/antblack/case一 .前言之
首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜
文章合集 : 🎁 https://juejin.cn/post/6941642435189538824
Github : 👉 https://github.com/black-ant
CASE 备份 : 👉 https://gitee.com/antblack/case
一 .前言
之前分别介绍了 Seata 的启动和配置 , 这一篇来看一下 Client 端的启动过程 :
Seata 的事务发起其实可以分为2个阶段 , 第一阶段是 Client 发起 Request , 第二阶段是 Server 端对 Request 进行处理 , 以及创建 TX 对象等 , 这一篇先来看一下 Seata Client 端发起流程 , 下面来详细看一下 :
1.1 Seata AT 模式的主流程梳理
Seata 中存在三个角色 :
// TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
// TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
// RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
1.2 主要流程梳理
不说废话 , 流程图就在这 , 能看懂 , 就可以不看下文了(PS : 还不是很详细 , 后面再完整的补充) :
以下流程主要参考官方文档 , 只做个总结 , 整个流程分为2个阶段 :
针对语句 : update product set name = 'GTS' where name = 'TXC';
一阶段
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
- 查询前镜像 : 根据解析得到的条件信息,生成查询语句,定位数据。
- 执行业务 SQL:更新这条记录的 name 为 ‘GTS’。
- 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
- 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁 。
- 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的结果上报给 TC。
二阶段 : 提交
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
1.3 GlobalLock 介绍
声明事务只在单个本地RM中执行,但是事务需要确保要更新(或选择更新)的记录不在全局事务的中间阶段
public @interface GlobalLock {
/**
* 自定义全局锁重试间隔(单位:ms)
*/
int lockRetryInternal() default 0;
/**
* 自定义的全局锁重试次数
*/
int lockRetryTimes() default -1;
}
1.4 GlobalTransactional 介绍
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.TYPE})
@Inherited
public @interface GlobalTransactional {
/**
* 全局事务超时mills(毫秒)
*/
int timeoutMills() default DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
/**
* 全局事务实例的给定名称
*/
String name() default "";
/**
* 回滚到指定class
*/
Class<? extends Throwable>[] rollbackFor() default {};
/**
* 回滚的 Class 类名
*/
String[] rollbackForClassName() default {};
/**
* 不回滚的指定类
*/
Class<? extends Throwable>[] noRollbackFor() default {};
/**
* 不回滚的指定类名
*/
String[] noRollbackForClassName() default {};
/**
* 全局事务的传播
*/
Propagation propagation() default Propagation.REQUIRED;
/**
* 自定义全局锁重试间隔(单位:ms)
*/
int lockRetryInternal() default 0;
/**
* 自定义的全局锁重试时间
*/
int lockRetryTimes() default -1;
}
二 . Client 事务的发起
事务的发起是有 TM 开始 , 然后由 RM 完成事务分支 , 看一下主要的流程 :
整个流程这一次只说前面2步 :
- Interceptor 对操作进行拦截
- TransactionalTemplate 处理事务整体流程
看一下类流程 :
// TM 流程一 :注解处理阶段 , 此阶段方法上标注注解 , 将创建一个新的事务
GlobalTransactionalInterceptor # invoke : 拦截器拦截被代理的方法
GlobalTransactionalInterceptor # handleGlobalTransaction : 注解处理详情
TransactionalTemplate # execute : 事务处理主流程
TransactionalTemplate # beginTransaction : 开启事务主流程
DefaultGlobalTransaction # begin : 调用 API 流程
DefaultTransactionManager # begin : 调用 API 实际流程
// App A RM 流程一 : 此方法中创建 Proxy 的 Statement 类
PreparedStatementHandler # instantiateStatement
PreparedStatementProxy # PreparedStatementProxy() : 构造器构建 StatementProxy
// App A RM 流程二 : 代理执行方案
ExecuteTemplate # execute
BaseTransactionalExecutor # execute(Object... args)
AbstractDMLBaseExecutor # doExecute(Object... args)
AbstractDMLBaseExecutor # executeAutoCommitTrue(Object[] args)
AbstractDMLBaseExecutor # executeAutoCommitFalse(Object[] args)
ConnectionProxy # doCommit()
ConnectionProxy # processGlobalTransactionCommit()
// App A RM 流程三 : 调用实际Connect执行
DruidPooledConnection # commit() : 因为使用的 Druid 连接池 , 此处为 DruidPooledConnection
?- 此处将会提交 commit
2.1 注解的入口
// 注解的扫描方式 :
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")
// Step 1 : 核心扫描类 GlobalTransactionScanner
// 先看一下该类的继承体系
C01- GlobalTransactionScanner
E- AbstractAutoProxyCreator : Aop 代理 -> PS:C01_001
I- ConfigurationChangeListener : -> PS:C01_002
I- InitializingBean : 意味着会初始化运行
I- ApplicationContextAware : ApplicationContext 运行时被通知
I- DisposableBean : 销毁 Bean 时会处理
PS:C01_001 AbstractAutoProxyCreator 作用
简单点说 , 就是为这个类做了代理 . 使用AOP代理包装每个合格bean的BeanPostProcessor实现,在调用bean本身之前将委托给指定的拦截器。
GlobalTransactionScanner 详情
详见 Client 配置 >>>> Client端配置流程
2.2 注解的拦截
通常方法上标注注解来实现某种功能 , 其最终原理都是代理 , 不论是Java 原生得 Proxy 类创建的 , 还是 Aop 实现的 , 其核心是一致的.
Step 1 : 代理的方式
此处使用的代理是 CglibAopProxy
Step 2 : 调用的流程
第一个调用的对象为 GlobalTransactionalInterceptor , 由其 invoke 方法完成了相关的代理操作
PS : 我原本以为框架会优先扫描所有的标注了 @GlobalTransactional 的方法 , 并且通过相关的Manager 进行管理 , 但是单独看这一处的代码 , 采用的是代理后直接获取的方式
GlobalTransactionalInterceptor # invoke 主流程
// 先来纵观一下 invoke 方法主流程
C50- GlobalTransactionalInterceptor
I- MethodInterceptor
M50_01- invoke(final MethodInvocation methodInvocation)
- 通过 MethodInvocation 解析目标 class
- 通过 ClassUtils 解析目标 Method
- 通过 findBridgedMethod 找到原始方法 -> PS:M50_01_01
- 获得原始方法的 GlobalTransactional 注解
- 获得原始方法的 GlobalLock 注解
- 如果GlobalTransactional 注解存在 , 则执行 globalTransactionalAnnotation
- 如果 GlobalLock 存在且 GlobalTransactional 不存在 , 则执行 handleGlobalLock
?- 注意 , 这里优先执行了 globalTransactionalAnnotation
GlobalTransactionalInterceptor 其他主要方法
// PS : 此处随便来看一下其他的方法
M- initDefaultGlobalTransactionTimeout
M- 支持配置 Configuration 变动加载
public void onChangeEvent(ConfigurationChangeEvent event) {
if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
disable = Boolean.parseBoolean(event.getNewValue().trim());
} else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
degradeCheck = Boolean.parseBoolean(event.getNewValue());
if (!degradeCheck) {
degradeNum = 0;
}
}
}
M- handleGlobalLock
M- handleGlobalTransaction
// 注意 , 该方法中就会调用 transactionalTemplate 完成主要逻辑
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
// 核心 , 调用 Template , 同时构建了一个 TransactionalExecutor 传入
// TransactionalExecutor 为 template 中具体执行的方法 , 即 executor 的对象
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
// 此处是对应的方法代理 , 用于执行具体的对象 -> PRO22001
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
// 该方法用于tempalte 中调用 事务信息
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
// 构建了当前的事务信息
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
// 构建了回退的方法以及无需回退的方法
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
// 通过 Failure 类型调用failureHandler不同的处理方法
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
[PRO22001] : methodInvocation 详情
[PRO22002] : TransactionInfo 详情
总结 :
从此处可以看到 , 在GlobalTransactionalInterceptor 拦截器中 , 首先调用了 template 方法 , 同时把 executor 需要执行的具体方法传入 , 而 method 通过 invoke 代理调用
另外 , 处理异常是在 拦截器 中执行 ,而不是 Template 的模板类型
三 . Client 事务的逻辑处理
Client 的事务处理为 TransactionalTemplate 模板方法来完成 , 来看一下 TransactionalTemplate 的功能
上一步当拦截器拦截完成后 , 就会来到 template 处理环节 , 该环节最核心的类为 : TransactionalTemplate , 这一步是模板方法设计模式 , 这一步中会调用 TransactionalExecutor 执行最终的逻辑
C51- TransactionalTemplate
M51_01- execute
// 注释已经很清楚了, 就喜欢这种源码 , 清清楚楚 , 直译一下就知道流程了
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 获取 transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 获取当前 transaction, 如果不为空,tx role 为 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 事务传播处理 -> PS:M51_01_01
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// 如果事务存在,则暂停它.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// 在没有事务的情况下执行并返回.
return business.execute();
case REQUIRES_NEW:
// 如果事务存在,暂停它,然后开始新的事务.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// 继续并使用新事务执行
break;
case SUPPORTS:
// 如果事务不存在,则不执行事务.
if (notExistingTransaction(tx)) {
return business.execute();
}
// 继续并使用新事务执行
break;
case REQUIRED:
// 如果当前事务存在,则使用当前事务执行,否则继续并使用新事务执行
break;
case NEVER:
// 如果事务存在,抛出异常.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// 在没有事务的情况下执行并返回.
return business.execute();
}
case MANDATORY:
// 如果事务不存在,抛出异常.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 继续并执行当前事务.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 如果当前 GlobalTransaction 为 null , 使用 role 'GlobalTransactionRole.Launcher' 创建一个新的 Transaction.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// 设置当前tx配置为holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. 如果当前 tx(GlobalTransaction) role 为 'GlobalTransactionRole.Launcher', 发送beginTransaction的请求到TC,
// 否则不做任何事. 不过 , hooks 始终会被触发.
beginTransaction(txInfo, tx);
Object rs;
try {
// 执行业务
rs = business.execute();
} catch (Throwable ex) {
// 3. 回滚所需的业务异常.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. 一切正常后 , 提交事务.
commitTransaction(tx);
return rs;
} finally {
//5. 清空事件
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// 如果事务挂起,则恢复它.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
// PS:M51_01_01 Propagation 是什么对象 , 有什么作用 ?
可以看到 , 这个中已经把整个事务处理的主流程完成了
接上一小结 , 当执行 business.execute()
后出现异常 , 会在 execute 中处理 , 而 Template 中 , 主要是对事务的管理和回退等操作 , 这也是使用模板方法设计模式的核心 , 只涉及统一的逻辑
总结
这一篇我们看到了 Seata 逻辑的发起流程 ,可以看到 ,通过以下几个步骤发起了事务的处理 :
- beginTransaction(txInfo, tx)
- business.execute()
- completeTransactionAfterThrowing(txInfo, tx, ex)
- commitTransaction(tx)
后面一篇我们来看一下后面这几个步骤做了什么>>> 👉
附录 . 补充点
读隔离和写隔离可以参考官方文档 👉 Seata 隔离策略
4.1 读隔离详情
Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) , 其通过 SELECT FOR UPDATE 实现
流程详情
- SELECT FOR UPDATE 语句的执行会申请 全局锁
- 如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试 (意味着会一直等待正在的全局提交完成)
这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回
PS : 这里产生了疑问 , 如果插入比较多的情况 , 会导致全局锁一直被占用或者频繁占用 , 所以这里还是要做读写分离
4.2 写隔离详情
写隔离是保证事务统一的关键 , 其中涉及到全局锁和本地锁的特性 , 为了保证准确 , 使用如下模式 :
1,2 分别代表2个事务 :
一阶段提交流程
- 开启本地事务 , 拿到本地锁 ,再拿到 全局锁 , 提交本地事务 , 释放本地锁 (PS :此时始终持有全局锁)
- 其他事务提交 , 开启本地事务 , 拿到本地锁 ,等待全局锁 (PS :此时上一个事务持有全局锁)
二阶段成功提交流程
- 全局处理完成 , 释放全局锁
- 其他事务 拿到全局锁 , 按照原流程处理
二阶段回退流程 (如果出现异常导致 1 事务回退)
1 . 发生异常 , 尝试回退 , 等待本地锁 (PS : 因为本地锁被其他事务持有)
2 . 始终无法拿到全局锁 , 本地事务回退 , 释放本地锁
1 . 拿到 本地锁 ,完成回退
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)