Disruptor 是 LMAX 开发的无锁环形缓冲队列,通过预分配内存、CAS Sequence、缓存行填充解决高并发下 ArrayBlockingQueue 的锁竞争与 GC 压力问题,被 Log4j2、Netty、Storm 等系统用于实现每秒千万级消息吞吐。
相关文章:CAS 与 ABA 问题 · 无锁队列(Michael-Scott Queue) · RCU(Read-Copy-Update)
目录
| 章节 | 说明 |
|---|---|
| 问题背景 | 为什么需要 Disruptor |
| 核心数据结构 | RingBuffer、Sequence、Sequencer |
| 位掩码寻址 | 避免取模运算 |
| 单生产者 Publish 伪代码 | CAS + 写入 + 发布 |
| 消费者 Consume 伪代码 | waitFor + 读取 + 更新 |
| WaitStrategy 策略 | BusySpin / Yielding / Blocking |
| 缓存行填充 | 避免 False Sharing |
| 多生产者模式 | MultiProducerSequencer |
| 为什么快 | 综合分析 |
| 异常与边界场景 | 背压、异常处理、溢出 |
| 执行追踪 | 具体数值步骤演示 |
| 参考资料 | — |
问题背景
传统队列的痛点
ArrayBlockingQueue 生产-消费模型:
生产者 → [lock] → put() → [unlock]
消费者 → [lock] → take() → [unlock]
- 锁竞争:高并发下 ReentrantLock 导致大量线程阻塞、上下文切换
- GC 压力:每次 put 新建对象,高吞吐下 GC 停顿严重
- False Sharing:head/tail 指针在同一缓存行,CPU 核间缓存失效
Disruptor 的目标
在单线程情况下实现接近内存带宽极限的吞吐量(实测 ~25M events/s),多线程时仍保持无锁。
核心数据结构
RingBuffer<E>:
entries : Object[] // 预分配的环形数组,大小必须是 2^n
indexMask : long // = entries.length - 1,用于位掩码寻址
sequencer : Sequencer // 管理 Sequence 的分配与发布
SingleProducerSequencer:
cursor : Sequence // 生产者当前发布位置(AtomicLong,缓存行填充)
gatingSequences : Sequence[] // 所有消费者的 Sequence 引用,用于背压检测
nextValue : long // 生产者本地缓存的下一个可用 sequence(非 volatile)
cachedGatingSequence : long // 缓存的最慢消费者位置(减少 volatile 读)
MultiProducerSequencer:
cursor : Sequence // 已声明的最高 sequence(AtomicLong)
availableBuffer : int[] // 大小 = ringBuffer.size,标记每个槽位是否已发布
// availableBuffer[index] = sequence >> log2(bufferSize)
gatingSequences : Sequence[] // 同上
Sequence:
value : volatile long // 当前位置,用 @Contended 或手动填充独占缓存行
EventHandler<E>:
onEvent(event: E, sequence: long, endOfBatch: boolean) : void
SequenceBarrier:
dependentSequence : Sequence // 依赖的上游 Sequence(链式消费者)
waitStrategy : WaitStrategy
cursorSequence : Sequence // RingBuffer 的 cursor 引用
图示
位掩码寻址
问题:环形数组需要将 sequence 映射到数组下标,取模(%)运算慢。
解决:当数组大小是 2 的幂次时,取模等价于位与运算:
bufferSize = 2^n → indexMask = bufferSize - 1
index = sequence & indexMask
= sequence % bufferSize // 等价,但位运算快 ~3-5 倍
例:bufferSize = 8,indexMask = 7 = 0b0111
sequence=0 → 0 & 7 = 0
sequence=7 → 7 & 7 = 7
sequence=8 → 8 & 7 = 0 // 回绕到起点
sequence=15 → 15 & 7 = 7
sequence=16 → 16 & 7 = 0 // 再次回绕
强制 2 的幂次:
function roundUpToPowerOf2(n):
result = 1
while result < n:
result = result << 1
return result
单生产者 Publish 伪代码
// 单生产者版本(SingleProducerSequencer)
// 无 CAS,利用单生产者保证只有一个线程写 nextValue
function publish(sequencer, ringBuffer, eventTranslator):
// --- 步骤 1:申请下一个 sequence ---
nextSequence = sequencer.nextValue + 1 // 本地变量,无需 volatile
wrapPoint = nextSequence - ringBuffer.size // 若生产者超前消费者超过一圈则需等待
// 检查是否超过最慢消费者(背压检查)
if wrapPoint > sequencer.cachedGatingSequence:
// 缓存失效,重新读取所有消费者的最小 sequence
minGatingSequence = min(seq.value for seq in sequencer.gatingSequences)
sequencer.cachedGatingSequence = minGatingSequence
// 若仍然超前,自旋等待(配合 WaitStrategy)
while wrapPoint > sequencer.cachedGatingSequence:
LockSupport.parkNanos(1) // 或 Thread.yield(),取决于 WaitStrategy
sequencer.cachedGatingSequence = min(seq.value for seq in sequencer.gatingSequences)
sequencer.nextValue = nextSequence // 更新本地缓存
// --- 步骤 2:写入 RingBuffer ---
index = nextSequence & ringBuffer.indexMask
event = ringBuffer.entries[index] // 取预分配对象
eventTranslator.translateTo(event, nextSequence) // 填充数据(用户回调)
// --- 步骤 3:发布(让消费者可见)---
// 写 cursor 必须是 volatile store(happens-before 保证)
UNSAFE.putOrderedLong(sequencer.cursor, nextSequence)
// 等价于:sequencer.cursor.lazySet(nextSequence)
// 使用 lazySet 而非 volatile write,避免 StoreLoad 屏障,性能更好
// 消费者通过轮询 cursor >= waitSequence 来感知新事件
// 若需要立即可见(严格 happens-before):
// sequencer.cursor.set(nextSequence) // volatile write with full fence
批量发布(减少 cursor 更新次数):
function publishBatch(sequencer, ringBuffer, translators[], batchSize):
hi = sequencer.next(batchSize) // 一次申请多个 sequence
lo = hi - batchSize + 1
for seq = lo to hi:
index = seq & ringBuffer.indexMask
event = ringBuffer.entries[index]
translators[seq - lo].translateTo(event, seq)
sequencer.cursor.lazySet(hi) // 一次发布整批
消费者 Consume 伪代码
// EventProcessor(消费者线程主循环)
function run(processor):
sequence = processor.sequence // 消费者自己的 Sequence
barrier = processor.sequenceBarrier // 包含 WaitStrategy 和 cursor 引用
handler = processor.eventHandler
nextSequence = sequence.value + 1 // 下一个要消费的 sequence
availableSequence = -1
loop:
try:
// --- 步骤 1:等待可消费的 sequence ---
availableSequence = barrier.waitFor(nextSequence)
// waitFor 内部:
// BusySpin: while cursor.value < waitSeq: {} // 自旋
// Yielding: while cursor.value < waitSeq: Thread.yield()
// Blocking: lock.wait() until signaled by producer
// --- 步骤 2:批量处理到 availableSequence ---
// availableSequence 可能 > nextSequence,实现批量消费
while nextSequence <= availableSequence:
index = nextSequence & ringBuffer.indexMask
event = ringBuffer.entries[index]
isEndOfBatch = (nextSequence == availableSequence)
handler.onEvent(event, nextSequence, isEndOfBatch)
nextSequence = nextSequence + 1
// --- 步骤 3:更新消费者 Sequence(通知生产者可以复用槽位)---
sequence.lazySet(availableSequence)
catch AlertException:
// 收到停止信号,退出循环
if processor.running == false:
break
catch Throwable as ex:
processor.exceptionHandler.handleEventException(ex, nextSequence, event)
sequence.lazySet(nextSequence) // 即使异常也要推进,避免死锁
nextSequence = nextSequence + 1
SequenceBarrier.waitFor 内部(以 BlockingWaitStrategy 为例):
function waitFor(sequence, cursor, dependentSequence, barrier):
if cursor.value < sequence:
lock.lock()
try:
while cursor.value < sequence:
barrier.checkAlert() // 检查是否有停止信号
processorNotifyCondition.await() // 释放锁,挂起等待
finally:
lock.unlock()
// 等待所有依赖的上游消费者也处理完(链式消费者场景)
availableSequence = dependentSequence.value
while availableSequence < sequence:
barrier.checkAlert()
availableSequence = dependentSequence.value
return availableSequence
WaitStrategy 策略
| 策略 | 实现 | 延迟 | CPU 占用 | 适用场景 |
|---|---|---|---|---|
| BusySpinWaitStrategy | while cursor < seq {} |
最低 (<1μs) | 最高(100%) | 绑定 CPU 核、超低延迟 |
| YieldingWaitStrategy | Thread.yield() 自旋 100 次后 yield |
低 (1-10μs) | 中高 | 延迟敏感,可共享 CPU |
| SleepingWaitStrategy | yield 100 次 → park(1ns) | 中 (50-100μs) | 低 | 平衡延迟与 CPU |
| BlockingWaitStrategy | lock.wait() |
高 (>100μs) | 最低 | 吞吐优先,延迟不敏感 |
| LiteBlockingWaitStrategy | 无锁 CAS + park | 中低 | 低 | 减少锁竞争的 Blocking |
// BusySpinWaitStrategy 伪代码
function waitFor(sequence, cursor, dependentSequence, barrier):
availableSequence = -1
loop:
availableSequence = dependentSequence.value
if availableSequence >= sequence:
return availableSequence
barrier.checkAlert()
// 无 yield/sleep,纯自旋
// YieldingWaitStrategy 伪代码
function waitFor(sequence, cursor, dependentSequence, barrier):
counter = 100 // 自旋次数阈值
loop:
availableSequence = dependentSequence.value
if availableSequence >= sequence:
return availableSequence
barrier.checkAlert()
if counter == 0:
Thread.yield()
else:
counter = counter - 1
缓存行填充
False Sharing 问题:
// 不填充时,多个 Sequence 可能在同一缓存行(64字节)
// CPU 0 写 sequence0 → 使 CPU 1 的 sequence1 缓存失效 → CPU 1 需重新加载
// 即使 CPU 1 只关心 sequence1,也因为同缓存行而受影响
缓存行填充伪代码:
// 目标:让每个 Sequence 独占一个 CPU 缓存行(64 字节)
// value 是 long(8字节),填充 56 字节让 value 独占一行
class Sequence:
// 前填充:7 个 long = 56 字节
p1, p2, p3, p4, p5, p6, p7 : long = 7, 7, 7, 7, 7, 7, 7
// 核心字段(volatile,8 字节)
value : volatile long = -1
// 后填充:7 个 long = 56 字节(防止 value 与下一对象的字段共缓存行)
p8, p9, p10, p11, p12, p13, p14 : long = 7, 7, 7, 7, 7, 7, 7
// 内存布局(每行代表 8 字节):
// [p1][p2][p3][p4][p5][p6][p7][value] ← 前 8 个 long = 64 字节 = 1 缓存行
// [p8][p9][p10][p11][p12][p13][p14][?] ← 后填充,防止下一对象侵入
Java 实现方式:
// 方式 1:手动填充(Disruptor 原版)
abstract class RhsPadding extends Value {
protected long p6, p7, p8, p9, p10, p11, p12, p13;
}
// 方式 2:@Contended 注解(JDK 8+,需 -XX:-RestrictContended)
@sun.misc.Contended
class Sequence {
volatile long value;
}
多生产者模式
问题:单生产者用本地变量 nextValue 就够了,多生产者需要原子性地分配 sequence。
// MultiProducerSequencer
// 核心思路:cursor 记录已声明的最高 sequence,
// availableBuffer 标记每个槽位是否真正写完
function next(n): // 申请 n 个连续 sequence
current = cursor.value
nextValue = current + n
wrapPoint = nextValue - bufferSize
// 背压:等待最慢消费者腾出空间
cachedGating = cachedGatingSequence.value
if wrapPoint > cachedGating or cachedGating > current:
minGating = min(seq.value for seq in gatingSequences)
cachedGatingSequence.set(minGating)
while wrapPoint > cachedGatingSequence.value:
LockSupport.parkNanos(1)
// CAS 申请 sequence 范围
while not cursor.compareAndSet(current, nextValue):
current = cursor.value // CAS 失败,重读
nextValue = current + n
wrapPoint = nextValue - bufferSize
// 重新检查背压...
return nextValue // 返回申请到的最高 sequence,lo = nextValue - n + 1
function publish(sequence):
// 标记该槽位已写完
index = sequence & indexMask
flag = calculateAvailabilityFlag(sequence) // = sequence >> log2(bufferSize)
availableBuffer[index] = flag
// 注意:不直接更新 cursor(cursor 只在 next() 中更新)
function isAvailable(sequence):
index = sequence & indexMask
flag = sequence >> log2(bufferSize)
return availableBuffer[index] == flag
// 消费者 waitFor 中调用 getHighestPublishedSequence:
function getHighestPublishedSequence(lowerBound, availableSequence):
for seq = lowerBound to availableSequence:
if not isAvailable(seq):
return seq - 1 // 找到第一个未发布的槽,返回前一个
return availableSequence
availableBuffer flag 的意义:
bufferSize = 8,log2(8) = 3
sequence=8 → index=0,flag = 8 >> 3 = 1
sequence=16 → index=0,flag = 16 >> 3 = 2
// 通过 flag 区分同一槽位的不同"圈次",防止生产者超前误判
为什么快
| 原因 | 传统队列 | Disruptor | 收益 |
|---|---|---|---|
| 锁 | ReentrantLock 每次 put/take | 无锁(CAS Sequence 或单生产者无 CAS) | 消除线程阻塞与上下文切换 |
| 内存分配 | 每个消息 new 对象 | 预分配 entries[],循环复用 | 无 GC 停顿 |
| 缓存行 | head/tail 共缓存行 | Sequence 独占缓存行 | 消除 False Sharing |
| 批量消费 | 每次取一个 | waitFor 返回可用范围,批量处理 | 均摊等待开销 |
| CPU 预测 | 随机访问链表 | 顺序访问数组,空间局部性好 | CPU 预取缓存命中率高 |
异常与边界场景
场景 1:生产者速度超过消费者(背压)
// 触发条件:wrapPoint > minGatingSequence
// wrapPoint = nextSequence - bufferSize
// 即:生产者想写的位置,消费者还没消费完
bufferSize = 8
cursor = 8(已发布到 sequence 8)
消费者 Sequence = 1(只消费到 1)
nextSequence = 9 → wrapPoint = 9 - 8 = 1
wrapPoint(1) > consumerSeq(1)? → 等于,不等待(边界:wrapPoint > 而非 >=)
nextSequence = 10 → wrapPoint = 2 > 1 → 触发等待
处理:
Single: 自旋检查 cachedGatingSequence,直到消费者推进
Multi: CAS 失败后重试,同时检查背压
用户可感知:publish() 调用被阻塞(背压自然传导)
场景 2:消费者 EventHandler 抛出异常
// 默认 ExceptionHandler:FatalExceptionHandler
// 行为:打印堆栈,重新抛出,停止 EventProcessor
// 自定义 ExceptionHandler:
class MyExceptionHandler implements ExceptionHandler<MyEvent>:
function handleEventException(ex, sequence, event):
log.error(f"处理 sequence={sequence} 时异常:{ex}")
// 不重新抛出 → 消费者继续运行
// 关键:sequence.lazySet(sequence) 必须执行(在 run() 的 catch 块中)
// 否则消费者 Sequence 卡住,生产者因背压永远阻塞
场景 3:Sequence 值溢出
// Sequence 是 long,范围 0 到 9,223,372,036,854,775,807(约 9.2 * 10^18)
// 每秒 25,000,000 个事件 → 溢出时间 = 9.2*10^18 / 25*10^6 / 3600 / 24 / 365
// ≈ 11,676 年
// 结论:实际使用中不需要处理溢出,long 足够大
// 初始值设为 -1(RingBuffer.INITIAL_CURSOR_VALUE)
// 第一个 sequence = 0,消费者初始化为 -1
// sequence.value >= 0 时表示有效事件
场景 4:多消费者 Diamond 拓扑(消费者依赖)
// Producer → [ConsumerA, ConsumerB] → ConsumerC
// ConsumerC 必须等 A 和 B 都处理完同一 sequence 才能处理
SequenceBarrier for C = ringBuffer.newBarrier(sequenceA, sequenceB)
// waitFor 中:dependentSequence = min(sequenceA, sequenceB)
// C 只能消费到 min(A.sequence, B.sequence)
场景 5:RingBuffer 大小不是 2 的幂次
// 构造时强制检查
if Integer.bitCount(bufferSize) != 1:
throw IllegalArgumentException("bufferSize 必须是 2 的幂次")
// 因为位掩码 indexMask = bufferSize - 1 只在 2^n 时等价于取模
// bufferSize=6, indexMask=5(0101): sequence=6 → 6&5=4 ≠ 6%6=0 // 错误!
执行追踪
场景:bufferSize=4,1 个生产者,1 个消费者,发布 3 个事件
初始状态:
entries = [E0, E1, E2, E3](预分配,大小=4,indexMask=3)
cursor = -1(AtomicLong)
consumerSeq = -1
--- Publish 第 1 个事件(data="Hello")---
Step 1: nextSequence = nextValue + 1 = -1 + 1 = 0
wrapPoint = 0 - 4 = -4
-4 > cachedGatingSeq(-1)? → false,无需等待
nextValue = 0
Step 2: index = 0 & 3 = 0
entries[0] = E0(预分配对象)
E0.data = "Hello"(translateTo 回调)
Step 3: cursor.lazySet(0)
cursor = 0
--- Consume 第 1 个事件 ---
Step 1: nextConsumerSeq = consumerSeq + 1 = 0
waitFor(0):cursor.value(0) >= 0 → 立即返回 availableSeq=0
Step 2: index = 0 & 3 = 0
event = entries[0],data="Hello"
handler.onEvent(E0, seq=0, endOfBatch=true)
Step 3: consumerSeq.lazySet(0),consumerSeq = 0
--- Publish 第 2 个事件(data="World")---
Step 1: nextSequence = 0 + 1 = 1
wrapPoint = 1 - 4 = -3
-3 > consumerSeq(0)? → false
nextValue = 1
Step 2: index = 1 & 3 = 1
E1.data = "World"
Step 3: cursor.lazySet(1),cursor = 1
--- Publish 第 3 个事件(data="!"),此时消费者未推进 ---
Step 1: nextSequence = 2,wrapPoint = -2 > 0? → false,继续
--- Publish 第 5 个事件(bufferSize=4,测试回绕)---
假设已发布 sequence 0-3,consumerSeq 仍在 -1(消费者极慢)
nextSequence = 4,wrapPoint = 4 - 4 = 0
cachedGatingSeq = consumerSeq = -1
0 > -1 → true,触发背压等待!
生产者自旋:
loop: cachedGatingSeq = min(consumerSeq) = -1
0 > -1 → 继续等待
(消费者处理完 seq=0,consumerSeq=0)
0 > 0? → false,退出等待
nextValue = 4,index = 4 & 3 = 0(回绕!复用 entries[0])
E0.data = "NewData"(覆盖旧数据)
cursor.lazySet(4)
最终 RingBuffer 状态:
entries[0] = "NewData"(sequence 4 写入,覆盖了 "Hello")
entries[1] = "World"
entries[2] = ...
entries[3] = ...
cursor = 4,consumerSeq = 0
参考资料
- LMAX Disruptor 白皮书 — 原始论文,详述设计决策
- Disruptor GitHub — 源码,重点看
SingleProducerSequencer和MultiProducerSequencer - Martin Thompson: Mechanical Sympathy — 缓存行、False Sharing 深度解析
- 并发编程网:Disruptor 系列 — 中文详解
- Log4j2 Async Appender — Disruptor 在日志框架中的应用
评论 (0)
发表评论