专栏文章
专栏文章
调度与限流系列
1. 调度与限流 #01:时间轮(Hashed Timing Wheel) 2. 调度与限流 #02:层级时间轮 3. 调度与限流 #03:最小堆定时器 4. 调度与限流 #04:令牌桶 5. 调度与限流 #05:漏桶 6. 调度与限流 #06:滑动窗口计数器 7. 调度与限流 #07:滑动窗口日志

调度与限流 #03:最小堆定时器

发布于 2026-06-04 04:06 👁 7 次阅读
#算法#data-structure#scheduler#min-heap#jdk#priority-queue

最小堆定时器以 PriorityQueue 为核心,堆顶始终是最近到期任务,插入 O(log N)、取消 O(log N);JDK ScheduledThreadPoolExecutor 的 DelayedWorkQueue 以及几乎所有操作系统内核早期定时器均基于此结构。

相关文章时间轮(Hashed Timing Wheel) · 层级时间轮

min heap timer

目录

章节 说明
问题背景 为什么需要最小堆定时器,适用场景与局限
核心数据结构 PriorityQueue、TimerTask 字段精确定义
schedule() 伪代码 插入任务 O(log N) 的完整算法
cancel() 伪代码 惰性删除 vs 立即删除的权衡
tick() 伪代码 单线程 vs 多线程执行模型
执行追踪 插入 A(5s)、B(2s)、C(8s) 的堆化与触发演示
JDK ScheduledThreadPoolExecutor DelayedWorkQueue 实现、leader-follower 模式
异常与边界场景 时钟回拨、任务抛出异常、并发竞争、堆满
与时间轮对比 复杂度、精度、适用场景全维度对比
参考资料 论文、JDK 源码、书籍

问题背景

什么场景需要最小堆定时器?

最小堆定时器是最自然、最通用的定时任务调度结构:

典型使用方 场景
java.util.Timer JDK 最早的单线程定时器(已过时)
ScheduledThreadPoolExecutor JDK 现代线程池,DelayedWorkQueue 内部即最小堆
HashedWheelTimer(Netty)出现之前 所有 NIO 框架的超时管理
操作系统早期内核定时器 Linux 2.4 之前用最小堆管理内核定时器

与时间轮的核心矛盾

时间轮用空间换时间,把任务分散到固定数量的槽;最小堆用时间换精度,每次维护全局有序。

两者适用场景天然互补:

任务数 × log N  vs  O(1)
  少任务大跨度    vs  海量任务固定精度

核心数据结构

// 定时器任务
struct TimerTask {
    long      deadline       // 绝对到期时间戳(单调时钟纳秒,不受时钟回拨影响)
    Runnable  callback       // 任务逻辑
    boolean   cancelled      // 惰性取消标志(volatile,保证可见性)
    int       heapIndex      // 在堆数组中的下标(支持 O(log N) cancel)
                             // 不持有该字段则 cancel 只能 O(1) 标记,无法立即从堆中删除
    long      period         // 周期任务间隔(ms),0 表示一次性任务
}

// 最小堆定时器
struct MinHeapTimer {
    TimerTask[]  heap         // 动态数组,1-indexed(heap[1] 为堆顶)
    int          size         // 当前堆元素数量
    long         tickResolution  // 后台线程唤醒精度(ms),通常 1ms~100ms
    Thread       timerThread  // 单一后台线程,负责 tick 推进
    Executor     taskExecutor // 任务回调的执行线程池(与 timerThread 分离)
}

// 堆序不变式
// 对所有 i ∈ [1, size/2]:
//   heap[i].deadline <= heap[2*i].deadline        (父 ≤ 左子)
//   heap[i].deadline <= heap[2*i+1].deadline      (父 ≤ 右子)
// 即:堆顶 heap[1] 是 deadline 最小(最快到期)的任务

关键设计决策:


schedule() 伪代码

// 向定时器插入一个延迟 delayMs 后触发的任务
// 时间复杂度:O(log N),N = 当前堆中任务数
function schedule(callback, delayMs):
    task = new TimerTask()
    task.deadline  = nanoTime() + delayMs * 1_000_000   // 转为纳秒
    task.callback  = callback
    task.cancelled = false
    task.period    = 0    // 一次性任务

    heap.add(task)                          // 追加到数组末尾
    task.heapIndex = size                   // 记录当前下标
    siftUp(size)                            // 上浮维护堆序
    size++

    if task.heapIndex == 1:                 // 新任务成了堆顶(更早到期)
        timerThread.wakeUp()               // 唤醒后台线程重新计算等待时间

    return task    // 返回句柄,供 cancel() 使用

