分布式任务调度
在开发项目中,分布式任务调度是必不可少的,如 电商项目中的 订单定时任务,商户的每日清算 等。
Linux crontab轻量定时任务
1 2 3 4 5
| service crond start // 必须保证定时任务是开启的状态 crontab -e // 编辑定时任务 * * * * * echo "timer">/root/test.out
|
crontab表达式规则
- 在Linux的crontab表达式只能精确到分钟,是没有秒的概念,但是在Java中是能精确到秒的。
crontab弊端
- 一般情况下是单机运行,如果单机挂了,定时任务也会跟着挂掉
- 如果服务器集群的话,定时任务不方便统一管理,需要单独对每台服务器进行定时任务管理
xxl-job实现分布式任务调度
为什么要用分布式任务调度系统?
- 如果在单机上进行定时任务的话,单机挂了,定时任务也会挂掉,如果采用分布式任务调度系统,则master机器挂掉了,系统会让其他机器进行任务调度
- 如:执行商户结算任务时,执行的节点宕机了,调度框架会安排其他节点 继续执行任务
单机定时任务的缺点
- 无法执行单次测试
- 当设置好定时任务后,推送到生产环境后,想立即执行一次是做不到的,必须等到定时器时间 到了之后才会触发该任务。
- 虽然说通过通过其他接口去调用该定时任务方法也可以触发,但是这会增加一些和业务逻辑无关的代码
- 更改执行时间麻烦
- 打包之后 如果想修改时间的话,就必须重新修改代码打包推送生产环境
- 关于异常和重试
- 一旦出现了异常的话并没有一个 补救措施,也没有 重试机制和通知机制。
- 无法暂停任务
- 定时任务执行到一半的时候,发现报错了 或者有其他异常想立即停止是做不到的,虽然可以通过开关状态来判断 该定时任务是否开启,但是这会增加一些和业务逻辑无关的代码
- 无法监控任务
- 无法对定时任务进行监控,查看任务执行情况
- 无法分片执行
- 如果需要对大量的数据进行分析或者对大量的用户进行推送的话,单机执行起来效率很低,无法进行分布式任务调度,让其他机器协同执行
- 存在重复执行的可能性
- 如果使用单机定时任务的话,集群中部署多台节点 可能存在 定时任务被多次执行的情况,并且无法保证每个节点的时间都一致,可能存在执行频率的问题。
目前实现分布式任务调度系统的框架有很多,国内用的较多的是是elasticjob和xxljob。elasticjob依赖于zookeeper进行分布式任务调度,而xxljob依赖于数据库,相对而言 xxljob 更加适用于轻量化项目开发。
xxl-job配置
控制后台服务端
下载xxj-job admin端源码
下面我们提供了2个地址下载,一个是 本地实验的版本,一个是 gitee仓库,根据需要下载。
gitee仓库:https://gitee.com/xuxueli0323/xxl-job?utm_source=alading&utm_campaign=repo
私人地址 2.4.1版本:https://files.javaxing.com/Java%08Demo/xxl-job-master.zip
导入SQL文件
SQL文件路径:xxl-job-master/doc/db/tables_xxl_job.sql
导入成功后看到有一个xxl-job的数据库,里面已经为我们创建好了一些表。
**配置 控制端 数据源 **
下载源码后导入到idea,配置 application.properties 数据源信息
email 邮件告警配置
启动服务
进入xxl-job 后台 http://localhost:8080/xxl-job-admin/
- 服务端口:server.port=8080
- 服务路径:server.servlet.context-path=/xxl-job-admin
创建执行器(集成到springBoot)
项目代码:https://files.javaxing.com/Java%08Demo/xxl-job-springboot-client.zip
流程:
- 创建一个maven工程,引入springBoot依赖
- 引入mysql、mybatis-plus、druid 依赖,完成数据源配置
- 引入 xxj-job依赖和配置文件
创建springBoot工程和数据源
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.4.1-SNAPSHOT</version> </dependency>
<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.2.3</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency>
</dependencies>
|
application.yaml
1 2 3 4 5 6 7 8 9 10 11 12
| spring: datasource: type: com.alibaba.druid.pool.DruidDataSource druid: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123123 url: jdbc:mysql://10.211.55.12:3306/user?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&allowMultiQueries=true&allowPublicKeyRetrieval=true
mybatis-plus: mapper-locations: classpath:/mapper/*Mapper.xml
|
设置mysql数据源,社会之mybatis-plus扫描mapper路径
配置xxl-job 执行器 config
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}") private String adminAddresses;
@Value("${xxl.job.accessToken}") private String accessToken;
@Value("${xxl.job.executor.appname}") private String appname;
@Value("${xxl.job.executor.address}") private String address;
@Value("${xxl.job.executor.ip}") private String ip;
@Value("${xxl.job.executor.port}") private int port;
@Value("${xxl.job.executor.logpath}") private String logPath;
@Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays;
@Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor; } }
|
创建任务Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
@Autowired private SysUserService sysUserService;
@XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { XxlJobHelper.log("一切执行正常"); }
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
List<SysUser> userList = sysUserService.findAllByMod(shardTotal, shardIndex); XxlJobHelper.log("当前分片分配的数据 {}", userList.toString());
} }
|
我们一共创建了2个任务,一个简单任务,一个分片广播任务(让多个执行器节点 协同执行)。
启动执行器客户端
控制面板创建执行器
创建执行器的时候,需要写入appName 这个名字 是执行器客户端的appName,可以理解为 集群名称。
执行器的appName 就是 client 客户端的appName
Log retention 日志保存时间,单位是天
执行器创建成功之后,会在一定的时间内发现并维护节点。
xxl-job任务调度
简单任务
路由策略(只显示常用的):
- 第一个:设备列表 第一个节点
- 最后一个:设备列表 最后一个节点
- 轮询:轮询设备列表节点
- 随机:设备列表随机执行
- 分片广播:让多个执行器节点 协同执行 同一个任务
任务管理列表
启动任务
调度日志
调度设备信息
我们可以通过调度日志 查看调度设备信息,可以清晰的看到 执行的节点,和调度策略。
详细的执行日志
分片任务
执行器 编写 shardingJobHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
List<SysUser> userList = sysUserService.findAllByMod(shardTotal, shardIndex); XxlJobHelper.log("当前分片分配的数据 {}", userList.toString()); }
|
执行任务时,会先获取出 总分片数量,以及当前分片index,MySQL中查询数据时 会进行取模,公示如下:
MOD(id, shardTotal) = shardIndex ; 取模MOD(唯一标识 , 分片总数) = 分片index
select * from sys_user WHERE MOD(user_id,2) = 1
取模结果:
后台新增任务
查看新增后的分片任务
执行分片任务
xxj-job执行分片任务时,会广播所有的执行器的节点,每个节点都会收到通知并执行任务。在执行任务的时候,为了避免重复执行,会对要执行的数据唯一标识 进行取模,如 用户表,每个分片通过唯一标识取模 拿到的数据都不一样。
MOD(id, shardTotal) = shardIndex ; 取模MOD(唯一标识 , 分片总数) = 分片index
`select * from sys_user WHERE MOD(user_id,3) = 0
select * from sys_user WHERE MOD(user_id,3) = 1
select * from sys_user WHERE MOD(user_id,3) = 2