定时任务演进和schedulerx

在很多业务场景下,都需要定时任务,比如”定时推送消息”、”定时计算报表”、”定时清理数据”等等。通常业务都是集群化的,定时任务通常不能每个机器单机执行,需要有个分布式协调器感知到集群,然后选中1台机器执行,所以出现了很多定时任务平台帮助处理定时任务。

定时任务有个核心的问题:如何知道时间到了?

原始模型

只需要每次插入任务,都按待执行时间从小到大排序,然后用个死循环轮训,不断从队头获取小于等于当前时间的任务即可。只要轮询得够快,时间粒度足够小,比如秒,那就相当于能感知到”时间到了”的瞬间了。
raw_schedule_model.png
但我们知道,定时任务的基本需求最少有2个:

  • 添加任务:插入
  • 获取当前时间到了的任务:查询

原始模型,插入后得排序,O(LogN),然后查询扫描的时间是O(N),很慢。

java.util.Timer

Java.util.TImer稍微做了优化,用数组实现的最小堆来代替单链表,这样插入O(LogN),查询O(1):
timer.png
不过任务量大时还是很慢。

java.util.concurrent.ScheduledExecutorService

多线程发展后,有了concurrent包,这个包提供了ScheduledExecutorService来做定时任务,做的最大的优化就是增加了线程池,多线程并发地执行任务。
scheduled_executor_service.png
不过线程数量是受限制的,还是有瓶颈,当数量大了之后,线程切换保存现场的时间开销反而很大,切来切去,还不如单线程。

简单时间轮

换个思路,来优化数据结构。现在的问题是插入O(LogN),只有查询是常数的,如果能把插入也搞到O(1)就好了。
time_wheel.png
假设我们把任务按照小时划分,用单链表分别存放在0~23个时间刻度上,轮询器不再扫描N个任务,而是扫描常数时间刻度,查询是O(1),插入时不需要排序,也是O(1)。
但是小时可能不够细,通常定时任务都需要做到秒级,我们可以继续细分下去,246060=86400个刻度。

看似很完美了,但只是理论空架构。落到实际应用上,可以想象,大部分定时任务都会有聚集性,比如:

  • 0点,”生而为人我们抱歉”的时间,又饿又疲惫,通常是买买买活动开启的好时机
  • 凌晨3点,人们都睡觉了,服务器压力小,通常可以做清理、备份的工作
  • 中午12点,人们在吃饭玩手机了,这个时候推送效果会很好
  • ……

real_time_wheel.png
这样,由于数据不均衡,大部分的轮转都是空转,比较浪费资源。不过这个设计很符合日常生活的直觉。

分桶时间轮:Netty的io.netty.util.HashedWheelTimer

我们要做的优化,就是让简单时间轮的数据均匀分布在每个时间刻度上。由于时间刻度无论如何都有明显的聚集性,我们不得不把时间刻度转换为”分桶”,按照某种规则能够均匀地放到各个”分桶”就可以。那当然是哈希了。
hashed_wheel_timer.png
假设我们把分桶设置为60个,现在是00:00:00,时间轮开始启动,按秒级进行轮询,那么:

  • 00:00:00任务会落入0分桶
  • 00:00:01任务会落入1分桶
  • ……
  • 00:01:00的任务,已经转了一圈了,应该落入0分桶,但是如果直接插入链表,会在00:00:00时刻被取出来执行的

所以增加了一个”剩余圈数”的概念,来区分0分桶中的任务,到底是”00:00:00”取出来执行,还是”00:01:00”取出来执行。

我们现在还是按照”秒”来分桶的,桶恰好是60个,还是会有大量的任务集中在00:00:00的第0个分桶,继续抽象一下,每个桶可以代表任意粒度的轮询时间,桶的数量也任意,这样任务就会被均匀分散了,本质上就是哈希+单链表解决冲突,就如Netty为了“心跳”和“超时”设计的HashedWheelTimer中的2个重要参数:

  • tickDuration:每个分桶代表的轮询器轮询时间,默认100ms
  • ticksPerWheel:一个轮上有多少个分桶,默认512个分桶

