专栏文章
专栏文章
并发算法系列
1. 并发算法 #01:CAS 与 ABA 问题 2. 并发算法 #02:AQS(AbstractQueuedSynchronizer) 3. 并发算法 #03:MVCC(多版本并发控制) 4. 并发算法 #04:无锁队列(Michael-Scott Queue) 5. 并发算法 #05:Disruptor(无锁环形缓冲) 6. 并发算法 #06:RCU(Read-Copy-Update)

并发算法 #02:AQS(AbstractQueuedSynchronizer)

发布于 2026-06-02 03:03 👁 11 次阅读
#算法#并发#java#lock#engineering#aqs#synchronizer

AQS(AbstractQueuedSynchronizer)是 Java 并发包的核心基础框架,通过一个 volatile int state 字段和 CLH 变体双向队列,为 ReentrantLock、Semaphore、CountDownLatch 等同步器提供统一的线程排队、阻塞、唤醒机制。

相关文章CAS 与 ABA 问题 · MVCC(多版本并发控制) · 无锁队列(Michael-Scott Queue)


目录

章节 说明
问题背景 为什么需要 AQS
整体架构 state 字段 + CLH 队列
CLH Node 结构 waitStatus / prev / next / thread
acquire 完整伪代码 tryAcquire → addWaiter → acquireQueued
release 完整伪代码 tryRelease → unparkSuccessor
ReentrantLock 实现细节 公平锁 vs 非公平锁
Condition 机制 await → ConditionQueue,signal → AQS 队列
执行追踪 3 线程竞争 ReentrantLock 完整演示
异常与边界场景 CANCELLED 清理、中断、spurious wakeup
参考资料

问题背景

为什么需要 AQS

在 Java 5 之前,线程同步只有 synchronized 关键字,存在以下问题:

AQS 将"线程排队、阻塞、唤醒"这一通用机制提炼为抽象框架,同步器开发者只需实现 tryAcquire/tryRelease(或共享模式的 tryAcquireShared/tryReleaseShared),无需关心线程管理细节。

使用 AQS 的系统

同步器 state 语义 模式
ReentrantLock 重入次数(0=未锁,N=重入N次) 独占
Semaphore 剩余许可数 共享
CountDownLatch 剩余计数 共享
ReentrantReadWriteLock 高16位=读锁数,低16位=写锁重入数 混合
ThreadPoolExecutor.Worker 运行状态 独占

整体架构

aqs structure

┌─────────────────────────────────────────────────────┐
│              AbstractQueuedSynchronizer              │
│                                                     │
│  volatile int state          // 同步状态            │
│  Node head                   // CLH 队列头(哨兵)  │
│  Node tail                   // CLH 队列尾          │
│  Thread exclusiveOwnerThread // 当前持锁线程        │
└─────────────────────────────────────────────────────┘

state 字段语义(由子类定义):

ReentrantLock:
  state == 0          → 锁空闲
  state == N (N > 0)  → 当前线程已重入 N 次

Semaphore:
  state == N (N > 0)  → 还有 N 个许可
  state == 0          → 无许可,新线程需排队

CountDownLatch:
  state == N          → 还需 N 次 countDown
  state == 0          → 全部完成,等待的线程全部唤醒

volatile 保证的可见性:


CLH Node 结构

CLH(Craig, Landin, Hagersten)队列是一种基于链表的自旋锁队列,AQS 在其基础上改造为双向链表 + park/unpark 替代自旋。

Node 字段定义

static final class Node {
    // waitStatus 取值常量
    static final int CANCELLED =  1;  // 节点已取消(超时或中断)
    static final int SIGNAL    = -1;  // 后继节点需要被唤醒
    static final int CONDITION = -2;  // 节点在条件队列中等待
    static final int PROPAGATE = -3;  // 共享模式:传播唤醒

    volatile int waitStatus;  // 节点状态,默认 0
    volatile Node prev;       // 前驱节点
    volatile Node next;       // 后继节点
    volatile Thread thread;   // 持有此节点的线程
    Node nextWaiter;          // 条件队列中的下一个节点(或共享模式标记)
}

waitStatus 语义详解

常量 含义 谁设置 何时设置
0 (默认) 节点刚入队,状态未知 初始化
-1 SIGNAL 当前节点的后继需要被唤醒 后继节点入队时设置前驱 shouldParkAfterFailedAcquire
-2 CONDITION 节点在 ConditionObject 队列中 await() 线程调用 condition.await()
-3 PROPAGATE 共享模式下唤醒需继续传播 releaseShared Semaphore/CountDownLatch
1 CANCELLED 节点已取消,需从队列移除 线程自身 超时/中断后

队列示意图

head (哨兵)          Node-A            Node-B            tail
┌──────────┐    ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│thread=null│←prev│thread=T1    │←prev│thread=T2    │←prev│thread=T3    │
│ws=0      │next→│ws=SIGNAL(-1)│next→│ws=SIGNAL(-1)│next→│ws=0         │
└──────────┘    └──────────────┘  └──────────────┘  └──────────────┘

acquire 完整伪代码

4.1 顶层入口

function acquire(arg):
    // 第一次尝试快速获取(不入队)
    if tryAcquire(arg) == true:
        return  // 获取成功,直接返回

    // 获取失败,将当前线程包装为 Node 加入等待队列
    node = addWaiter(EXCLUSIVE)

    // 在队列中自旋等待,直到获取成功
    interrupted = acquireQueued(node, arg)

    // 如果等待期间被中断,重置中断标志
    if interrupted:
        Thread.currentThread().interrupt()

4.2 addWaiter:将线程包装为 Node 入队

function addWaiter(mode):
    node = new Node(Thread.currentThread(), mode)
    // 快速路径:尝试直接 CAS 追加到队尾
    pred = tail
    if pred != null:
        node.prev = pred
        if CAS(tail, pred, node) == success:
            pred.next = node
            return node

    // 快速路径失败,进入完整的 enq 自旋
    enq(node)
    return node

function enq(node):
    loop forever:
        t = tail
        if t == null:
            // 队列未初始化,创建哨兵 head
            sentinel = new Node()
            if CAS(head, null, sentinel) == success:
                tail = sentinel
            // 继续循环(下次循环 t != null,走 else 分支)
        else:
            node.prev = t
            if CAS(tail, t, node) == success:
                t.next = node
                return t  // 返回前驱
            // CAS 失败说明有竞争,继续循环

4.3 acquireQueued:在队列中自旋等待

function acquireQueued(node, arg):
    failed = true
    interrupted = false
    try:
        loop forever:
            p = node.predecessor()  // 获取前驱节点

            // 关键条件:只有前驱是 head,才有资格尝试获取锁
            if p == head AND tryAcquire(arg) == true:
                // 获取成功,将自己设为新的 head(自己成为哨兵)
                setHead(node)       // node.thread = null; node.prev = null
                p.next = null       // 断开旧 head 的 next,帮助 GC
                failed = false
                return interrupted

            // 判断是否应该 park(检查前驱 waitStatus)
            if shouldParkAfterFailedAcquire(p, node):
                // 真正 park,当前线程阻塞
                interrupted = parkAndCheckInterrupt() OR interrupted

    finally:
        if failed:
            cancelAcquire(node)  // 异常退出时取消节点

function shouldParkAfterFailedAcquire(pred, node):
    ws = pred.waitStatus
    if ws == SIGNAL:
        return true  // 前驱已标记会唤醒我,可以安全 park

    if ws > 0:  // CANCELLED
        // 跳过所有已取消的前驱,找到第一个有效前驱
        loop:
            pred = pred.prev
            node.prev = pred
            if pred.waitStatus <= 0:
                break
        pred.next = node
        return false  // 重新检查

    // ws == 0 或 PROPAGATE:将前驱设为 SIGNAL,表示它需要唤醒后继
    CAS(pred.waitStatus, ws, SIGNAL)
    return false  // 重新检查(不立即 park,再给一次获取机会)

function parkAndCheckInterrupt():
    LockSupport.park(this)  // 线程在此阻塞,等待 unpark 或中断
    return Thread.interrupted()  // 返回并清除中断标志

4.4 setHead:成功获取后的收尾

function setHead(node):
    head = node
    node.thread = null   // 哨兵不持有线程引用
    node.prev = null     // 断开与旧 head 的连接
    // 注意:不修改 node.waitStatus,由 release 时处理

release 完整伪代码

5.1 顶层入口

function release(arg):
    if tryRelease(arg) == true:
        h = head
        // head 不为 null 且 waitStatus != 0,说明有线程在等待
        if h != null AND h.waitStatus != 0:
            unparkSuccessor(h)
        return true
    return false

5.2 unparkSuccessor:唤醒后继线程

function unparkSuccessor(node):
    ws = node.waitStatus
    if ws < 0:
        // 将 head 的 waitStatus 置回 0(不再需要 SIGNAL 标记)
        CAS(node.waitStatus, ws, 0)

    s = node.next  // 尝试直接唤醒下一个节点

    // 如果下一个节点为 null 或已取消,从 tail 向前找最近的有效节点
    if s == null OR s.waitStatus > 0:
        s = null
        t = tail
        // 从尾部向前扫描,找到最靠近 head 的非 CANCELLED 节点
        while t != null AND t != node:
            if t.waitStatus <= 0:
                s = t
            t = t.prev

    if s != null:
        LockSupport.unpark(s.thread)  // 唤醒目标线程

为什么从 tail 向前找,而不是从 head 向前找?

因为 addWaiter 中设置 next 指针不是原子的:

node.prev = pred          // ① 先设置 prev(volatile 写)
CAS(tail, pred, node)     // ② 再 CAS 更新 tail
pred.next = node          // ③ 最后设置 next(非原子)

步骤 ③ 尚未完成时,如果此时 release 顺着 next 遍历,可能看到 node.next == null,误认为没有后继。但 prev 在 CAS 之前已经设置,所以从 tail 向前遍历通过 prev 指针是安全的。


ReentrantLock 实现细节

6.1 非公平锁 tryAcquire

function NonfairSync.tryAcquire(acquires):
    return nonfairTryAcquire(acquires)

function nonfairTryAcquire(acquires):
    current = Thread.currentThread()
    c = getState()  // 读取 volatile state

    if c == 0:
        // 锁空闲:不检查队列,直接 CAS 抢锁(可能插队)
        if CAS(state, 0, acquires) == success:
            setExclusiveOwnerThread(current)
            return true

    else if current == getExclusiveOwnerThread():
        // 重入:当前线程已持锁,增加重入次数
        nextc = c + acquires
        if nextc < 0:  // 整数溢出检查
            throw Error("Maximum lock count exceeded")
        setState(nextc)  // 直接写,无需 CAS(当前线程独占)
        return true

    return false

6.2 公平锁 tryAcquire

function FairSync.tryAcquire(acquires):
    current = Thread.currentThread()
    c = getState()

    if c == 0:
        // 关键区别:检查队列中是否有前驱等待者
        if NOT hasQueuedPredecessors() AND CAS(state, 0, acquires) == success:
            setExclusiveOwnerThread(current)
            return true

    else if current == getExclusiveOwnerThread():
        nextc = c + acquires
        if nextc < 0:
            throw Error("Maximum lock count exceeded")
        setState(nextc)
        return true

    return false

6.3 hasQueuedPredecessors() 的作用

function hasQueuedPredecessors():
    t = tail
    h = head
    s = null

    // 队列非空(h != t),且:
    // head 的 next 为 null(节点正在入队中),或者 head.next 的线程不是当前线程
    return h != t AND
           ((s = h.next) == null OR s.thread != Thread.currentThread())
场景 返回值 结果
队列为空(h == t) false 可以抢锁
队列非空且我是第一个等待者(h.next.thread == me) false 可以抢锁
队列非空且我不是第一个等待者 true 不能抢锁,需排队

核心意义:公平锁保证"先到先得"——如果队列中已有其他线程在等待,新来的线程不能插队,必须排到队尾。

6.4 tryRelease

function ReentrantLock.Sync.tryRelease(releases):
    c = getState() - releases
    if Thread.currentThread() != getExclusiveOwnerThread():
        throw IllegalMonitorStateException("Not the lock owner")

    free = false
    if c == 0:
        // 重入计数归零,真正释放锁
        free = true
        setExclusiveOwnerThread(null)

    setState(c)  // 写 volatile,即使 free=false 也更新重入计数
    return free  // 只有 c==0 时才返回 true,触发 unparkSuccessor

Condition 机制

7.1 ConditionObject 数据结构

