专栏文章
专栏文章
分布式算法系列
1. 分布式算法 #01:Paxos 共识算法 2. 分布式算法 #02:Raft 共识算法 3. 分布式算法 #03:Gossip 协议与 SWIM 故障检测 4. 分布式算法 #04:向量时钟与因果一致性 5. 分布式算法 #05:CRDT(无冲突复制数据类型) 6. 分布式算法 #06:Quorum 与 NWR 模型 7. 分布式算法 #07:Merkle Tree 与反熵同步 8. 分布式算法 #08:Byzantine 容错(BFT/PBFT) 9. 分布式算法 #09:Hybrid Logical Clock(HLC)

分布式算法 #05:CRDT(无冲突复制数据类型)

发布于 2026-05-29 03:57 👁 9 次阅读
#算法#分布式#consistency#crdt#data-structure

CRDT(Conflict-free Replicated Data Type)是一类可以在多个副本上独立更新、并在任意时刻自动合并且不产生冲突的数据结构。本文覆盖 G-Counter、PN-Counter、LWW-Register、OR-Set 等核心类型的原理与伪代码,并深入分析网络分区、并发写、消息乱序等异常场景下的行为保证。

相关文章向量时钟与因果一致性 · Quorum 与 NWR 模型 · Merkle Tree 与反熵同步


目录

章节 说明
为什么需要 CRDT 传统分布式数据结构的冲突问题
CRDT 的数学基础 半格(Join-Semilattice)与单调性
State-based vs Op-based 两类 CRDT 的对比
核心数据类型 G-Counter / PN-Counter / LWW-Register / OR-Set
异常场景分析 网络分区 / 并发写 / 消息丢失 / 副本落后
CRDT 的局限 不能用 CRDT 解决的问题
实际应用 Redis / Riak / Figma

为什么需要 CRDT

传统方案:乐观并发控制 + 冲突合并
  问题1:需要开发者手写合并逻辑,容易出错
  问题2:某些操作(如集合删除)合并语义不直观
  问题3:网络分区期间无法写入(等待协调)

CRDT 的保证:
  1. 任意副本可以独立接受写入(高可用,AP 系统)
  2. 副本间同步时自动合并,无冲突
  3. 最终所有副本收敛到相同状态(强最终一致性)
  4. 不需要中心协调者

CRDT 的数学基础

CRDT 的合并操作必须满足**半格(Join-Semilattice)**的三个性质:

设合并操作为 merge(a, b) = a ⊔ b(取上确界)

1. 幂等性(Idempotent):  a ⊔ a = a
   → 重复收到同一条消息,结果不变(消息去重无需额外处理)

2. 交换性(Commutative):  a ⊔ b = b ⊔ a
   → 消息到达顺序不影响最终结果(乱序消息自动处理)

3. 结合性(Associative):  (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c)
   → 批量合并顺序不影响结果

满足这三条 → 无论网络如何分区、消息如何乱序,最终所有副本状态相同

State-based vs Op-based

特性 State-based CRDT(CvRDT) Op-based CRDT(CmRDT)
传播内容 完整状态(或增量状态) 操作(op)
网络要求 不可靠传输即可(幂等合并) 需要可靠有序传输
带宽 较大(传状态) 较小(传操作)
实现复杂度 简单 复杂(需维护因果序)
典型代表 G-Counter、OR-Set RGA(协作编辑文本)

核心数据类型

crdt types

G-Counter(仅增计数器)

数据结构:
  每个副本 i 维护一个向量 P[1..N]
  P[i] = 副本 i 的本地增量

本地 increment():
  P[self] += 1

查询 value():
  return sum(P[1..N])

合并 merge(local, remote):
  for i in 1..N:
    local.P[i] = max(local.P[i], remote.P[i])

正确性直觉

3 个副本,初始 P=[0,0,0]

副本1 increment × 3:P1=[3,0,0]
副本2 increment × 2:P2=[0,2,0]
(网络分区,互不通信)

合并:
  P_merged[1] = max(3,0) = 3
  P_merged[2] = max(0,2) = 2
  P_merged[3] = max(0,0) = 0
  value = 3+2+0 = 5 ✅(正确反映总增量)

多次合并(幂等):
  merge(merge(P1,P2), P1) = merge([3,2,0],[3,0,0]) = [3,2,0] ✅

PN-Counter(可增减计数器)

数据结构:
  P[1..N]  // 增量向量
  N[1..N]  // 减量向量

本地 increment():P[self] += 1
本地 decrement():N[self] += 1

查询 value():
  return sum(P) - sum(N)

合并 merge(local, remote):
  for i in 1..N:
    local.P[i] = max(local.P[i], remote.P[i])
    local.N[i] = max(local.N[i], remote.N[i])

异常:计数器为负的处理

问题:sum(P) - sum(N) 可能为负(如果减的次数超过增的次数)
     这在业务上可能不合法(如库存不能为负)

解决方案:
  1. 业务层约束(写入前检查):
     if value() > 0: decrement()
     else: 拒绝操作
     
  问题:分布式环境下并发 decrement() 仍可能越界!
  
  2. 改用中心化节点处理 decrement(牺牲部分 AP 特性)
  3. 使用 Bounded Counter CRDT(添加配额机制)

LWW-Register(Last Write Wins 寄存器)

数据结构:
  value:     当前值
  timestamp: 写入时间戳(或 Lamport 时钟)

本地 write(v):
  value = v
  timestamp = now()  // 使用单调递增时钟

查询 read():
  return value

合并 merge(local, remote):
  if remote.timestamp > local.timestamp:
    local.value = remote.value
    local.timestamp = remote.timestamp
  // 时间戳相同时:可用 node_id 打破平局(牺牲部分写入)

异常:时钟偏差导致数据丢失

场景:
  副本1 在物理时间 T=100 写入 value="A"
  副本2 在物理时间 T=80(时钟慢)写入 value="B"

合并:
  副本1.timestamp=100 > 副本2.timestamp=80
  最终 value="A" ← 但业务上 "B" 是后写的!

缓解方案:
  1. 使用 Hybrid Logical Clock(HLC)替代纯物理时钟(见笔记09)
  2. 接受数据丢失风险(缓存、日志等场景可接受)
  3. 用 MV-Register(保留所有并发值)替代 LWW(见下)

MV-Register(Multi-Value Register,保留冲突版本)

数据结构:
  values: Map<VectorClock, value>  // 可能有多个并发值

本地 write(v):
  新 VC = 递增本地 VC
  values = {newVC: v}  // 清除被新 VC 支配的旧版本

查询 read():
  return values.values()  // 返回所有并发版本(由业务合并)

合并 merge(local, remote):
  result = {}
  for (vc, v) in remote.values:
    // 只保留不被 local 任何 VC 支配的版本
    if not any(local_vc >= vc for local_vc in local.values.keys()):
      result[vc] = v
  for (vc, v) in local.values:
    if not any(remote_vc >= vc for remote_vc in remote.values.keys()):
      result[vc] = v
  local.values = result

OR-Set(Observed-Remove Set,可添加可删除集合)

问题背景:简单的 Add/Remove 集合在 CRDT 中很难处理
  Add-Wins Set:并发 add 和 remove,add 优先 → 可能违背删除意图
  Remove-Wins Set:remove 优先 → 可能意外删除刚加入的元素

OR-Set 的方案:为每个添加操作打唯一标签(unique tag)

数据结构:
  entries: Set<(element, unique_tag)>

本地 add(e):
  tag = generate_unique_id()  // UUID
  entries.add((e, tag))

本地 remove(e):
  // 删除所有当前观察到的 (e, *) 条目
  observed_tags = {(e, t) for (e, t) in entries}
  entries -= observed_tags
  // 并发 add 会生成新 tag,不受这次删除影响 ✅

查询 contains(e):
  return any(e == element for (element, tag) in entries)

合并 merge(local, remote):
  local.entries = local.entries ∪ remote.entries
  // 取并集即可!
  // 删除操作通过"移除 tag"而非"添加 tombstone"来表达

OR-Set 解决并发冲突的直觉

时序:
  初始:{("apple", t1)}

  副本1 删除 apple:entries = {}(删除 t1)
  副本2 同时添加 apple:entries = {("apple",t1), ("apple",t2)}(新 tag t2)

  合并:
    副本1 合并副本2:
      entries = {} ∪ {("apple",t1),("apple",t2)}
             = {("apple",t1),("apple",t2)}
      contains("apple")? → Yes(t2 仍在)✅

  语义:并发的 add 优先于并发的 remove
  因为 remove 只删除 "观察到" 的 tag,新的 add 产生新 tag 不受影响

