摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-storage/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 作业数据存储

涉及到主要类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. JobNodePath

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀

在 Zookeeper 看一个作业的数据存储:

[zk: localhost:2181(CONNECTED) 65] ls /elastic-job-example-lite-java/javaSimpleJob
[leader, servers, config, instances, sharding]
  • elastic-job-example-lite-java:作业节点集群名,使用 ZookeeperConfiguration.namespace 属性配置。
  • javaSimpleJob:作业名字,使用 JobCoreConfiguration.jobName 属性配置。
  • config / servers / instances / sharding / leader:不同服务的数据存储节点路径。

JobNodePath,注释很易懂,点击链接查看。这里我们梳理下 JobNodePath 和其它节点路径类的关系:

Zookeeper 路径JobNodePath 静态属性JobNodePath 方法节点路径类
configCONFIG_NODE#getConfigNodePath()ConfigurationNode
serversSERVERS_NODE#getServerNodePath()ServerNode
instancesINSTANCES_NODE#getInstancesNodePath()InstanceNode
shardingSHARDING_NODE#getShardingNodePath()ShardingNode
leader/#getFullPath(node)LeaderNode
leader/failover/#getFullPath(node)FailoverNode
guarantee/#getFullPath(node)GuaranteeNode

3. JobNodeStorage

JobNodeStorage,作业节点数据访问类。

Elastic-Job-Lite 使用注册中心存储作业节点数据,JobNodeStorage 对注册中心提供的方法做下简单的封装提供调用。举个例子:

// JobNodeStorage.java
private final CoordinatorRegistryCenter regCenter;
private final JobNodePath jobNodePath;
/**
* 判断作业节点是否存在.
*
* @param node 作业节点名称
* @return 作业节点是否存在
*/
public boolean isJobNodeExisted(final String node) {
return regCenter.isExisted(jobNodePath.getFullPath(node));
}
// JobNodePath.java
/**
* 获取节点全路径.
*
* @param node 节点名称
* @return 节点全路径
*/
public String getFullPath(final String node) {
return String.format("/%s/%s", jobName, node);
}
  • 传递的参数 node 只是简单的作业节点名称,通过调用 JobNodePath#getFullPath(...) 方法获取节点全路径。
  • 其它方法类似,有兴趣的同学点击链接查看。

4. ConfigurationNode

ConfigurationNode,配置节点路径。

在 Zookeeper 看一个作业的配置节点数据存储:

