CRDT(Conflict-free Replicated Data Type)是一类可以在多个副本上独立更新、并在任意时刻自动合并且不产生冲突的数据结构。本文覆盖 G-Counter、PN-Counter、LWW-Register、OR-Set 等核心类型的原理与伪代码,并深入分析网络分区、并发写、消息乱序等异常场景下的行为保证。
相关文章:向量时钟与因果一致性 · Quorum 与 NWR 模型 · Merkle Tree 与反熵同步
目录
| 章节 | 说明 |
|---|---|
| 为什么需要 CRDT | 传统分布式数据结构的冲突问题 |
| CRDT 的数学基础 | 半格(Join-Semilattice)与单调性 |
| State-based vs Op-based | 两类 CRDT 的对比 |
| 核心数据类型 | G-Counter / PN-Counter / LWW-Register / OR-Set |
| 异常场景分析 | 网络分区 / 并发写 / 消息丢失 / 副本落后 |
| CRDT 的局限 | 不能用 CRDT 解决的问题 |
| 实际应用 | Redis / Riak / Figma |
为什么需要 CRDT
传统方案:乐观并发控制 + 冲突合并
问题1:需要开发者手写合并逻辑,容易出错
问题2:某些操作(如集合删除)合并语义不直观
问题3:网络分区期间无法写入(等待协调)
CRDT 的保证:
1. 任意副本可以独立接受写入(高可用,AP 系统)
2. 副本间同步时自动合并,无冲突
3. 最终所有副本收敛到相同状态(强最终一致性)
4. 不需要中心协调者
CRDT 的数学基础
CRDT 的合并操作必须满足**半格(Join-Semilattice)**的三个性质:
设合并操作为 merge(a, b) = a ⊔ b(取上确界)
1. 幂等性(Idempotent): a ⊔ a = a
→ 重复收到同一条消息,结果不变(消息去重无需额外处理)
2. 交换性(Commutative): a ⊔ b = b ⊔ a
→ 消息到达顺序不影响最终结果(乱序消息自动处理)
3. 结合性(Associative): (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c)
→ 批量合并顺序不影响结果
满足这三条 → 无论网络如何分区、消息如何乱序,最终所有副本状态相同
State-based vs Op-based
| 特性 | State-based CRDT(CvRDT) | Op-based CRDT(CmRDT) |
|---|---|---|
| 传播内容 | 完整状态(或增量状态) | 操作(op) |
| 网络要求 | 不可靠传输即可(幂等合并) | 需要可靠有序传输 |
| 带宽 | 较大(传状态) | 较小(传操作) |
| 实现复杂度 | 简单 | 复杂(需维护因果序) |
| 典型代表 | G-Counter、OR-Set | RGA(协作编辑文本) |
核心数据类型
G-Counter(仅增计数器)
数据结构:
每个副本 i 维护一个向量 P[1..N]
P[i] = 副本 i 的本地增量
本地 increment():
P[self] += 1
查询 value():
return sum(P[1..N])
合并 merge(local, remote):
for i in 1..N:
local.P[i] = max(local.P[i], remote.P[i])
正确性直觉:
3 个副本,初始 P=[0,0,0]
副本1 increment × 3:P1=[3,0,0]
副本2 increment × 2:P2=[0,2,0]
(网络分区,互不通信)
合并:
P_merged[1] = max(3,0) = 3
P_merged[2] = max(0,2) = 2
P_merged[3] = max(0,0) = 0
value = 3+2+0 = 5 ✅(正确反映总增量)
多次合并(幂等):
merge(merge(P1,P2), P1) = merge([3,2,0],[3,0,0]) = [3,2,0] ✅
PN-Counter(可增减计数器)
数据结构:
P[1..N] // 增量向量
N[1..N] // 减量向量
本地 increment():P[self] += 1
本地 decrement():N[self] += 1
查询 value():
return sum(P) - sum(N)
合并 merge(local, remote):
for i in 1..N:
local.P[i] = max(local.P[i], remote.P[i])
local.N[i] = max(local.N[i], remote.N[i])
异常:计数器为负的处理:
问题:sum(P) - sum(N) 可能为负(如果减的次数超过增的次数)
这在业务上可能不合法(如库存不能为负)
解决方案:
1. 业务层约束(写入前检查):
if value() > 0: decrement()
else: 拒绝操作
问题:分布式环境下并发 decrement() 仍可能越界!
2. 改用中心化节点处理 decrement(牺牲部分 AP 特性)
3. 使用 Bounded Counter CRDT(添加配额机制)
LWW-Register(Last Write Wins 寄存器)
数据结构:
value: 当前值
timestamp: 写入时间戳(或 Lamport 时钟)
本地 write(v):
value = v
timestamp = now() // 使用单调递增时钟
查询 read():
return value
合并 merge(local, remote):
if remote.timestamp > local.timestamp:
local.value = remote.value
local.timestamp = remote.timestamp
// 时间戳相同时:可用 node_id 打破平局(牺牲部分写入)
异常:时钟偏差导致数据丢失:
场景:
副本1 在物理时间 T=100 写入 value="A"
副本2 在物理时间 T=80(时钟慢)写入 value="B"
合并:
副本1.timestamp=100 > 副本2.timestamp=80
最终 value="A" ← 但业务上 "B" 是后写的!
缓解方案:
1. 使用 Hybrid Logical Clock(HLC)替代纯物理时钟(见笔记09)
2. 接受数据丢失风险(缓存、日志等场景可接受)
3. 用 MV-Register(保留所有并发值)替代 LWW(见下)
MV-Register(Multi-Value Register,保留冲突版本)
数据结构:
values: Map<VectorClock, value> // 可能有多个并发值
本地 write(v):
新 VC = 递增本地 VC
values = {newVC: v} // 清除被新 VC 支配的旧版本
查询 read():
return values.values() // 返回所有并发版本(由业务合并)
合并 merge(local, remote):
result = {}
for (vc, v) in remote.values:
// 只保留不被 local 任何 VC 支配的版本
if not any(local_vc >= vc for local_vc in local.values.keys()):
result[vc] = v
for (vc, v) in local.values:
if not any(remote_vc >= vc for remote_vc in remote.values.keys()):
result[vc] = v
local.values = result
OR-Set(Observed-Remove Set,可添加可删除集合)
问题背景:简单的 Add/Remove 集合在 CRDT 中很难处理
Add-Wins Set:并发 add 和 remove,add 优先 → 可能违背删除意图
Remove-Wins Set:remove 优先 → 可能意外删除刚加入的元素
OR-Set 的方案:为每个添加操作打唯一标签(unique tag)
数据结构:
entries: Set<(element, unique_tag)>
本地 add(e):
tag = generate_unique_id() // UUID
entries.add((e, tag))
本地 remove(e):
// 删除所有当前观察到的 (e, *) 条目
observed_tags = {(e, t) for (e, t) in entries}
entries -= observed_tags
// 并发 add 会生成新 tag,不受这次删除影响 ✅
查询 contains(e):
return any(e == element for (element, tag) in entries)
合并 merge(local, remote):
local.entries = local.entries ∪ remote.entries
// 取并集即可!
// 删除操作通过"移除 tag"而非"添加 tombstone"来表达
OR-Set 解决并发冲突的直觉:
时序:
初始:{("apple", t1)}
副本1 删除 apple:entries = {}(删除 t1)
副本2 同时添加 apple:entries = {("apple",t1), ("apple",t2)}(新 tag t2)
合并:
副本1 合并副本2:
entries = {} ∪ {("apple",t1),("apple",t2)}
= {("apple",t1),("apple",t2)}
contains("apple")? → Yes(t2 仍在)✅
语义:并发的 add 优先于并发的 remove
因为 remove 只删除 "观察到" 的 tag,新的 add 产生新 tag 不受影响
异常场景分析
异常 1:消息重复传递
问题:可靠传输层未去重,同一 Gossip 消息被接收多次
CvRDT(State-based)天然处理:
merge(state, state) = state(幂等性保证)
重复合并不改变结果 ✅
CmRDT(Op-based)需要去重:
操作 op 带唯一 ID
维护已应用 op 的集合 applied
on receive op:
if op.id in applied: return
applied.add(op.id)
apply(op)
异常 2:副本长时间离线后恢复
场景:移动端离线 8 小时,产生大量本地写操作
State-based CRDT:
恢复后同步完整状态(或增量)
merge() 一次性处理所有离线期间的差异 ✅
可能传输数据量大
Op-based CRDT:
需要重放 8 小时内的所有操作(按因果序)
需要对端保存 8 小时的操作日志 ← 存储压力大
优化:Delta CRDT(增量 State-based)
只传输上次同步后的增量(delta),不传完整状态
兼顾 State-based 的幂等合并和 Op-based 的带宽效率
异常 3:OR-Set 中的删除后重新添加
场景:
T1:用户 A 添加元素 "x"(tag=t1)
T2:用户 B 删除 "x"(删除 t1)
T3:用户 A 重新添加 "x"(tag=t2,因果上在 T2 之后)
正确行为:T3 的添加应该覆盖 T2 的删除
分析:
B 删除时,entries 中只有 t1,删除后 entries={}
A 重新添加产生新 tag t2,entries={(x,t2)}
合并后:{} ∪ {(x,t2)} = {(x,t2)}
contains("x") = true ✅(重新添加成功)
如果 T3 早于 T2 到达(消息乱序):
entries={(x,t1),(x,t2)}
B 的删除只删 t1(观察到的),t2 不受影响
合并后 entries={(x,t2)}
contains("x") = true ✅
异常 4:tombstone 积累(G-Set / 2P-Set)
某些 CRDT(如 2P-Set)使用 tombstone 标记删除:
tombs: Set // 已删除元素的集合
contains(e) = e in adds AND e not in tombs
问题:tombs 只增不减,长期运行后大量 tombstone 堆积
优化方案:
1. 定期 GC:当所有副本都同步了某个 tombstone 后,可安全删除
需要额外的协调轮次(反而增加复杂度)
2. 改用 OR-Set:无 tombstone,删除通过移除 tag 实现
3. 接受 tombstone(Riak 的做法):运营层定期清理
CRDT 的局限
1. 无法实现强一致性约束
如:银行转账(总金额守恒)、唯一性约束(用户名不重复)
这些需要协调,CRDT 无法解决
2. 语义限制
OR-Set 的"并发 add 优先于 remove"在某些场景不符合业务期望
需要仔细设计 CRDT 语义
3. 读放大
MV-Register 可能返回多个并发值,需要业务层合并
OR-Set 的 entries 集合可能很大(含大量 tag)
4. 因果历史无限增长(Op-based)
需要定期做 GC 或 snapshot
5. 计算复杂度
复杂 CRDT(如 RGA 文本编辑)合并操作 O(N²)
实际应用
| 系统 | CRDT 类型 | 用途 |
|---|---|---|
| Redis | G-Counter(HyperLogLog近似) | 分布式计数 |
| Riak | OR-Set, LWW-Register, MV-Register | Key-Value 存储的冲突处理 |
| Figma | Op-based CRDT | 实时协作设计 |
| Automerge / Yjs | RGA(Op-based) | 协作文档编辑(类 Google Docs) |
| Apache Cassandra | LWW(时间戳) | 简化版,非严格 CRDT |
| SoundCloud Roshi | LWW-Set | 时间线数据存储 |
参考资料
- Shapiro, M. et al. (2011). A Comprehensive Study of Convergent and Commutative Replicated Data Types
- 《Designing Data-Intensive Applications》Ch.9 — Collaboration and Conflict Resolution
- Automerge: https://github.com/automerge/automerge
- Yjs: https://github.com/yjs/yjs
评论 (0)
发表评论