《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://zhuanlan.zhihu.com/p/54815876 「yuanxiang」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

AsyncWorker目前只实现了branchCommit,用于在分支事务提交后异步删除undo sql记录,目前branchrollback接口还没有实现

private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
return;
}
Map<String, List<Phase2Conjava>> mappedConjavas = new HashMap<>();
Iterator<Phase2Conjava> iterator = ASYNC_COMMIT_BUFFER.iterator();
while (iterator.hasNext()) {
Phase2Conjava commitConjava = iterator.next();
List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(commitConjava.resourceId);
if (conjavasGroupedByResourceId == null) {
conjavasGroupedByResourceId = new ArrayList<>();
mappedConjavas.put(commitConjava.resourceId, conjavasGroupedByResourceId);
}
conjavasGroupedByResourceId.add(commitConjava);

iterator.remove();

}

for (String resourceId : mappedConjavas.keySet()) {
Connection conn = null;
try {
try {
DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId);
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle);
continue;
}

List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(resourceId);
for (Phase2Conjava commitConjava : conjavasGroupedByResourceId) {
try {
UndoLogManager.deleteUndoLog(commitConjava.xid, commitConjava.branchId, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to delete undo log [" + commitConjava.branchId + "/" + commitConjava.xid + "]", ex);
}
}

} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}


}


}

ConnectionProxy.commit在提交是会调用branchRegiester。在如下方法register()里

public void commit() throws SQLException {
if (conjava.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (conjava.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}

}
report(true);
conjava.reset();

} else {
targetConnection.commit();
}
}

DataSourceManager是rm的实现:

branchRegister通过rpcclient到服务端注册分支事务

DefaultCoordinator来注册分支事务

@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcConjava rpcConjava) throws TransactionException {
response.setTransactionId(request.getTransactionId());
response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcConjava.getClientId(),
XID.generateXID(request.getTransactionId()), request.getLockKey()));

}

DefaultCore.branchRegister

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

BranchSession branchSession = new BranchSession();
branchSession.setTransactionId(XID.getTransactionId(xid));
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setApplicationId(globalSession.getApplicationId());
branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);

if (!branchSession.lock()) {
throw new TransactionException(LockKeyConflict);
}
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
throw new TransactionException(FailedToAddBranch);

}
return branchSession.getBranchId();
}

分支branch注册到全局事务分支上

commit过程:

第一阶段提交以executeUpdate为例,入口地址如下:

public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
@Override
public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
return statement.executeUpdate();
}
});
}

ExecuteTemplate.execute根据不同的语句生成不同的执行器

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {

if (!RootConjava.inGlobalTransaction()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}

if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
}
T rs = null;
try {
rs = executor.execute(args);

} catch (Throwable ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
// Turn everything into SQLException
new SQLException(ex);
}
}
return rs;
}

以UpdateExecutor为例,UpdateExecutor.execute通过模板类会调用如下方法:

public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}

具体实现会保存更改前后的数据镜像并插入到undo log里并commit。只有回滚用undo log里的数据生成sql语句回滚

protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
return result;
}

回滚在上一篇已经说了,就不多说了

由于AT事务在第一阶段已提交,所以commit过程是由Asyncworker异步删除undo log,真正的commit是在第一阶段完成的

public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);
}

TableMetaCache:

这个类里存储了缓存的TableMeta(一个小问题,如果tablemeta发生变更如果应用服务没有重启的话,默认15分钟缓存才过期。这种情况应该很少,正常情况下如果tablemeta有变更的话,相应的业务应用应该都有代码变更需要重新部署,或者考虑数据库更改对应的业务兼容性的问题)

文章目录