当前位置: 首页> 房产> 建筑 > 亚洲永久免费云服务器_旅游网站建设步骤_网络建站平台_工具大全

亚洲永久免费云服务器_旅游网站建设步骤_网络建站平台_工具大全

时间:2025/7/12 12:18:54来源:https://blog.csdn.net/weixin_65935065/article/details/146406965 浏览次数:0次
亚洲永久免费云服务器_旅游网站建设步骤_网络建站平台_工具大全

今天我们讲讲分布式定时任务调度—ElasticJob。

一、概述

1、什么是分布式任务调度

我们可以思考⼀下下⾯业务场景的解决⽅案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放⼀批优惠券

  • 某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒

  • 某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了⾃动完成特定任务,在约定的特定时刻去执⾏任务的过程

2、为什么需要分布式调度

我们在之前使⽤过Spring中提供的定时任务注解@Scheduled,在业务类中⽅法中贴上这个注解然后在启动类上贴上 @EnableScheduling 注解

那为什么又需要分布式调度?

感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?

主要有如下这⼏点原因:

1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来⼀个统计需要1⼩时,现在业务⽅需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并⾏处理可以提⾼单位时间的处理效率,但是单机能⼒毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。

2.⾼可⽤:单机版的定式任务调度只能在⼀台机器上运⾏,如果程序或者系统出现异常就会导致功能不可⽤。虽然可以在单机程序实现的⾜够稳定,但始终有机会遇到⾮程序引起的故障,⽽这个对于⼀个系统的核⼼功能来说是不可接受的。

3.防⽌重复执⾏: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时⼜每台服务⼜有定时任务时,可能会出现任务重复执行

这个时候就需要分布式的任务调度来实现了。

3、Elastic-Job介绍

Elastic-Job是⼀个分布式调度的解决⽅案,由当当⽹开源,它由两个相互独⽴的⼦项⽬Elastic-job-Lite和

Elastic-Job-Cloud组成,使⽤Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的地址: ElasticJob - Distributed scheduled job solution

功能列表:

  • 分布式调度协调

在分布式环境中,任务能够按照指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏。

  • 丰富的调度策略:

基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。

  • 弹性拓容缩容

当集群中增加⼀个实例,它应当能够被选举被执⾏任务;当集群减少⼀个实例时,他所执⾏的任务能被转移到别的示例中执⾏。

  • 失效转移

某示例在任务执⾏失败后,会被转移到其他实例执⾏。

  • 错过执⾏任务重触发

若因某种原因导致作业错过执⾏,⾃动记录错误执⾏的作业,并在下次次作业完成后⾃动触发。

  • ⽀持并⾏调度

⽀持任务分⽚,任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。

  • 作业分⽚⼀致性

当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。

  • ⽀持作业⽣命周期操作

可以动态对任务进⾏开启及停⽌操作。

  • 丰富的作业类型

⽀持Simple、DataFlow、Script三种作业类型

系统运行架构图

二、Zookeeper下载

建议:zookeeper3.4.6以上版本,JDK1.7以上,maven在3.0.4以上

我这里将Zookeeper安装到了Linux系统上,以我自己安装为例,可自行选择。

1.上传,将zookeeper-3.4.11.tar.gz上传到/usr/local/src/soft/zookeeper目录下

2.解压文件到指定目录

tar -zxvf /usr/local/src/soft/zookeeper-3.4.11.tar.gz -C /usr/local/src/soft/zookeeper

3.拷贝配置文件

cp /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo.cfg

4.启动

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh start

5.检查进程是否开启

这个命令不一定有效

jps

可以试试这个命令,查看状态

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh status

三、任务分片参数

1、分片的概念

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

例如:Elastic-job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2査找radio和vedio类型文件执行备份。如果由于服务器拓容应用实例数量增加为4,则作业谝历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的文件。

可以看到,通过对任务的合理分片化,从而达到任务并行处理的效果.

  • 当只有一台机器时,给定时任务分片四个,在机器中启动四个线程,分别处理四个分片的内容

  • 当只有两台机器时,分片由两台机器进行分配,A负责索引为0,1的内容,B负责索引为2,3的内容

  • 三台机器,如图

  • 四台机器,平均分摊

集群之后,可以分摊CPU的处理压力,提高数据处理的速度,那到底几台机器好呢?

分片数建议是机器个数的倍数

在秒杀项目中,我们将秒杀商品的场次分成了10、12、14三个场次。在这里我们就根据场次将其分成三片

2、分片分配机制

ElasticJob的分片分配机制

  • Zookeeper 协调:ElasticJob 通过 Zookeeper 协调任务实例的分片分配。每个任务实例启动时,会向 Zookeeper 注册自己,并获取分配给自己的分片信息。

  • 动态分片分配:如果任务实例的数量发生变化(比如新增或减少实例),Zookeeper 会重新分配分片,确保每个分片都有任务实例处理。

  • 分片参数传递:分片参数通过 ShardingContext 传递给任务实例,任务实例根据分片参数执行对应的逻辑。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

三、项目集成

1、依赖添加

<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version>
</dependency>
<!--zookeeper客户端-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.10.0</version>
</dependency>

2、分布式调度配置

1)注册中心配置

获取zk的地址和任务名称,将任务注册到zk注册中心中

