您的当前位置:首页正文

elastic-joblite编程实战经验

2022-07-01 来源:小奈知识网
elastic-joblite编程实战经验

(继续贴⼀篇之前写的经验案例)

elastic-job lite 编程实战经验

其实这是⼀次失败的项⽬,虽然最后还是做出来了,但是付出了很⼤代价。并且需要较深⼊的踩坑改造elastic-job,导致代码的可读性,可维护性也很差。

事实证明 elastic-job lite 并不适合⽤于做 需要长时间运⾏(可能⼏⼩时或⼏天不停⽌)的作业调度。

⼀、 elastic-job 简介

Elastic-Job是当当推出的⼀个开源分布式调度解决⽅案,由两个相互独⽴的⼦项⽬Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级⽆中⼼化解决⽅案。详见官⽹介绍,传送门

本⽂从编程踩坑和⼤量测试中提炼,讲解⽬的在于帮助有基础的开发者理解elastic-job真实的运⾏逻辑,解答编程中官⽅⽂档未提及的疑惑,避免重复踩坑。基础的概念和⾼深的主从节点选举失效转移不是本⽂要讲的内容,请移步官⽅⽂档,本⽂更专注于怎么⽤对。⼆、 elastic-job 与 zookeeper

elastic-job 基于zk实现分布式协调,重要的作业信息都被存储在了zk上,如图:algosmElasticJobs 节点是⾃定义的命名空间的名称。nameLib-295是作业名

servers 通过⼦节点记录作业在哪⼏台机器上正在运⾏,⼀个机器对应⼀个⼦节点,运⾏结束后,servers⼦节点会被删除。instances 记录了job实例与机器的关系config 存储作业的配置信息

sharding 展开 是作业的分⽚信息,及每个分⽚运⾏在哪个机器上,与运⾏状态。

每个作业对应命名空间algosmElasticJobs下的⼀个⼦节点(节点名称为作业名)

三、 elastic-job 作业、分⽚与线程

作业运⾏起来后,服务器、实例、线程、分⽚的关系图:(2台服务器,每台运⾏2个作业,每个作业4个分⽚)

1)作业 job : 实现作业逻辑的class类,需重写execute(ShardingContext shardingContext) ⽅法。

elastic-job创建⼀个作业时,会在当前服务器上拉起⼀个作业class的实例,并需要为该实例指定唯⼀的作业名称。(如下图所⽰作业名称各不相同)

同⼀个class可以创建多个实例,从⽽⽣成多个作业(各个作业的名称不同,作业⾃定义配置也可设置的不⼀样)。如下图 nameLib 开头的是由ClassA ⽣成, makelabel是由ClassB⽣成。

2)分⽚ sharding:

创建作业时,需要设置这个作业有多少个分⽚来执⾏,如果作业A在机器1上获得了两个分⽚,那么这两个分⽚实际上是两个线程(注意是⼀台机器上获得两个分⽚,每台机器上装⼀个elastic-job 服务的情况下),这两个线程共⽤的是同⼀个class实例,也就是说这两个分⽚ 会共享 此class实例的成员变量。分⽚1修改了实例的成员变量,就会被分⽚2读到,从⽽影响分⽚2的作业逻辑。

如果想要为每个分⽚设置独享的变量,从⽽不受到其他分⽚影响,那么就需要⽤到线程变量。 ⽅法是,在该class中定义线程变量,⽤法如下:

/**

* 与线程相关的变量, key 线程号 */

private Map threadParam = new ConcurrentHashMap<>();

//初始化线程变量private void initParam(ShardingContext shardingContext){

//elasticJob 单实例多线程,每次拉起需要清理线程上次残留的状态 JobBaseParam jobBaseParam = new JobBaseParam();

jobBaseParam.setShardingItem(shardingContext.getShardingItem()); jobBaseParam.setCompletedActive(false); jobBaseParam.setOver(true);

jobBaseParam.setReceiveOver(false);

threadParam.put(Thread.currentThread().getId(),jobBaseParam); jobName = shardingContext.getJobName();

}

//使⽤线程变量threadParam.get(Thread.currentThread().getId()).setCompletedActive(true);threadParam.get(masterThreadId).getReceiveOver()

线程变量使⽤时,需⼩⼼:在哪个线程⾥threadParam.put的变量,就需要在哪个线程⾥threadParam.get,例如在主线程⾥put变量,然后在⼦线

程⾥get变量,就会get不到,产⽣逻辑错误。

关于作业Class中的静态变量,该静态变量将会被由改class new出来的所有作业分⽚读到,作⽤域范围最⼤。

jobClass不同变量作⽤域: 变量类型作⽤域

静态变量成员变量线程变量所有作业实例、所有分当前作业实例的所有分当前分⽚⽚⽚

