⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/election/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 主节点选举

建议前置阅读:

涉及到主要类的类图如下( 打开大图 ):

  • 粉色的类在 com.dangdang.ddframe.job.lite.internal.election 包下,实现了 Elastic-Job-Lite 主节点选举。

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. 为什么需要选举主节点

首先我们来看一段官方对 Elastic-Job-Lite 的介绍:

Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。

无中心化,意味着 Elastic-Job-Lite 不存在一个中心执行一些操作,例如:分配作业分片项。Elastic-Job-Lite 选举主节点,通过主节点进行作业分片项分配。目前,必须在主节点执行的操作有:分配作业分片项,调解分布式作业不一致状态。

另外,主节点的选举是以作业为维度。例如:有一个 Elastic-Job-Lite 集群有三个作业节点 ABC,存在两个作业 ab,可能 a 作业的主节点是 Cb 作业的主节点是 A

3. 选举主节点

调用 LeaderService#electLeader() 选举主节点。

大体流程如下( 打开大图 ):

实现代码如下:

// LeaderService.java
/**
* 选举主节点.
*/
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}

// JobNodeStorage.java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}

// LeaderElectionExecutionCallback.java
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {

@Override
public void execute() {
if (!hasLeader()) { // 当前无主节点
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}

  • 使用 Curator LeaderLatch 分布式锁,保证同一时间有且仅有一个工作节点能够调用 LeaderElectionExecutionCallback#execute() 方法执行主节点设置。Curator LeaderLatch 在《Elastic-Job-Lite 源码分析 —— 注册中心》「3.1 在主节点执行操作」有详细解析。

  • LeaderElectionExecutionCallback#execute() 为什么要调用 #hasLeader() 呢?LeaderLatch 只保证同一时间有且仅有一个工作节点,在获得分布式锁的工作节点结束逻辑后,第二个工作节点会开始逻辑,如果不判断当前是否有主节点,原来的主节点会被覆盖。

    // LeaderService.java
    /**
    * 判断是否已经有主节点.
    *
    * @return 是否已经有主节点
    */
    public boolean hasLeader() {
    return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
    }

  • 选举成功后,Zookeeper 存储作业的主节点:/${JOB_NAME}/leader/electron/instance 为当前节点。该节点为临时节点。

    [zk: localhost:2181(CONNECTED) 7] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
    192.168.16.137@-@82496

选举主节点时机

第一种,注册作业启动信息时。

/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
// .... 省略部分方法
// 选举 主节点
leaderService.electLeader();
// .... 省略部分方法
}

  • 新的作业启动时,即能保证选举出主节点。
    • 当该作业不存在主节点时,当前作业节点成为主节点。
    • 当该作业存在主节点,当前作业节主节点不变

第二种,节点数据发生变化时。

class LeaderElectionJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
leaderService.electLeader();
}
}
}

  • 符合重新选举主节点分成两种情况。

  • 主动选举 #isActiveElection(...)

    private boolean isActiveElection(final String path, final String data) {
    return !leaderService.hasLeader() // 不存在主节点
    && isLocalServerEnabled(path, data); // 开启作业
    }

    private boolean isLocalServerEnabled(final String path, final String data) {
    return serverNode.isLocalServerPath(path)
    && !ServerStatus.DISABLED.name().equals(data);
    }

    • 当作业被禁用( LiteJobConfiguration.disabled = true )时,作业是不存在主节点的。那有同学就有疑问了?LeaderService#electLeader() 没做这个限制呀,作业注册作业启动信息时也进行了选举。在「4. 删除主节点」小结,我们会解开这个答案。这里大家先记住这个结论。
    • 根据上面我们说的结论,这里就很好理解了,#isActiveElection() 方法判断了两个条件:( 1 ) 不存在主节点;( 2 ) 开启作业,不再禁用,因此需要进行主节点选举落。
    • 这里判断开启作业的方法 #isLocalServerEnabled(...) 有点特殊,它不是通过作业节点是否处于开启状态,而是该数据不是将作业节点更新成关闭状态。举个例子:作业节点处于禁用状态,使用运维平台设置作业节点开启,会进行主节点选举;作业节点处于开启状态,使用运维平台设置作业节点禁用,不会进行主节点选举。
  • 被动选举 #isPassiveElection(...)

    private boolean isPassiveElection(final String path, final Type eventType) {
    return isLeaderCrashed(path, eventType) // 主节点 Crashed
    && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()); // 当前节点正在运行中(未挂掉)
    }

    private boolean isLeaderCrashed(final String path, final Type eventType) {
    return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
    }

    • 当主节点因为各种情况( 「4. 删除主节点」会列举 )被删除,需要重新进行选举。对的,必须主节点被删除后才可以重新进行选举
    • #isPassiveElection(...) 方法判断了两个条件:( 1 ) 原主节点被删除;( 2 ) 当前节点正在运行中(未挂掉),可以参加主节点选举。
    • #isLeaderCrashed(...) 方法虽然命名带有 Crashed 英文,实际主作业节点正常退出也符合被动选举条件。