这样解决了数据不均匀的问题,但是还不够有效率,因为单链表上并不是所有的任务都会得到执行,大量任务还是执行的“剩余圈数-1”这样的无意义操作,和“空转”并没有本质区别,所以如果能让链表上所有的任务都是应该执行的任务就好了。

同时,当任务量较大时,我们需要让分桶数量多一些,让单链表短一些,执行得更快;当任务量较小时,我们需要让分桶的数量少一些,空桶少一些,执行得更快。但业务都是有周期的,不可能在一开始就设置完美的分桶数量。轮询时间同理,如果设大了,任务会聚集;如果设过小,CPU受不了,很难合理设置。所以Netty的时间轮有一些局限性,需要经验参数。

分层时间轮:Kafka的kafka.utils.timer.TimingWheel

为了避免分桶时间轮的空转,以及能够动态调整分桶参数,kafka设计了分层时间轮TimingWheel。

插入数据和动态创建分桶

为了能让分桶能够根据任务数量“动态扩缩”,对于需要“转几圈”才执行的任务,kafka提出了“溢出”的概念,认为它们可以溢出到更高层的时间轮。
timing_wheel.png

  • 假设有层级1的桶,当任务在0~59秒的时候,可以完美放入层级1的桶
  • 这时来了一个“1分30秒”后才执行的任务,那么就会动态创建一个层级2的时间轮,把它放到2层第1个桶(管理1分~2分这个时间区间的任务)
  • 同理如果是“1小时1分30秒”后执行,那么会继续往上动态创建层级3的时间轮,放到第3层的第1个桶(管理1小时~2小时时间区间的任务)

这样,继续往上,我们还可以扩展周、月、年等等时间层级,无限扩展,实现了动态扩容。当然实际上,kafka每个层级允许20个桶,扫描时间也是毫秒级别的不是秒级别的,避免任务都集中在一起,并且创建了之后没有缩小的动作,避免了并发冲突。
对应的代码在TimeWheel.scala的add函数里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
// 获取过期时间
val expiration = timerTaskEntry.expirationMs

if (timerTaskEntry.cancelled) {
// 任务被取消,插入失败
false
} else if (expiration < currentTime + tickMs) {
// 任务过期,插入失败
false
} else if (expiration < currentTime + interval) {
// 任务在当前层级时间区间范围内

// 找到桶虚拟index
val virtualId = expiration / tickMs
// 找到buckets数组中对应的桶
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
// 把任务加入桶
bucket.add(timerTaskEntry)

// 设置桶的过期时间
if (bucket.setExpiration(virtualId % tickMs)) {
// 设置用的是expiration.getAndSet(expirationMs) != expirationMs
// 也就是说只有在expirationMs发生变化的时候才会走到这里
// expirationMs发生变化只可能在“时间轮前进”导致“过期的桶被循环复用”
// 也就是说这个桶的所有任务到时间了,该被执行了,加入到delayQueue里面
queue.offer(bucket)
}
true
} else {
// 时间轮不够用了,向上溢出,如果没有高层时间轮则创建
if (overflowWheel == null) addOverflowWheel()
// 递归向上,直到把任务放入符合它时间区间的那层时间轮
overflowWheel.add(timerTaskEntry)
}
}

这里可以看到,用了java.util.concurrent.DelayQueue,本质上还是O(LogN)的最小堆,但这里插入的元素是bucket,也就说并不是任务量级,仅仅只是桶数量的量级。每一层才20个桶,而且可以指数性地容纳任务,假设有100万个任务,也只需要20^5共5层时间轮,20*5=100个桶,基本可以看做O(1)了。

时间轮推进和定时任务取出

Timer.scala的推进函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def advanceClock(timeoutMs: Long): Boolean = {
// 从delayQueue拿出过期的桶
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
……
while (bucket != null) {
// 时间轮推进
timingWheel.advanceClock(bucket.getExpiration())
// 对每个元素执行reinsert插入
bucket.flush(reinsert)
// 循环继续取过期桶
bucket = delayQueue.poll()
}
}
……
}

