专栏文章
专栏文章
并发算法系列
1. 并发算法 #01:CAS 与 ABA 问题 2. 并发算法 #02:AQS(AbstractQueuedSynchronizer) 3. 并发算法 #03:MVCC(多版本并发控制) 4. 并发算法 #04:无锁队列(Michael-Scott Queue) 5. 并发算法 #05:Disruptor(无锁环形缓冲) 6. 并发算法 #06:RCU(Read-Copy-Update)

并发算法 #05:Disruptor(无锁环形缓冲)

发布于 2026-06-04 02:42 👁 13 次阅读
#算法#java#engineering#lock-free#disruptor#ring-buffer#high-performance#concurrent

Disruptor 是 LMAX 开发的无锁环形缓冲队列,通过预分配内存、CAS Sequence、缓存行填充解决高并发下 ArrayBlockingQueue 的锁竞争与 GC 压力问题,被 Log4j2、Netty、Storm 等系统用于实现每秒千万级消息吞吐。

相关文章CAS 与 ABA 问题 · 无锁队列(Michael-Scott Queue) · RCU(Read-Copy-Update)


目录

章节 说明
问题背景 为什么需要 Disruptor
核心数据结构 RingBuffer、Sequence、Sequencer
位掩码寻址 避免取模运算
单生产者 Publish 伪代码 CAS + 写入 + 发布
消费者 Consume 伪代码 waitFor + 读取 + 更新
WaitStrategy 策略 BusySpin / Yielding / Blocking
缓存行填充 避免 False Sharing
多生产者模式 MultiProducerSequencer
为什么快 综合分析
异常与边界场景 背压、异常处理、溢出
执行追踪 具体数值步骤演示
参考资料

问题背景

传统队列的痛点

ArrayBlockingQueue 生产-消费模型:
  生产者 → [lock] → put() → [unlock]
  消费者 → [lock] → take() → [unlock]

Disruptor 的目标

在单线程情况下实现接近内存带宽极限的吞吐量(实测 ~25M events/s),多线程时仍保持无锁。


核心数据结构

RingBuffer<E>:
  entries    : Object[]          // 预分配的环形数组,大小必须是 2^n
  indexMask  : long              // = entries.length - 1,用于位掩码寻址
  sequencer  : Sequencer         // 管理 Sequence 的分配与发布

SingleProducerSequencer:
  cursor     : Sequence          // 生产者当前发布位置(AtomicLong,缓存行填充)
  gatingSequences : Sequence[]   // 所有消费者的 Sequence 引用,用于背压检测
  nextValue  : long              // 生产者本地缓存的下一个可用 sequence(非 volatile)
  cachedGatingSequence : long    // 缓存的最慢消费者位置(减少 volatile 读)

MultiProducerSequencer:
  cursor          : Sequence     // 已声明的最高 sequence(AtomicLong)
  availableBuffer : int[]        // 大小 = ringBuffer.size,标记每个槽位是否已发布
                                 // availableBuffer[index] = sequence >> log2(bufferSize)
  gatingSequences : Sequence[]   // 同上

Sequence:
  value : volatile long          // 当前位置,用 @Contended 或手动填充独占缓存行

EventHandler<E>:
  onEvent(event: E, sequence: long, endOfBatch: boolean) : void

SequenceBarrier:
  dependentSequence : Sequence   // 依赖的上游 Sequence(链式消费者)
  waitStrategy      : WaitStrategy
  cursorSequence    : Sequence   // RingBuffer 的 cursor 引用

图示

disruptor ring buffer


位掩码寻址

问题:环形数组需要将 sequence 映射到数组下标,取模(%)运算慢。

解决:当数组大小是 2 的幂次时,取模等价于位与运算:

bufferSize = 2^n  →  indexMask = bufferSize - 1

index = sequence & indexMask
      = sequence % bufferSize   // 等价,但位运算快 ~3-5 倍

例:bufferSize = 8,indexMask = 7 = 0b0111
  sequence=0  → 0 & 7 = 0
  sequence=7  → 7 & 7 = 7
  sequence=8  → 8 & 7 = 0   // 回绕到起点
  sequence=15 → 15 & 7 = 7
  sequence=16 → 16 & 7 = 0  // 再次回绕