[zk: localhost:2181(CONNECTED) 67] get /elastic-job-example-lite-java/javaSimpleJob/config
{"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0/5 * * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}
  • /config持久节点,存储Lite作业配置( LiteJobConfiguration ) JSON化字符串。

ConfigurationNode 代码如下:

public final class ConfigurationNode {
static final String ROOT = "config";
}

ConfigurationNode 如何读取、存储,在《Elastic-Job-Lite 源码分析 —— 作业配置》的「3.」作业配置服务已经详细解析。

5. ServerNode

ServerNode,服务器节点路径。

在 Zookeeper 看一个作业的服务器节点数据存储:

[zk: localhost:2181(CONNECTED) 72] ls /elastic-job-example-lite-java/javaSimpleJob/servers
[192.168.16.164, 169.254.93.156, 192.168.252.57, 192.168.16.137, 192.168.3.2, 192.168.43.31]
[zk: localhost:2181(CONNECTED) 73] get /elastic-job-example-lite-java/javaSimpleJob/servers/192.168.16.164
  • /servers/ 目录下以 IP 为数据节点路径存储每个服务器节点。如果相同IP服务器有多个服务器节点,只存储一个 IP 数据节点。
  • /servers/${IP}持久节点,不存储任何信息,只是空串( "");

ServerNode 代码如下:

public final class ServerNode {
/**
* 服务器信息根节点.
*/
public static final String ROOT = "servers";
private static final String SERVERS = ROOT + "/%s";
}

ServerNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。

6. InstanceNode

InstanceNode,运行实例节点路径。

在 Zookeeper 看一个作业的运行实例节点数据存储:

[zk: localhost:2181(CONNECTED) 81] ls /elastic-job-example-lite-java/javaSimpleJob/instances
[192.168.16.137@-@56010]
[zk: localhost:2181(CONNECTED) 82] get /elastic-job-example-lite-java/javaSimpleJob/instances
  • /instances 目录下以作业实例主键( JOB_INSTANCE_ID ) 为数据节点路径存储每个运行实例节点。
  • /instances/${JOB_INSTANCE_ID}临时节点,不存储任何信息,只是空串( "");
  • JOB_INSTANCE_ID 生成方式:

    // JobInstance.java
    jobInstanceId = IpUtils.getIp()
    + DELIMITER
    + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID

InstanceNode 代码如下:

public final class InstanceNode {
/**
* 运行实例信息根节点.
*/
public static final String ROOT = "instances";
private static final String INSTANCES = ROOT + "/%s";
/**
* 获取当前运行实例节点路径
*
* @return 当前运行实例节点路径
*/
String getLocalInstanceNode() {
return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}

InstanceNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。

7. ShardingNode

ShardingNode,分片节点路径。

在 Zookeeper 看一个作业的分片节点数据存储:

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/sharding
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/sharding/0
[running, instance, misfire]
[zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
192.168.16.137@-@56010

ShardingNode,代码如下:

public final class ShardingNode {
/**
* 执行状态根节点.
*/
public static final String ROOT = "sharding";
static final String INSTANCE_APPENDIX = "instance";
public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX;
static final String RUNNING_APPENDIX = "running";
static final String RUNNING = ROOT + "/%s/" + RUNNING_APPENDIX;
static final String MISFIRE = ROOT + "/%s/misfire";
static final String DISABLED = ROOT + "/%s/disabled";
static final String LEADER_ROOT = LeaderNode.ROOT + "/" + ROOT;
static final String NECESSARY = LEADER_ROOT + "/necessary";
static final String PROCESSING = LEADER_ROOT + "/processing";
}
  • LEADER_ROOT / NECESSARY / PROCESSING 放在「4.7」LeaderNode 解析。

8. LeaderNode

LeaderNode,主节点路径。

leader 目录下一共有三个存储子节点:

  • election:主节点选举。
  • sharding:作业分片项分配。
  • failover:作业失效转移。

主节点选举

在 Zookeeper 看一个作业的 leader/election 节点数据存储:

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/election
[latch, instance]
[zk: localhost:2181(CONNECTED) 2] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
192.168.16.137@-@1910

作业分片项分配

在 Zookeeper 看一个作业的 leader/sharding 节点数据存储:

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[necessary, processing]
[zk: localhost:2181(CONNECTED) 2] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[zk: localhost:2181(CONNECTED) 3] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/processing
  • /leader/sharding/necessary永久节点,当相同作业有新的作业节点加入或者移除时,存储空串( "" ),标记需要进行作业分片项重新分配;当重新分配完成后,移除该数据节点。
  • /leader/sharding/processing临时节点,当开始重新分配作业分片项时,存储空串( "" ),标记正在进行重新分配;当重新分配完成后,移除该数据节点。
  • 当且仅当作业节点为主节点时,才可以执行作业分片项分配,《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。

作业失效转移

作业失效转移数据节点在 FailoverNode,放在「9」FailoverNode 解析。

这里大家可能会和我一样比较疑惑,为什么 /leader/failover 放在 /leader 目录下,而不独立成为一个根目录?经过确认,作业失效转移 设计到分布式锁,统一存储在 /leader 目录下。


LeaderNode,代码如下:

public final class LeaderNode {
/**
* 主节点根路径.
*/
public static final String ROOT = "leader";
static final String ELECTION_ROOT = ROOT + "/election";
static final String INSTANCE = ELECTION_ROOT + "/instance";
static final String LATCH = ELECTION_ROOT + "/latch";
}

9. FailoverNode

FailoverNode,失效转移节点路径。

在 Zookeeper 看一个作业的失效转移节点数据存储:

[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover
[latch, items]
[zk: localhost:2181(CONNECTED) 4] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover/items
[0]
  • /leader/failover/latch 作业失效转移分布式锁,和 /leader/failover/latch 是一致的。
  • /leader/items/${ITEM_ID}永久节点,当某台作业节点 CRASH 时,其分配的作业分片项标记需要进行失效转移,存储其分配的作业分片项的 /leader/items/${ITEM_ID} 为空串( "" );当失效转移标记,移除 /leader/items/${ITEM_ID},存储 /sharding/${ITEM_ID}/failover 为空串( "" ),临时节点,需要进行失效转移执行。《Elastic-Job-Lite 源码分析 —— 作业失效转移》详细解析。

FailoverNode 代码如下:

public final class FailoverNode {
static final String FAILOVER = "failover";
static final String LEADER_ROOT = LeaderNode.ROOT + "/" + FAILOVER;
static final String ITEMS_ROOT = LEADER_ROOT + "/items";
static final String ITEMS = ITEMS_ROOT + "/%s";
static final String LATCH = LEADER_ROOT + "/latch";
private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;
static String getItemsNode(final int item) {
return String.format(ITEMS, item);
}
static String getExecutionFailoverNode(final int item) {
return String.format(EXECUTION_FAILOVER, item);
}
}

10. GuaranteeNode

GuaranteeNode,保证分布式任务全部开始和结束状态节点路径。在《Elastic-Job-Lite 源码分析 —— 作业监听器》详细解析。

666. 彩蛋

旁白君:芋道君,你又水更了!
芋道君:屁屁屁,劳资怼死你!如下是作业数据存储整理,哼哼哈兮!

道友,赶紧上车,分享一波朋友圈!

文章目录
  1. 1. 1. 概述
  2. 2. 2. JobNodePath
  3. 3. 3. JobNodeStorage
  4. 4. 4. ConfigurationNode
  5. 5. 5. ServerNode
  6. 6. 6. InstanceNode
  7. 7. 7. ShardingNode
  8. 8. 8. LeaderNode
  9. 9. 9. FailoverNode
  10. 10. 10. GuaranteeNode
  11. 11. 666. 彩蛋