摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-config/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 作业配置

涉及到主要类的类图如下( 打开大图 ):

  • 黄色的类在 elastic-job-common-core 项目里,为 Elastic-Job-Lite、Elastic-Job-Cloud 公用作业配置类。

另外建议你已经( 非必须 ):

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. 作业配置

一个作业( ElasticJob )的调度,需要配置独有的一个作业调度器( JobScheduler ),两者是 1 : 1 的关系。这点大家要注意下,当然下文看代码也会看到。

作业调度器的创建可以配置四个参数:

  1. 注册中心( CoordinatorRegistryCenter ):用于协调分布式服务。必填
  2. Lite作业配置( LiteJobConfiguration ):必填
  3. 作业事件总线( JobEventBus ):对作业事件异步监听。选填
  4. 作业监听器( ElasticJobListener ):对作业执行前,执行后进行同步监听。选填

2.1 注册中心配置

Elastic-Job 抽象了注册中心接口( RegistryCenter ),并提供了默认基于 Zookeeper 的注册中心实现( ZookeeperRegistryCenter )

ZookeeperRegistryCenter 对应配置类为 ZookeeperConfiguration。该类注释很完整,可以点击链接直接查看源码,这里我们重点说下 namespace 属性。如果你有多个不同 Elastic-Job集群 时,使用相同 Zookeeper,可以配置不同的 namespace 进行隔离。

注册中心的初始化,我们会在《Elastic-Job-Lite 源码解析 —— 注册中心》详细分享。

2.2 Lite作业配置

LiteJobConfiguration 继承自接口 JobRootConfiguration,作为 Elastic-Job-Lite 里的作业( LiteJob )配置。Elastic-Job-Cloud 的作业( CloudJob )对应另外的配置类,也实现了该接口。

public final class LiteJobConfiguration implements JobRootConfiguration {
private final JobTypeConfiguration typeConfig;
private final boolean monitorExecution;
private final int maxTimeDiffSeconds;
private final int monitorPort;
private final String jobShardingStrategyClass;
private final int reconcileIntervalMinutes;
private final boolean disabled;
private final boolean overwrite;
// .... 省略部分get方法
public static class Builder {
// .... 省略部分属性
public final LiteJobConfiguration build() {
return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, reconcileIntervalMinutes, disabled, overwrite);
}
}
}
  • typeConfig:作业类型配置。必填
  • monitorExecution:监控作业运行时状态。默认为 false。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行》详细分享。

    每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。
    每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。

  • monitorPort:作业监控端口。默认为 -1,不开启作业监控端口。选填。在《Elastic-Job-Lite 源码解析 —— 作业监控服务》详细分享。

    建议配置作业监控端口, 方便开发者dump作业信息。
    使用方法: echo “dump” | nc 127.0.0.1 9888

  • maxTimeDiffSeconds:设置最大容忍的本机与注册中心的时间误差秒数。默认为 -1,不检查时间误差。选填。

  • jobShardingStrategyClass:作业分片策略实现类全路径。默认为使用分配侧路。选填。在《Elastic-Job-Lite 源码解析 —— 作业分片策略》详细分享。
  • reconcileIntervalMinutes:修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复。默认为 10。在《Elastic-Job-Lite 源码解析 —— 自诊断修复 》详细分享。

  • disabled:作业是否禁用执行。默认为 false。选填。

  • overwrite:设置使用本地作业配置覆盖注册中心的作业配置。默认为 false。选填。建议使用运维平台( console )配置作业配置,统一管理。
  • Builder 类:使用该类配置 LiteJobConfiguration 属性,调用 #build() 方法最终生成作业配置。参见:《JAVA设计模式 — 生成器模式(Builder)》

2.2.1 作业类型配置

作业类型配置接口( JobTypeConfiguration ) 有三种配置实现,针对三种作业类型:

配置实现作业说明
SimpleJobConfigurationSimpleJob简单作业。例如:订单过期作业
DataflowJobConfigurationDataflowJob数据流作业。TODO:笔者暂时未了解流式处理数据,不误人子弟
ScriptJobConfigurationScriptJob脚本作业。例如:调用 shell 脚本备份数据库作业

三种配置类属性对比如:

属性SimpleJobDataflowJobScriptJob说明
coreConfig作业核心配置
jobTypeJobType.SIMPLEJobType.DATAFLOWJobType.SCRIPT作业类型
jobClass√ (默认:ScriptJob.class)作业实现类全路径
streamingProcess是否流式处理数据
scriptCommandLine脚本型作业执行命令行

作业类型配置不仅仅适用于 Elastic-Job-Lite,也适用于 Elastic-Job-Cloud。

2.2.2 作业核心配置

作业核心配置( JobCoreConfiguration ),我们可以看到在每种作业类型配置都有该属性( coreConfig )。