// 上浮操作(sift-up)
// 将 heap[i] 向上移动,直到满足堆序
function siftUp(i):
    while i > 1:
        parent = i / 2
        if heap[parent].deadline <= heap[i].deadline:
            break       // 父节点已 ≤ 当前节点,堆序满足,停止
        swap(heap[i], heap[parent])
        heap[i].heapIndex      = i
        heap[parent].heapIndex = parent
        i = parent

插入步骤拆解:

  1. 创建 TimerTask,设置 deadline = now + delay,O(1)
  2. 数组末尾追加,记录 heapIndex,O(1)
  3. siftUp:最多上浮 log₂(N) 层,每层一次比较 + 最多一次 swap,O(log N)
  4. 若新任务成为堆顶(deadline 最小),唤醒后台线程重设等待时间,O(1)

cancel() 伪代码

方案一:惰性删除(lazy cancel)

// O(1) 标记取消,不立即从堆中删除
function cancelLazy(task):
    task.cancelled = true   // CAS 原子设置(防止重复 cancel 与并发 fire 竞争)

// tick() 中检查
if task.cancelled:
    heap.poll()    // 弹出但不执行
    continue

优点:实现最简单,cancel 路径完全无锁
缺点:堆中可能积压大量"僵尸任务",浪费内存;堆操作仍需比较这些节点

方案二:立即删除(eager cancel,需要 heapIndex)

// O(log N) 立即从堆中移除
// 前提:TimerTask 持有 heapIndex 字段
function cancelEager(task):
    if task.heapIndex < 1 or task.heapIndex > size:
        return   // 任务已执行或已删除

    i = task.heapIndex

    // 用堆末尾元素覆盖被删元素,然后缩小堆
    heap[i] = heap[size]
    heap[i].heapIndex = i
    size--

    // 新填入的元素可能需要上浮或下沉(只会走其中一条路)
    siftUp(i)
    siftDown(i)

    task.heapIndex = -1    // 标记为已移除

// 下沉操作(sift-down)
function siftDown(i):
    while true:
        smallest = i
        left     = 2 * i
        right    = 2 * i + 1

        if left <= size and heap[left].deadline < heap[smallest].deadline:
            smallest = left
        if right <= size and heap[right].deadline < heap[smallest].deadline:
            smallest = right

        if smallest == i:
            break    // 已满足堆序
        swap(heap[i], heap[smallest])
        heap[i].heapIndex       = i
        heap[smallest].heapIndex = smallest
        i = smallest

两种方案对比:

惰性删除 立即删除
cancel 复杂度 O(1) O(log N)
内存占用 可能积压已取消任务 立即释放
额外字段 heapIndex(4 bytes)
适用场景 cancel 极少 cancel 频繁(如大量超时任务)
JDK 实现 java.util.Timer(早期) ScheduledThreadPoolExecutor(DelayedWorkQueue)

tick() 伪代码

单线程执行模型

// timerThread 的主循环(单线程,负责时钟推进)
function tickLoop():
    while running:
        now = nanoTime()

        // 触发所有已到期任务(deadline <= now)
        while size > 0 and heap[1].deadline <= now:
            task = heap[1]
            removeTop()          // 弹出堆顶,O(log N)
            if not task.cancelled:
                taskExecutor.submit(task.callback)   // 异步提交,不阻塞 timerThread

        // 计算下次唤醒时间
        if size > 0:
            sleepNs = heap[1].deadline - nanoTime()
            if sleepNs > 0:
                timerThread.sleep(sleepNs)    // 精确等到堆顶到期
        else:
            timerThread.wait()               // 无任务,等待 schedule() 唤醒

// 弹出堆顶(sift-down)
function removeTop():
    heap[1] = heap[size]
    heap[1].heapIndex = 1
    size--
    siftDown(1)

多线程执行模型注意事项

