专栏文章
专栏文章
调度与限流系列
1. 调度与限流 #01:时间轮(Hashed Timing Wheel) 2. 调度与限流 #02:层级时间轮 3. 调度与限流 #03:最小堆定时器 4. 调度与限流 #04:令牌桶 5. 调度与限流 #05:漏桶 6. 调度与限流 #06:滑动窗口计数器 7. 调度与限流 #07:滑动窗口日志

调度与限流 #02:层级时间轮

发布于 2026-06-04 04:00 👁 9 次阅读
#算法#engineering#timing-wheel#scheduler#netty#Kafka

层级时间轮(Hierarchical Timing Wheel)通过多层表盘结构将大延迟任务分级存储,彻底消除单层时间轮 round 计数带来的 O(N) 扫描退化问题;Kafka、Netty、Linux 内核定时器均采用此结构。

相关文章时间轮(Hashed Timing Wheel) · 最小堆定时器


目录

章节 说明
问题背景 单层时间轮 round 计数的 O(N) 退化
核心思想 时钟表盘类比,多层级联进位
数据结构 TimingWheel 字段定义与层级关系
schedule 伪代码 任务插入:从低层到高层递归插入
cascade 伪代码 任务降级:高层到期槽重新 schedule
执行追踪 三层轮演示 schedule(A, 1h30m20s)
Kafka 实现要点 DelayQueue 推进 + 链表节点复用
异常与边界场景 cascade 爆炸、多层同时 tick、cancel 竞争
与单层时间轮对比 cascade 代价 vs round 遍历代价
参考资料 论文、源码、书籍

问题背景

单层时间轮的 round 计数退化

单层时间轮用一个固定大小(wheelSize = W)的环形数组表示时间槽,每个槽存储一个任务链表。为了支持比 tickDuration × W 更大的延迟,引入 round 字段:

任务插入位置:slot = (currentTick + delay / tickDuration) % W
任务 round:  round = delay / tickDuration / W

tick 时扫描 slot[currentTick % W] 下所有任务:
  for task in slot[currentTick % W]:
    if task.round == 0:
      执行 task
    else:
      task.round -= 1          // 未到期,等下一圈

退化根因:当 wheelSize 固定(如 512)、延迟跨度很大时,大量任务堆积在同一个槽(round 不同),每 tick 必须遍历槽内全部任务比较 round 值:

最坏情况:N 个任务全部映射到同一 slot
  → 每 tick 扫描代价 O(N)
  → 高并发定时场景(Kafka 百万级任务)完全不可接受

层级时间轮用空间换时间:不同延迟的任务存储在不同层,每层槽内任务的到期 tick 完全相同,无需 round 比较,slot 扫描退化为 O(1)(每个槽平均任务数恒定)。


核心思想

hierarchical timing wheel

类比机械钟表:

秒针(第1层):60 格,每格 = 1 秒,走完一圈 = 60 秒
分针(第2层):60 格,每格 = 60 秒,走完一圈 = 3600 秒
时针(第3层):24 格,每格 = 3600 秒,走完一圈 = 86400 秒

进位规则:秒针转完一圈 → 分针前进 1 格(cascade)
          分针转完一圈 → 时针前进 1 格(cascade)

任务存放规则:
  delay ≤ 60s     → 直接放第1层对应槽
  60s < delay ≤ 3600s → 放第2层对应槽
  3600s < delay ≤ 86400s → 放第3层对应槽

cascade(降级):当高层某槽到期时,将该槽的所有任务取出,重新 schedule 到低层(此时剩余延迟已缩小,能落入低层范围)。


数据结构

// 单层时间轮
struct TimingWheel {
    tickDuration  int64        // 该层最小精度(纳秒)
    wheelSize     int          // 槽数量
    interval      int64        // 该层覆盖范围 = tickDuration × wheelSize
    currentTime   int64        // 当前时间(对齐到 tickDuration 的整数倍)
    buckets       []Bucket     // 环形槽数组,长度 = wheelSize
    overflowWheel *TimingWheel // 上层时间轮(懒创建)
}

// 槽(桶)
struct Bucket {
    expiration int64           // 该槽到期的绝对时间戳(纳秒)
    tasks      *TaskList       // 双向链表,头尾指针
}

// 任务节点(链表节点,复用避免 GC)
struct TimerTask {
    expiration int64           // 绝对到期时间戳(纳秒)
    task       func()          // 实际回调
    bucket     *Bucket         // 当前所在槽(用于 O(1) cancel)
    prev, next *TimerTask      // 双向链表指针
    cancelled  bool            // 取消标志
}

// 层级关系约束
// 第 k 层:tickDuration_k = tickDuration_{k-1} × wheelSize_{k-1}
// 第 k 层覆盖:[tickDuration_k, tickDuration_k × wheelSize_k)
//
// 三层示例(Kafka 默认):
// L1: tickDuration=1ms,   wheelSize=20,  interval=20ms
// L2: tickDuration=20ms,  wheelSize=20,  interval=400ms
// L3: tickDuration=400ms, wheelSize=20,  interval=8000ms
//
// 演示用三层(秒/分/时):
// L1: tickDuration=1s,    wheelSize=60,  interval=60s
// L2: tickDuration=60s,   wheelSize=60,  interval=3600s
// L3: tickDuration=3600s, wheelSize=24,  interval=86400s

schedule 伪代码

// 将任务插入合适的层
// wheel: 当前层(从最低层开始调用)
// task:  待插入任务(task.expiration 已设置为绝对到期时间)
function schedule(wheel, task):
    delay = task.expiration - wheel.currentTime

    if delay < wheel.tickDuration:
        // delay 已经 < 1 tick,说明任务已经到期或即将到期
        // 直接提交执行(防止插入后错过 tick)
        task.task()
        return

    if delay < wheel.interval:
        // delay 在当前层的覆盖范围内,插入当前层
        ticks = delay / wheel.tickDuration
        slotIndex = (wheel.currentTime / wheel.tickDuration + ticks) % wheel.wheelSize
        bucket = wheel.buckets[slotIndex]
        bucket.add(task)
        task.bucket = bucket
        // 更新槽的 expiration(用于 DelayQueue 推进,见 Kafka 实现)
        if bucket.expiration != slotIndex * wheel.tickDuration:
            bucket.expiration = wheel.currentTime - (wheel.currentTime % wheel.tickDuration)
                                 + ticks * wheel.tickDuration
            delayQueue.offer(bucket, bucket.expiration)
        return

    // delay 超出当前层范围,尝试上层(懒创建)
    if wheel.overflowWheel == null:
        wheel.overflowWheel = new TimingWheel(
            tickDuration  = wheel.interval,      // 上层 tick = 本层 interval
            wheelSize     = wheel.wheelSize,      // 槽数不变(或可配置)
            currentTime   = wheel.currentTime
        )
    schedule(wheel.overflowWheel, task)          // 递归插入上层

关键不变式:


cascade 伪代码

// 推进时间轮到 timeMs(由外部驱动调用)
function advanceClock(wheel, timeMs):
    if timeMs >= wheel.currentTime + wheel.tickDuration:
        // 推进 currentTime(对齐到 tickDuration 的整数倍)
        wheel.currentTime = timeMs - (timeMs % wheel.tickDuration)

        // 若有上层,同步推进上层时间
        if wheel.overflowWheel != null:
            advanceClock(wheel.overflowWheel, timeMs)

// 触发到期任务(由 DelayQueue 或固定 tick 驱动)
function tick(wheel):
    slotIndex = (wheel.currentTime / wheel.tickDuration) % wheel.wheelSize
    bucket = wheel.buckets[slotIndex]

    if bucket.expiration <= wheel.currentTime:
        bucket.expiration = -1              // 重置,防止重复触发
        tasks = bucket.flush()              // 取出槽内所有任务,清空槽
        for task in tasks:
            if task.cancelled:
                continue                    // 跳过已取消任务
            schedule(lowestWheel, task)     // 重新 schedule 到最低层
                                            // (cascade:高层到低层)

// 完整驱动循环(DelayQueue 版本,见 Kafka)
function runLoop(lowestWheel, delayQueue):
    loop:
        // 阻塞等待最近到期的 bucket
        bucket = delayQueue.poll(timeout)
        if bucket == null:
            continue

        // 推进各层 currentTime
        advanceClock(lowestWheel, bucket.expiration)

        // 触发该 bucket(可能是任何层的 bucket)
        tasks = bucket.flush()
        for task in tasks:
            if not task.cancelled:
                schedule(lowestWheel, task)  // 高层任务 cascade 到低层
                                             // 低层任务直接执行

cascade 递归深度:等于层数(通常 3 层),每次 cascade 代价 = 槽内任务数 × schedule 代价 O(1),整体 O(M),M 为同时到期任务数。


