AQS(AbstractQueuedSynchronizer)是 Java 并发包的核心基础框架,通过一个 volatile int state 字段和 CLH 变体双向队列,为 ReentrantLock、Semaphore、CountDownLatch 等同步器提供统一的线程排队、阻塞、唤醒机制。
相关文章:CAS 与 ABA 问题 · MVCC(多版本并发控制) · 无锁队列(Michael-Scott Queue)
目录
| 章节 | 说明 |
|---|---|
| 问题背景 | 为什么需要 AQS |
| 整体架构 | state 字段 + CLH 队列 |
| CLH Node 结构 | waitStatus / prev / next / thread |
| acquire 完整伪代码 | tryAcquire → addWaiter → acquireQueued |
| release 完整伪代码 | tryRelease → unparkSuccessor |
| ReentrantLock 实现细节 | 公平锁 vs 非公平锁 |
| Condition 机制 | await → ConditionQueue,signal → AQS 队列 |
| 执行追踪 | 3 线程竞争 ReentrantLock 完整演示 |
| 异常与边界场景 | CANCELLED 清理、中断、spurious wakeup |
| 参考资料 |
问题背景
为什么需要 AQS
在 Java 5 之前,线程同步只有 synchronized 关键字,存在以下问题:
- 无法中断等待:线程在
synchronized中等待锁时,无法响应中断 - 无法设置超时:无法指定等待时间,只能无限阻塞
- 无法实现非公平/公平策略切换:
synchronized固定为非公平 - 无法实现条件等待的精细控制:只有一个等待队列(
wait/notifyAll) - 重复造轮子:每个同步原语(信号量、闭锁、读写锁)都需要重新实现线程排队逻辑
AQS 将"线程排队、阻塞、唤醒"这一通用机制提炼为抽象框架,同步器开发者只需实现 tryAcquire/tryRelease(或共享模式的 tryAcquireShared/tryReleaseShared),无需关心线程管理细节。
使用 AQS 的系统
| 同步器 | state 语义 | 模式 |
|---|---|---|
ReentrantLock |
重入次数(0=未锁,N=重入N次) | 独占 |
Semaphore |
剩余许可数 | 共享 |
CountDownLatch |
剩余计数 | 共享 |
ReentrantReadWriteLock |
高16位=读锁数,低16位=写锁重入数 | 混合 |
ThreadPoolExecutor.Worker |
运行状态 | 独占 |
整体架构
┌─────────────────────────────────────────────────────┐
│ AbstractQueuedSynchronizer │
│ │
│ volatile int state // 同步状态 │
│ Node head // CLH 队列头(哨兵) │
│ Node tail // CLH 队列尾 │
│ Thread exclusiveOwnerThread // 当前持锁线程 │
└─────────────────────────────────────────────────────┘
state 字段语义(由子类定义):
ReentrantLock:
state == 0 → 锁空闲
state == N (N > 0) → 当前线程已重入 N 次
Semaphore:
state == N (N > 0) → 还有 N 个许可
state == 0 → 无许可,新线程需排队
CountDownLatch:
state == N → 还需 N 次 countDown
state == 0 → 全部完成,等待的线程全部唤醒
volatile 保证的可见性:
state的每次 CAS 写入都立即对所有线程可见- head/tail 指针变更通过 volatile 保证可见性
- park/unpark 内部通过
UNSAFE.park实现,JVM 保证内存屏障
CLH Node 结构
CLH(Craig, Landin, Hagersten)队列是一种基于链表的自旋锁队列,AQS 在其基础上改造为双向链表 + park/unpark 替代自旋。
Node 字段定义
static final class Node {
// waitStatus 取值常量
static final int CANCELLED = 1; // 节点已取消(超时或中断)
static final int SIGNAL = -1; // 后继节点需要被唤醒
static final int CONDITION = -2; // 节点在条件队列中等待
static final int PROPAGATE = -3; // 共享模式:传播唤醒
volatile int waitStatus; // 节点状态,默认 0
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 持有此节点的线程
Node nextWaiter; // 条件队列中的下一个节点(或共享模式标记)
}
waitStatus 语义详解
| 值 | 常量 | 含义 | 谁设置 | 何时设置 |
|---|---|---|---|---|
| 0 | (默认) | 节点刚入队,状态未知 | — | 初始化 |
| -1 | SIGNAL | 当前节点的后继需要被唤醒 | 后继节点入队时设置前驱 | shouldParkAfterFailedAcquire |
| -2 | CONDITION | 节点在 ConditionObject 队列中 | await() |
线程调用 condition.await() |
| -3 | PROPAGATE | 共享模式下唤醒需继续传播 | releaseShared |
Semaphore/CountDownLatch |
| 1 | CANCELLED | 节点已取消,需从队列移除 | 线程自身 | 超时/中断后 |
队列示意图
head (哨兵) Node-A Node-B tail
┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│thread=null│←prev│thread=T1 │←prev│thread=T2 │←prev│thread=T3 │
│ws=0 │next→│ws=SIGNAL(-1)│next→│ws=SIGNAL(-1)│next→│ws=0 │
└──────────┘ └──────────────┘ └──────────────┘ └──────────────┘
- head 是哨兵节点(dummy node),不持有真实线程(
thread=null) - 每个节点的
waitStatus描述的是它的后继是否需要被唤醒(SIGNAL = -1 表示"我释放锁后需要唤醒后继") - prev 指针是 volatile,保证前驱链的可见性(用于 CANCELLED 节点清理时跳过)
acquire 完整伪代码
4.1 顶层入口
function acquire(arg):
// 第一次尝试快速获取(不入队)
if tryAcquire(arg) == true:
return // 获取成功,直接返回
// 获取失败,将当前线程包装为 Node 加入等待队列
node = addWaiter(EXCLUSIVE)
// 在队列中自旋等待,直到获取成功
interrupted = acquireQueued(node, arg)
// 如果等待期间被中断,重置中断标志
if interrupted:
Thread.currentThread().interrupt()
4.2 addWaiter:将线程包装为 Node 入队
function addWaiter(mode):
node = new Node(Thread.currentThread(), mode)
// 快速路径:尝试直接 CAS 追加到队尾
pred = tail
if pred != null:
node.prev = pred
if CAS(tail, pred, node) == success:
pred.next = node
return node
// 快速路径失败,进入完整的 enq 自旋
enq(node)
return node
function enq(node):
loop forever:
t = tail
if t == null:
// 队列未初始化,创建哨兵 head
sentinel = new Node()
if CAS(head, null, sentinel) == success:
tail = sentinel
// 继续循环(下次循环 t != null,走 else 分支)
else:
node.prev = t
if CAS(tail, t, node) == success:
t.next = node
return t // 返回前驱
// CAS 失败说明有竞争,继续循环
4.3 acquireQueued:在队列中自旋等待
function acquireQueued(node, arg):
failed = true
interrupted = false
try:
loop forever:
p = node.predecessor() // 获取前驱节点
// 关键条件:只有前驱是 head,才有资格尝试获取锁
if p == head AND tryAcquire(arg) == true:
// 获取成功,将自己设为新的 head(自己成为哨兵)
setHead(node) // node.thread = null; node.prev = null
p.next = null // 断开旧 head 的 next,帮助 GC
failed = false
return interrupted
// 判断是否应该 park(检查前驱 waitStatus)
if shouldParkAfterFailedAcquire(p, node):
// 真正 park,当前线程阻塞
interrupted = parkAndCheckInterrupt() OR interrupted
finally:
if failed:
cancelAcquire(node) // 异常退出时取消节点
function shouldParkAfterFailedAcquire(pred, node):
ws = pred.waitStatus
if ws == SIGNAL:
return true // 前驱已标记会唤醒我,可以安全 park
if ws > 0: // CANCELLED
// 跳过所有已取消的前驱,找到第一个有效前驱
loop:
pred = pred.prev
node.prev = pred
if pred.waitStatus <= 0:
break
pred.next = node
return false // 重新检查
// ws == 0 或 PROPAGATE:将前驱设为 SIGNAL,表示它需要唤醒后继
CAS(pred.waitStatus, ws, SIGNAL)
return false // 重新检查(不立即 park,再给一次获取机会)
function parkAndCheckInterrupt():
LockSupport.park(this) // 线程在此阻塞,等待 unpark 或中断
return Thread.interrupted() // 返回并清除中断标志
4.4 setHead:成功获取后的收尾
function setHead(node):
head = node
node.thread = null // 哨兵不持有线程引用
node.prev = null // 断开与旧 head 的连接
// 注意:不修改 node.waitStatus,由 release 时处理
release 完整伪代码
5.1 顶层入口
function release(arg):
if tryRelease(arg) == true:
h = head
// head 不为 null 且 waitStatus != 0,说明有线程在等待
if h != null AND h.waitStatus != 0:
unparkSuccessor(h)
return true
return false
5.2 unparkSuccessor:唤醒后继线程
function unparkSuccessor(node):
ws = node.waitStatus
if ws < 0:
// 将 head 的 waitStatus 置回 0(不再需要 SIGNAL 标记)
CAS(node.waitStatus, ws, 0)
s = node.next // 尝试直接唤醒下一个节点
// 如果下一个节点为 null 或已取消,从 tail 向前找最近的有效节点
if s == null OR s.waitStatus > 0:
s = null
t = tail
// 从尾部向前扫描,找到最靠近 head 的非 CANCELLED 节点
while t != null AND t != node:
if t.waitStatus <= 0:
s = t
t = t.prev
if s != null:
LockSupport.unpark(s.thread) // 唤醒目标线程
为什么从 tail 向前找,而不是从 head 向前找?
因为 addWaiter 中设置 next 指针不是原子的:
node.prev = pred // ① 先设置 prev(volatile 写)
CAS(tail, pred, node) // ② 再 CAS 更新 tail
pred.next = node // ③ 最后设置 next(非原子)
步骤 ③ 尚未完成时,如果此时 release 顺着 next 遍历,可能看到 node.next == null,误认为没有后继。但 prev 在 CAS 之前已经设置,所以从 tail 向前遍历通过 prev 指针是安全的。
ReentrantLock 实现细节
6.1 非公平锁 tryAcquire
function NonfairSync.tryAcquire(acquires):
return nonfairTryAcquire(acquires)
function nonfairTryAcquire(acquires):
current = Thread.currentThread()
c = getState() // 读取 volatile state
if c == 0:
// 锁空闲:不检查队列,直接 CAS 抢锁(可能插队)
if CAS(state, 0, acquires) == success:
setExclusiveOwnerThread(current)
return true
else if current == getExclusiveOwnerThread():
// 重入:当前线程已持锁,增加重入次数
nextc = c + acquires
if nextc < 0: // 整数溢出检查
throw Error("Maximum lock count exceeded")
setState(nextc) // 直接写,无需 CAS(当前线程独占)
return true
return false
6.2 公平锁 tryAcquire
function FairSync.tryAcquire(acquires):
current = Thread.currentThread()
c = getState()
if c == 0:
// 关键区别:检查队列中是否有前驱等待者
if NOT hasQueuedPredecessors() AND CAS(state, 0, acquires) == success:
setExclusiveOwnerThread(current)
return true
else if current == getExclusiveOwnerThread():
nextc = c + acquires
if nextc < 0:
throw Error("Maximum lock count exceeded")
setState(nextc)
return true
return false
6.3 hasQueuedPredecessors() 的作用
function hasQueuedPredecessors():
t = tail
h = head
s = null
// 队列非空(h != t),且:
// head 的 next 为 null(节点正在入队中),或者 head.next 的线程不是当前线程
return h != t AND
((s = h.next) == null OR s.thread != Thread.currentThread())
| 场景 | 返回值 | 结果 |
|---|---|---|
| 队列为空(h == t) | false | 可以抢锁 |
| 队列非空且我是第一个等待者(h.next.thread == me) | false | 可以抢锁 |
| 队列非空且我不是第一个等待者 | true | 不能抢锁,需排队 |
核心意义:公平锁保证"先到先得"——如果队列中已有其他线程在等待,新来的线程不能插队,必须排到队尾。
6.4 tryRelease
function ReentrantLock.Sync.tryRelease(releases):
c = getState() - releases
if Thread.currentThread() != getExclusiveOwnerThread():
throw IllegalMonitorStateException("Not the lock owner")
free = false
if c == 0:
// 重入计数归零,真正释放锁
free = true
setExclusiveOwnerThread(null)
setState(c) // 写 volatile,即使 free=false 也更新重入计数
return free // 只有 c==0 时才返回 true,触发 unparkSuccessor
Condition 机制
7.1 ConditionObject 数据结构
class ConditionObject:
Node firstWaiter // 条件队列头
Node lastWaiter // 条件队列尾
// 注意:条件队列是单向链表(使用 Node.nextWaiter)
// AQS 主队列是双向链表(使用 Node.prev/next)
7.2 await:从锁 → 条件队列
function ConditionObject.await():
if Thread.interrupted():
throw InterruptedException
node = addConditionWaiter() // 创建 CONDITION 节点加入条件队列
savedState = fullyRelease(node) // 完全释放锁(包括重入次数),返回释放前的 state 值
interruptMode = 0
while NOT isOnSyncQueue(node): // 节点还在条件队列(未被 signal 转移)
LockSupport.park(this) // 阻塞
if checkInterruptWhileWaiting(node) != 0:
interruptMode = ...
break
// 被 signal 转移到 AQS 队列后,重新竞争锁
acquireQueued(node, savedState)
if interruptMode != 0:
reportInterruptAfterWait(interruptMode)
function addConditionWaiter():
t = lastWaiter
// 清理条件队列末尾的已取消节点
if t != null AND t.waitStatus != CONDITION:
unlinkCancelledWaiters()
t = lastWaiter
node = new Node(Thread.currentThread(), CONDITION) // waitStatus = -2
if t == null:
firstWaiter = node
else:
t.nextWaiter = node // 单向链表追加
lastWaiter = node
return node
7.3 signal:条件队列 → AQS 队列
function ConditionObject.signal():
if NOT isHeldExclusively():
throw IllegalMonitorStateException
first = firstWaiter
if first != null:
doSignal(first)
function doSignal(first):
loop:
// 从条件队列头部摘除节点
firstWaiter = first.nextWaiter
first.nextWaiter = null
if firstWaiter == null:
lastWaiter = null
// 将节点转移到 AQS 队列(transferForSignal)
if transferForSignal(first):
break
first = firstWaiter
if first == null:
break
function transferForSignal(node):
// 将 waitStatus 从 CONDITION 改为 0
if NOT CAS(node.waitStatus, CONDITION, 0):
return false // 节点已取消
// 加入 AQS 等待队列(enq 返回前驱节点)
p = enq(node)
// 设置前驱的 waitStatus 为 SIGNAL,确保它能唤醒 node
ws = p.waitStatus
if ws > 0 OR NOT CAS(p.waitStatus, ws, SIGNAL):
// 前驱已取消或 CAS 失败,直接唤醒 node(它会在 acquireQueued 中重新处理)
LockSupport.unpark(node.thread)
return true
7.4 await/signal 完整流程图
线程 T1 持锁,调用 await():
T1: 创建 CONDITION Node → 加入条件队列
T1: fullyRelease() → state=0,唤醒 AQS 队列中的 T2
T1: park(),等待在条件队列
线程 T2 获得锁,调用 signal():
T2: 从条件队列取出 T1 的 Node
T2: enq(T1_node) → 将 T1_node 加入 AQS 等待队列
T2: 设置 T1_node 前驱的 waitStatus = SIGNAL
线程 T2 释放锁:
T2: unparkSuccessor() → unpark T1
T1: 醒来,在 acquireQueued 中重新竞争锁
T1: 获得锁后,从 await() 返回
执行追踪
场景:3 个线程竞争 ReentrantLock(非公平)
初始状态: state=0, head=null, tail=null
步骤 1:T1 调用 lock.lock()
acquire(1):
tryAcquire(1):
state == 0 → CAS(state, 0, 1) → 成功
setExclusiveOwnerThread(T1)
return true
结果: state=1, owner=T1, 队列为空
步骤 2:T2 调用 lock.lock()
acquire(1):
tryAcquire(1):
state == 1 (非0) → T2 != T1 → return false
addWaiter(EXCLUSIVE):
tail == null → enq(Node-T2):
CAS(head, null, Sentinel) → head = Sentinel
tail = Sentinel
node.prev = Sentinel
CAS(tail, Sentinel, Node-T2) → tail = Node-T2
Sentinel.next = Node-T2
return Node-T2
acquireQueued(Node-T2, 1):
循环1:
p = Node-T2.prev = Sentinel = head → tryAcquire(1):
state == 1 (非0) → return false
shouldParkAfterFailedAcquire(Sentinel, Node-T2):
ws = Sentinel.waitStatus = 0 → CAS(Sentinel.ws, 0, SIGNAL=-1)
return false // 重新循环
循环2:
p = head → tryAcquire(1): 失败
shouldPark...: ws = SIGNAL → return true
parkAndCheckInterrupt() → T2 阻塞
状态: state=1, owner=T1
Sentinel(ws=-1) ←→ Node-T2(ws=0,thread=T2)
head=Sentinel, tail=Node-T2
步骤 3:T3 调用 lock.lock()
acquire(1):
tryAcquire(1): state==1, T3!=T1 → false
addWaiter(EXCLUSIVE):
tail = Node-T2 (非null)
Node-T3.prev = Node-T2
CAS(tail, Node-T2, Node-T3) → 成功
Node-T2.next = Node-T3
return Node-T3
acquireQueued(Node-T3, 1):
循环1:
p = Node-T2 ≠ head → 不尝试 tryAcquire
shouldPark(Node-T2, Node-T3):
ws = Node-T2.waitStatus = 0 → CAS(ws, 0, SIGNAL)
return false
循环2:
p = Node-T2 ≠ head → 跳过
shouldPark: ws = SIGNAL → return true
T3 阻塞
状态: Sentinel(ws=-1) ←→ Node-T2(ws=-1,thread=T2) ←→ Node-T3(ws=0,thread=T3)
步骤 4:T1 调用 lock.unlock()
release(1):
tryRelease(1):
c = state - 1 = 0
setExclusiveOwnerThread(null)
setState(0)
return true // c==0
h = head = Sentinel, h.waitStatus = -1 ≠ 0
unparkSuccessor(Sentinel):
CAS(Sentinel.ws, -1, 0)
s = Sentinel.next = Node-T2 // ws=0, 非CANCELLED
LockSupport.unpark(T2)
步骤 5:T2 被唤醒,重新竞争
T2 从 parkAndCheckInterrupt() 返回
acquireQueued 循环:
p = Node-T2.prev = Sentinel = head
tryAcquire(1):
state == 0 → CAS(state, 0, 1) → 成功
setExclusiveOwnerThread(T2)
return true
setHead(Node-T2): // Node-T2 成为新的哨兵
head = Node-T2
Node-T2.thread = null
Node-T2.prev = null
Sentinel.next = null // GC
状态: state=1, owner=T2
Node-T2(ws=-1,thread=null) ←→ Node-T3(ws=0,thread=T3)
head=Node-T2, tail=Node-T3
步骤 6:T2 释放锁,T3 同理被唤醒获锁(过程同步骤 4-5)
异常与边界场景
场景 1:CANCELLED 节点清理
触发条件: lockInterruptibly() 超时,或 tryLock(timeout) 等待超时
cancelAcquire(node):
node.thread = null
// 跳过 CANCELLED 前驱,找到有效前驱 pred
pred = node.prev
while pred.waitStatus > 0: // CANCELLED
pred = pred.prev
node.prev = pred // 更新 prev 跳过中间 CANCELLED 节点
predNext = pred.next
// 将自身标记为 CANCELLED(waitStatus = 1)
node.waitStatus = CANCELLED // 普通写,不需要 CAS(只有自己改自己)
// 如果自己是 tail,CAS 移除自己
if node == tail AND CAS(tail, node, pred):
CAS(pred.next, predNext, null)
// 如果前驱是 head,直接唤醒后继(让后继重新入队竞争)
else if pred == head:
unparkSuccessor(node)
// 普通情况:将前驱的 waitStatus 设为 SIGNAL,并链接后继
else:
ws = pred.waitStatus
if (ws == SIGNAL OR CAS(pred.waitStatus, ws, SIGNAL)) AND pred.thread != null:
next = node.next
if next != null AND next.waitStatus <= 0:
CAS(pred.next, predNext, next) // 跳过自己
else:
unparkSuccessor(node)
视觉示意:
CANCELLED 节点清理前:
Sentinel ←→ Node-A(SIGNAL) ←→ Node-B(CANCELLED) ←→ Node-C(0)
清理后(Node-B 取消):
Sentinel ←→ Node-A(SIGNAL) ←→ Node-C(0)
(Node-A.next = Node-C, Node-C.prev = Node-A)
场景 2:中断处理(lockInterruptibly vs lock)
// lock():不响应中断,只记录中断标志
acquireQueued(node, 1):
loop:
...
interrupted = parkAndCheckInterrupt() || interrupted
// 即使被中断,也继续等待获取锁
return interrupted // 最终由调用者重置中断标志
// lockInterruptibly():立即响应中断
acquireInterruptibly(1):
if Thread.interrupted():
throw InterruptedException
if NOT tryAcquire(1):
doAcquireInterruptibly(1) // park 中被中断,立即 cancelAcquire 并抛出异常
区别总结:
lock():中断会唤醒 park,但不会退出,继续自旋等待锁,最后恢复中断标志lockInterruptibly():中断直接取消等待,抛出InterruptedException
场景 3:Spurious Wakeup(虚假唤醒)
LockSupport.park 可能被虚假唤醒(无 unpark,无中断,但线程醒来):
// acquireQueued 的循环结构正好处理虚假唤醒:
loop forever:
p = node.predecessor()
if p == head AND tryAcquire(arg): // 虚假唤醒 → tryAcquire 大概率失败
...
if shouldParkAfterFailedAcquire(p, node): // 重新检查是否应该 park
parkAndCheckInterrupt() // 再次 park
// 循环保证:即使虚假唤醒,也会再次尝试获取锁或重新 park
虚假唤醒不会导致 AQS 错误,循环结构是天然防御。
场景 4:公平锁中的竞争边缘
// 时间线:
T1: 持锁,准备 unlock()
T2: 在队列中等待,是 head.next
T3: 刚调用 lock(),tryAcquire 中检查 hasQueuedPredecessors()
T1 执行 setState(0)(volatile 写,立即可见)
T3 读 state==0,检查 hasQueuedPredecessors():
h != t(队列非空)
h.next = Node-T2,Node-T2.thread = T2 ≠ T3
→ return true → T3 不抢锁,走 addWaiter
T1 执行 unparkSuccessor → unpark T2
T2 获得锁
// 公平性保证:T3 虽然此时看到锁空闲,但必须排队等待 T2
场景 5:重入计数溢出
tryAcquire(1):
nextc = c + acquires // c 已经是 Integer.MAX_VALUE
if nextc < 0: // 整数溢出变为负数
throw new Error("Maximum lock count exceeded")
实际意义:单个线程最多重入 Integer.MAX_VALUE(约 21 亿)次,理论上不会触发,但 JDK 有此防御。
场景 6:Node-T2 在 signal 和 cancel 并发时的竞争
// T2 在条件队列中,此时 T3 调用 signal(T2),同时 T2 被中断
signal 路径: CAS(T2.waitStatus, CONDITION, 0) → 成功
→ enq(T2) → T2 进入 AQS 队列
cancel/中断路径: CAS(T2.waitStatus, CONDITION, CANCELLED) → 失败(已被 signal 改为 0)
→ T2 已经在 AQS 队列中,走 cancelAcquire
// 结论:signal 和 cancel 互斥,通过 CAS 保证只有一方成功
// 如果 signal 先成功,中断发生在 acquire 阶段,抛出 InterruptedException
// 如果 cancel 先成功(中断在 signal 之前),节点被清理出条件队列
参考资料
- JDK 源码 -
java.util.concurrent.locks.AbstractQueuedSynchronizer(OpenJDK 17) - CLH 锁原论文 - Craig, Landin, Hagersten, "Building FIFO and Priority-Queuing Spin Locks from Atomic Swap" (1993)
- Doug Lea 设计文档 - "The java.util.concurrent Synchronizer Framework" (2004) - JSR 166 专家组文档
- 《Java 并发编程的艺术》 - 方腾飞、魏鹏、程晓明,第5章 Java 中的锁
- 《Java 并发编程实战》 - Brian Goetz 等著,第13章 显式锁
评论 (0)
发表评论