Elastic-Job

Author Avatar
丁起男 08月 24,2021
  • 在其它设备中阅读本文章

Elastic-Job

整体架构

  • app:应用程序,内部包含任务执行业务逻辑和elastic-job-lite组件,其中执行任务需要实现ElasticJob接口完成与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实例

  • elastic-job-lite:elastic-job-lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,此组件负责任务的调度,并产生任务调度纪录

    无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务都是对等的,各个作业节点是自治的、平等的、节点之间通过注册中心进行分布式协调

  • registry:以zookeeper作为elastic-job的注册中心组件,存储了执行任务的相关信息。同时,elastic-job利用该组件进行执行任务实例的选举

  • console:elastic-job提供了运维平台,它通过读取zookeeper数据展现任务执行状态,或更新zookeeper数据修改全局配置。通过elastic-job-lite组件产生的数据来查看任务执行历史纪录

应用程序在启动时,在其内嵌的elastic-job-lite组件会向zookeeper注册该实例的信息,并触发选举(此时可能已经启动了该应用程序的其他实例),从众多实例中选举出一个leader,让其执行任务。当到达任务执行时间时,elastic-job-lite组件会调用由应用程序实现的任务业务逻辑,任务执行后会产生任务执行纪录。当应用程序的某一个实例宕机时,zookeeper组件会感知到并重新触发leader选举

zookeeper的功能

  • 对执行任务信息的存储(任务名称、任务参与实例、任务执行策略)
  • 实现选举机制,在任务执行实例数量变化时,会触发选举机制来决定让哪个实例去执行该任务

zookeeper存储信息

  • instances:同一个elastic-job的部署实例。一台机器上可以启动多个job实例,也就是jar包。instances的命名规则是:[IP+@-@+PID]
  • leader:任务实例的主节点信息,通过zk的主节点选举
    • election:主节点选举
      • instance:当前主节点的实例id
      • latch:一个永久节点用于选举的时候实现分布式锁
    • sharding:分配
      • necessary:释放需要重新分片的标记
    • failover:失效转移
  • sharding:任务的分片信息,子节点是分片项序号
  • config:配置信息
  • servers:机器列表

zookeeper选举过程

  1. 任意一个实例启动时首先创建一个/server的持久节点
  2. 多个实例同时创建/server/leader临时子节点
  3. /server/leader子节点只能创建一个,后创建的会失败。创建成功的实例被选为leader节点,用来执行任务
  4. 所有任务实例监听/server/leader的变化,一旦节点被删除,就重新进行选举,抢占式的创建/server/leader节点,谁创建成功谁就是leader

原生开发

  1. 导入依赖

            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-core</artifactId>
                <version>2.1.5</version>
            </dependency>
    
  2. 设置zookeeper

    		//注册中心配置
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "job_namespace");
            //zk超时时间
            zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
            //创建注册中心
            ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
            zookeeperRegistryCenter.init();
    
  3. 编写任务

    public class MyJob implements SimpleJob {
        //任务逻辑
        @Override
        public void execute(ShardingContext shardingContext) {
            //......
        }
    }
    
  4. 任务配置

    		//创建JobCoreConfiguration 参数1任务名称 2cron表达式 3分片
            JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                    .newBuilder("job-name", "0/3 * * * * ?", 1)
                    .build();
            //创建SimpleJobConfiguration 参数1JobCoreConfiguration 2任务类的全限定类名
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MyJob.class.getCanonicalName());
            //启动任务
            new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration
                    .newBuilder(simpleJobConfiguration)
                    .overwrite(true).build())
                    .init();
    

整合springboot

  1. 导入依赖

            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-spring</artifactId>
                <version>2.1.5</version>
            </dependency>
    
  2. 配置zookeeper

        @Bean(initMethod = "init")//调用init实现初始化
        ZookeeperRegistryCenter zookeeperRegistryCenter(){
            //注册中心配置
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "job_namespace");
            //zk超时时间
            zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
            //创建注册中心
            return new ZookeeperRegistryCenter(zookeeperConfiguration);
        }
    
  3. 配置任务

        @Autowired
        private MyJob myJob;
    
        @Autowired
        private ZookeeperRegistryCenter zookeeperRegistryCenter;
    
        @Bean(initMethod = "init")
        SpringJobScheduler initSimpleElasticJob(){
            //创建SpringJobScheduler
            return new SpringJobScheduler(myJob,
                    zookeeperRegistryCenter,
                    createJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 1, null));
        }
    
        /**
         * 配置任务想象信息
         * @param jobClass 任务执行类
         * @param cron 执行策略
         * @param shardingTotalCount 分片数量
         * @param shardingItemParameters 分片个性化参数
         * @return
         */
        private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                            final String cron,
                                                            final int shardingTotalCount,
                                                            final String shardingItemParameters){
            //定义核心配置
            JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration
                    .newBuilder(jobClass.getName(), cron, shardingTotalCount);
            //设置shardingItemParameters
            if (shardingItemParameters != null){
                jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
            }
            JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build();
            //创建SimpleJobConfiguration
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
            //创建liteJobConfiguration
            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
                    .newBuilder(simpleJobConfiguration)
                    .overwrite(true)
                    .build();
            return liteJobConfiguration;
        }
    

作业分片

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项

自定义分片

  1. 设置分片参数
    @Bean(initMethod = "init")
    SpringJobScheduler initSimpleElasticJob(){
        //创建SpringJobScheduler
        return new SpringJobScheduler(myJob,
                zookeeperRegistryCenter,
                createJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 3, "0=a,1=b,2=c"));//数量要和分片数量相同,下标从0开始
    }
  1. 在任务中获取分片参数:shardingContext.getShardingParameter();

    例如当前是分片0时,获取的参数就是a

事件追踪

  1. 配置文件中添加事务追踪配置

        @Autowired
        private DataSource dataSource;
    
        @Bean(initMethod = "init")
        SpringJobScheduler initSimpleElasticJob(){
            //事件追踪配置
            JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
            //创建SpringJobScheduler
            return new SpringJobScheduler(myJob,
                    zookeeperRegistryCenter,
                    createJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 3, "0=a,1=b,2=c"),jobEventRdbConfiguration);
        }
    
  2. 启动项目,此时数据库中会多两张表

    • job_execution_log:纪录每次作业的执行历史
    • job_status_trace_log:纪录作业状态变更痕迹