分布式任务调度

场景

  1. 报表统计

    需要每天或者定时调度产生的报表。

  2. 日/月结单

    金融/支付领域按时结单。

  3. 爬虫

    定时爬取内容。

  4. 数据归档

    数据量较大时,定时进行整理数据并存储。

任务调度框架类型

非分布式任务调度框架

只适用于单机应用,代表框架为Spring自带的@Scheduled。

使用@Scheduled

引入依赖的框架:

1
2
3
4
5
6
7
8
9
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

启动类上添加注解启动定时任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@SpringBootApplication
// 开启定时任务
@EnableScheduling
public class DistributedJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(DistributedJobApplication.class, args);
    }

}

使用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Slf4j
@Component
public class MyJob {


    // fixedDelay每三秒执行一次,从前次任务的结束到下次任务的开始间隔为三秒
    // fixedRate 时间间隔是前次任务的开始和下次任务的开始
    // initialDelay设置启动后5秒再开始执行,默认是启动时就开始执行
    // @Scheduled(fixedDelay = 3000,initialDelay = 5000)
    // cron:cron表达式,支持定时调用
    @Scheduled(cron = "0,5 * * * * ?")
    public void process(){
        log.info("process.....start");
    }
}

多线程使用@Scheduled

默认的@Scheduled使用方式是单线程的,也就是说,如果有两个定时任务,会出现两个定时任务抢占调度机会的情况,一个调度任务不能及时被调度的情况发生。这种情况下就需要使用多线程方式。

使用:

在启动类中添加设置定时任务线程池的配置:

1
2
3
4
5
6
7
@Bean
public TaskScheduler taskScheduler(){
    ThreadPoolTaskScheduler taskExecutor  = new ThreadPoolTaskScheduler();
    // 设置定时任务线程池大小
    taskExecutor.setPoolSize(10);
    return taskExecutor;
}

添加两个定时任务:

1
2
3
4
5
6
7
8
9
@Scheduled(cron = "0,5 * * * * ?")
public void process1(){
	log.info("process1.....start");
}

@Scheduled(cron = "0,5 * * * * ?")
public void process2(){
	log.info("process2.....start");
}

启动后可以在控制台中看到类似下面的输出:

1
2
3
4
2021-01-13 12:45:00.016  INFO 10504 --- [taskScheduler-2] tech.punklu.distributedjob.Job.MyJob     : process1.....start
2021-01-13 12:45:00.016  INFO 10504 --- [taskScheduler-1] tech.punklu.distributedjob.Job.MyJob     : process2.....start
2021-01-13 12:45:05.011  INFO 10504 --- [taskScheduler-2] tech.punklu.distributedjob.Job.MyJob     : process1.....start
2021-01-13 12:45:05.011  INFO 10504 --- [taskScheduler-1] tech.punklu.distributedjob.Job.MyJob     : process2.....start

可以看到,此时已经有了两个线程taskScheduler-1taskScheduler-2来分别处理两个定时任务。

异步调度@Scheduled

使用Spring Boot自带的@Async注解开启异步任务,在定时任务中调用即可实现忽略定时任务执行时间,根据设置好的时间间隔定时调度。

分布式任务调度框架

  1. QuartZ

    只是简单的提供了分布式任务调度,没有可视化界面,接口不好用。

  2. ElasticJob

    基于QuartZ开发,使用的中间件较多,复杂度较高。

  3. XXL-JOB

    美团点评开源的分布式任务调度框架。使用较为简单。

  4. SchedulerX

    为阿里云商用软件,不开源需要花钱购买。

  5. PowerJob

使用QuartZ

引入依赖:

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

配置运行规则:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import tech.punklu.distributedjob.Job.MyQuartzJob;

@Configuration
public class MyQuartzJobConfig {

    @Bean
    public JobDetail jobDetail(){
        JobDetail detail = JobBuilder.newJob(MyQuartzJob.class)
                .withIdentity("job1","group1")
                .storeDurably()
                .build();
        return detail;
    }

    @Bean
    public Trigger trigger(){
        Trigger trigger = TriggerBuilder.newTrigger()
                .forJob(jobDetail())
                .withIdentity("trigger1","group1")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule("0,5 * * * * ?"))
                .build();
        return trigger;
    }
}