如果想要长久保留分⽚要⽤的变量,每次分⽚拉起时⾃动从上⼀次状态继续,可以将与分⽚相关的变量存储到zk上,作业对应的分⽚节点下⾯,

类似:

algosm是⾃定义的前缀标识,以与elastic-job管理的节点区分。注意分⽚下加⾃定义节点,是不会影响elastic-job运⾏的,也不会被elastic-job 清除,是可⾏的⽅案。

四、 elastic-job 分⽚与失效转移

想要作业A失效转移⽣效,前提是每台服务器上都要在运⾏着作业A。

分⽚序号从0开始,当服务器1同时获得两个分⽚,分⽚1执⾏完毕,分⽚2未结束的情况下,分⽚1不会被再次触发,⼀直要等到分⽚2结束。经典模式,正常运⾏时,会随机分⽚,导致作业分⽚在不同机器上切换。

作业的两个分⽚在同⼀台服务器上时,分⽚1与分⽚2⽤的是同⼀实例,不同线程,若有状态残留在实例的成员变量中,需要⼩⼼,建议分⽚每次运⾏都要初始化⼀次状态。

五、 elastic-job 作业重启恢复

elastic-job 如果发⽣重启,是不会⾃动将作业拉起的,虽然其作业配置存储到了zk上,需要⾃⾏实现重启,拉起作业功能。

实现要点,是要判断作业是否为异常结束,⾮正常销毁的作业,会在servers节点下残留⼦节点,如果servers的⼦节点不为空,说明是异常停⽌,需要被拉起

config节点中存储了作业的配置信息实战代码如下:

(代码中jobPool是⾃⾏实现的⼀个作业池⽤来管理作业的实例,jobPoolLock是⾃⾏实现的细粒度锁)

@Override

public Boolean loadPreJob(){ try {

String rootPath = \"/\"+algosmJobConfig.getRegCenterNamespace(); if(!zkClient.exists(rootPath)){

logger.info(\"初次运⾏,未发现{}\ return true; }

List zkJobList = zkClient.getChildren(rootPath); if(!CollectionUtils.isEmpty(zkJobList)){ for(String jobName : zkJobList){ try{

String jobPath = rootPath+\"/\"+jobName;

//只要servers 不是空的,就说明作业⾮正常终⽌,需要将作业拉起来 if(!CollectionUtils.isEmpty(zkClient.getChildren(jobPath+\"/servers\"))){ //说明是⾮正常结束的job,需要拉起 String jobConfig = zkClient.readData(jobPath+\"/config\ if(StringUtils.isEmpty(jobConfig)){

logger.warn(\"job=[{}] config为空\ }else{ //取出参数 JsonNode jobNode = jsonTool.readTree(jobConfig); String jobParam = jobNode.get(\"jobParameter\").textValue(); //创建任务 String[] jobInfo = jobName.split(\"-\");

if(jobInfo.length != 2 || AlgosmJobType.trans(jobInfo[0])==null){ logger.warn(\"jobName={} 命名⾮法\ }else{

jobPoolLock.lock(jobName); try{

logger.info(\"初始化-加载job {}\ if(jobPool.containsKey(jobName)){

logger.warn(\"job={}已存在,跳过!\ }else{

JobEntityConfig jobEntityConfig = jobControl.createJob(jobName,AlgosmJobType.trans(jobInfo[0]),jobParam); JobScheduler jobScheduler = new SpringJobScheduler(jobEntityConfig.getJobEntity(),

zookeeperRegistryCenter, jobEntityConfig.getJobConfiguration(), new AlgosmElasticJobListener()); jobScheduler.init();

jobPool.put(jobName,jobEntityConfig.getJobEntity()); logger.info(\"加载job={}成功! param={}\ }

}finally {

jobPoolLock.unlock(jobName); } } }

}

}catch (Exception e){

logger.error(\"加载作业失败!jobName={}\ continue; }

} }

}catch (Exception e){

logger.error(\"初始化加载作业失败!\ return false; }

return true;}

六、 elastic-job 分布式作业控制与任务状态统计

如何控制所有机器上作业的启停,如何获取当前作业的运⾏状态,考虑到作业是运⾏在多台机器上的,所以挂了⼀台,作业并不算停⽌。作业运

⾏也并⾮所有机器都在跑就算运⾏,作业跑在不同机器上,每个机器上⼜可能不⽌⼀个分⽚所有分⽚的任务统计数据叠加,才算是作业准确的统计数据。

这块是需要⾃⾏实现的,elastic-job是不⽀持的。笔者已实现该部分功能,限于篇幅与时间限制,等下篇再讲述。(卖个关⼦,从下图分⽚中的⾃定义节点命名可看出⼀⼆)。

因篇幅问题不能全部显示,请点此查看更多更全内容