@Configuration
public class RegistryCenterConfig {@Bean(initMethod = "init")public CoordinatorRegistryCenter createRegistryCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {//zk的配置ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl,groupName);//设置zk超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);//创建注册中心CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);return zookeeperRegistryCenter;}
}
2)分布式调度参数配置
#分布式定时任务配置
elasticjob:zookeeper-url: 192.168.88.130:2181group-name: shop-job-groupjobCron:#3分钟执行一次seckillProduct: 0 0/3 * * * ?
3)定时任务配置

需要使用定时任务的服务可能不止一个,不同的定时任务,表达式和分片参数、个数都不一样

将不同的定时任务创建不同的LiteJobConfiguration对象,指定不同的参数类型,将其创建为一个Bean,交给spring容器管理

@Configuration
public class BusinessJobConfig {
@Bean(initMethod = "init")public SpringJobScheduler initSPJob(CoordinatorRegistryCenter registryCenter, SeckillProductJob seckillProductJob){LiteJobConfiguration jobConfiguration = ElasticJobUtil.createJobConfiguration(seckillProductJob.getClass(),seckillProductJob.getCron(),//任务类的cron表达式seckillProductJob.getShardingTotalCount(), //分片个数seckillProductJob.getShardingParameters(), //分片参数seckillProductJob.isDataflowType());//不是dataflow类型SpringJobScheduler springJobScheduler = new SpringJobScheduler(seckillProductJob, registryCenter,jobConfiguration );return springJobScheduler;}
}    
4)分布式调度工具类

主要是用于创建LiteJobConfiguration对象,为不同的定时任务定义不同的配置类型

  • 指定定时任务类

  • 任务类的cron表达式

  • 分片个数

  • 分片参数

  • 是否为dataflow类型

public class ElasticJobUtil {public static LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,
final String shardingItemParameters,boolean dataflowType) {// 定义作业核心配置JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);if(!StringUtils.isEmpty(shardingItemParameters)){jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);}JobTypeConfiguration jobConfig = null;if(dataflowType){jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);}else {// 定义SIMPLE类型配置jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());}// 定义Lite作业根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();return simpleJobRootConfig;}public static LiteJobConfiguration createDefaultSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {// 创建默认的SIMPLE类型作业配置return createJobConfiguration(jobClass,cron,1,null,false);}public static LiteJobConfiguration createDefaultDataFlowJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {// 创建默认的DataFlow类型作业配置return createJobConfiguration(jobClass,cron,1,null,true);}
}
5)任务分片参数配置
  1. 删除之前的数据

  2. 查询当天的数据同步Redis,定时任务每天执行一次

  3. 给定时任务分片处理,分三片

给定时任务做分片处理:0=10,1=12,2=14(第10场秒杀任务,第12场.........)

jobSharding:seckillProduct:shardingParameters: 0=10,1=12,2=14shardingTotalCount: 3dataflowType: false

3、定时任务类

初始化秒杀商品定时任务

@Data
@RefreshScope
@Component
public class SeckillProductJob implements SimpleJob {//表达式@Value("${elasticjob.jobCron.seckillProduct}")private String cron;//分片参数@Value("${jobSharding.seckillProduct.shardingParameters}")private String shardingParameters;//分片个数@Value("${jobSharding.seckillProduct.shardingTotalCount}")private int shardingTotalCount;@Value("${jobSharding.seckillProduct.dataflowType}")private boolean dataflowType;@Resourceprivate SeckillProductFeignApi seckillProductFeignApi;@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic void execute(ShardingContext shardingContext) {String time = shardingContext.getShardingParameter();//远程调用商品服务获取秒杀列表集合Result<List<SeckillProductVo>> result = seckillProductFeignApi.queryByTime(Integer.valueOf(time));if (result==null||result.hasError()) {//通知管理员return;}List<SeckillProductVo> seckillProductVoList = result.getData();//获取秒杀商品key-seckillProductHash:10String key = JobRedisKey.SECKILL_PRODUCT_HASH.getRealKey(time);//删除之前的缓存redisTemplate.delete(key);HashMap<String, String> seckillProductMap = new HashMap<>();//存储集合数据到Redis中for (SeckillProductVo vo : seckillProductVoList) {seckillProductMap.put(vo.getId().toString(), JSON.toJSONString(vo));}redisTemplate.opsForHash().putAll(key, seckillProductMap);System.out.println("分布式商品秒杀任务执行...............");}
}

秒杀列表缓存成功

4、任务分片处理逻辑

分片参数设置为 0=10,1=12,2=14,表示分片0处理时间参数为10的任务,分片1处理时间参数为12的任务,分片2处理时间参数为14的任务。ElasticJob 会根据分片参数将任务分片,并将每个分片分配给不同的任务实例执行。

  • 分片总数(ShardingTotalCount):表示任务的总分片数。比如你有3场秒杀活动,可以将分片总数设置为3,每个分片处理一场秒杀活动。

  • 分片参数(ShardingParameters):可以为每个分片指定参数,比如分片0处理第一场秒杀,分片1处理第二场秒杀,分片2处理第三场秒杀。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

今天的分享结束,感兴趣的兄弟请点赞、收藏,关注我不迷路!下期再见!

关键字:亚洲永久免费云服务器_旅游网站建设步骤_网络建站平台_工具大全

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: