定时任务演进和schedulerx
在很多业务场景下,都需要定时任务,比如”定时推送消息”、”定时计算报表”、”定时清理数据”等等。通常业务都是集群化的,定时任务通常不能每个机器单机执行,需要有个分布式协调器感知到集群,然后选中1台机器执行,所以出现了很多定时任务平台帮助处理定时任务。
定时任务有个核心的问题:如何知道时间到了?
原始模型
只需要每次插入任务,都按待执行时间从小到大排序,然后用个死循环轮训,不断从队头获取小于等于当前时间的任务即可。只要轮询得够快,时间粒度足够小,比如秒,那就相当于能感知到”时间到了”的瞬间了。
但我们知道,定时任务的基本需求最少有2个:
- 添加任务:插入
- 获取当前时间到了的任务:查询
原始模型,插入后得排序,O(LogN),然后查询扫描的时间是O(N),很慢。
java.util.Timer
Java.util.TImer稍微做了优化,用数组实现的最小堆来代替单链表,这样插入O(LogN),查询O(1):
不过任务量大时还是很慢。
java.util.concurrent.ScheduledExecutorService
多线程发展后,有了concurrent包,这个包提供了ScheduledExecutorService来做定时任务,做的最大的优化就是增加了线程池,多线程并发地执行任务。
不过线程数量是受限制的,还是有瓶颈,当数量大了之后,线程切换保存现场的时间开销反而很大,切来切去,还不如单线程。
简单时间轮
换个思路,来优化数据结构。现在的问题是插入O(LogN),只有查询是常数的,如果能把插入也搞到O(1)就好了。
假设我们把任务按照小时划分,用单链表分别存放在0~23个时间刻度上,轮询器不再扫描N个任务,而是扫描常数时间刻度,查询是O(1),插入时不需要排序,也是O(1)。
但是小时可能不够细,通常定时任务都需要做到秒级,我们可以继续细分下去,246060=86400个刻度。
看似很完美了,但只是理论空架构。落到实际应用上,可以想象,大部分定时任务都会有聚集性,比如:
- 0点,”生而为人我们抱歉”的时间,又饿又疲惫,通常是买买买活动开启的好时机
- 凌晨3点,人们都睡觉了,服务器压力小,通常可以做清理、备份的工作
- 中午12点,人们在吃饭玩手机了,这个时候推送效果会很好
- ……
这样,由于数据不均衡,大部分的轮转都是空转,比较浪费资源。不过这个设计很符合日常生活的直觉。
分桶时间轮:Netty的io.netty.util.HashedWheelTimer
我们要做的优化,就是让简单时间轮的数据均匀分布在每个时间刻度上。由于时间刻度无论如何都有明显的聚集性,我们不得不把时间刻度转换为”分桶”,按照某种规则能够均匀地放到各个”分桶”就可以。那当然是哈希了。
假设我们把分桶设置为60个,现在是00:00:00,时间轮开始启动,按秒级进行轮询,那么:
- 00:00:00任务会落入0分桶
- 00:00:01任务会落入1分桶
- ……
- 00:01:00的任务,已经转了一圈了,应该落入0分桶,但是如果直接插入链表,会在00:00:00时刻被取出来执行的
所以增加了一个”剩余圈数”的概念,来区分0分桶中的任务,到底是”00:00:00”取出来执行,还是”00:01:00”取出来执行。
我们现在还是按照”秒”来分桶的,桶恰好是60个,还是会有大量的任务集中在00:00:00的第0个分桶,继续抽象一下,每个桶可以代表任意粒度的轮询时间,桶的数量也任意,这样任务就会被均匀分散了,本质上就是哈希+单链表解决冲突,就如Netty为了“心跳”和“超时”设计的HashedWheelTimer中的2个重要参数:
- tickDuration:每个分桶代表的轮询器轮询时间,默认100ms
- ticksPerWheel:一个轮上有多少个分桶,默认512个分桶
这样解决了数据不均匀的问题,但是还不够有效率,因为单链表上并不是所有的任务都会得到执行,大量任务还是执行的“剩余圈数-1”这样的无意义操作,和“空转”并没有本质区别,所以如果能让链表上所有的任务都是应该执行的任务就好了。
同时,当任务量较大时,我们需要让分桶数量多一些,让单链表短一些,执行得更快;当任务量较小时,我们需要让分桶的数量少一些,空桶少一些,执行得更快。但业务都是有周期的,不可能在一开始就设置完美的分桶数量。轮询时间同理,如果设大了,任务会聚集;如果设过小,CPU受不了,很难合理设置。所以Netty的时间轮有一些局限性,需要经验参数。
分层时间轮:Kafka的kafka.utils.timer.TimingWheel
为了避免分桶时间轮的空转,以及能够动态调整分桶参数,kafka设计了分层时间轮TimingWheel。
插入数据和动态创建分桶
为了能让分桶能够根据任务数量“动态扩缩”,对于需要“转几圈”才执行的任务,kafka提出了“溢出”的概念,认为它们可以溢出到更高层的时间轮。
- 假设有层级1的桶,当任务在0~59秒的时候,可以完美放入层级1的桶
- 这时来了一个“1分30秒”后才执行的任务,那么就会动态创建一个层级2的时间轮,把它放到2层第1个桶(管理1分~2分这个时间区间的任务)
- 同理如果是“1小时1分30秒”后执行,那么会继续往上动态创建层级3的时间轮,放到第3层的第1个桶(管理1小时~2小时时间区间的任务)
这样,继续往上,我们还可以扩展周、月、年等等时间层级,无限扩展,实现了动态扩容。当然实际上,kafka每个层级允许20个桶,扫描时间也是毫秒级别的不是秒级别的,避免任务都集中在一起,并且创建了之后没有缩小的动作,避免了并发冲突。
对应的代码在TimeWheel.scala的add函数里:
1 | def add(timerTaskEntry: TimerTaskEntry): Boolean = { |
这里可以看到,用了java.util.concurrent.DelayQueue,本质上还是O(LogN)的最小堆,但这里插入的元素是bucket,也就说并不是任务量级,仅仅只是桶数量的量级。每一层才20个桶,而且可以指数性地容纳任务,假设有100万个任务,也只需要20^5共5层时间轮,20*5=100个桶,基本可以看做O(1)了。
时间轮推进和定时任务取出
Timer.scala的推进函数:
1 | def advanceClock(timeoutMs: Long): Boolean = { |
时间轮推进在TimingWheel里面:
1 | def advanceClock(timeMs: Long): Unit = { |
实际上做的事情就如同kafka注释中所述,做一个轮转:
1 | 推进前 |
bucket.flush(reinsert),这个flush是TimerTaskList中自定义函数,传入的是一个方法。做的事情是:遍历链表,拿出并移除所有元素,对每个元素都执行一遍传入的f函数,最后重置数据结构。
1 | def flush(f: (TimerTaskEntry)=>Unit): Unit = { |
传入的这个reinsert是在Timer.scala里面的,其实就是做了addTimerTaskEntry:
1 | private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) |
也就是说,对于“到时间了”的任务,都拿出来,重新插入时间轮,比如:本来在“小时层”的时间轮任务,到1小时了,就拿出来放到“分钟级”时间轮;“分钟级”放到“秒级”;“秒级”到时间了,则直接取出执行。
这里的实现感觉非常优雅,并没有去想“把高层级直接插回低层级”,而是用一个“重试”的方式实现了目的,简单又稳定。
还有个值得借鉴的地方是,用了delayQueue。在任务量少的时候,本来应该“轮询所有桶”,产生Netty时间轮的“空转”副作用,但如果用delayQueue把“待执行的桶”都放进去,完全就避免了轮询,只需要等待队列中有数据就可以了,这个思路真的厉害,相当于把“轮询”改为了“回调”。
kafka的时间轮已经是单机时间轮最好的设计了,对于常见的IoT千万级别设备,大量设备都会设置“0点定时开机/关机”,这种业务场景,也完全可以利用这个时间轮来执行定时任务。
分布式定时:Quartz
定时任务执行,单机是有瓶颈的,于是Quartz提出了分布式定时任务。Quartz将定时任务进行了领域划分,核心有3个模型:
- Scheduler:调度器。对应前面的“轮询器”。核心调度器是QuartzScheduler,Scheduler接口只是一个接口壳子,可以操作QuartzScheduler的一些启停之类。
- Trigger:触发器。对应“delayTime”,在什么时间去执行
- Job/JobDetail:具体的任务。分为执行接口和元信息。
这是一个非常好的划分,我们可以对不同模型进行深入扩展: - Scheduler:支持各种调度方式,比如远程调度(RemoteScheduler)其他机器上的QuartzScheduler
- Trigger:支持各种格式。比如“cron”、“周期”等等,甚至可以“多个触发器逻辑组合”共同触发
- Job/JobDetail:支持各种持久化方式,避免机器挂掉导致任务丢失。
最后,通过数据库“抢锁”方式,来决定那台机器执行任务。我们重点还是关注下定时是如何实现的,这里我们的研究对象就应该是Trigger了。轮询的线程是QuartzSchedulerThread:
1 |
|
接下来看如何获取需要执行的Trigger,以内存存储RAMJobStore来看:
1 | protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator()); |
这里本质上就是“简单时间轮”,按照时间窗口不断地取任务。只不过相比于“简单时间轮”的单链表,这里用了TreeSet来保存trigger,底层是红黑树,比较器是TriggerWrapperComaprator:
1 | public static int compare(Date nextFiretime1, int priority1, TriggerKey key1, Date nextFiretime2, int priority2, TriggerKey key2) |
也就是根据下一次执行时间从小到大来排序,如果时间相同,则根据优先级从大到小排序。
所以,Quartz实际上的很低效的,不过它的优势在于是分布式的,任务会均衡在不同机器上执行。虽然这里有问题,但实际使用中发现,最大的瓶颈并不是这里,而是“mysql数据库抢锁”,任务多了之后非常慢,而且受限于mysql的性能瓶颈。
Elastic-job和Xxl-job在Quartz的基础上,增加了分片功能,将大任务拆分成各个小任务,执行完后再汇总,它们都是在不断地优化“任务如何执行”功能,而没有优化“如何知道时间到了”。
分布式定时:schedulerx
schedulerx做的主要工作,是利用“Quartz”实现自定义的定时任务,比如接入自定义告警、监控、失败重试。
schedulerx对“抢锁”做了优化,不再用数据库竞争锁,而是用zookeeper来竞争,后期发现zookeeper性能也不行,于是转用tair来做锁。如果你在阿里内网,可以去看JobManager的acquireTriggerLock部分代码。
还有一个比较好的优化,就是定时任务不再是在“定时任务服务机器群”里执行,而是在“业务机器群”里执行,用netty做了一个CS框架,这块代码在remoting里面。好处有很多:
- “定时任务机器群”可以独立于业务,专心做“定时任务触发”的事情,不会被超时业务拖累
- 相当于客户端有了一个agent,可以做一些边缘计算,比如schedulerx引以为豪的“网格计算”等
schedulerx在内网开源,但是很多同学把代码、设计都发布到外网去了,还有在阿里的同学开发出来的PowerJob,抄了很多schedulerx的思路,这些行为不太好评价,因为我并不是schedulerx开发者,不过个人认为是不妥的。
分布式定时:schedulerx 2.0
schedulerx2.0不再使用quartz作为底层基础,而是自研了一套基于“时间轮”的分布式定时,同时优化了多租户相关的设计。和核心开发者交流了一下,提了一些有关集群通信和时间轮的建议。
总结
分层时间轮看起来似乎是“如何知道时间到了”的终极方案,性能已经很够用了,再优化也没有太大意义,除非以后有一些新的分布式技术出现导致不得不出一个“分布式分层时间轮”这种脏东西。
所以现在大部分定时任务,都不再优化这块了,转去优化“如何执行任务”,比如加入轻量级的大数据map-reduce功能,这样对一些有数据处理需求的小项目很友好,可能是未来的一个趋势吧。
定时任务演进和schedulerx