最小堆定时器以 PriorityQueue
为核心,堆顶始终是最近到期任务,插入 O(log N)、取消 O(log N);JDK ScheduledThreadPoolExecutor 的 DelayedWorkQueue 以及几乎所有操作系统内核早期定时器均基于此结构。
相关文章:时间轮(Hashed Timing Wheel) · 层级时间轮
目录
| 章节 | 说明 |
|---|---|
| 问题背景 | 为什么需要最小堆定时器,适用场景与局限 |
| 核心数据结构 | PriorityQueue、TimerTask 字段精确定义 |
| schedule() 伪代码 | 插入任务 O(log N) 的完整算法 |
| cancel() 伪代码 | 惰性删除 vs 立即删除的权衡 |
| tick() 伪代码 | 单线程 vs 多线程执行模型 |
| 执行追踪 | 插入 A(5s)、B(2s)、C(8s) 的堆化与触发演示 |
| JDK ScheduledThreadPoolExecutor | DelayedWorkQueue 实现、leader-follower 模式 |
| 异常与边界场景 | 时钟回拨、任务抛出异常、并发竞争、堆满 |
| 与时间轮对比 | 复杂度、精度、适用场景全维度对比 |
| 参考资料 | 论文、JDK 源码、书籍 |
问题背景
什么场景需要最小堆定时器?
最小堆定时器是最自然、最通用的定时任务调度结构:
- 任务数量少(< 10 万级别),delay 跨度极大(毫秒到天级别混用)
- 精度要求高:每个任务有独立的 deadline,无需为精度牺牲内存或 CPU
- 实现简单:JDK 标准库
PriorityQueue即可驱动
| 典型使用方 | 场景 |
|---|---|
java.util.Timer |
JDK 最早的单线程定时器(已过时) |
ScheduledThreadPoolExecutor |
JDK 现代线程池,DelayedWorkQueue 内部即最小堆 |
HashedWheelTimer(Netty)出现之前 |
所有 NIO 框架的超时管理 |
| 操作系统早期内核定时器 | Linux 2.4 之前用最小堆管理内核定时器 |
与时间轮的核心矛盾
时间轮用空间换时间,把任务分散到固定数量的槽;最小堆用时间换精度,每次维护全局有序。
两者适用场景天然互补:
任务数 × log N vs O(1)
少任务大跨度 vs 海量任务固定精度
核心数据结构
// 定时器任务
struct TimerTask {
long deadline // 绝对到期时间戳(单调时钟纳秒,不受时钟回拨影响)
Runnable callback // 任务逻辑
boolean cancelled // 惰性取消标志(volatile,保证可见性)
int heapIndex // 在堆数组中的下标(支持 O(log N) cancel)
// 不持有该字段则 cancel 只能 O(1) 标记,无法立即从堆中删除
long period // 周期任务间隔(ms),0 表示一次性任务
}
// 最小堆定时器
struct MinHeapTimer {
TimerTask[] heap // 动态数组,1-indexed(heap[1] 为堆顶)
int size // 当前堆元素数量
long tickResolution // 后台线程唤醒精度(ms),通常 1ms~100ms
Thread timerThread // 单一后台线程,负责 tick 推进
Executor taskExecutor // 任务回调的执行线程池(与 timerThread 分离)
}
// 堆序不变式
// 对所有 i ∈ [1, size/2]:
// heap[i].deadline <= heap[2*i].deadline (父 ≤ 左子)
// heap[i].deadline <= heap[2*i+1].deadline (父 ≤ 右子)
// 即:堆顶 heap[1] 是 deadline 最小(最快到期)的任务
关键设计决策:
heapIndex字段:额外占 4 字节,换来 cancel 时 O(log N) 直接定位,无需 O(N) 遍历- 单调时钟:deadline 使用
System.nanoTime()(单调递增),不受 NTP 调整影响,防止时钟回拨导致任务漏触发 - timerThread 与 taskExecutor 分离:timerThread 只负责时钟推进,任务实际执行交给线程池,避免耗时任务阻塞时钟
schedule() 伪代码
// 向定时器插入一个延迟 delayMs 后触发的任务
// 时间复杂度:O(log N),N = 当前堆中任务数
function schedule(callback, delayMs):
task = new TimerTask()
task.deadline = nanoTime() + delayMs * 1_000_000 // 转为纳秒
task.callback = callback
task.cancelled = false
task.period = 0 // 一次性任务
heap.add(task) // 追加到数组末尾
task.heapIndex = size // 记录当前下标
siftUp(size) // 上浮维护堆序
size++
if task.heapIndex == 1: // 新任务成了堆顶(更早到期)
timerThread.wakeUp() // 唤醒后台线程重新计算等待时间
return task // 返回句柄,供 cancel() 使用
// 上浮操作(sift-up)
// 将 heap[i] 向上移动,直到满足堆序
function siftUp(i):
while i > 1:
parent = i / 2
if heap[parent].deadline <= heap[i].deadline:
break // 父节点已 ≤ 当前节点,堆序满足,停止
swap(heap[i], heap[parent])
heap[i].heapIndex = i
heap[parent].heapIndex = parent
i = parent
插入步骤拆解:
- 创建
TimerTask,设置deadline = now + delay,O(1) - 数组末尾追加,记录
heapIndex,O(1) siftUp:最多上浮 log₂(N) 层,每层一次比较 + 最多一次 swap,O(log N)- 若新任务成为堆顶(deadline 最小),唤醒后台线程重设等待时间,O(1)
cancel() 伪代码
方案一:惰性删除(lazy cancel)
// O(1) 标记取消,不立即从堆中删除
function cancelLazy(task):
task.cancelled = true // CAS 原子设置(防止重复 cancel 与并发 fire 竞争)
// tick() 中检查
if task.cancelled:
heap.poll() // 弹出但不执行
continue
优点:实现最简单,cancel 路径完全无锁
缺点:堆中可能积压大量"僵尸任务",浪费内存;堆操作仍需比较这些节点
方案二:立即删除(eager cancel,需要 heapIndex)
// O(log N) 立即从堆中移除
// 前提:TimerTask 持有 heapIndex 字段
function cancelEager(task):
if task.heapIndex < 1 or task.heapIndex > size:
return // 任务已执行或已删除
i = task.heapIndex
// 用堆末尾元素覆盖被删元素,然后缩小堆
heap[i] = heap[size]
heap[i].heapIndex = i
size--
// 新填入的元素可能需要上浮或下沉(只会走其中一条路)
siftUp(i)
siftDown(i)
task.heapIndex = -1 // 标记为已移除
// 下沉操作(sift-down)
function siftDown(i):
while true:
smallest = i
left = 2 * i
right = 2 * i + 1
if left <= size and heap[left].deadline < heap[smallest].deadline:
smallest = left
if right <= size and heap[right].deadline < heap[smallest].deadline:
smallest = right
if smallest == i:
break // 已满足堆序
swap(heap[i], heap[smallest])
heap[i].heapIndex = i
heap[smallest].heapIndex = smallest
i = smallest
两种方案对比:
| 惰性删除 | 立即删除 | |
|---|---|---|
| cancel 复杂度 | O(1) | O(log N) |
| 内存占用 | 可能积压已取消任务 | 立即释放 |
| 额外字段 | 无 | heapIndex(4 bytes) |
| 适用场景 | cancel 极少 | cancel 频繁(如大量超时任务) |
| JDK 实现 | java.util.Timer(早期) |
ScheduledThreadPoolExecutor(DelayedWorkQueue) |
tick() 伪代码
单线程执行模型
// timerThread 的主循环(单线程,负责时钟推进)
function tickLoop():
while running:
now = nanoTime()
// 触发所有已到期任务(deadline <= now)
while size > 0 and heap[1].deadline <= now:
task = heap[1]
removeTop() // 弹出堆顶,O(log N)
if not task.cancelled:
taskExecutor.submit(task.callback) // 异步提交,不阻塞 timerThread
// 计算下次唤醒时间
if size > 0:
sleepNs = heap[1].deadline - nanoTime()
if sleepNs > 0:
timerThread.sleep(sleepNs) // 精确等到堆顶到期
else:
timerThread.wait() // 无任务,等待 schedule() 唤醒
// 弹出堆顶(sift-down)
function removeTop():
heap[1] = heap[size]
heap[1].heapIndex = 1
size--
siftDown(1)
多线程执行模型注意事项
// 多线程 schedule/cancel 需加锁(或使用无锁结构)
lock.acquire()
try:
schedule(callback, delay) // 修改 heap 数组必须串行
finally:
lock.release()
// tick() 中弹出堆顶也需持锁
lock.acquire()
try:
task = heap[1]
removeTop()
finally:
lock.release()
task.callback.run() // 实际执行在锁外,避免持锁时阻塞
关键原则:
- timerThread 只推进时钟,不执行任务:任何耗时操作都必须提交给
taskExecutor - sleep 精度:
Thread.sleep(ns)精度受 OS 调度影响(Linux 约 ±0.5ms),实际误差通常远小于 tickDuration - 新任务提前唤醒:若新插入任务 deadline < 当前等待的 deadline,必须打断 sleep 重设等待时间
执行追踪
配置:空堆初始状态,当前时间 t=0
操作序列:
t=0ms: schedule(A, delay=5s) → A.deadline = 5000ms
t=0ms: schedule(B, delay=2s) → B.deadline = 2000ms
t=0ms: schedule(C, delay=8s) → C.deadline = 8000ms
步骤一:插入 A(deadline=5000)
heap 初始为空,直接追加:
heap[1] = A(5000)
size = 1
siftUp(1):i=1,无父节点,退出
堆状态:
A(5000)
[1]
步骤二:插入 B(deadline=2000)
追加到末尾:
heap[2] = B(2000)
size = 2
siftUp(2):
i=2, parent=1
heap[1].deadline=5000 > heap[2].deadline=2000
→ swap(heap[2], heap[1]):B 上浮到堆顶
→ heap[1]=B(2000), heap[1].heapIndex=1
→ heap[2]=A(5000), heap[2].heapIndex=2
i=1,退出循环
堆状态:
B(2000)
/
A(5000)
[1] [2]
B 成为新堆顶(比 A 更早到期)→ 唤醒 timerThread 重设 sleep 为 2000ms
步骤三:插入 C(deadline=8000)
追加到末尾:
heap[3] = C(8000)
size = 3
siftUp(3):
i=3, parent=1
heap[1].deadline=2000 ≤ heap[3].deadline=8000
→ 满足堆序,退出
堆状态(数组下标):
B(2000) ← heap[1],堆顶
/ \
A(5000) C(8000) ← heap[2], heap[3]
步骤四:t=2000ms,tickLoop 触发
now = 2000ms
heap[1].deadline = 2000 ≤ 2000 → 触发
removeTop():
heap[1] = heap[3] = C(8000),size=2
siftDown(1):
i=1, left=2(A,5000), right=3(越界 size=2)
smallest = min(C=8000, A=5000) → A 更小
swap(heap[1], heap[2])
heap[1]=A(5000), heap[2]=C(8000)
i=2, left=4(越界)→ 退出
taskExecutor.submit(B.callback) ← B 被异步执行
堆状态:
A(5000) ← heap[1],新堆顶
/
C(8000) ← heap[2]
timerThread 重设 sleep = 5000 - 2000 = 3000ms
步骤五:t=5000ms,触发 A;t=8000ms,触发 C
t=5000ms:
heap[1]=A(5000),deadline ≤ now → removeTop()
taskExecutor.submit(A.callback)
堆:heap[1]=C(8000),size=1
t=8000ms:
heap[1]=C(8000),deadline ≤ now → removeTop()
taskExecutor.submit(C.callback)
堆:size=0,timerThread 进入 wait() 状态
JDK ScheduledThreadPoolExecutor
DelayedWorkQueue:最小堆的工业级实现
ScheduledThreadPoolExecutor 内部使用 DelayedWorkQueue,本质是支持 cancel 的最小堆:
// ScheduledFutureTask 关键字段(简化)
class ScheduledFutureTask<V> implements RunnableScheduledFuture<V> {
long time; // 绝对到期时间(纳秒,System.nanoTime())
long period; // 0=一次性, >0=fixedRate, <0=fixedDelay
int heapIndex; // 在 DelayedWorkQueue 数组中的下标(支持 O(log N) cancel)
long sequenceNumber // 相同 time 时的排序稳定性保证(FIFO)
}
// DelayedWorkQueue 关键操作
class DelayedWorkQueue {
RunnableScheduledFuture<?>[] queue; // 最小堆数组(初始容量 16,动态扩容)
int size;
ReentrantLock lock;
Condition available; // 用于 leader-follower 等待
// cancel 时从堆中移除(利用 heapIndex 直接定位,O(log N))
boolean remove(Object x) {
int i = ((ScheduledFutureTask)x).heapIndex;
if (i < 0) return false;
setIndex(queue[i], -1); // 标记已移除
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement); // 先尝试下沉
if (queue[i] == replacement)
siftUp(i, replacement); // 下沉无效则上浮
}
return true;
}
}
leader-follower 等待模式
问题:多个工作线程同时调用 take()(从堆顶取任务),如果都 sleep 等到堆顶到期,会引发惊群效应(大量线程同时唤醒,只有一个能拿到任务)。
解决方案:leader-follower 模式
// DelayedWorkQueue.take() 简化逻辑
function take():
lock.acquire()
loop:
first = heap[1] // 堆顶任务
if first == null:
available.await() // 无任务,无限等待,成为 follower
delay = first.deadline - nanoTime()
if delay <= 0:
return poll() // 任务已到期,直接取走
if leader != null:
// 已有 leader 在等待堆顶,当前线程作为 follower 无限等待
available.await()
else:
// 当前线程成为 leader,精确 sleep 到堆顶到期
thisThread = Thread.currentThread()
leader = thisThread
try:
available.awaitNanos(delay) // 精确等待
finally:
if leader == thisThread:
leader = null
// leader 醒来后,通知 follower(follower 再竞争成为新 leader)
if leader == null and heap[1] != null:
available.signal()
lock.release()
效果:任意时刻只有 1 个线程(leader)在计时等待,其余线程(follower)无限等待信号,消除惊群效应。
周期任务的重调度
// ScheduledFutureTask.run() 简化
void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
super.run(); // 一次性任务,直接执行
else if (super.runAndReset()) {
setNextRunTime(); // 重设下次触发时间
reExecutePeriodic(outerTask); // 重新插入堆
}
}
void setNextRunTime() {
long p = period;
if (p > 0) time += p; // fixedRate:基于上次触发时间累加
else time = nanoTime() - p; // fixedDelay:基于上次完成时间累加(p<0)
}
异常与边界场景
场景 1:系统时钟回拨
现象:NTP 同步导致 System.currentTimeMillis() 向前跳跃(如从 t=10s 突然回到 t=9s),基于此计算的 deadline 可能导致任务被提前触发或错过触发。
算法处理:
// 错误做法(受时钟回拨影响)
task.deadline = System.currentTimeMillis() + delayMs // ❌
// 正确做法(单调时钟)
task.deadline = System.nanoTime() + delayMs * 1_000_000 // ✅
// System.nanoTime() 保证单调递增,不受 NTP 调整影响
// JDK ScheduledThreadPoolExecutor 使用 System.nanoTime()
即使 nanoTime 本身在 JVM 重启后可能不连续,但在同一 JVM 生命周期内单调性有保证。
场景 2:任务执行抛出异常
现象:callback.run() 抛出 RuntimeException,是否影响后续任务触发?
算法处理:
// tickLoop 中提交给 taskExecutor(线程池)
taskExecutor.submit(task.callback) // 异步,不阻塞 timerThread
// 线程池内部(ThreadPoolExecutor 的 runWorker)
try:
task.run()
catch Throwable t:
// 异常被线程池捕获,不传播到 timerThread
// afterExecute(task, t) 回调可用于监控
结论:任务异常不影响后续任务,因为 timerThread 与 taskExecutor 完全分离。
但注意 ScheduledThreadPoolExecutor 中的周期任务:若周期任务抛出未检异常,runAndReset() 返回 false,任务不会被重新调度——相当于静默停止。需在任务内部 try-catch 所有异常。
场景 3:并发 cancel 与 fire 竞争
现象:timerThread 正在执行 removeTop() 准备提交任务,同时另一线程调用 cancelEager(task)。
竞态分析:
线程 A(timerThread):
task = heap[1] // 取出堆顶引用
removeTop() // 从堆中删除
// [竞态窗口]
if not task.cancelled: // 检查取消标志
taskExecutor.submit(task.callback)
线程 B(业务线程):
task.cancelled = true // CAS 设置(volatile write)
cancelEager(task) // 尝试从堆中删除(此时任务可能已被 removeTop 移出)
关键:task.cancelled 必须声明为 volatile,保证跨线程可见性。cancelEager 中检查 heapIndex == -1 可安全忽略已移出的任务。
竞态结果有两种:
- cancel 先于 fire:
task.cancelled=true→ timerThread 跳过提交,任务不执行(正确) - fire 先于 cancel:
removeTop()已完成,cancelEager发现heapIndex=-1静默返回;若task.callback已提交线程池,cancel 无法阻止执行(best-effort 语义,与Future.cancel(false)一致)
场景 4:堆满时的处理
现象:持续调用 schedule() 而任务执行速度不及调度速度,heap 数组超出内存或达到设定上限。
算法处理:
// 方案一:动态扩容(JDK PriorityQueue 默认)
if size == heap.length:
// 类似 ArrayList:容量 < 64 时翻倍,≥ 64 时增加 50%
heap = Arrays.copyOf(heap, newCapacity)
// 扩容代价 O(N),分摊到每次插入仍是 O(log N)
// 风险:OOM — 无边界增长
// 方案二:固定上限 + 拒绝策略(Netty HashedWheelTimer 的 pendingTimeouts 对标)
if size >= maxPendingTasks:
throw new RejectedExecutionException("timer queue overflow, size=" + size)
// 方案三:惰性清理(减少堆中"僵尸"任务占用的空间)
// 在 tick() 中顺带移除 cancelled 任务,降低峰值堆大小
while size > 0 and heap[1].cancelled:
removeTop() // O(log N)
与时间轮对比
| 维度 | 最小堆定时器 | 时间轮(HashedWheelTimer) | 层级时间轮(Kafka) |
|---|---|---|---|
| 插入复杂度 | O(log N) | O(1) | O(1) |
| 取消复杂度 | O(log N)(有 heapIndex)/ O(1)(惰性) | O(1)(标记取消) | O(1)(有 bucket 反向指针) |
| tick 触发复杂度 | O(1)(弹堆顶)+ O(log N)(重建堆序) | O(槽内任务数)(遍历链表) | O(M)(M=到期槽内任务数) |
| 精度 | 动态精确(每任务独立 deadline) | 固定(tickDuration 为精度下限) | 固定(最低层 tickDuration) |
| 内存 | O(N) 堆数组,无额外槽位 | O(wheelSize) 固定槽 + O(N) 任务 | O(层数 × wheelSize) + O(N) |
| 超长延迟支持 | 天然支持(deadline 任意大) | 需 round 计数或多层 | 天然支持(多层) |
| 多线程安全 | 需全局锁(或 CAS 无锁堆) | MPSC 无锁队列 | DelayQueue + 锁 |
| 适用任务规模 | < 10 万(log N 可接受) | > 100 万(O(1) 关键) | > 100 万,延迟跨度大 |
| 代表实现 | JDK ScheduledThreadPoolExecutor |
Netty HashedWheelTimer |
Kafka TimingWheel |
| 最佳场景 | 任务数少、delay 跨度极大、精度要求高 | 海量任务、固定超时、高并发 | 海量任务、delay 跨度大 |
选型结论:
任务数 < 10 万 且 delay 跨度极大(ms ~ 天混用)
→ 最小堆(ScheduledThreadPoolExecutor)
任务数 > 10 万 且 delay 相对固定(如统一超时 30s)
→ 单层时间轮(HashedWheelTimer)
任务数 > 10 万 且 delay 跨度大(ms ~ 小时混用)
→ 层级时间轮(Kafka TimingWheel)
参考资料
- JDK 源码 —
java.util.concurrent.ScheduledThreadPoolExecutor,DelayedWorkQueue,ScheduledFutureTask(OpenJDK GitHub) - Varghese & Lauck (1987) — Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility — 时间轮原始论文,SOSP '87(对比最小堆的背景章节)
- 《Java 并发编程实战》 — Brian Goetz 等,第 6 章任务执行,
ScheduledExecutorService使用规范 - Doug Lea — A Java Fork/Join Framework — 线程池设计思想,leader-follower 模式出处
- Linux Kernel —
include/linux/timer.h,Linux 2.4 早期定时器基于最小堆,2.6 后改为层级时间轮 - 《算法导论》第 4 版 — CLRS,第 6 章 Heapsort,堆的 siftUp / siftDown 标准实现
评论 (0)
发表评论