设置定时任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

@Slf4j
public class MyQuartzJob extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("MyQuartzJob...");
    }
}

使用XXL-Job

搭建xxl-job-admin

创建XXL-Job需要的数据库表结构:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#
# XXL-JOB v2.2.0
# Copyright (c) 2015-present, xuxueli.

CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;

SET NAMES utf8mb4;

CREATE TABLE `xxl_job_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
  `job_cron` varchar(128) NOT NULL COMMENT '任务执行CRON',
  `job_desc` varchar(255) NOT NULL,
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `author` varchar(64) DEFAULT NULL COMMENT '作者',
  `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
  `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
  `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
  `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
  `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
  `glue_source` mediumtext COMMENT 'GLUE源代码',
  `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
  `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
  `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
  `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
  `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
  `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
  `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
  `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
  `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
  `trigger_code` int(11) NOT NULL COMMENT '调度-结果',
  `trigger_msg` text COMMENT '调度-日志',
  `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
  `handle_code` int(11) NOT NULL COMMENT '执行-状态',
  `handle_msg` text COMMENT '执行-日志',
  `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
  PRIMARY KEY (`id`),
  KEY `I_trigger_time` (`trigger_time`),
  KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_log_report` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
  `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
  `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
  `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_logglue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
  `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
  `glue_source` mediumtext COMMENT 'GLUE源代码',
  `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_registry` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `registry_group` varchar(50) NOT NULL,
  `registry_key` varchar(255) NOT NULL,
  `registry_value` varchar(255) NOT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
  `title` varchar(12) NOT NULL COMMENT '执行器名称',
  `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
  `address_list` varchar(512) DEFAULT NULL COMMENT '执行器地址列表,多地址逗号分隔',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(50) NOT NULL COMMENT '账号',
  `password` varchar(50) NOT NULL COMMENT '密码',
  `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
  `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_lock` (
  `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
  PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL);
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_cron`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '0 0 0 * * ? *', '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');

commit;

使用Docker搭建xxl-job-admin任务调度中心:

1
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.9.223:3306/xxl_job?Unicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai --spring.datasource.username=root --spring.datasource.password=123456 --xxl.admin.login=false"  -p 8080:8080 --name xxl-job-admin -d  xuxueli/xxl-job-admin:2.2.0

容器启动成功后,访问http://127.0.0.1:8080/xxl-job-admin,可以登录到xxl-job-admin任务调度中心控制台。默认用户名admin,默认密码123456

可在任务调度中心控制台里看到:

  1. 运行报表
  2. 任务管理
  3. 调度日志
  4. 执行器管理
  5. 用户管理

等信息。

使用xxl-job-admin

引入依赖:

1
2
3
4
5
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.2.0</version>
</dependency>

执行器配置信息:

1
2
3
4
5
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample

其余的使用默认配置项即可,可选配置项如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30

配置执行器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
@Slf4j
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        // 使用默认配置项
        /*xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);*/
        return xxlJobSpringExecutor;
    }
}

启动项目后,可在xxl-job-admin的执行器管理里看到对应的OnLine机器地址里出现了注册节点地址为http://192.168.9.223:9999/的机器(即刚启动成功的Java执行器应用)。

机器注册到任务调度中心之后,就可以创建调度任务并执行了。

xxl-job-admin的任务管理里新增任务,配置项如下:

  1. 任务描述:

  2. 路由策略:

  3. Cron:

    指定此任务要遵循的Cron表达式。

  4. 运行模式

    1. BEAN

      BEAN模式运行在执行器里。

    2. GLUE

      包括:

      1. Java
      2. Shell
      3. Python
      4. PHP
      5. Nodejs
      6. PowerShell

      GLUE模式运行在任务调度中心。

  5. JobHandler

    **后续需要在执行器中指定,才可实现执行器定时执行该任务。**这里使用myXxlJobHandler

  6. 隔离处理策略

  7. 负责人

    任务创建人

然后在程序中开发执行器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Component
@Slf4j
public class MyXxlJob {