// 多线程 schedule/cancel 需加锁(或使用无锁结构)
lock.acquire()
try:
    schedule(callback, delay)    // 修改 heap 数组必须串行
finally:
    lock.release()

// tick() 中弹出堆顶也需持锁
lock.acquire()
try:
    task = heap[1]
    removeTop()
finally:
    lock.release()
task.callback.run()   // 实际执行在锁外,避免持锁时阻塞

关键原则:


执行追踪

配置:空堆初始状态,当前时间 t=0

操作序列

t=0ms: schedule(A, delay=5s)   → A.deadline = 5000ms
t=0ms: schedule(B, delay=2s)   → B.deadline = 2000ms
t=0ms: schedule(C, delay=8s)   → C.deadline = 8000ms

步骤一:插入 A(deadline=5000)

heap 初始为空,直接追加:
  heap[1] = A(5000)
  size = 1

siftUp(1):i=1,无父节点,退出

堆状态:
         A(5000)
         [1]

步骤二:插入 B(deadline=2000)

追加到末尾:
  heap[2] = B(2000)
  size = 2

siftUp(2):
  i=2, parent=1
  heap[1].deadline=5000 > heap[2].deadline=2000
  → swap(heap[2], heap[1]):B 上浮到堆顶
  → heap[1]=B(2000), heap[1].heapIndex=1
  → heap[2]=A(5000), heap[2].heapIndex=2
  i=1,退出循环

堆状态:
         B(2000)
        /
       A(5000)
       [1]   [2]

B 成为新堆顶(比 A 更早到期)→ 唤醒 timerThread 重设 sleep 为 2000ms

步骤三:插入 C(deadline=8000)

追加到末尾:
  heap[3] = C(8000)
  size = 3

siftUp(3):
  i=3, parent=1
  heap[1].deadline=2000 ≤ heap[3].deadline=8000
  → 满足堆序,退出

堆状态(数组下标):
         B(2000)      ← heap[1],堆顶
        /       \
    A(5000)   C(8000)  ← heap[2], heap[3]

步骤四:t=2000ms,tickLoop 触发

now = 2000ms
heap[1].deadline = 2000 ≤ 2000 → 触发

removeTop():
  heap[1] = heap[3] = C(8000),size=2
  siftDown(1):
    i=1, left=2(A,5000), right=3(越界 size=2)
    smallest = min(C=8000, A=5000) → A 更小
    swap(heap[1], heap[2])
    heap[1]=A(5000), heap[2]=C(8000)
    i=2, left=4(越界)→ 退出

taskExecutor.submit(B.callback)   ← B 被异步执行

堆状态:
         A(5000)      ← heap[1],新堆顶
        /
    C(8000)           ← heap[2]

timerThread 重设 sleep = 5000 - 2000 = 3000ms

步骤五:t=5000ms,触发 A;t=8000ms,触发 C

t=5000ms:
  heap[1]=A(5000),deadline ≤ now → removeTop()
  taskExecutor.submit(A.callback)
  堆:heap[1]=C(8000),size=1

t=8000ms:
  heap[1]=C(8000),deadline ≤ now → removeTop()
  taskExecutor.submit(C.callback)
  堆:size=0,timerThread 进入 wait() 状态

JDK ScheduledThreadPoolExecutor

DelayedWorkQueue:最小堆的工业级实现

ScheduledThreadPoolExecutor 内部使用 DelayedWorkQueue,本质是支持 cancel 的最小堆:

// ScheduledFutureTask 关键字段(简化)
class ScheduledFutureTask<V> implements RunnableScheduledFuture<V> {
    long    time;          // 绝对到期时间(纳秒,System.nanoTime())
    long    period;        // 0=一次性, >0=fixedRate, <0=fixedDelay
    int     heapIndex;     // 在 DelayedWorkQueue 数组中的下标(支持 O(log N) cancel)
    long    sequenceNumber // 相同 time 时的排序稳定性保证(FIFO)
}

// DelayedWorkQueue 关键操作
class DelayedWorkQueue {
    RunnableScheduledFuture<?>[] queue;   // 最小堆数组(初始容量 16,动态扩容)
    int size;
    ReentrantLock lock;
    Condition available;   // 用于 leader-follower 等待