强制 2 的幂次

function roundUpToPowerOf2(n):
  result = 1
  while result < n:
    result = result << 1
  return result

单生产者 Publish 伪代码

// 单生产者版本(SingleProducerSequencer)
// 无 CAS,利用单生产者保证只有一个线程写 nextValue

function publish(sequencer, ringBuffer, eventTranslator):

  // --- 步骤 1:申请下一个 sequence ---
  nextSequence = sequencer.nextValue + 1           // 本地变量,无需 volatile
  wrapPoint    = nextSequence - ringBuffer.size    // 若生产者超前消费者超过一圈则需等待

  // 检查是否超过最慢消费者(背压检查)
  if wrapPoint > sequencer.cachedGatingSequence:
    // 缓存失效,重新读取所有消费者的最小 sequence
    minGatingSequence = min(seq.value for seq in sequencer.gatingSequences)
    sequencer.cachedGatingSequence = minGatingSequence

    // 若仍然超前,自旋等待(配合 WaitStrategy)
    while wrapPoint > sequencer.cachedGatingSequence:
      LockSupport.parkNanos(1)  // 或 Thread.yield(),取决于 WaitStrategy
      sequencer.cachedGatingSequence = min(seq.value for seq in sequencer.gatingSequences)

  sequencer.nextValue = nextSequence   // 更新本地缓存

  // --- 步骤 2:写入 RingBuffer ---
  index = nextSequence & ringBuffer.indexMask
  event = ringBuffer.entries[index]    // 取预分配对象
  eventTranslator.translateTo(event, nextSequence)  // 填充数据(用户回调)

  // --- 步骤 3:发布(让消费者可见)---
  // 写 cursor 必须是 volatile store(happens-before 保证)
  UNSAFE.putOrderedLong(sequencer.cursor, nextSequence)
  // 等价于:sequencer.cursor.lazySet(nextSequence)
  // 使用 lazySet 而非 volatile write,避免 StoreLoad 屏障,性能更好
  // 消费者通过轮询 cursor >= waitSequence 来感知新事件

  // 若需要立即可见(严格 happens-before):
  // sequencer.cursor.set(nextSequence)  // volatile write with full fence

批量发布(减少 cursor 更新次数):

function publishBatch(sequencer, ringBuffer, translators[], batchSize):
  hi = sequencer.next(batchSize)   // 一次申请多个 sequence
  lo = hi - batchSize + 1

  for seq = lo to hi:
    index = seq & ringBuffer.indexMask
    event = ringBuffer.entries[index]
    translators[seq - lo].translateTo(event, seq)

  sequencer.cursor.lazySet(hi)     // 一次发布整批

消费者 Consume 伪代码

// EventProcessor(消费者线程主循环)

function run(processor):
  sequence        = processor.sequence        // 消费者自己的 Sequence
  barrier         = processor.sequenceBarrier // 包含 WaitStrategy 和 cursor 引用
  handler         = processor.eventHandler
  nextSequence    = sequence.value + 1        // 下一个要消费的 sequence
  availableSequence = -1

  loop:
    try:
      // --- 步骤 1:等待可消费的 sequence ---
      availableSequence = barrier.waitFor(nextSequence)
      // waitFor 内部:
      //   BusySpin:  while cursor.value < waitSeq: {}  // 自旋
      //   Yielding:  while cursor.value < waitSeq: Thread.yield()
      //   Blocking:  lock.wait() until signaled by producer

      // --- 步骤 2:批量处理到 availableSequence ---
      // availableSequence 可能 > nextSequence,实现批量消费
      while nextSequence <= availableSequence:
        index = nextSequence & ringBuffer.indexMask
        event = ringBuffer.entries[index]
        isEndOfBatch = (nextSequence == availableSequence)

        handler.onEvent(event, nextSequence, isEndOfBatch)
        nextSequence = nextSequence + 1

      // --- 步骤 3:更新消费者 Sequence(通知生产者可以复用槽位)---
      sequence.lazySet(availableSequence)

    catch AlertException:
      // 收到停止信号,退出循环
      if processor.running == false:
        break

    catch Throwable as ex:
      processor.exceptionHandler.handleEventException(ex, nextSequence, event)
      sequence.lazySet(nextSequence)   // 即使异常也要推进,避免死锁
      nextSequence = nextSequence + 1

