《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://zhuanlan.zhihu.com/p/54659540 「yuanxiang」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

先简单走一下主流程(仅做参考,如有错误后续更正)

先从GlobalTransactional看起,这个标签是全局事务发起的标志,它是通过GlobalTransactionScanner方法在初始化对象之前自动扫描GlobalTransactional标签,创建相应方法所在类的代理类,intecepter是GlobalTransactionalInterceptor

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
if (anno != null) {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}

@Override
public int timeout() {
return anno.timeoutMills();
}

@Override
public String name() {
if (anno.name() != null) {
return anno.name();
}
return formatMethod(methodInvocation.getMethod());
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

}
}

}
return methodInvocation.proceed();
}

具体的事务调用模板在transactionalTemplate中:

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

// 2. begin transaction
try {
tx.begin(business.timeout(), business.name());

} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);

}

Object rs = null;
try {

// Do Your Business
rs = business.execute();

} catch (Throwable ex) {

// 3. any business exception, rollback.
try {
tx.rollback();

// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);

}

}

// 4. everything is fine, commit.
try {
tx.commit();

} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);

}
return rs;
}

这里的tx.begin, tx.commit等都是rpc请求,会通知tm方当前要发生的动作,有tm来管理事务的状态,然后再通知rm来具体执行相应的动作

具体看下tx.commit过程:

public void commit() throws TransactionException {
check();
RootContext.unbind();
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
return;
}
status = transactionManager.commit(xid);

}

会移除保存在threadlocal里的全局事务id, 对于分支事务不需要告诉tm事务动作,只有事务发起者才会通知tm.

DefaultTransactionManager里的commit方法:

public GlobalStatus commit(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setTransactionId(txId);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}

begin,commit,rollback方法都会调用syncCall而且request都是一样的参数

目前只支持AT模式

GlobalTransactionScanner

private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isEmpty(applicationId) || StringUtils.isEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
+ txServiceGroup + "]");
}
if ((AT_MODE & mode) > 0) {
RMClientAT.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Resource Manager for AT Client is initialized. applicationId[" + applicationId
+ "] txServiceGroup["
+ txServiceGroup + "]");
}
}
if ((MT_MODE & mode) > 0) {
throw new NotSupportYetException();
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
}

dubbo通过TransactionPropagationFilter传输xid给分支服务:

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
}
boolean bind = false;
if (xid != null) {
//消费方
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
//服务提供者方
if (rpcXid != null) {
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + rpcXid + "] to RootContext");
}
}
}
try {
return invoker.invoke(invocation);

} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
文章目录