    // cancel 时从堆中移除(利用 heapIndex 直接定位,O(log N))
    boolean remove(Object x) {
        int i = ((ScheduledFutureTask)x).heapIndex;
        if (i < 0) return false;
        setIndex(queue[i], -1);        // 标记已移除
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            siftDown(i, replacement);  // 先尝试下沉
            if (queue[i] == replacement)
                siftUp(i, replacement); // 下沉无效则上浮
        }
        return true;
    }
}

leader-follower 等待模式

问题:多个工作线程同时调用 take()(从堆顶取任务),如果都 sleep 等到堆顶到期,会引发惊群效应(大量线程同时唤醒,只有一个能拿到任务)。

解决方案:leader-follower 模式

// DelayedWorkQueue.take() 简化逻辑
function take():
    lock.acquire()
    loop:
        first = heap[1]    // 堆顶任务

        if first == null:
            available.await()    // 无任务,无限等待,成为 follower

        delay = first.deadline - nanoTime()

        if delay <= 0:
            return poll()        // 任务已到期,直接取走

        if leader != null:
            // 已有 leader 在等待堆顶,当前线程作为 follower 无限等待
            available.await()
        else:
            // 当前线程成为 leader,精确 sleep 到堆顶到期
            thisThread = Thread.currentThread()
            leader = thisThread
            try:
                available.awaitNanos(delay)    // 精确等待
            finally:
                if leader == thisThread:
                    leader = null

    // leader 醒来后,通知 follower(follower 再竞争成为新 leader)
    if leader == null and heap[1] != null:
        available.signal()
    lock.release()

效果:任意时刻只有 1 个线程(leader)在计时等待,其余线程(follower)无限等待信号,消除惊群效应。

周期任务的重调度

// ScheduledFutureTask.run() 简化
void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        super.run();             // 一次性任务,直接执行
    else if (super.runAndReset()) {
        setNextRunTime();        // 重设下次触发时间
        reExecutePeriodic(outerTask);   // 重新插入堆
    }
}

void setNextRunTime() {
    long p = period;
    if (p > 0)    time += p;             // fixedRate:基于上次触发时间累加
    else          time = nanoTime() - p; // fixedDelay:基于上次完成时间累加(p<0)
}

异常与边界场景

场景 1:系统时钟回拨

现象:NTP 同步导致 System.currentTimeMillis() 向前跳跃(如从 t=10s 突然回到 t=9s),基于此计算的 deadline 可能导致任务被提前触发或错过触发。

算法处理

// 错误做法(受时钟回拨影响)
task.deadline = System.currentTimeMillis() + delayMs   // ❌

// 正确做法(单调时钟)
task.deadline = System.nanoTime() + delayMs * 1_000_000  // ✅
// System.nanoTime() 保证单调递增,不受 NTP 调整影响
// JDK ScheduledThreadPoolExecutor 使用 System.nanoTime()

即使 nanoTime 本身在 JVM 重启后可能不连续,但在同一 JVM 生命周期内单调性有保证。

场景 2:任务执行抛出异常

现象callback.run() 抛出 RuntimeException,是否影响后续任务触发?

算法处理

// tickLoop 中提交给 taskExecutor(线程池)
taskExecutor.submit(task.callback)  // 异步,不阻塞 timerThread

// 线程池内部(ThreadPoolExecutor 的 runWorker)
try:
    task.run()
catch Throwable t:
    // 异常被线程池捕获,不传播到 timerThread
    // afterExecute(task, t) 回调可用于监控

结论:任务异常不影响后续任务,因为 timerThread 与 taskExecutor 完全分离。

但注意 ScheduledThreadPoolExecutor 中的周期任务:若周期任务抛出未检异常,runAndReset() 返回 false,任务不会被重新调度——相当于静默停止。需在任务内部 try-catch 所有异常。

场景 3:并发 cancel 与 fire 竞争

现象:timerThread 正在执行 removeTop() 准备提交任务,同时另一线程调用 cancelEager(task)

竞态分析

线程 A(timerThread):
  task = heap[1]           // 取出堆顶引用
  removeTop()              // 从堆中删除
  // [竞态窗口]
  if not task.cancelled:   // 检查取消标志
      taskExecutor.submit(task.callback)

线程 B(业务线程):
  task.cancelled = true    // CAS 设置(volatile write)
  cancelEager(task)        // 尝试从堆中删除(此时任务可能已被 removeTop 移出)