SequenceBarrier.waitFor 内部(以 BlockingWaitStrategy 为例)

function waitFor(sequence, cursor, dependentSequence, barrier):
  if cursor.value < sequence:
    lock.lock()
    try:
      while cursor.value < sequence:
        barrier.checkAlert()       // 检查是否有停止信号
        processorNotifyCondition.await()   // 释放锁,挂起等待
    finally:
      lock.unlock()

  // 等待所有依赖的上游消费者也处理完(链式消费者场景)
  availableSequence = dependentSequence.value
  while availableSequence < sequence:
    barrier.checkAlert()
    availableSequence = dependentSequence.value

  return availableSequence

WaitStrategy 策略

策略 实现 延迟 CPU 占用 适用场景
BusySpinWaitStrategy while cursor < seq {} 最低 (<1μs) 最高(100%) 绑定 CPU 核、超低延迟
YieldingWaitStrategy Thread.yield() 自旋 100 次后 yield 低 (1-10μs) 中高 延迟敏感,可共享 CPU
SleepingWaitStrategy yield 100 次 → park(1ns) 中 (50-100μs) 平衡延迟与 CPU
BlockingWaitStrategy lock.wait() 高 (>100μs) 最低 吞吐优先,延迟不敏感
LiteBlockingWaitStrategy 无锁 CAS + park 中低 减少锁竞争的 Blocking
// BusySpinWaitStrategy 伪代码
function waitFor(sequence, cursor, dependentSequence, barrier):
  availableSequence = -1
  loop:
    availableSequence = dependentSequence.value
    if availableSequence >= sequence:
      return availableSequence
    barrier.checkAlert()
    // 无 yield/sleep,纯自旋

// YieldingWaitStrategy 伪代码
function waitFor(sequence, cursor, dependentSequence, barrier):
  counter = 100    // 自旋次数阈值
  loop:
    availableSequence = dependentSequence.value
    if availableSequence >= sequence:
      return availableSequence
    barrier.checkAlert()
    if counter == 0:
      Thread.yield()
    else:
      counter = counter - 1

缓存行填充

False Sharing 问题

// 不填充时,多个 Sequence 可能在同一缓存行(64字节)
// CPU 0 写 sequence0 → 使 CPU 1 的 sequence1 缓存失效 → CPU 1 需重新加载
// 即使 CPU 1 只关心 sequence1,也因为同缓存行而受影响

缓存行填充伪代码

// 目标:让每个 Sequence 独占一个 CPU 缓存行(64 字节)
// value 是 long(8字节),填充 56 字节让 value 独占一行

class Sequence:
  // 前填充:7 个 long = 56 字节
  p1, p2, p3, p4, p5, p6, p7 : long = 7, 7, 7, 7, 7, 7, 7

  // 核心字段(volatile,8 字节)
  value : volatile long = -1

  // 后填充:7 个 long = 56 字节(防止 value 与下一对象的字段共缓存行)
  p8, p9, p10, p11, p12, p13, p14 : long = 7, 7, 7, 7, 7, 7, 7

// 内存布局(每行代表 8 字节):
// [p1][p2][p3][p4][p5][p6][p7][value]  ← 前 8 个 long = 64 字节 = 1 缓存行
// [p8][p9][p10][p11][p12][p13][p14][?] ← 后填充,防止下一对象侵入

Java 实现方式

// 方式 1:手动填充(Disruptor 原版)
abstract class RhsPadding extends Value {
    protected long p6, p7, p8, p9, p10, p11, p12, p13;
}

// 方式 2:@Contended 注解(JDK 8+,需 -XX:-RestrictContended)
@sun.misc.Contended
class Sequence {
    volatile long value;
}

多生产者模式

问题:单生产者用本地变量 nextValue 就够了,多生产者需要原子性地分配 sequence。