class ConditionObject:
    Node firstWaiter  // 条件队列头
    Node lastWaiter   // 条件队列尾
    // 注意:条件队列是单向链表(使用 Node.nextWaiter)
    // AQS 主队列是双向链表(使用 Node.prev/next)

7.2 await:从锁 → 条件队列

function ConditionObject.await():
    if Thread.interrupted():
        throw InterruptedException

    node = addConditionWaiter()   // 创建 CONDITION 节点加入条件队列
    savedState = fullyRelease(node)  // 完全释放锁(包括重入次数),返回释放前的 state 值

    interruptMode = 0
    while NOT isOnSyncQueue(node):  // 节点还在条件队列(未被 signal 转移)
        LockSupport.park(this)       // 阻塞

        if checkInterruptWhileWaiting(node) != 0:
            interruptMode = ...
            break

    // 被 signal 转移到 AQS 队列后,重新竞争锁
    acquireQueued(node, savedState)

    if interruptMode != 0:
        reportInterruptAfterWait(interruptMode)

function addConditionWaiter():
    t = lastWaiter
    // 清理条件队列末尾的已取消节点
    if t != null AND t.waitStatus != CONDITION:
        unlinkCancelledWaiters()
        t = lastWaiter
    node = new Node(Thread.currentThread(), CONDITION)  // waitStatus = -2
    if t == null:
        firstWaiter = node
    else:
        t.nextWaiter = node  // 单向链表追加
    lastWaiter = node
    return node

7.3 signal:条件队列 → AQS 队列

function ConditionObject.signal():
    if NOT isHeldExclusively():
        throw IllegalMonitorStateException

    first = firstWaiter
    if first != null:
        doSignal(first)

function doSignal(first):
    loop:
        // 从条件队列头部摘除节点
        firstWaiter = first.nextWaiter
        first.nextWaiter = null
        if firstWaiter == null:
            lastWaiter = null

        // 将节点转移到 AQS 队列(transferForSignal)
        if transferForSignal(first):
            break
        first = firstWaiter
        if first == null:
            break

function transferForSignal(node):
    // 将 waitStatus 从 CONDITION 改为 0
    if NOT CAS(node.waitStatus, CONDITION, 0):
        return false  // 节点已取消

    // 加入 AQS 等待队列(enq 返回前驱节点)
    p = enq(node)

    // 设置前驱的 waitStatus 为 SIGNAL,确保它能唤醒 node
    ws = p.waitStatus
    if ws > 0 OR NOT CAS(p.waitStatus, ws, SIGNAL):
        // 前驱已取消或 CAS 失败,直接唤醒 node(它会在 acquireQueued 中重新处理)
        LockSupport.unpark(node.thread)

    return true

7.4 await/signal 完整流程图

线程 T1 持锁,调用 await():
  T1: 创建 CONDITION Node → 加入条件队列
  T1: fullyRelease() → state=0,唤醒 AQS 队列中的 T2
  T1: park(),等待在条件队列

线程 T2 获得锁,调用 signal():
  T2: 从条件队列取出 T1 的 Node
  T2: enq(T1_node) → 将 T1_node 加入 AQS 等待队列
  T2: 设置 T1_node 前驱的 waitStatus = SIGNAL

线程 T2 释放锁:
  T2: unparkSuccessor() → unpark T1
  T1: 醒来,在 acquireQueued 中重新竞争锁
  T1: 获得锁后,从 await() 返回

执行追踪

场景:3 个线程竞争 ReentrantLock(非公平)

初始状态: state=0, head=null, tail=null


步骤 1:T1 调用 lock.lock()

acquire(1):
  tryAcquire(1):
    state == 0 → CAS(state, 0, 1) → 成功
    setExclusiveOwnerThread(T1)
    return true
结果: state=1, owner=T1, 队列为空

步骤 2:T2 调用 lock.lock()