时间轮推进在TimingWheel里面:

1
2
3
4
5
6
7
8
9
def advanceClock(timeMs: Long): Unit = {
// 已经过期了
if (timeMs >= currentTime + tickMs) {
// 修改时间
currentTime = timeMs - (timeMs % tickMs)
// 递归推进高层时间轮
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}

实际上做的事情就如同kafka注释中所述,做一个轮转:

1
2
3
4
5
推进前
* level buckets
* 1 [c, c] [c+1, c+1] [c+2, c+2]
推进后
* 1 [c+1, c+1] [c+2, c+2] [c+3, c+3]

bucket.flush(reinsert),这个flush是TimerTaskList中自定义函数,传入的是一个方法。做的事情是:遍历链表,拿出并移除所有元素,对每个元素都执行一遍传入的f函数,最后重置数据结构。

1
2
3
4
5
6
7
8
9
10
11
def flush(f: (TimerTaskEntry)=>Unit): Unit = {
synchronized {
var head = root.next
while(head ne root) {
remove(head)
f(head)
head = root.next
}
expiration.set(-1L)
}
}

传入的这个reinsert是在Timer.scala里面的,其实就是做了addTimerTaskEntry:

1
2
3
4
5
6
7
8
9
10
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
// 重新插入时间轮,插入成功说明还没到底层时间轮,插入失败说明到时间或取消了
if (!timingWheel.add(timerTaskEntry)) {
// 已经到时间了,执行任务;已经取消了,忽略任务
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}

也就是说,对于“到时间了”的任务,都拿出来,重新插入时间轮,比如:本来在“小时层”的时间轮任务,到1小时了,就拿出来放到“分钟级”时间轮;“分钟级”放到“秒级”;“秒级”到时间了,则直接取出执行。
这里的实现感觉非常优雅,并没有去想“把高层级直接插回低层级”,而是用一个“重试”的方式实现了目的,简单又稳定。

还有个值得借鉴的地方是,用了delayQueue。在任务量少的时候,本来应该“轮询所有桶”,产生Netty时间轮的“空转”副作用,但如果用delayQueue把“待执行的桶”都放进去,完全就避免了轮询,只需要等待队列中有数据就可以了,这个思路真的厉害,相当于把“轮询”改为了“回调”。

kafka的时间轮已经是单机时间轮最好的设计了,对于常见的IoT千万级别设备,大量设备都会设置“0点定时开机/关机”,这种业务场景,也完全可以利用这个时间轮来执行定时任务。

分布式定时:Quartz

定时任务执行,单机是有瓶颈的,于是Quartz提出了分布式定时任务。Quartz将定时任务进行了领域划分,核心有3个模型:

  • Scheduler:调度器。对应前面的“轮询器”。核心调度器是QuartzScheduler,Scheduler接口只是一个接口壳子,可以操作QuartzScheduler的一些启停之类。
  • Trigger:触发器。对应“delayTime”,在什么时间去执行
  • Job/JobDetail:具体的任务。分为执行接口和元信息。
    这是一个非常好的划分,我们可以对不同模型进行深入扩展:
  • Scheduler:支持各种调度方式,比如远程调度(RemoteScheduler)其他机器上的QuartzScheduler
  • Trigger:支持各种格式。比如“cron”、“周期”等等,甚至可以“多个触发器逻辑组合”共同触发
  • Job/JobDetail:支持各种持久化方式,避免机器挂掉导致任务丢失。

最后,通过数据库“抢锁”方式,来决定那台机器执行任务。我们重点还是关注下定时是如何实现的,这里我们的研究对象就应该是Trigger了。轮询的线程是QuartzSchedulerThread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void run() {
while (!halted.get()) {
获取锁之类的操作……
if (availTheadCount > 0) {
// 待执行的Trigger
List<OperableTrigger> triggers;

// 获取需要执行的trigger
long now = System.currentTimeMillis();
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
……
if (triggers != null && !triggers.isEmpty()) {
……
// 触发trigger执行
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if (res != null)
bndles = res;

分析执行结果……
}
}
……
} // end while (!halted)
……
}

接下来看如何获取需要执行的Trigger,以内存存储RAMJobStore来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());

