扫码关注公众号:芋道源码

发送: 百事可乐
获取永久解锁本站全部文章的链接

《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://zhuanlan.zhihu.com/p/54659540 「yuanxiang」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

先简单走一下主流程(仅做参考,如有错误后续更正)

先从GlobalTransactional看起,这个标签是全局事务发起的标志,它是通过GlobalTransactionScanner方法在初始化对象之前自动扫描GlobalTransactional标签,创建相应方法所在类的代理类,intecepter是GlobalTransactionalInterceptor

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
if (anno != null) {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}

@Override
public int timeout() {
return anno.timeoutMills();
}

@Override
public String name() {
if (anno.name() != null) {
return anno.name();
}
return formatMethod(methodInvocation.getMethod());
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

}
}

}
return methodInvocation.proceed();
}

具体的事务调用模板在transactionalTemplate中:

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

// 2. begin transaction
try {
tx.begin(business.timeout(), business.name());

} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);

}

Object rs = null;
try {

// Do Your Business
rs = business.execute();

} catch (Throwable ex) {

// 3. any business exception, rollback.
try {
tx.rollback();

// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);

}

}

// 4. everything is fine, commit.
try {
tx.commit();

} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);

}
return rs;
}

这里的tx.begin, tx.commit等都是rpc请求,会通知tm方当前要发生的动作,有tm来管理事务的状态,然后再通知rm来具体执行相应的动作

具体看下tx.commit过程:

public void commit() throws TransactionException {
check();
RootContext.unbind();
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
return;
}
status = transactionManager.commit(xid);

}

会移除保存在threadlocal里的全局事务id, 对于分支事务不需要告诉tm事务动作,只有事务发起者才会通知tm.

DefaultTransactionManager里的commit方法:

public GlobalStatus commit(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setTransactionId(txId);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}

begin,commit,rollback方法都会调用syncCall而且request都是一样的参数

目前只支持AT模式

GlobalTransactionScanner

private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isEmpty(applicationId) || StringUtils.isEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
+ txServiceGroup + "]");
}
if ((AT_MODE & mode) > 0) {
RMClientAT.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Resource Manager for AT Client is initialized. applicationId[" + applicationId
+ "] txServiceGroup["
+ txServiceGroup + "]");
}
}
if ((MT_MODE & mode) > 0) {
throw new NotSupportYetException();
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
}

dubbo通过TransactionPropagationFilter传输xid给分支服务:

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
}
boolean bind = false;
if (xid != null) {
//消费方
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
//服务提供者方
if (rpcXid != null) {
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + rpcXid + "] to RootContext");
}
}
}
try {
return invoker.invoke(invocation);

} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
文章目录