acquire(1):
  tryAcquire(1):
    state == 1 (非0) → T2 != T1 → return false

  addWaiter(EXCLUSIVE):
    tail == null → enq(Node-T2):
      CAS(head, null, Sentinel)  → head = Sentinel
      tail = Sentinel
      node.prev = Sentinel
      CAS(tail, Sentinel, Node-T2) → tail = Node-T2
      Sentinel.next = Node-T2
    return Node-T2

  acquireQueued(Node-T2, 1):
    循环1:
      p = Node-T2.prev = Sentinel = head → tryAcquire(1):
        state == 1 (非0) → return false
      shouldParkAfterFailedAcquire(Sentinel, Node-T2):
        ws = Sentinel.waitStatus = 0 → CAS(Sentinel.ws, 0, SIGNAL=-1)
        return false  // 重新循环
    循环2:
      p = head → tryAcquire(1): 失败
      shouldPark...: ws = SIGNAL → return true
      parkAndCheckInterrupt() → T2 阻塞

状态: state=1, owner=T1
      Sentinel(ws=-1) ←→ Node-T2(ws=0,thread=T2)
      head=Sentinel, tail=Node-T2

步骤 3:T3 调用 lock.lock()

acquire(1):
  tryAcquire(1): state==1, T3!=T1 → false

  addWaiter(EXCLUSIVE):
    tail = Node-T2 (非null)
    Node-T3.prev = Node-T2
    CAS(tail, Node-T2, Node-T3) → 成功
    Node-T2.next = Node-T3
    return Node-T3

  acquireQueued(Node-T3, 1):
    循环1:
      p = Node-T2 ≠ head → 不尝试 tryAcquire
      shouldPark(Node-T2, Node-T3):
        ws = Node-T2.waitStatus = 0 → CAS(ws, 0, SIGNAL)
        return false
    循环2:
      p = Node-T2 ≠ head → 跳过
      shouldPark: ws = SIGNAL → return true
      T3 阻塞

状态: Sentinel(ws=-1) ←→ Node-T2(ws=-1,thread=T2) ←→ Node-T3(ws=0,thread=T3)

步骤 4:T1 调用 lock.unlock()

release(1):
  tryRelease(1):
    c = state - 1 = 0
    setExclusiveOwnerThread(null)
    setState(0)
    return true  // c==0

  h = head = Sentinel, h.waitStatus = -1 ≠ 0
  unparkSuccessor(Sentinel):
    CAS(Sentinel.ws, -1, 0)
    s = Sentinel.next = Node-T2  // ws=0, 非CANCELLED
    LockSupport.unpark(T2)

步骤 5:T2 被唤醒,重新竞争

T2 从 parkAndCheckInterrupt() 返回
acquireQueued 循环:
  p = Node-T2.prev = Sentinel = head
  tryAcquire(1):
    state == 0 → CAS(state, 0, 1) → 成功
    setExclusiveOwnerThread(T2)
    return true
  setHead(Node-T2):  // Node-T2 成为新的哨兵
    head = Node-T2
    Node-T2.thread = null
    Node-T2.prev = null
  Sentinel.next = null  // GC

状态: state=1, owner=T2
      Node-T2(ws=-1,thread=null) ←→ Node-T3(ws=0,thread=T3)
      head=Node-T2, tail=Node-T3

步骤 6:T2 释放锁,T3 同理被唤醒获锁(过程同步骤 4-5)


异常与边界场景

场景 1:CANCELLED 节点清理

触发条件: lockInterruptibly() 超时,或 tryLock(timeout) 等待超时

cancelAcquire(node):
    node.thread = null

    // 跳过 CANCELLED 前驱,找到有效前驱 pred
    pred = node.prev
    while pred.waitStatus > 0:  // CANCELLED
        pred = pred.prev
    node.prev = pred  // 更新 prev 跳过中间 CANCELLED 节点

    predNext = pred.next

    // 将自身标记为 CANCELLED(waitStatus = 1)
    node.waitStatus = CANCELLED  // 普通写,不需要 CAS(只有自己改自己)

    // 如果自己是 tail,CAS 移除自己
    if node == tail AND CAS(tail, node, pred):
        CAS(pred.next, predNext, null)

    // 如果前驱是 head,直接唤醒后继(让后继重新入队竞争)
    else if pred == head:
        unparkSuccessor(node)

    // 普通情况:将前驱的 waitStatus 设为 SIGNAL,并链接后继
    else:
        ws = pred.waitStatus
        if (ws == SIGNAL OR CAS(pred.waitStatus, ws, SIGNAL)) AND pred.thread != null:
            next = node.next
            if next != null AND next.waitStatus <= 0:
                CAS(pred.next, predNext, next)  // 跳过自己
        else:
            unparkSuccessor(node)