异常场景分析

异常 1:消息重复传递

问题:可靠传输层未去重,同一 Gossip 消息被接收多次

CvRDT(State-based)天然处理:
  merge(state, state) = state(幂等性保证)
  重复合并不改变结果 ✅

CmRDT(Op-based)需要去重:
  操作 op 带唯一 ID
  维护已应用 op 的集合 applied
  on receive op:
    if op.id in applied: return
    applied.add(op.id)
    apply(op)

异常 2:副本长时间离线后恢复

场景:移动端离线 8 小时,产生大量本地写操作

State-based CRDT:
  恢复后同步完整状态(或增量)
  merge() 一次性处理所有离线期间的差异 ✅
  可能传输数据量大

Op-based CRDT:
  需要重放 8 小时内的所有操作(按因果序)
  需要对端保存 8 小时的操作日志 ← 存储压力大

优化:Delta CRDT(增量 State-based)
  只传输上次同步后的增量(delta),不传完整状态
  兼顾 State-based 的幂等合并和 Op-based 的带宽效率

异常 3:OR-Set 中的删除后重新添加

场景:
  T1:用户 A 添加元素 "x"(tag=t1)
  T2:用户 B 删除 "x"(删除 t1)
  T3:用户 A 重新添加 "x"(tag=t2,因果上在 T2 之后)

正确行为:T3 的添加应该覆盖 T2 的删除

分析:
  B 删除时,entries 中只有 t1,删除后 entries={}
  A 重新添加产生新 tag t2,entries={(x,t2)}
  合并后:{} ∪ {(x,t2)} = {(x,t2)}
  contains("x") = true ✅(重新添加成功)

如果 T3 早于 T2 到达(消息乱序):
  entries={(x,t1),(x,t2)}
  B 的删除只删 t1(观察到的),t2 不受影响
  合并后 entries={(x,t2)}
  contains("x") = true ✅

异常 4:tombstone 积累(G-Set / 2P-Set)

某些 CRDT(如 2P-Set)使用 tombstone 标记删除:
  tombs: Set  // 已删除元素的集合
  contains(e) = e in adds AND e not in tombs

问题:tombs 只增不减,长期运行后大量 tombstone 堆积

优化方案:
  1. 定期 GC:当所有副本都同步了某个 tombstone 后,可安全删除
     需要额外的协调轮次(反而增加复杂度)
  2. 改用 OR-Set:无 tombstone,删除通过移除 tag 实现
  3. 接受 tombstone(Riak 的做法):运营层定期清理

CRDT 的局限

1. 无法实现强一致性约束
   如:银行转账(总金额守恒)、唯一性约束(用户名不重复)
   这些需要协调,CRDT 无法解决

2. 语义限制
   OR-Set 的"并发 add 优先于 remove"在某些场景不符合业务期望
   需要仔细设计 CRDT 语义

3. 读放大
   MV-Register 可能返回多个并发值,需要业务层合并
   OR-Set 的 entries 集合可能很大(含大量 tag)

4. 因果历史无限增长(Op-based)
   需要定期做 GC 或 snapshot

5. 计算复杂度
   复杂 CRDT(如 RGA 文本编辑)合并操作 O(N²)

实际应用

系统 CRDT 类型 用途
Redis G-Counter(HyperLogLog近似) 分布式计数
Riak OR-Set, LWW-Register, MV-Register Key-Value 存储的冲突处理
Figma Op-based CRDT 实时协作设计
Automerge / Yjs RGA(Op-based) 协作文档编辑(类 Google Docs)
Apache Cassandra LWW(时间戳) 简化版,非严格 CRDT
SoundCloud Roshi LWW-Set 时间线数据存储

参考资料

  • Shapiro, M. et al. (2011). A Comprehensive Study of Convergent and Commutative Replicated Data Types
  • 《Designing Data-Intensive Applications》Ch.9 — Collaboration and Conflict Resolution
  • Automerge: https://github.com/automerge/automerge
  • Yjs: https://github.com/yjs/yjs
← 返回列表

评论 (0)

暂无评论,来留下第一条吧。

发表评论