public final class JobCoreConfiguration {
private final String jobName;
private final String cron;
private final int shardingTotalCount;
private final String shardingItemParameters;
private final String jobParameter;
private final boolean failover;
private final boolean misfire;
private final String description;
private final JobProperties jobProperties;
public static class Builder {
// .... 省略部分属性
public final JobCoreConfiguration build() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(cron), "cron can not be empty.");
Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
return new JobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, description, jobProperties);
}
}
}
  • jobName:作业名称。必填。
  • cron:cron表达式,用于控制作业触发时间。必填。
  • shardingTotalCount:作业分片总数。如果一个作业启动超过作业分片总数的节点,只有 shardingTotalCount 会执行作业。必填。《Elastic-Job-Lite 源码解析 —— 作业分片策略 》详细分享。
  • shardingItemParameters:分片序列号和参数。选填。

    分片序列号和参数用等号分隔,多个键值对用逗号分隔
    分片序列号从0开始,不可大于或等于作业分片总数
    如:
    0=a,1=b,2=c

  • jobParameter:作业自定义参数。选填。

    作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
    例:每次获取的数据量、作业实例从数据库读取的主键等

  • failover:是否开启作业执行失效转移。开启表示如果作业在一次作业执行中途宕机,允许将该次未完成的作业在另一作业节点上补偿执行。默认为 false。选填。在《Elastic-Job-Lite 源码解析 —— 作业失效转移 》详细分享。

  • misfire:是否开启错过作业重新执行。默认为 true。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行 》详细分享。
  • description:作业描述。选填。
  • jobProperties:作业属性配置。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行 》详细分享。

    public final class JobProperties {
    private EnumMap<JobPropertiesEnum, String> map = new EnumMap<>(JobPropertiesEnum.class);
    public enum JobPropertiesEnum {
    /**
    * 作业异常处理器.
    */
    JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),
    /**
    * 线程池服务处理器.
    */
    EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());
    private final String key;
    private final Class<?> classType;
    private final String defaultValue;
    }
    }
    • JOB_EXCEPTION_HANDLER:用于扩展异常处理类。
    • EXECUTOR_SERVICE_HANDLER:用于扩展作业处理线程池类。
    • 通过这个属性,我们可以自定义每个作业的异常处理和线程池服务。

2.3 作业事件配置

通过作业事件配置( JobEventConfiguration ),实现对作业事件的异步监听、处理。在《Elastic-Job-Lite 源码解析 —— 作业事件追踪》详细分享。

2.4 作业监听器

通过配置作业监听器( ElasticJobListener ),实现对作业执行的同步监听、处理。在《Elastic-Job-Lite 源码解析 —— 作业监听器》详细分享。

3. 作业配置服务

多个 Elastic-Job-Lite 使用相同注册中心和相同 namespace 组成集群,实现高可用。集群中,使用作业配置服务( ConfigurationService ) 共享作业配置。

public final class ConfigurationService {
/**
* 时间服务
*/
private final TimeService timeService;
/**
* 作业节点数据访问类
*/
private final JobNodeStorage jobNodeStorage;
public ConfigurationService(final CoordinatorRegistryCenter regCenter, final String jobName) {
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
timeService = new TimeService();
}
}
  • JobNodeStorage,封装注册中心,提供存储服务。在《Elastic-Job-Lite 源码解析 —— 作业数据存储》详细分享。
  • TimeService,时间服务,提供当前时间查询。

    public final class TimeService {
    /**
    * 获取当前时间的毫秒数.
    *
    * @return 当前时间的毫秒数
    */
    public long getCurrentMillis() {
    return System.currentTimeMillis();
    }
    }

3.1 读取作业配置

/**
* 读取作业配置.
*
* @param fromCache 是否从缓存中读取
* @return 作业配置
*/
public LiteJobConfiguration load(final boolean fromCache) {
String result;
if (fromCache) { // 缓存
result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
if (null == result) {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
} else {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
return LiteJobConfigurationGsonFactory.fromJson(result);
}

3.2 持久化作业配置

/**
* 持久化分布式作业配置信息.
*
* @param liteJobConfig 作业配置
*/
public void persist(final LiteJobConfiguration liteJobConfig) {
checkConflictJob(liteJobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
}
  • 调用 #checkConflictJob(...) 方法校验注册中心存储的作业配置的作业实现类全路径( jobClass )和当前的是否相同,如果不同,则认为是冲突,不允许存储:

    private void checkConflictJob(final LiteJobConfiguration liteJobConfig) {
    Optional<LiteJobConfiguration> liteJobConfigFromZk = find();
    if (liteJobConfigFromZk.isPresent()
    && !liteJobConfigFromZk.get().getTypeConfig().getJobClass().equals(liteJobConfig.getTypeConfig().getJobClass())) { // jobClass 是否相同
    throw new JobConfigurationException("Job conflict with register center. The job '%s' in register center's class is '%s', your job class is '%s'",
    liteJobConfig.getJobName(), liteJobConfigFromZk.get().getTypeConfig().getJobClass(), liteJobConfig.getTypeConfig().getJobClass());
    }
    }
  • 当注册中心未存储该作业配置 或者 当前作业配置允许替换注册中心作业配置( overwrite = true )时,持久化作业配置。

3.3 校验本机时间是否合法

/**
* 检查本机与注册中心的时间误差秒数是否在允许范围.
*
* @throws JobExecutionEnvironmentException 本机与注册中心的时间误差秒数不在允许范围所抛出的异常
*/
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
  • Elastic-Job-Lite 作业触发是依赖本机时间,相同集群使用注册中心时间为基准,校验本机与注册中心的时间误差是否在允许范围内( LiteJobConfiguration.maxTimeDiffSeconds )。

666. 彩蛋

Elastic-Job-Lite 源码解析系列第一篇文章,希望大家多多支持,预计全部更新完会有 15+ 篇。Elastic-Job-Cloud 源码系列后续也会更新。

道友,分享一波微信朋友圈支持支持支持,可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. 作业配置
    1. 2.1. 2.1 注册中心配置
    2. 2.2. 2.2 Lite作业配置
      1. 2.2.1. 2.2.1 作业类型配置
      2. 2.2.2. 2.2.2 作业核心配置
    3. 2.3. 2.3 作业事件配置
    4. 2.4. 2.4 作业监听器
  3. 3. 3. 作业配置服务
    1. 3.1. 3.1 读取作业配置
    2. 3.2. 3.2 持久化作业配置
    3. 3.3. 3.3 校验本机时间是否合法
  4. 4. 666. 彩蛋