// MultiProducerSequencer
// 核心思路:cursor 记录已声明的最高 sequence,
// availableBuffer 标记每个槽位是否真正写完

function next(n):  // 申请 n 个连续 sequence
  current = cursor.value
  nextValue = current + n
  wrapPoint = nextValue - bufferSize

  // 背压:等待最慢消费者腾出空间
  cachedGating = cachedGatingSequence.value
  if wrapPoint > cachedGating or cachedGating > current:
    minGating = min(seq.value for seq in gatingSequences)
    cachedGatingSequence.set(minGating)
    while wrapPoint > cachedGatingSequence.value:
      LockSupport.parkNanos(1)

  // CAS 申请 sequence 范围
  while not cursor.compareAndSet(current, nextValue):
    current = cursor.value          // CAS 失败,重读
    nextValue = current + n
    wrapPoint = nextValue - bufferSize
    // 重新检查背压...

  return nextValue  // 返回申请到的最高 sequence,lo = nextValue - n + 1

function publish(sequence):
  // 标记该槽位已写完
  index = sequence & indexMask
  flag  = calculateAvailabilityFlag(sequence)  // = sequence >> log2(bufferSize)
  availableBuffer[index] = flag
  // 注意:不直接更新 cursor(cursor 只在 next() 中更新)

function isAvailable(sequence):
  index = sequence & indexMask
  flag  = sequence >> log2(bufferSize)
  return availableBuffer[index] == flag

// 消费者 waitFor 中调用 getHighestPublishedSequence:
function getHighestPublishedSequence(lowerBound, availableSequence):
  for seq = lowerBound to availableSequence:
    if not isAvailable(seq):
      return seq - 1    // 找到第一个未发布的槽,返回前一个
  return availableSequence

availableBuffer flag 的意义

bufferSize = 8,log2(8) = 3
sequence=8  → index=0,flag = 8 >> 3 = 1
sequence=16 → index=0,flag = 16 >> 3 = 2
// 通过 flag 区分同一槽位的不同"圈次",防止生产者超前误判

为什么快

原因 传统队列 Disruptor 收益
ReentrantLock 每次 put/take 无锁(CAS Sequence 或单生产者无 CAS) 消除线程阻塞与上下文切换
内存分配 每个消息 new 对象 预分配 entries[],循环复用 无 GC 停顿
缓存行 head/tail 共缓存行 Sequence 独占缓存行 消除 False Sharing
批量消费 每次取一个 waitFor 返回可用范围,批量处理 均摊等待开销
CPU 预测 随机访问链表 顺序访问数组,空间局部性好 CPU 预取缓存命中率高

异常与边界场景

场景 1:生产者速度超过消费者(背压)

// 触发条件:wrapPoint > minGatingSequence
// wrapPoint = nextSequence - bufferSize
// 即:生产者想写的位置,消费者还没消费完

bufferSize = 8
cursor = 8(已发布到 sequence 8)
消费者 Sequence = 1(只消费到 1)
nextSequence = 9 → wrapPoint = 9 - 8 = 1
wrapPoint(1) > consumerSeq(1)?  → 等于,不等待(边界:wrapPoint > 而非 >=)
nextSequence = 10 → wrapPoint = 2 > 1 → 触发等待

处理:
  Single: 自旋检查 cachedGatingSequence,直到消费者推进
  Multi:  CAS 失败后重试,同时检查背压
  用户可感知:publish() 调用被阻塞(背压自然传导)

场景 2:消费者 EventHandler 抛出异常

// 默认 ExceptionHandler:FatalExceptionHandler
// 行为:打印堆栈,重新抛出,停止 EventProcessor

// 自定义 ExceptionHandler:
class MyExceptionHandler implements ExceptionHandler<MyEvent>:
  function handleEventException(ex, sequence, event):
    log.error(f"处理 sequence={sequence} 时异常:{ex}")
    // 不重新抛出 → 消费者继续运行
    // 关键:sequence.lazySet(sequence) 必须执行(在 run() 的 catch 块中)
    // 否则消费者 Sequence 卡住,生产者因背压永远阻塞

场景 3:Sequence 值溢出

