《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://zhuanlan.zhihu.com/p/54660611 「yuanxiang」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

服务端启动:

    public static void main(String[] args) throws IOException {
RpcServer rpcServer = new RpcServer(WORKING_THREADS);

if (args.length > 0) {
int port = Integer.parseInt(args[0]);
rpcServer.setListenPort(port);

}
String dataDir = null;
if (args.length > 1) {
dataDir = args[1];
}
SessionHolder.init(dataDir);
//协调者初始化,不清楚tc,tm,rm关系的可以简单看些git上的介绍
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
coordinator.init();
rpcServer.setHandler(new DefaultCoordinator(rpcServer));
//全局事务发号器初始化
UUIDGenerator.init(1);

XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(rpcServer.getListenPort());

rpcServer.init();

System.exit(0);
}

RpcServer 是用netty实现的一个简单的rpc服务端

SessionHolder:

主要负责事务信息存储,对应的ROOT_SESSION_MANAGER,ASYNC_COMMITTING_SESSION_MANAGER,RETRY_COMMITTING_SESSION_MANAGER,RETRY_ROLLBACKING_SESSION_MANAGER分别对应相应的文件存储到本地

DefaultSessionManager内部采用队列的形式由一个线程从队列里读取session并写入本地,可以很好的避免竞争,但写入还是阻塞试等待并非异步

DefaultCoordinator :

init方法起了四个定时任务查询session并做处理,以rollback为例

retryRollbacking.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}

}
}, 0, 5, TimeUnit.MILLISECONDS);

从SessionHolder的RETRY_ROLLBACKING_SESSION_MANAGER里获取所有等待rollback的任务回滚。调用DefaultCore.doGlobalRollback->DefaultCoordinator.branchRollback

RMHandlerAT 事务提交或回滚的执行者rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));注册,具体的commit跟rollback实现都在这里不细说了

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("AT Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
BranchStatus status = dataSourceManager.branchCommit(xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("AT Branch commit result: " + status);

}

@Override
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId);
BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("AT Branch rollback result: " + status);

}

branchRegister的全局锁机制:

    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

BranchSession branchSession = new BranchSession();
branchSession.setTransactionId(XID.getTransactionId(xid));
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setApplicationId(globalSession.getApplicationId());
branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);
//锁检查
if (!branchSession.lock()) {
throw new TransactionException(LockKeyConflict);
}
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
throw new TransactionException(FailedToAddBranch);

}
return branchSession.getBranchId();
}

lockkey的机制是服务端维护的一个锁并非数据库层面的,所以这个锁仅针对fescar发起的事务有效。比如A事务包含BCD分支,需要修改TableA,后续事务如果想修改TableA就需要经过lockkey检查,直到A事务释放锁。如果有一个业务操作没有发起事务也去修改TableA,这个是不需要lockkey检查的(有可能)。

    public boolean acquireLock(BranchSession branchSession) throws TransactionException {
String resourceId = branchSession.getResourceId();
long transactionId = branchSession.getTransactionId();
ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
if (dbLockMap == null) {
LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());
dbLockMap = LOCK_MAP.get(resourceId);
}
ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
String[] tableGroupedLockKeys = branchSession.getLockKey().split(";");
for (String tableGroupedLockKey : tableGroupedLockKeys) {
int idx = tableGroupedLockKey.indexOf(":");
if (idx < 0) {
branchSession.unlock();
throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());
}
String tableName = tableGroupedLockKey.substring(0, idx);
String mergedPKs = tableGroupedLockKey.substring(idx + 1);
ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);
if (tableLockMap == null) {
dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());
tableLockMap = dbLockMap.get(tableName);
}
String[] pks = mergedPKs.split(",");
//检查主键是否还在tableLockMap里
for (String pk : pks) {
int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);
if (bucketLockMap == null) {
tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());
bucketLockMap = tableLockMap.get(bucketId);
}
synchronized (bucketLockMap) {
Long lockingTransactionId = bucketLockMap.get(pk);
if (lockingTransactionId == null) {
// No existing lock
bucketLockMap.put(pk, transactionId);
Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
if (keysInHolder == null) {
bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
keysInHolder = bucketHolder.get(bucketLockMap);
}
keysInHolder.add(pk);

} else if (lockingTransactionId.longValue() == transactionId) {
// Locked by me
continue;
} else {
LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
branchSession.unlock(); // Release all acquired locks.
return false;
}
}
}
}
return true;
}
文章目录