等待主节点选举完成

必须在主节点执行的操作,执行之前,需要判断当前节点是否为主节点。如果主节点已经选举好,可以直接进行判断。但是,不排除主节点还没选举到,因而需要阻塞等待到主节点选举完成后才能进行判断。

实现代码如下:

// LeaderService.java

/**
* 判断当前节点是否是主节点.
*
* 如果主节点正在选举中而导致取不到主节点, 则阻塞至主节点选举完成再返回.
*
* @return 当前节点是否是主节点
*/
public boolean isLeaderUntilBlock() {
// 不存在主节点 && 有可用的服务器节点
while (!hasLeader() && serverService.hasAvailableServers()) {
log.info("Leader is electing, waiting for {} ms", 100);
BlockUtils.waitingShortTime();
if (!JobRegistry.getInstance().isShutdown(jobName)
&& serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { // 当前服务器节点可用
electLeader();
}
}
// 返回当前节点是否是主节点
return isLeader();
}

  • 调用 BlockUtils#waitingShortTime() 方法,选举不到主节点进行等待,避免不间断、无间隔的进行主节点选举。

4. 删除主节点

有主节点的选举,必然有主节点的删除,否则怎么进行重新选举

实现代码如下:

// LeaderService.java
/**
* 删除主节点供重新选举.
*/
public void removeLeader() {
jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
}

删除主节点时机

第一种,主节点进程正常关闭时。

public final class JobShutdownHookPlugin extends ShutdownHookPlugin {

@Override
public void shutdown() {
CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
if (null == regCenter) {
return;
}
LeaderService leaderService = new LeaderService(regCenter, jobName);
if (leaderService.isLeader()) {
leaderService.removeLeader(); // 移除主节点
}
new InstanceService(regCenter, jobName).removeInstance();
}
}

  • 这个比较好理解,退出进程,若该进程为主节点,需要将自己移除。

第二种,主节点进程 CRASHED 时。

${JOB_NAME}/leader/electron/instance临时节点,主节点进程 CRASHED 后,超过最大会话时间,Zookeeper 自动进行删除,触发重新选举逻辑。

第三种,作业被禁用时。

class LeaderAbdicationJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}

private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}

  • 这里就解答上面我们遗留的疑问。被禁用的作业注册作业启动信息时即使进行了主节点选举,也会被该监听器处理,移除该选举的主节点。

第四种,主节点进程远程关闭。

// InstanceShutdownStatusJobListener.java
class InstanceShutdownStatusJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName)
&& !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused() // 作业未暂停调度
&& isRemoveInstance(path, eventType) // 移除【运行实例】事件
&& !isReconnectedRegistryCenter()) { // 运行实例被移除
schedulerFacade.shutdownInstance();
}
}

private boolean isRemoveInstance(final String path, final Type eventType) {
return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
}

private boolean isReconnectedRegistryCenter() {
return instanceService.isLocalJobInstanceExisted();
}
}

// SchedulerFacade.java
/**
* 终止作业调度.
*/
public void shutdownInstance() {
if (leaderService.isLeader()) {
leaderService.removeLeader(); // 移除主节点
}
monitorService.close();
if (reconcileService.isRunning()) {
reconcileService.stopAsync();
}
JobRegistry.getInstance().shutdown(jobName);
}

  • 远程关闭作业节点有两种方式:
    • zkClient 发起命令:rmr /${NAMESPACE}/${JOB_NAME}/instances/${JOB_INSTANCE_ID}
    • 运维平台发起 Shutdown 操作。Shutdown 操作实质上就是第一种。

666. 彩蛋

知识星球

旁白君:哎哟,这次竟然分享了点干货 😈
芋道君:嘿呀嘿呀,必须的啊,虽然有点焦头烂额啦。

道友,赶紧上车,分享一波朋友圈!

文章目录
  1. 1. 1. 概述
  2. 2. 2. 为什么需要选举主节点
  3. 3. 3. 选举主节点
  4. 4. 4. 删除主节点
  5. 5. 666. 彩蛋