// Sequence 是 long,范围 0 到 9,223,372,036,854,775,807(约 9.2 * 10^18)
// 每秒 25,000,000 个事件 → 溢出时间 = 9.2*10^18 / 25*10^6 / 3600 / 24 / 365
//                          ≈ 11,676 年
// 结论:实际使用中不需要处理溢出,long 足够大

// 初始值设为 -1(RingBuffer.INITIAL_CURSOR_VALUE)
// 第一个 sequence = 0,消费者初始化为 -1
// sequence.value >= 0 时表示有效事件

场景 4:多消费者 Diamond 拓扑(消费者依赖)

// Producer → [ConsumerA, ConsumerB] → ConsumerC
// ConsumerC 必须等 A 和 B 都处理完同一 sequence 才能处理

SequenceBarrier for C = ringBuffer.newBarrier(sequenceA, sequenceB)
// waitFor 中:dependentSequence = min(sequenceA, sequenceB)
// C 只能消费到 min(A.sequence, B.sequence)

场景 5:RingBuffer 大小不是 2 的幂次

// 构造时强制检查
if Integer.bitCount(bufferSize) != 1:
  throw IllegalArgumentException("bufferSize 必须是 2 的幂次")
// 因为位掩码 indexMask = bufferSize - 1 只在 2^n 时等价于取模
// bufferSize=6, indexMask=5(0101): sequence=6 → 6&5=4 ≠ 6%6=0  // 错误!

执行追踪

场景:bufferSize=4,1 个生产者,1 个消费者,发布 3 个事件

初始状态:
  entries   = [E0, E1, E2, E3](预分配,大小=4,indexMask=3)
  cursor    = -1(AtomicLong)
  consumerSeq = -1

--- Publish 第 1 个事件(data="Hello")---
Step 1: nextSequence = nextValue + 1 = -1 + 1 = 0
        wrapPoint = 0 - 4 = -4
        -4 > cachedGatingSeq(-1)? → false,无需等待
        nextValue = 0

Step 2: index = 0 & 3 = 0
        entries[0] = E0(预分配对象)
        E0.data = "Hello"(translateTo 回调)

Step 3: cursor.lazySet(0)
        cursor = 0

--- Consume 第 1 个事件 ---
Step 1: nextConsumerSeq = consumerSeq + 1 = 0
        waitFor(0):cursor.value(0) >= 0 → 立即返回 availableSeq=0

Step 2: index = 0 & 3 = 0
        event = entries[0],data="Hello"
        handler.onEvent(E0, seq=0, endOfBatch=true)

Step 3: consumerSeq.lazySet(0),consumerSeq = 0

--- Publish 第 2 个事件(data="World")---
Step 1: nextSequence = 0 + 1 = 1
        wrapPoint = 1 - 4 = -3
        -3 > consumerSeq(0)? → false
        nextValue = 1

Step 2: index = 1 & 3 = 1
        E1.data = "World"

Step 3: cursor.lazySet(1),cursor = 1

--- Publish 第 3 个事件(data="!"),此时消费者未推进 ---
Step 1: nextSequence = 2,wrapPoint = -2 > 0? → false,继续

--- Publish 第 5 个事件(bufferSize=4,测试回绕)---
假设已发布 sequence 0-3,consumerSeq 仍在 -1(消费者极慢)
nextSequence = 4,wrapPoint = 4 - 4 = 0
cachedGatingSeq = consumerSeq = -1
0 > -1 → true,触发背压等待!

生产者自旋:
  loop: cachedGatingSeq = min(consumerSeq) = -1
        0 > -1 → 继续等待
  (消费者处理完 seq=0,consumerSeq=0)
        0 > 0? → false,退出等待

nextValue = 4,index = 4 & 3 = 0(回绕!复用 entries[0])
E0.data = "NewData"(覆盖旧数据)
cursor.lazySet(4)

最终 RingBuffer 状态:
  entries[0] = "NewData"(sequence 4 写入,覆盖了 "Hello")
  entries[1] = "World"
  entries[2] = ...
  entries[3] = ...
  cursor = 4,consumerSeq = 0

参考资料

← 返回列表

评论 (0)

暂无评论,来留下第一条吧。

发表评论