执行追踪

场景:三层时间轮,初始 currentTime = 0。

配置:
  L1: tickDuration=1s,    wheelSize=60,  interval=60s    (秒级)
  L2: tickDuration=60s,   wheelSize=60,  interval=3600s  (分钟级)
  L3: tickDuration=3600s, wheelSize=24,  interval=86400s (小时级)

任务:A,expiration = 0 + 1h30m20s = 5420s

Step 1:schedule(L1, A)

delay = 5420 - 0 = 5420s
L1.interval = 60s
5420 >= 60 → 超出 L1 范围,递归上层

Step 2:schedule(L2, A)

delay = 5420s
L2.interval = 3600s
5420 >= 3600 → 超出 L2 范围,递归上层

Step 3:schedule(L3, A)

delay = 5420s
L3.tickDuration = 3600s
L3.interval = 86400s
5420 < 86400 → 落入 L3 范围

ticks = 5420 / 3600 = 1  (取整,即第1格,对应 1~2h 这段)
slotIndex = (0 / 3600 + 1) % 24 = 1

→ 任务 A 插入 L3.buckets[1]
  L3.buckets[1].expiration = 1 × 3600 = 3600s

Step 4:t=3600s,L3.buckets[1] 到期,触发第一次 cascade

advanceClock 推进:L1.currentTime=3600, L2.currentTime=3600, L3.currentTime=3600
取出 L3.buckets[1] 的任务:{A, expiration=5420}

重新 schedule(L1, A):
  delay = 5420 - 3600 = 1820s
  L1.interval=60 → 1820 >= 60 → 递归 L2

schedule(L2, A):
  delay = 1820s
  L2.tickDuration = 60s
  L2.interval = 3600s
  1820 < 3600 → 落入 L2 范围

  ticks = 1820 / 60 = 30
  slotIndex = (3600 / 60 + 30) % 60 = (60 + 30) % 60 = 30

  → 任务 A 插入 L2.buckets[30]
    L2.buckets[30].expiration = 3600 + 30×60 = 5400s

Step 5:t=5400s,L2.buckets[30] 到期,触发第二次 cascade

取出 L2.buckets[30] 的任务:{A, expiration=5420}

重新 schedule(L1, A):
  delay = 5420 - 5400 = 20s
  L1.interval = 60s
  20 < 60 → 落入 L1 范围

  ticks = 20 / 1 = 20
  slotIndex = (5400 / 1 + 20) % 60 = 5420 % 60 = 20

  → 任务 A 插入 L1.buckets[20]
    L1.buckets[20].expiration = 5420s

Step 6:t=5420s,L1.buckets[20] 到期,任务 A 执行

bucket.flush() → {A}
A.cancelled = false → 执行 A.task()

cascade 链路总结:L3[1] → L2[30] → L1[20] → 执行,共触发 2 次 cascade。


Kafka 实现要点

DelayQueue 辅助推进(避免空转)

朴素实现:每隔 1 tick 唤醒线程扫描当前层——若大多数槽为空,大量 CPU 浪费。

Kafka 方案:用 java.util.concurrent.DelayQueue 存储所有非空 bucket,按到期时间排序:

// 只有当某个 bucket 第一次被写入任务时,才加入 DelayQueue
if bucket 从空变为非空:
    delayQueue.offer(bucket, bucket.expiration)

// 驱动线程只在有 bucket 到期时才唤醒
bucket = delayQueue.take()                // 阻塞,直到最近到期 bucket
advanceClock(lowestWheel, bucket.expiration)
// 触发 bucket.flush() + cascade

效果:零任务时驱动线程完全阻塞,不消耗 CPU;有任务时精确唤醒,无空转。

代价:每次 bucket 入队/出队 O(log N),N = DelayQueue 中 bucket 数(通常远小于任务数)。

链表节点复用避免 GC

Kafka TimerTask 本身就是链表节点(侵入式链表),不额外分配节点对象:

// TimerTask 同时作为链表节点
class TimerTask {
    var expiration: Long
    var timerTaskList: TimerTaskList  // 指向所在 bucket
    var prev: TimerTask
    var next: TimerTask
    def run(): Unit                   // 业务逻辑
}

// cancel 时 O(1) 摘除(有 prev/next 直接操作)
def cancel(task: TimerTask):
    task.timerTaskList.remove(task)  // O(1) 链表摘除
    task.cancelled = true

