分布式任务调度

在开发项目中,分布式任务调度是必不可少的,如 电商项目中的 订单定时任务,商户的每日清算 等。

Linux crontab轻量定时任务

1
2
3
4
5
service crond start  // 必须保证定时任务是开启的状态
crontab -e // 编辑定时任务
* * * * * echo "timer">/root/test.out


crontab表达式规则

  • 在Linux的crontab表达式只能精确到分钟,是没有秒的概念,但是在Java中是能精确到秒的。
image-20230219112744531

crontab弊端

  • 一般情况下是单机运行,如果单机挂了,定时任务也会跟着挂掉
  • 如果服务器集群的话,定时任务不方便统一管理,需要单独对每台服务器进行定时任务管理

xxl-job实现分布式任务调度

为什么要用分布式任务调度系统?

  • 如果在单机上进行定时任务的话,单机挂了,定时任务也会挂掉,如果采用分布式任务调度系统,则master机器挂掉了,系统会让其他机器进行任务调度
    • 如:执行商户结算任务时,执行的节点宕机了,调度框架会安排其他节点 继续执行任务

单机定时任务的缺点

  1. 无法执行单次测试
    1. 当设置好定时任务后,推送到生产环境后,想立即执行一次是做不到的,必须等到定时器时间 到了之后才会触发该任务。
      1. 虽然说通过通过其他接口去调用该定时任务方法也可以触发,但是这会增加一些和业务逻辑无关的代码
  2. 更改执行时间麻烦
    1. 打包之后 如果想修改时间的话,就必须重新修改代码打包推送生产环境
  3. 关于异常和重试
    1. 一旦出现了异常的话并没有一个 补救措施,也没有 重试机制和通知机制
  4. 无法暂停任务
    1. 定时任务执行到一半的时候,发现报错了 或者有其他异常想立即停止是做不到的,虽然可以通过开关状态来判断 该定时任务是否开启,但是这会增加一些和业务逻辑无关的代码
  5. 无法监控任务
    1. 无法对定时任务进行监控,查看任务执行情况
  6. 无法分片执行
    1. 如果需要对大量的数据进行分析或者对大量的用户进行推送的话,单机执行起来效率很低,无法进行分布式任务调度,让其他机器协同执行
  7. 存在重复执行的可能性
    1. 如果使用单机定时任务的话,集群中部署多台节点 可能存在 定时任务被多次执行的情况,并且无法保证每个节点的时间都一致,可能存在执行频率的问题。

目前实现分布式任务调度系统的框架有很多,国内用的较多的是是elasticjob和xxljob。elasticjob依赖于zookeeper进行分布式任务调度,而xxljob依赖于数据库,相对而言 xxljob 更加适用于轻量化项目开发。

xxl-job配置

控制后台服务端

下载xxj-job admin端源码

下面我们提供了2个地址下载,一个是 本地实验的版本,一个是 gitee仓库,根据需要下载。

gitee仓库:https://gitee.com/xuxueli0323/xxl-job?utm_source=alading&utm_campaign=repo

私人地址 2.4.1版本:https://files.javaxing.com/Java%08Demo/xxl-job-master.zip

导入SQL文件

SQL文件路径:xxl-job-master/doc/db/tables_xxl_job.sql

image-20240201153216680

导入成功后看到有一个xxl-job的数据库,里面已经为我们创建好了一些表。

image-20240201153307195

**配置 控制端 数据源 **

下载源码后导入到idea,配置 application.properties 数据源信息

image-20240201154018823

email 邮件告警配置

image-20240201154333249

启动服务

image-20240201162620617

进入xxl-job 后台 http://localhost:8080/xxl-job-admin/

  • 服务端口:server.port=8080
  • 服务路径:server.servlet.context-path=/xxl-job-admin

image-20240201162721892

创建执行器(集成到springBoot)

项目代码:https://files.javaxing.com/Java%08Demo/xxl-job-springboot-client.zip

流程:

  1. 创建一个maven工程,引入springBoot依赖
  2. 引入mysql、mybatis-plus、druid 依赖,完成数据源配置
  3. 引入 xxj-job依赖和配置文件

创建springBoot工程和数据源

image-20240201155531353

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.1-SNAPSHOT</version>
</dependency>


<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<!--Druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.3</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>

</dependencies>

application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123123
url: jdbc:mysql://10.211.55.12:3306/user?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&allowMultiQueries=true&allowPublicKeyRetrieval=true


mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml

设置mysql数据源,社会之mybatis-plus扫描mapper路径

配置xxl-job 执行器 config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}
}

创建任务Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

@Autowired
private SysUserService sysUserService;

/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("一切执行正常");
}


/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

List<SysUser> userList = sysUserService.findAllByMod(shardTotal, shardIndex);
XxlJobHelper.log("当前分片分配的数据 {}", userList.toString());

// 业务逻辑
// for (int i = 0; i < shardTotal; i++) {
// if (i == shardIndex) {
// XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
// } else {
// XxlJobHelper.log("第 {} 片, 忽略", i);
// }
// }

}
}

我们一共创建了2个任务,一个简单任务,一个分片广播任务(让多个执行器节点 协同执行)。

启动执行器客户端

image-20240201161756501

控制面板创建执行器

image-20240201161540440

创建执行器的时候,需要写入appName 这个名字 是执行器客户端的appName,可以理解为 集群名称。

执行器的appName 就是 client 客户端的appName

image-20240201162107952

Log retention 日志保存时间,单位是天

执行器创建成功之后,会在一定的时间内发现并维护节点。

image-20240201161639923

xxl-job任务调度

简单任务

image-20240201164014553

路由策略(只显示常用的):

  • 第一个:设备列表 第一个节点
  • 最后一个:设备列表 最后一个节点
  • 轮询:轮询设备列表节点
  • 随机:设备列表随机执行
  • 分片广播:让多个执行器节点 协同执行 同一个任务

任务管理列表

image-20240201164022365

启动任务

image-20240201164046644

调度日志

image-20240201164143174

调度设备信息

我们可以通过调度日志 查看调度设备信息,可以清晰的看到 执行的节点,和调度策略。

image-20240201164253799

详细的执行日志

image-20240201164207042

分片任务

执行器 编写 shardingJobHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

List<SysUser> userList = sysUserService.findAllByMod(shardTotal, shardIndex);
XxlJobHelper.log("当前分片分配的数据 {}", userList.toString());
}

执行任务时,会先获取出 总分片数量,以及当前分片index,MySQL中查询数据时 会进行取模,公示如下:

MOD(id, shardTotal) = shardIndex ; 取模MOD(唯一标识 , 分片总数) = 分片index

select * from sys_user WHERE MOD(user_id,2) = 1

取模结果:

image-20240201171822805

后台新增任务

image-20240201170147260

查看新增后的分片任务

image-20240201170234992

执行分片任务

image-20240201172143039image-20240201172126259image-20240201172114500

xxj-job执行分片任务时,会广播所有的执行器的节点,每个节点都会收到通知并执行任务。在执行任务的时候,为了避免重复执行,会对要执行的数据唯一标识 进行取模,如 用户表,每个分片通过唯一标识取模 拿到的数据都不一样。

image-20240201172937403

MOD(id, shardTotal) = shardIndex ; 取模MOD(唯一标识 , 分片总数) = 分片index

`select * from sys_user WHERE MOD(user_id,3) = 0

image-20240201173746065

select * from sys_user WHERE MOD(user_id,3) = 1

image-20240201173238884

image-20240201173817141

select * from sys_user WHERE MOD(user_id,3) = 2

image-20240201173250928

image-20240201173830147