    // 注解中的值即为任务调度中心中的JobHandler的值
    @XxlJob("myXxlJobHandler")
    public ReturnT<String> execute(String param) {
        log.info("myXxlJobHandler execute...");
        // 将日志打印到Xxl-job中的执行日志中,方便在管理中心直接查看
        XxlJobLogger.log("myXxlJobHandler execute...");
        return ReturnT.SUCCESS;
    }
}

启动项目后,可以在控制台中看到如下的日志:

1
>>>>>>>>>>> xxl-job register jobhandler success, name:myXxlJobHandler, jobHandler:com.xxl.job.core.handler.impl.MethodJobHandler

然后,可以在任务调度中心里的任务管理里,选择创建的任务,在操作里选择执行一次或启动,启动后,程序会按照对应的Cron表达式的内容定时调度执行任务。并可选择查询日志查看任务执行的结果。

最重要的一点,可以在任务管理中心里直接修改任务的Cron表达式,修改定时计划,执行器程序不需重新启动即可直接生效。

路由策略

任务调度中心里可配置的任务路由策略有:

  1. 第一个(默认)
  2. 最后一个
  3. 轮询
  4. 随机
  5. 一致性HASH
  6. 最不经常使用
  7. 最近最久未使用
  8. 故障转移
  9. 忙碌转移
  10. 分片广播

当只有一个任务调度中心,而有多个执行器的时候,任务调度中心要选择一个或多个执行器来执行任务,也就是对应的路由策略。

在idea里项目中心里使用Edit Configurations选项复制现有的任务调度项目,修改项目名为DistributedJobApplication(2),端口为8083。同时启动两个项目。登录任务调度中心查看执行器OnLine机器地址,可以发现其中已经注册了两个执行器。地址的端口分别为两个执行器程序连接时使用的端口。开启定时任务执行,使用默认路由策略(第一个),会发现此时只有第一个注册的执行器会定时调用任务。同理,如果路由策略为最后一个,则正好反过来。如果是轮询策略,则为注册的多个执行器轮流执行。

由官方文档可知,相关配置项的功能如下:

路由策略:当执行器集群部署时,提供丰富的路由策略,包括;

  1. FIRST(第一个):固定选择第一个机器;
  2. LAST(最后一个):固定选择最后一个机器;
  3. ROUND(轮询):;
  4. RANDOM(随机):随机选择在线的机器;
  5. CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
  6. LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
  7. LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
  8. FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
  9. BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
  10. SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

其中故障转移策略,会优先选择第一个注册的执行器进行调度,如果第一个注册的执行器出现故障无法调用,会转而调用第二个执行器进行调度,直到第一个执行器恢复正常,会再次调度第一个。

轮询策略的特点是流量均摊,故障转移把所有流量都压到一个执行器上。

分片广播策略

有时可能定时任务需要处理的数据量很大,单台服务器无法快速完成,可以使用分片广播策略调度多台执行器同时运行,但分别处理一部分与其他执行器不同的数据。

分片广播策略用法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
@Slf4j
public class MyXxlJob {


    // 注解中的值即为任务调度中心中的JobHandler的值
    @XxlJob("myXxlJobHandler")
    public ReturnT<String> execute(String param) {


        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        // 从数据库查询用户数据,并分别调度处理
        // 两台服务器,假设分别处理一半的数据
        List<Integer> list = Arrays.asList(1,2,3,4);
        for (Integer i: list ) {
            // 取模判断当前执行器是否要处理这部分数据
            if (i % shardingVO.getTotal() == shardingVO.getIndex()){
                // 将日志打印到Xxl-job中的执行日志中,方便在管理中心直接查看
                log.info("myXxlJobHandler execute...user={},shardingVo={}",
                        i,new Gson().toJson(shardingVO));
            }

        }

        return ReturnT.SUCCESS;
    }
}

其中ShardingUtil.ShardingVO有两个属性:indextotal

  1. index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
  2. total:总分片数,执行器集群的总机器数量;

再次启动两个项目后,可以看到,一台执行器只执行1和3的数据,另一台只处理2和4的数据。

同理,即使再增加执行器数量,也可以保证数据只在其中一台执行器执行一次。

阻塞处理策略

阻塞处理可选策略:

  1. 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
  2. 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
  3. 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;