关键:task.cancelled 必须声明为 volatile,保证跨线程可见性。cancelEager 中检查 heapIndex == -1 可安全忽略已移出的任务。

竞态结果有两种:

  1. cancel 先于 fire:task.cancelled=true → timerThread 跳过提交,任务不执行(正确)
  2. fire 先于 cancel:removeTop() 已完成,cancelEager 发现 heapIndex=-1 静默返回;若 task.callback 已提交线程池,cancel 无法阻止执行(best-effort 语义,与 Future.cancel(false) 一致)

场景 4:堆满时的处理

现象:持续调用 schedule() 而任务执行速度不及调度速度,heap 数组超出内存或达到设定上限。

算法处理

// 方案一:动态扩容(JDK PriorityQueue 默认)
if size == heap.length:
    // 类似 ArrayList:容量 < 64 时翻倍,≥ 64 时增加 50%
    heap = Arrays.copyOf(heap, newCapacity)
// 扩容代价 O(N),分摊到每次插入仍是 O(log N)
// 风险:OOM — 无边界增长

// 方案二:固定上限 + 拒绝策略(Netty HashedWheelTimer 的 pendingTimeouts 对标)
if size >= maxPendingTasks:
    throw new RejectedExecutionException("timer queue overflow, size=" + size)

// 方案三:惰性清理(减少堆中"僵尸"任务占用的空间)
// 在 tick() 中顺带移除 cancelled 任务,降低峰值堆大小
while size > 0 and heap[1].cancelled:
    removeTop()   // O(log N)

与时间轮对比

维度 最小堆定时器 时间轮(HashedWheelTimer) 层级时间轮(Kafka)
插入复杂度 O(log N) O(1) O(1)
取消复杂度 O(log N)(有 heapIndex)/ O(1)(惰性) O(1)(标记取消) O(1)(有 bucket 反向指针)
tick 触发复杂度 O(1)(弹堆顶)+ O(log N)(重建堆序) O(槽内任务数)(遍历链表) O(M)(M=到期槽内任务数)
精度 动态精确(每任务独立 deadline) 固定(tickDuration 为精度下限) 固定(最低层 tickDuration)
内存 O(N) 堆数组,无额外槽位 O(wheelSize) 固定槽 + O(N) 任务 O(层数 × wheelSize) + O(N)
超长延迟支持 天然支持(deadline 任意大) 需 round 计数或多层 天然支持(多层)
多线程安全 需全局锁(或 CAS 无锁堆) MPSC 无锁队列 DelayQueue + 锁
适用任务规模 < 10 万(log N 可接受) > 100 万(O(1) 关键) > 100 万,延迟跨度大
代表实现 JDK ScheduledThreadPoolExecutor Netty HashedWheelTimer Kafka TimingWheel
最佳场景 任务数少、delay 跨度极大、精度要求高 海量任务、固定超时、高并发 海量任务、delay 跨度大

选型结论

任务数 < 10 万 且 delay 跨度极大(ms ~ 天混用)
    → 最小堆(ScheduledThreadPoolExecutor)

任务数 > 10 万 且 delay 相对固定(如统一超时 30s)
    → 单层时间轮(HashedWheelTimer)

任务数 > 10 万 且 delay 跨度大(ms ~ 小时混用)
    → 层级时间轮(Kafka TimingWheel)

参考资料

  1. JDK 源码java.util.concurrent.ScheduledThreadPoolExecutorDelayedWorkQueueScheduledFutureTask(OpenJDK GitHub)
  2. Varghese & Lauck (1987)Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility — 时间轮原始论文,SOSP '87(对比最小堆的背景章节)
  3. 《Java 并发编程实战》 — Brian Goetz 等,第 6 章任务执行,ScheduledExecutorService 使用规范
  4. Doug LeaA Java Fork/Join Framework — 线程池设计思想,leader-follower 模式出处
  5. Linux Kernelinclude/linux/timer.h,Linux 2.4 早期定时器基于最小堆,2.6 后改为层级时间轮
  6. 《算法导论》第 4 版 — CLRS,第 6 章 Heapsort,堆的 siftUp / siftDown 标准实现
← 返回列表

评论 (0)

暂无评论,来留下第一条吧。

发表评论