视觉示意:

CANCELLED 节点清理前:
Sentinel ←→ Node-A(SIGNAL) ←→ Node-B(CANCELLED) ←→ Node-C(0)

清理后(Node-B 取消):
Sentinel ←→ Node-A(SIGNAL) ←→ Node-C(0)
(Node-A.next = Node-C, Node-C.prev = Node-A)

场景 2:中断处理(lockInterruptibly vs lock)

// lock():不响应中断,只记录中断标志
acquireQueued(node, 1):
    loop:
        ...
        interrupted = parkAndCheckInterrupt() || interrupted
        // 即使被中断,也继续等待获取锁
    return interrupted  // 最终由调用者重置中断标志

// lockInterruptibly():立即响应中断
acquireInterruptibly(1):
    if Thread.interrupted():
        throw InterruptedException
    if NOT tryAcquire(1):
        doAcquireInterruptibly(1)  // park 中被中断,立即 cancelAcquire 并抛出异常

区别总结:

场景 3:Spurious Wakeup(虚假唤醒)

LockSupport.park 可能被虚假唤醒(无 unpark,无中断,但线程醒来):

// acquireQueued 的循环结构正好处理虚假唤醒:
loop forever:
    p = node.predecessor()
    if p == head AND tryAcquire(arg):  // 虚假唤醒 → tryAcquire 大概率失败
        ...
    if shouldParkAfterFailedAcquire(p, node):  // 重新检查是否应该 park
        parkAndCheckInterrupt()  // 再次 park
    // 循环保证:即使虚假唤醒,也会再次尝试获取锁或重新 park

虚假唤醒不会导致 AQS 错误,循环结构是天然防御

场景 4:公平锁中的竞争边缘

// 时间线:
T1: 持锁,准备 unlock()
T2: 在队列中等待,是 head.next
T3: 刚调用 lock(),tryAcquire 中检查 hasQueuedPredecessors()

T1 执行 setState(0)(volatile 写,立即可见)
T3 读 state==0,检查 hasQueuedPredecessors():
  h != t(队列非空)
  h.next = Node-T2,Node-T2.thread = T2 ≠ T3
  → return true → T3 不抢锁,走 addWaiter

T1 执行 unparkSuccessor → unpark T2
T2 获得锁

// 公平性保证:T3 虽然此时看到锁空闲,但必须排队等待 T2

场景 5:重入计数溢出

tryAcquire(1):
    nextc = c + acquires  // c 已经是 Integer.MAX_VALUE
    if nextc < 0:  // 整数溢出变为负数
        throw new Error("Maximum lock count exceeded")

实际意义:单个线程最多重入 Integer.MAX_VALUE(约 21 亿)次,理论上不会触发,但 JDK 有此防御。

场景 6:Node-T2 在 signal 和 cancel 并发时的竞争

// T2 在条件队列中,此时 T3 调用 signal(T2),同时 T2 被中断

signal 路径: CAS(T2.waitStatus, CONDITION, 0) → 成功
  → enq(T2) → T2 进入 AQS 队列

cancel/中断路径: CAS(T2.waitStatus, CONDITION, CANCELLED) → 失败(已被 signal 改为 0)
  → T2 已经在 AQS 队列中,走 cancelAcquire

// 结论:signal 和 cancel 互斥,通过 CAS 保证只有一方成功
// 如果 signal 先成功,中断发生在 acquire 阶段,抛出 InterruptedException
// 如果 cancel 先成功(中断在 signal 之前),节点被清理出条件队列

参考资料

  1. JDK 源码 - java.util.concurrent.locks.AbstractQueuedSynchronizer(OpenJDK 17)
  2. CLH 锁原论文 - Craig, Landin, Hagersten, "Building FIFO and Priority-Queuing Spin Locks from Atomic Swap" (1993)
  3. Doug Lea 设计文档 - "The java.util.concurrent Synchronizer Framework" (2004) - JSR 166 专家组文档
  4. 《Java 并发编程的艺术》 - 方腾飞、魏鹏、程晓明,第5章 Java 中的锁
  5. 《Java 并发编程实战》 - Brian Goetz 等著,第13章 显式锁
← 返回列表

评论 (0)

暂无评论,来留下第一条吧。

发表评论