通过 TimerTask.timerTaskList 反向持有 bucket 引用,cancel 时无需遍历,O(1) 完成。


异常与边界场景

场景 1:cascade 时任务量爆炸

触发条件:大量长延迟任务(如 10 万个)恰好设置为同一小时整点到期,cascade 时全部从 L3 同一槽涌出,需要重新 schedule 到 L2。

算法处理

for task in bucket.flush():     // 顺序处理,无并发爆炸风险
    schedule(lowestWheel, task) // 每次 O(1),总代价 O(N)

cascade 本身是 O(N) 线性扫描,没有额外放大。但若单次 cascade 耗时过长,会延迟同一线程的其他 tick 处理。

Kafka 缓解方案:驱动线程只做 cascade(重新 schedule),任务实际执行提交到独立线程池,cascade 延迟不影响任务执行延迟。

场景 2:多层 tick 同时触发(低层转完一圈推高层)

触发条件:t=60s 时,L1 走完一圈,同时 L2.buckets[1] 到期。

算法处理

advanceClock(lowestWheel, 60000ms)
  → L1.currentTime = 60s
  → L2.currentTime = 60s  (递归推进)
  → L3.currentTime = 60s

DelayQueue 按 expiration 排序,L1 的某些 bucket 和 L2 的 bucket 会按顺序依次出队
→ 不会"同时"触发,而是串行按到期时间处理
→ 若 L1[0] 和 L2[1] 的 expiration 相同,处理顺序由 DelayQueue 的队内顺序决定(无影响,逻辑正确)

关键保证advanceClock 在处理任何 bucket 前先推进所有层的 currentTime,确保 cascade 重新 schedule 时计算的 delay 是正确的。

场景 3:任务在 cascade 过程中被 cancel

触发条件:L3 槽到期,cascade 正在将任务重新 schedule 到 L2,此时另一线程调用 task.cancel()

算法处理

// cascade 路径
tasks = bucket.flush()          // 已取出任务列表(快照)
for task in tasks:
    if task.cancelled:          // 检查取消标志
        continue                // 直接跳过,不插入低层
    schedule(lowestWheel, task)

// cancel 路径(另一线程)
task.cancelled = true           // CAS 原子设置
if task.bucket != null:
    task.bucket.remove(task)    // O(1) 摘除(若仍在某槽)

竞态窗口:若 cancel 发生在 bucket.flush() 之后、schedule 之前,任务会被插入低层;但低层 tick 时会再次检查 task.cancelled,仍不会执行。需保证 task.cancelled 可见性(Java volatile 或 CAS)。


与单层时间轮对比

维度 单层时间轮 + round 层级时间轮
插入代价 O(1) O(1)(最坏 O(层数),通常 3 层)
tick 扫描代价 O(N)(槽内所有任务比较 round) O(1)(槽内任务 expiration 完全相同)
cascade 代价 无(round 直接递减) O(M),M = 到期槽内任务数
支持延迟范围 tickDuration × wheelSize(固定) 理论无限(动态加层)
内存占用 1 × wheelSize 个槽 L 层 × wheelSize 个槽(通常 3~5 层)
空转开销 每 tick 固定扫描(即使无任务) DelayQueue 版本零空转
cancel 代价 O(1)(有 bucket 反向指针) O(1)(同上)
典型用途 简单超时检测、RTT 计时器 Kafka、Netty、Linux 内核

结论:当任务延迟跨度大、任务量多时,层级时间轮的 O(1) slot 扫描优势碾压 round 方案;cascade 是额外代价,但仅在任务降级时触发,分摊到整个生命周期代价很低。


参考资料

  1. Varghese & Lauck (1987)Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility — 层级时间轮原始论文,SOSP '87
  2. Kafka TimingWheel 源码clients/src/main/java/org/apache/kafka/common/utils/Timer.javacore/src/main/scala/kafka/utils/timer/TimingWheel.scala(Apache Kafka GitHub)
  3. Netty HashedWheelTimerio.netty.util.HashedWheelTimer,单层时间轮实现,可对比研究
  4. Linux 内核定时器kernel/time/timer.c,采用层级时间轮,5 层(Linux 2.6+)
  5. 《Systems Performance》 — Brendan Gregg,第 6 章 CPU 调度与定时器分析
← 返回列表

评论 (0)

暂无评论,来留下第一条吧。

发表评论