层级时间轮(Hierarchical Timing Wheel)通过多层表盘结构将大延迟任务分级存储,彻底消除单层时间轮 round 计数带来的 O(N) 扫描退化问题;Kafka、Netty、Linux 内核定时器均采用此结构。
相关文章:时间轮(Hashed Timing Wheel) · 最小堆定时器
目录
| 章节 | 说明 |
|---|---|
| 问题背景 | 单层时间轮 round 计数的 O(N) 退化 |
| 核心思想 | 时钟表盘类比,多层级联进位 |
| 数据结构 | TimingWheel 字段定义与层级关系 |
| schedule 伪代码 | 任务插入:从低层到高层递归插入 |
| cascade 伪代码 | 任务降级:高层到期槽重新 schedule |
| 执行追踪 | 三层轮演示 schedule(A, 1h30m20s) |
| Kafka 实现要点 | DelayQueue 推进 + 链表节点复用 |
| 异常与边界场景 | cascade 爆炸、多层同时 tick、cancel 竞争 |
| 与单层时间轮对比 | cascade 代价 vs round 遍历代价 |
| 参考资料 | 论文、源码、书籍 |
问题背景
单层时间轮的 round 计数退化
单层时间轮用一个固定大小(wheelSize = W)的环形数组表示时间槽,每个槽存储一个任务链表。为了支持比 tickDuration × W 更大的延迟,引入 round 字段:
任务插入位置:slot = (currentTick + delay / tickDuration) % W
任务 round: round = delay / tickDuration / W
tick 时扫描 slot[currentTick % W] 下所有任务:
for task in slot[currentTick % W]:
if task.round == 0:
执行 task
else:
task.round -= 1 // 未到期,等下一圈
退化根因:当 wheelSize 固定(如 512)、延迟跨度很大时,大量任务堆积在同一个槽(round 不同),每 tick 必须遍历槽内全部任务比较 round 值:
最坏情况:N 个任务全部映射到同一 slot
→ 每 tick 扫描代价 O(N)
→ 高并发定时场景(Kafka 百万级任务)完全不可接受
层级时间轮用空间换时间:不同延迟的任务存储在不同层,每层槽内任务的到期 tick 完全相同,无需 round 比较,slot 扫描退化为 O(1)(每个槽平均任务数恒定)。
核心思想
类比机械钟表:
秒针(第1层):60 格,每格 = 1 秒,走完一圈 = 60 秒
分针(第2层):60 格,每格 = 60 秒,走完一圈 = 3600 秒
时针(第3层):24 格,每格 = 3600 秒,走完一圈 = 86400 秒
进位规则:秒针转完一圈 → 分针前进 1 格(cascade)
分针转完一圈 → 时针前进 1 格(cascade)
任务存放规则:
delay ≤ 60s → 直接放第1层对应槽
60s < delay ≤ 3600s → 放第2层对应槽
3600s < delay ≤ 86400s → 放第3层对应槽
cascade(降级):当高层某槽到期时,将该槽的所有任务取出,重新 schedule 到低层(此时剩余延迟已缩小,能落入低层范围)。
数据结构
// 单层时间轮
struct TimingWheel {
tickDuration int64 // 该层最小精度(纳秒)
wheelSize int // 槽数量
interval int64 // 该层覆盖范围 = tickDuration × wheelSize
currentTime int64 // 当前时间(对齐到 tickDuration 的整数倍)
buckets []Bucket // 环形槽数组,长度 = wheelSize
overflowWheel *TimingWheel // 上层时间轮(懒创建)
}
// 槽(桶)
struct Bucket {
expiration int64 // 该槽到期的绝对时间戳(纳秒)
tasks *TaskList // 双向链表,头尾指针
}
// 任务节点(链表节点,复用避免 GC)
struct TimerTask {
expiration int64 // 绝对到期时间戳(纳秒)
task func() // 实际回调
bucket *Bucket // 当前所在槽(用于 O(1) cancel)
prev, next *TimerTask // 双向链表指针
cancelled bool // 取消标志
}
// 层级关系约束
// 第 k 层:tickDuration_k = tickDuration_{k-1} × wheelSize_{k-1}
// 第 k 层覆盖:[tickDuration_k, tickDuration_k × wheelSize_k)
//
// 三层示例(Kafka 默认):
// L1: tickDuration=1ms, wheelSize=20, interval=20ms
// L2: tickDuration=20ms, wheelSize=20, interval=400ms
// L3: tickDuration=400ms, wheelSize=20, interval=8000ms
//
// 演示用三层(秒/分/时):
// L1: tickDuration=1s, wheelSize=60, interval=60s
// L2: tickDuration=60s, wheelSize=60, interval=3600s
// L3: tickDuration=3600s, wheelSize=24, interval=86400s
schedule 伪代码
// 将任务插入合适的层
// wheel: 当前层(从最低层开始调用)
// task: 待插入任务(task.expiration 已设置为绝对到期时间)
function schedule(wheel, task):
delay = task.expiration - wheel.currentTime
if delay < wheel.tickDuration:
// delay 已经 < 1 tick,说明任务已经到期或即将到期
// 直接提交执行(防止插入后错过 tick)
task.task()
return
if delay < wheel.interval:
// delay 在当前层的覆盖范围内,插入当前层
ticks = delay / wheel.tickDuration
slotIndex = (wheel.currentTime / wheel.tickDuration + ticks) % wheel.wheelSize
bucket = wheel.buckets[slotIndex]
bucket.add(task)
task.bucket = bucket
// 更新槽的 expiration(用于 DelayQueue 推进,见 Kafka 实现)
if bucket.expiration != slotIndex * wheel.tickDuration:
bucket.expiration = wheel.currentTime - (wheel.currentTime % wheel.tickDuration)
+ ticks * wheel.tickDuration
delayQueue.offer(bucket, bucket.expiration)
return
// delay 超出当前层范围,尝试上层(懒创建)
if wheel.overflowWheel == null:
wheel.overflowWheel = new TimingWheel(
tickDuration = wheel.interval, // 上层 tick = 本层 interval
wheelSize = wheel.wheelSize, // 槽数不变(或可配置)
currentTime = wheel.currentTime
)
schedule(wheel.overflowWheel, task) // 递归插入上层
关键不变式:
- 插入时
task.expiration是绝对时间戳,不是相对 delay slotIndex计算基于currentTime,保证进位后 cascade 能正确重算- 懒创建
overflowWheel:只有实际需要时才分配内存
cascade 伪代码
// 推进时间轮到 timeMs(由外部驱动调用)
function advanceClock(wheel, timeMs):
if timeMs >= wheel.currentTime + wheel.tickDuration:
// 推进 currentTime(对齐到 tickDuration 的整数倍)
wheel.currentTime = timeMs - (timeMs % wheel.tickDuration)
// 若有上层,同步推进上层时间
if wheel.overflowWheel != null:
advanceClock(wheel.overflowWheel, timeMs)
// 触发到期任务(由 DelayQueue 或固定 tick 驱动)
function tick(wheel):
slotIndex = (wheel.currentTime / wheel.tickDuration) % wheel.wheelSize
bucket = wheel.buckets[slotIndex]
if bucket.expiration <= wheel.currentTime:
bucket.expiration = -1 // 重置,防止重复触发
tasks = bucket.flush() // 取出槽内所有任务,清空槽
for task in tasks:
if task.cancelled:
continue // 跳过已取消任务
schedule(lowestWheel, task) // 重新 schedule 到最低层
// (cascade:高层到低层)
// 完整驱动循环(DelayQueue 版本,见 Kafka)
function runLoop(lowestWheel, delayQueue):
loop:
// 阻塞等待最近到期的 bucket
bucket = delayQueue.poll(timeout)
if bucket == null:
continue
// 推进各层 currentTime
advanceClock(lowestWheel, bucket.expiration)
// 触发该 bucket(可能是任何层的 bucket)
tasks = bucket.flush()
for task in tasks:
if not task.cancelled:
schedule(lowestWheel, task) // 高层任务 cascade 到低层
// 低层任务直接执行
cascade 递归深度:等于层数(通常 3 层),每次 cascade 代价 = 槽内任务数 × schedule 代价 O(1),整体 O(M),M 为同时到期任务数。
执行追踪
场景:三层时间轮,初始 currentTime = 0。
配置:
L1: tickDuration=1s, wheelSize=60, interval=60s (秒级)
L2: tickDuration=60s, wheelSize=60, interval=3600s (分钟级)
L3: tickDuration=3600s, wheelSize=24, interval=86400s (小时级)
任务:A,expiration = 0 + 1h30m20s = 5420s
Step 1:schedule(L1, A)
delay = 5420 - 0 = 5420s
L1.interval = 60s
5420 >= 60 → 超出 L1 范围,递归上层
Step 2:schedule(L2, A)
delay = 5420s
L2.interval = 3600s
5420 >= 3600 → 超出 L2 范围,递归上层
Step 3:schedule(L3, A)
delay = 5420s
L3.tickDuration = 3600s
L3.interval = 86400s
5420 < 86400 → 落入 L3 范围
ticks = 5420 / 3600 = 1 (取整,即第1格,对应 1~2h 这段)
slotIndex = (0 / 3600 + 1) % 24 = 1
→ 任务 A 插入 L3.buckets[1]
L3.buckets[1].expiration = 1 × 3600 = 3600s
Step 4:t=3600s,L3.buckets[1] 到期,触发第一次 cascade
advanceClock 推进:L1.currentTime=3600, L2.currentTime=3600, L3.currentTime=3600
取出 L3.buckets[1] 的任务:{A, expiration=5420}
重新 schedule(L1, A):
delay = 5420 - 3600 = 1820s
L1.interval=60 → 1820 >= 60 → 递归 L2
schedule(L2, A):
delay = 1820s
L2.tickDuration = 60s
L2.interval = 3600s
1820 < 3600 → 落入 L2 范围
ticks = 1820 / 60 = 30
slotIndex = (3600 / 60 + 30) % 60 = (60 + 30) % 60 = 30
→ 任务 A 插入 L2.buckets[30]
L2.buckets[30].expiration = 3600 + 30×60 = 5400s
Step 5:t=5400s,L2.buckets[30] 到期,触发第二次 cascade
取出 L2.buckets[30] 的任务:{A, expiration=5420}
重新 schedule(L1, A):
delay = 5420 - 5400 = 20s
L1.interval = 60s
20 < 60 → 落入 L1 范围
ticks = 20 / 1 = 20
slotIndex = (5400 / 1 + 20) % 60 = 5420 % 60 = 20
→ 任务 A 插入 L1.buckets[20]
L1.buckets[20].expiration = 5420s
Step 6:t=5420s,L1.buckets[20] 到期,任务 A 执行
bucket.flush() → {A}
A.cancelled = false → 执行 A.task()
cascade 链路总结:L3[1] → L2[30] → L1[20] → 执行,共触发 2 次 cascade。
Kafka 实现要点
DelayQueue 辅助推进(避免空转)
朴素实现:每隔 1 tick 唤醒线程扫描当前层——若大多数槽为空,大量 CPU 浪费。
Kafka 方案:用 java.util.concurrent.DelayQueue 存储所有非空 bucket,按到期时间排序:
// 只有当某个 bucket 第一次被写入任务时,才加入 DelayQueue
if bucket 从空变为非空:
delayQueue.offer(bucket, bucket.expiration)
// 驱动线程只在有 bucket 到期时才唤醒
bucket = delayQueue.take() // 阻塞,直到最近到期 bucket
advanceClock(lowestWheel, bucket.expiration)
// 触发 bucket.flush() + cascade
效果:零任务时驱动线程完全阻塞,不消耗 CPU;有任务时精确唤醒,无空转。
代价:每次 bucket 入队/出队 O(log N),N = DelayQueue 中 bucket 数(通常远小于任务数)。
链表节点复用避免 GC
Kafka TimerTask 本身就是链表节点(侵入式链表),不额外分配节点对象:
// TimerTask 同时作为链表节点
class TimerTask {
var expiration: Long
var timerTaskList: TimerTaskList // 指向所在 bucket
var prev: TimerTask
var next: TimerTask
def run(): Unit // 业务逻辑
}
// cancel 时 O(1) 摘除(有 prev/next 直接操作)
def cancel(task: TimerTask):
task.timerTaskList.remove(task) // O(1) 链表摘除
task.cancelled = true
通过 TimerTask.timerTaskList 反向持有 bucket 引用,cancel 时无需遍历,O(1) 完成。
异常与边界场景
场景 1:cascade 时任务量爆炸
触发条件:大量长延迟任务(如 10 万个)恰好设置为同一小时整点到期,cascade 时全部从 L3 同一槽涌出,需要重新 schedule 到 L2。
算法处理:
for task in bucket.flush(): // 顺序处理,无并发爆炸风险
schedule(lowestWheel, task) // 每次 O(1),总代价 O(N)
cascade 本身是 O(N) 线性扫描,没有额外放大。但若单次 cascade 耗时过长,会延迟同一线程的其他 tick 处理。
Kafka 缓解方案:驱动线程只做 cascade(重新 schedule),任务实际执行提交到独立线程池,cascade 延迟不影响任务执行延迟。
场景 2:多层 tick 同时触发(低层转完一圈推高层)
触发条件:t=60s 时,L1 走完一圈,同时 L2.buckets[1] 到期。
算法处理:
advanceClock(lowestWheel, 60000ms)
→ L1.currentTime = 60s
→ L2.currentTime = 60s (递归推进)
→ L3.currentTime = 60s
DelayQueue 按 expiration 排序,L1 的某些 bucket 和 L2 的 bucket 会按顺序依次出队
→ 不会"同时"触发,而是串行按到期时间处理
→ 若 L1[0] 和 L2[1] 的 expiration 相同,处理顺序由 DelayQueue 的队内顺序决定(无影响,逻辑正确)
关键保证:advanceClock 在处理任何 bucket 前先推进所有层的 currentTime,确保 cascade 重新 schedule 时计算的 delay 是正确的。
场景 3:任务在 cascade 过程中被 cancel
触发条件:L3 槽到期,cascade 正在将任务重新 schedule 到 L2,此时另一线程调用 task.cancel()。
算法处理:
// cascade 路径
tasks = bucket.flush() // 已取出任务列表(快照)
for task in tasks:
if task.cancelled: // 检查取消标志
continue // 直接跳过,不插入低层
schedule(lowestWheel, task)
// cancel 路径(另一线程)
task.cancelled = true // CAS 原子设置
if task.bucket != null:
task.bucket.remove(task) // O(1) 摘除(若仍在某槽)
竞态窗口:若 cancel 发生在 bucket.flush() 之后、schedule 之前,任务会被插入低层;但低层 tick 时会再次检查 task.cancelled,仍不会执行。需保证 task.cancelled 可见性(Java volatile 或 CAS)。
与单层时间轮对比
| 维度 | 单层时间轮 + round | 层级时间轮 |
|---|---|---|
| 插入代价 | O(1) | O(1)(最坏 O(层数),通常 3 层) |
| tick 扫描代价 | O(N)(槽内所有任务比较 round) | O(1)(槽内任务 expiration 完全相同) |
| cascade 代价 | 无(round 直接递减) | O(M),M = 到期槽内任务数 |
| 支持延迟范围 | tickDuration × wheelSize(固定) | 理论无限(动态加层) |
| 内存占用 | 1 × wheelSize 个槽 | L 层 × wheelSize 个槽(通常 3~5 层) |
| 空转开销 | 每 tick 固定扫描(即使无任务) | DelayQueue 版本零空转 |
| cancel 代价 | O(1)(有 bucket 反向指针) | O(1)(同上) |
| 典型用途 | 简单超时检测、RTT 计时器 | Kafka、Netty、Linux 内核 |
结论:当任务延迟跨度大、任务量多时,层级时间轮的 O(1) slot 扫描优势碾压 round 方案;cascade 是额外代价,但仅在任务降级时触发,分摊到整个生命周期代价很低。
参考资料
- Varghese & Lauck (1987):Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility — 层级时间轮原始论文,SOSP '87
- Kafka TimingWheel 源码:
clients/src/main/java/org/apache/kafka/common/utils/Timer.java,core/src/main/scala/kafka/utils/timer/TimingWheel.scala(Apache Kafka GitHub) - Netty HashedWheelTimer:
io.netty.util.HashedWheelTimer,单层时间轮实现,可对比研究 - Linux 内核定时器:
kernel/time/timer.c,采用层级时间轮,5 层(Linux 2.6+) - 《Systems Performance》 — Brendan Gregg,第 6 章 CPU 调度与定时器分析
评论 (0)
发表评论