public List<TriggerWrapper> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
synchronized(lock) {
……
while (true) {
// 取出头部的trigger
tw = timeTriggers.first();
if (tw == null)
break;
timeTriggers.remove(tw);

// 如果不想要执行了,忽略
if (tw.trigger.getNextFireTime() == null) {
continue;
}

// 如果是宕机期间错过的任务,重新加入
if (applyMisFire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
continue;
}

// 时间还没到,重新加入
if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) {
timeTriggers.add(tw);
break;
}

OperableTrigger trig = (OperableTrigger) tw.trigger.clone();

// 没有任务符合条件,继续往后移动时间窗口
if (result.isEmpty()) {
batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
}
result.add(trig);

// 如果这个时间窗口任务太多,只取maxCount个执行
if (result.size() == maxCount)
break;
}
}
}

这里本质上就是“简单时间轮”,按照时间窗口不断地取任务。只不过相比于“简单时间轮”的单链表,这里用了TreeSet来保存trigger,底层是红黑树,比较器是TriggerWrapperComaprator:

1
2
public static int compare(Date nextFiretime1, int priority1, TriggerKey key1, Date nextFiretime2, int priority2, TriggerKey key2)
……

也就是根据下一次执行时间从小到大来排序,如果时间相同,则根据优先级从大到小排序。

所以,Quartz实际上的很低效的,不过它的优势在于是分布式的,任务会均衡在不同机器上执行。虽然这里有问题,但实际使用中发现,最大的瓶颈并不是这里,而是“mysql数据库抢锁”,任务多了之后非常慢,而且受限于mysql的性能瓶颈。

Elastic-job和Xxl-job在Quartz的基础上,增加了分片功能,将大任务拆分成各个小任务,执行完后再汇总,它们都是在不断地优化“任务如何执行”功能,而没有优化“如何知道时间到了”。

分布式定时:schedulerx

schedulerx做的主要工作,是利用“Quartz”实现自定义的定时任务,比如接入自定义告警、监控、失败重试。

schedulerx对“抢锁”做了优化,不再用数据库竞争锁,而是用zookeeper来竞争,后期发现zookeeper性能也不行,于是转用tair来做锁。如果你在阿里内网,可以去看JobManager的acquireTriggerLock部分代码。

还有一个比较好的优化,就是定时任务不再是在“定时任务服务机器群”里执行,而是在“业务机器群”里执行,用netty做了一个CS框架,这块代码在remoting里面。好处有很多:

  • “定时任务机器群”可以独立于业务,专心做“定时任务触发”的事情,不会被超时业务拖累
  • 相当于客户端有了一个agent,可以做一些边缘计算,比如schedulerx引以为豪的“网格计算”等

schedulerx在内网开源,但是很多同学把代码、设计都发布到外网去了,还有在阿里的同学开发出来的PowerJob,抄了很多schedulerx的思路,这些行为不太好评价,因为我并不是schedulerx开发者,不过个人认为是不妥的。

分布式定时:schedulerx 2.0

schedulerx2.0不再使用quartz作为底层基础,而是自研了一套基于“时间轮”的分布式定时,同时优化了多租户相关的设计。和核心开发者交流了一下,提了一些有关集群通信和时间轮的建议。

总结

分层时间轮看起来似乎是“如何知道时间到了”的终极方案,性能已经很够用了,再优化也没有太大意义,除非以后有一些新的分布式技术出现导致不得不出一个“分布式分层时间轮”这种脏东西。

所以现在大部分定时任务,都不再优化这块了,转去优化“如何执行任务”,比如加入轻量级的大数据map-reduce功能,这样对一些有数据处理需求的小项目很友好,可能是未来的一个趋势吧。

定时任务演进和schedulerx

https://www.bananaoven.com/posts/5612/

作者

香蕉微波炉

发布于

2023-02-04

更新于

2023-02-04

许可协议