LZ4/Snappy/Zstd 均是 LZ77 变体,LZ4 以极速解压(>4 GB/s)著称,Zstd 提供高压缩率(3-4x)兼顾速度,Snappy 是 Google 内部的稳定平衡选项;Kafka、RocksDB、ClickHouse、Parquet 均内置支持。
相关文章:Varint 与 ZigZag 编码 · Delta 编码
目录
| 章节 | 说明 |
|---|---|
| 问题背景 | 为何需要快速通用压缩算法 |
| LZ77 公共基础 | 滑动窗口原理与 encode/decode 伪代码 |
| LZ4 核心设计 | Hash table 快速匹配、token 格式、无熵编码 |
| Snappy 核心设计 | Google 内部场景优化、跨平台稳定性 |
| Zstd 核心设计 | FSE 熵编码、可训练字典、多压缩级别 |
| 执行追踪 | LZ4 压缩 "abcabcabc" 完整演示 |
| 压缩率 vs 速度基准 | 典型数据集的量化对比 |
| 各系统的选择 | Kafka/RocksDB/ClickHouse/Parquet 配置 |
| 异常与边界场景 | 不可压缩数据、并发压缩、流式压缩 |
| 参考资料 | 论文与官方文档 |
问题背景
存储系统和消息队列面临两个相互冲突的需求:
- 磁盘/网络 I/O 是瓶颈:写入 1 GB 数据比在内存中处理慢 10~100 倍,压缩可减少 I/O 量
- CPU 是宝贵资源:通用算法(gzip/bzip2)压缩速度仅 50~200 MB/s,无法跟上磁盘吞吐
关键矛盾:高压缩率(bzip2 ~5x)与高速度(需要 GB/s 级)之间如何取舍?
三种算法给出了不同的答案:
| 算法 | 定位 | 核心取舍 |
|---|---|---|
| LZ4 | 极速 | 压缩率换速度,解压 >4 GB/s |
| Snappy | 平衡 | Google 内部场景稳定可靠 |
| Zstd | 高压缩率 | FSE 熵编码,速度仍远快于 gzip |
LZ77 公共基础
LZ4、Snappy、Zstd 均是 LZ77 的变体,理解 LZ77 是理解三者的前提。
2.1 核心概念
LZ77 维护两个缓冲区:
HistoryBuffer (历史缓冲区 / 滑动窗口)
[已编码数据,最近 W 字节]
┌──────────────────┐
... a b c a b c │ a b c x y z │ 前向缓冲区
└──────────────┘
cursor (当前位置)
匹配:在历史窗口中找到与当前前向缓冲区最长一致的子串,记录:
offset:匹配位置距当前光标的反向距离length:匹配长度
若匹配长度 >= MIN_MATCH(通常 3),输出反向引用 token (offset, length),否则输出字面量字节。
2.2 核心数据结构
LZ77Token {
type : enum { LITERAL, MATCH }
// LITERAL 分支
byte : uint8 // 字面量字节值
// MATCH 分支
offset : uint16 // 反向偏移量(>= 1,指向历史窗口)
length : uint16 // 匹配长度(>= MIN_MATCH = 3)
}
LZ77Stream {
tokens : LZ77Token[] // 字面量与匹配混合序列
window : byte[] // 滑动窗口(历史缓冲区,大小 W)
}
2.3 LZ77 编码伪代码(简化)
LZ77_encode(input: byte[], W: int = 65535, MIN_MATCH: int = 3) -> LZ77Token[]:
tokens = []
cursor = 0 // 当前处理位置
n = input.length
while cursor < n:
// 在历史窗口 [cursor-W, cursor) 中寻找最长匹配
best_offset = 0
best_length = 0
window_start = max(0, cursor - W)
for pos = window_start to cursor - 1:
// 从 pos 与 cursor 同时向前比较
match_len = 0
while cursor + match_len < n
and input[pos + match_len] == input[cursor + match_len]:
match_len = match_len + 1
if match_len > best_length:
best_length = match_len
best_offset = cursor - pos // 反向距离
if best_length >= MIN_MATCH:
// 输出反向引用 token
tokens.append(Token(MATCH, offset=best_offset, length=best_length))
cursor = cursor + best_length // 跳过已匹配的字节
else:
// 输出字面量
tokens.append(Token(LITERAL, byte=input[cursor]))
cursor = cursor + 1
return tokens
2.4 LZ77 解码伪代码
LZ77_decode(tokens: LZ77Token[]) -> byte[]:
output = []
for token in tokens:
if token.type == LITERAL:
output.append(token.byte)
else: // MATCH
start = output.length - token.offset
for i = 0 to token.length - 1:
output.append(output[start + i])
// 注意:start+i 可能追上 output 末尾(overlap copy,合法)
return output
LZ4 核心设计
3.1 设计目标
LZ4 由 Yann Collet 于 2011 年发布,核心目标:解压速度超过内存带宽,使解压不成为瓶颈。
- 不做熵编码(无 Huffman,无 FSE)
- 使用 hash table 代替完整字符串匹配
- token 格式极简,解码分支极少
3.2 Hash Table 快速匹配
LZ77 朴素实现的匹配是 O(W) 的扫描,LZ4 用 hash table 加速:
HashTable {
table : uint32[HASH_SIZE] // position -> last seen position of this hash
// HASH_SIZE = 2^16 (64KB 模式) 或 2^12 (16KB 快速模式)
}
对当前位置 cursor 取 4 字节,计算 hash:
hash4(cursor: int, input: byte[]) -> uint16:
val = read_uint32_le(input, cursor)
return (val * 0x9E3779B9) >> (32 - LOG2_HASH_SIZE)
// Fibonacci hashing,均匀散布 4 字节值
匹配流程:
LZ4_find_match(cursor, input, hash_table):
h = hash4(cursor, input)
candidate = hash_table.table[h] // 候选位置
hash_table.table[h] = cursor // 更新为当前位置
if candidate != INVALID
and cursor - candidate < MAX_DISTANCE (65535)
and read_uint32_le(input, candidate) == read_uint32_le(input, cursor):
// 4 字节匹配成功,继续向后扩展
length = 4
while cursor + length < n
and input[candidate + length] == input[cursor + length]:
length = length + 1
return (offset = cursor - candidate, length = length)
else:
return NO_MATCH
关键特性:hash 冲突时直接丢弃旧位置(最多损失压缩率,不影响正确性)。
3.3 Token 格式
LZ4 压缩块由连续的 sequence 组成,每个 sequence 的结构:
Sequence {
token : uint8 // 高 4 bit = literal_len, 低 4 bit = match_len_extra
extra_lit : byte[] // 若 literal_len == 15,追加额外字节直到遇到 < 255
literals : byte[] // literal_len 个字面量字节
offset : uint16_le // 匹配偏移量(小端 2 字节)
extra_mat : byte[] // 若 match_len_extra == 15,追加额外字节
}
长度编码规则(以字面量长度为例):
encode_length(len: int, base: int) -> (token_nibble: uint4, extra: byte[]):
// base = 字面量时为 0,匹配时为 MIN_MATCH(4)
adjusted = len - base
if adjusted < 15:
return (token_nibble=adjusted, extra=[])
else:
extra = []
adjusted = adjusted - 15
while adjusted >= 255:
extra.append(255)
adjusted = adjusted - 255
extra.append(adjusted)
return (token_nibble=15, extra=extra)
3.4 完整 LZ4 编码伪代码
LZ4_compress(input: byte[]) -> byte[]:
n = input.length
output = []
hash_table = HashTable(size=HASH_SIZE, fill=INVALID)
cursor = 0
lit_start = 0 // 当前字面量段的起始位置
while cursor < n - MFLIMIT: // MFLIMIT=12,末尾保留字面量
(offset, match_len) = LZ4_find_match(cursor, input, hash_table)
if offset == NO_MATCH:
cursor = cursor + 1
continue
// 编码字面量段 + 匹配段
lit_len = cursor - lit_start
match_adj = match_len - MIN_MATCH (4)
// token 字节
token_nibble_lit = min(lit_len, 15)
token_nibble_mat = min(match_adj, 15)
output.append((token_nibble_lit << 4) | token_nibble_mat)
// 超长字面量长度
output.append_extra_length(lit_len - token_nibble_lit)
// 字面量数据
output.append_bytes(input, lit_start, lit_len)
// 偏移量(小端 2 字节)
output.append_uint16_le(offset)
// 超长匹配长度
output.append_extra_length(match_adj - token_nibble_mat)
cursor = cursor + match_len
lit_start = cursor
// 末尾剩余字面量(无 offset/match)
lit_len = n - lit_start
token = min(lit_len, 15) << 4
output.append(token)
output.append_extra_length(lit_len - min(lit_len,15))
output.append_bytes(input, lit_start, lit_len)
return output
3.5 LZ4 解码伪代码
LZ4_decompress(input: byte[], original_size: int) -> byte[]:
output = byte[original_size]
ip = 0 // 输入指针
op = 0 // 输出指针
while ip < input.length:
token = input[ip]; ip = ip + 1
// 读取字面量长度
lit_len = (token >> 4) & 0x0F
if lit_len == 15:
while True:
extra = input[ip]; ip = ip + 1
lit_len = lit_len + extra
if extra != 255: break
// 复制字面量
copy(output, op, input, ip, lit_len)
op = op + lit_len; ip = ip + lit_len
if ip >= input.length: break // 末尾无 match
// 读取偏移量
offset = read_uint16_le(input, ip); ip = ip + 2
// 读取匹配长度
match_len = (token & 0x0F) + MIN_MATCH (4)
if (token & 0x0F) == 15:
while True:
extra = input[ip]; ip = ip + 1
match_len = match_len + extra
if extra != 255: break
// 复制匹配(允许 overlap)
match_start = op - offset
for i = 0 to match_len - 1:
output[op + i] = output[match_start + i]
op = op + match_len
return output
Snappy 核心设计
4.1 设计背景
Snappy 由 Google(Sanjay Ghemawat 等)于 2011 年开源,原名 Zippy,内部使用超过 10 年。设计约束:
- Google 数据中心大量处理 protobuf 序列化数据(高度重复的字段名、枚举值)
- 需在单核达到 250 MB/s 压缩、500 MB/s 解压
- 跨平台一致性优先(ARM/x86 结果相同)
4.2 核心数据结构
SnappyStream {
preamble : varint32 // 原始数据长度(解压器预分配缓冲区用)
elements : SnappyElement[]
}
SnappyElement = Literal | CopyShort | CopyLong
Literal {
tag : uint8 // 低 2 bit = 00,高 6 bit 编码长度
data : byte[]
}
CopyShort { // 1~64 字节匹配,offset < 2KB
tag : uint8 // 低 2 bit = 01
// bit[2..4] = length - 4,bit[5..7] = offset 高 3 bit
offset_lo : uint8 // offset 低 8 bit
}
CopyLong { // 1~64 字节匹配,offset 任意
tag : uint8 // 低 2 bit = 10 (2 字节 offset) 或 11 (4 字节 offset)
length : uint8 // 匹配长度
offset : uint16/uint32
}
4.3 Snappy 编码伪代码
Snappy_compress(input: byte[]) -> byte[]:
n = input.length
output = []
output.append_varint32(n) // preamble
hash_table = new uint16[1 << 14] // 16K 表,每项存 last position
fill(hash_table, INVALID)
ip = 0
ip_end = n - 5
next_emit = 0 // 下一个字面量段起点
if n >= 15:
ip = ip + 1
ip_hash = hash4(input, ip)
while True:
// 快速跳跃寻找匹配
skip = 32
ip_next = ip
while True:
ip = ip_next
ip_next = ip + (skip >> 5)
skip = skip + 1
if ip_next > ip_end: goto emit_remainder
h = hash4(input, ip)
candidate = hash_table[h]
hash_table[h] = ip
if read_uint32(input, candidate) == read_uint32(input, ip):
break
// 找到匹配,扩展
lit_len = ip - next_emit
match_len = 4
while ip + match_len < n
and input[candidate + match_len] == input[ip + match_len]:
match_len = match_len + 1
offset = ip - candidate
emit_literal(output, input, next_emit, lit_len)
emit_copy(output, offset, match_len)
ip = ip + match_len
next_emit = ip
if ip >= ip_end: goto emit_remainder
// 更新 hash(当前和前一位置)
hash_table[hash4(input, ip - 1)] = ip - 1
h = hash4(input, ip)
candidate = hash_table[h]
hash_table[h] = ip
if read_uint32(input, candidate) != read_uint32(input, ip):
ip = ip + 1
emit_remainder:
if next_emit < n:
emit_literal(output, input, next_emit, n - next_emit)
return output
4.4 与 LZ4 的关键差异
| 特性 | LZ4 | Snappy |
|---|---|---|
| Hash table 大小 | 4K~64K(可配置) | 固定 16K |
| 最大 offset | 65535 | 32767 |
| 匹配长度编码 | 变长追加字节 | 固定 3 种 tag |
| 压缩率 | 略高于 Snappy | 略低,更稳定 |
| 解压速度 | >4 GB/s | ~1-2 GB/s |
| 熵编码 | 无 | 无 |
Zstd 核心设计
5.1 设计目标
Zstd(Zstandard)由 Facebook 的 Yann Collet 于 2016 年发布。目标:
- 替代 zlib(gzip),在相同压缩率下速度提升 5~10 倍
- 支持可训练字典,对小数据块(JSON API 响应、protobuf 消息)效果极佳
- 提供 1~22 的压缩级别,级别 1 速度接近 LZ4,级别 22 接近 bzip2
5.2 核心架构:LZ77 + FSE
Zstd 的压缩流水线:
输入数据
│
▼
[LZ77 匹配引擎] → 序列流 (literal_len, offset, match_len)
│
▼
[Huffman 编码字面量] → 压缩字面量块
│
[FSE 编码序列参数] → 压缩序列块(offset/match_len 用 FSE)
│
▼
输出帧
FSE(有限状态熵编码) 是 ANS(Asymmetric Numeral Systems)的一种实现,比 Huffman 更接近熵下界,且支持批量编解码。
5.3 核心数据结构
ZstdFrame {
magic : uint32 = 0xFD2FB528
fhd : FrameHeaderDescriptor
header : FrameHeader
blocks : ZstdBlock[]
checksum : uint32 (optional, xxHash64)
}
ZstdBlock {
block_header : uint24 // last_block(1) + block_type(2) + block_size(21)
data : byte[]
}
// 压缩块内部结构
CompressedBlock {
literals_section : LiteralsSection
sequences_section: SequencesSection
}
LiteralsSection {
header : LiteralsHeader // 编码类型:Raw/RLE/Huffman/TreelessHuffman
huffman_tree : HuffmanTree (optional)
data : byte[] // Huffman 压缩的字面量
}
SequencesSection {
nb_sequences : uint8/uint16 // 序列数量
symbol_comp : uint8 // LLtype(2) + Offtype(2) + MLtype(2)
fse_tables : FSETables // LL/ML/OF 的 FSE 解码表
bitstream : byte[] // 逆序 bitstream,包含所有序列参数
}
5.4 Zstd 序列编码伪代码(概念级)
Zstd_encode_sequences(sequences: Sequence[]) -> byte[]:
// sequences 中每项为 (literal_len, offset_code, match_len)
// offset_code 经过特殊变换:offset_code = log2(offset) + extra bits
// 构建 FSE 表(基于序列中各参数的频率分布)
ll_freqs = count_frequencies([s.literal_len for s in sequences])
ml_freqs = count_frequencies([s.match_len for s in sequences])
of_freqs = count_frequencies([s.offset_code for s in sequences])
ll_table = FSE_build_table(ll_freqs, LL_MAX_LOG)
ml_table = FSE_build_table(ml_freqs, ML_MAX_LOG)
of_table = FSE_build_table(of_freqs, OF_MAX_LOG)
// 逆序 ANS 编码(从最后一个序列开始,倒序写入 bitstream)
state_ll = FSE_init_state(ll_table)
state_ml = FSE_init_state(ml_table)
state_of = FSE_init_state(of_table)
bitstream = BitWriter()
for seq in reversed(sequences):
FSE_encode_symbol(state_ll, seq.literal_len, ll_table, bitstream)
FSE_encode_symbol(state_ml, seq.match_len, ml_table, bitstream)
FSE_encode_symbol(state_of, seq.offset_code, of_table, bitstream)
bitstream.flush_states(state_ll, state_ml, state_of)
return bitstream.to_bytes()
5.5 可训练字典
Zstd 支持从代表性样本集中训练字典(Dictionary),字典本质上是预加载的历史窗口:
// 训练(离线)
dictionary = zstd_train_dict(samples: byte[][], dict_size: int = 112 * 1024)
// 内部使用 Cover 算法选取高覆盖率的字节序列
// 输出一个 dict_size 字节的字典文件
// 压缩(使用字典)
Zstd_compress_with_dict(input: byte[], dict: byte[], level: int) -> byte[]:
ctx = Zstd_create_context()
Zstd_load_dict(ctx, dict)
// 压缩时历史窗口初始化为字典内容
return Zstd_compress(ctx, input, level)
// 解压(使用字典)
Zstd_decompress_with_dict(compressed: byte[], dict: byte[]) -> byte[]:
ctx = Zstd_create_context()
Zstd_load_dict(ctx, dict)
return Zstd_decompress(ctx, compressed)
字典效果:对 1KB 的 JSON 消息,无字典压缩率 ~1.2x,有字典压缩率可达 ~4x。
5.6 压缩级别参数
| 级别 | 压缩速度 | 解压速度 | 压缩率 | 对比 |
|---|---|---|---|---|
| 1 | ~500 MB/s | ~1.5 GB/s | ~2.8x | 接近 LZ4 |
| 3(默认) | ~250 MB/s | ~1.2 GB/s | ~3.2x | 替代 zlib 级别 6 |
| 9 | ~60 MB/s | ~1.1 GB/s | ~3.6x | — |
| 19 | ~5 MB/s | ~1.0 GB/s | ~4.2x | 接近 gzip -9 |
| 22 | ~1 MB/s | ~0.9 GB/s | ~4.5x | 接近 bzip2 |
执行追踪
以 LZ4 压缩字符串 "abcabcabc" 为例(ASCII 字节)。
6.1 输入
索引: 0 1 2 3 4 5 6 7 8
字节: a b c a b c a b c
十六进制: 61 62 63 61 62 63 61 62 63
6.2 Hash Table 初始化
hash_table = [INVALID] × HASH_SIZE
cursor = 0, lit_start = 0
6.3 逐步追踪
cursor=0('a'):
hash4(input, 0) → hash("abca") = H1
hash_table[H1] = INVALID → 无匹配
hash_table[H1] = 0
cursor = 1(无匹配,继续)
cursor=1('b'):
hash4(input, 1) → hash("bcab") = H2
hash_table[H2] = INVALID → 无匹配
hash_table[H2] = 1
cursor = 2
cursor=2('c'):
hash4(input, 2) → hash("cabc") = H3
hash_table[H3] = INVALID → 无匹配
hash_table[H3] = 2
cursor = 3
cursor=3('a'):
hash4(input, 3) → hash("abca") = H1(与 cursor=0 相同)
candidate = hash_table[H1] = 0
hash_table[H1] = 3
验证:read_uint32(input, 0) = "abca" == read_uint32(input, 3) = "abca" ✓
扩展匹配长度:
match_len = 4("abca" 已匹配)
index 4 vs 1: 'b' == 'b' → match_len = 5
index 5 vs 2: 'c' == 'c' → match_len = 6
index 6 vs 3: 'a' == 'a' → match_len = 7
index 7 vs 4: 'b' == 'b' → match_len = 8 ← 注意:candidate+8=8 超出原始串
index 8 vs 5: 'c' == 'c' → match_len = 9 ← 整个剩余串都匹配(overlap copy)
match_len 受限于 n - cursor = 9 - 3 = 6(末尾 MFLIMIT 限制)
取 match_len = 6
offset = cursor - candidate = 3 - 0 = 3
生成 Sequence:
lit_start = 0, cursor = 3
literal_len = 3 - 0 = 3 → 字面量: [a, b, c]
match_len = 6
match_adj = 6 - 4 = 2 → token 低 4 bit = 2
token 字节:
高 4 bit = min(3, 15) = 3 (字面量长度)
低 4 bit = min(2, 15) = 2 (匹配长度 - 4)
token = 0x32
offset = 0x0003 (小端: 03 00)
输出字节序列:
[0x32] token
[0x61 0x62 0x63] 字面量 "abc"
[0x03 0x00] offset = 3
(无 extra_mat,match_len_adj=2 < 15)
末尾无更多匹配(cursor=9 >= n-MFLIMIT),输出末尾字面量段(空)
完整压缩输出(7 字节 vs 原始 9 字节):
原始: 61 62 63 61 62 63 61 62 63 (9 字节)
压缩: 32 61 62 63 03 00 00 (7 字节,压缩率约 1.29x)
^token ^literals ^offset ^末尾token
解压验证:
读 token=0x32: lit_len=3, match_len_adj=2
读 3 字节字面量: output = [a, b, c] op=3
读 offset=3: match_start = 3 - 3 = 0
match_len = 2 + 4 = 6
overlap copy: output[0..5] → output[3..8]
i=0: output[3] = output[0] = 'a'
i=1: output[4] = output[1] = 'b'
i=2: output[5] = output[2] = 'c'
i=3: output[6] = output[3] = 'a'
i=4: output[7] = output[4] = 'b'
i=5: output[8] = output[5] = 'c'
output = [a, b, c, a, b, c, a, b, c] ✓
压缩率 vs 速度基准
以 Silesia corpus(典型混合数据集,211 MB)和 Calgary corpus 的测试结果为参考:
| 算法 | 压缩率(混合文本) | 压缩速度 | 解压速度 | 内存占用 |
|---|---|---|---|---|
| gzip -6 | ~2.8x | ~30 MB/s | ~350 MB/s | 低 |
| LZ4 默认 | ~2.0x | ~700 MB/s | >4 GB/s | 极低 |
| Snappy | ~1.9x | ~300 MB/s | ~1.5 GB/s | 低 |
| Zstd -1 | ~2.8x | ~500 MB/s | ~1.5 GB/s | 中 |
| Zstd -3 | ~3.2x | ~250 MB/s | ~1.2 GB/s | 中 |
| Zstd -9 | ~3.6x | ~60 MB/s | ~1.1 GB/s | 高 |
| bzip2 | ~3.7x | ~14 MB/s | ~40 MB/s | 中 |
不同数据类型的表现:
| 数据类型 | LZ4 压缩率 | Snappy 压缩率 | Zstd-3 压缩率 |
|---|---|---|---|
| 英文文本 | 2.2x | 2.0x | 3.5x |
| JSON 日志 | 2.5x | 2.3x | 4.0x |
| Protobuf | 1.8x | 1.7x | 3.2x |
| 二进制/随机 | ~1.0x | ~1.0x | ~1.05x |
| CSV 数值 | 1.6x | 1.5x | 2.8x |
各系统的选择
8.1 Kafka
Kafka 在 Producer 端支持 compression.type 配置,对整个消息批次压缩:
# Producer 配置
compression.type=lz4 # 选项:none/gzip/snappy/lz4/zstd
batch.size=65536 # 批次越大,压缩率越高
linger.ms=5 # 等待更多消息填充批次
选择建议:
- 高吞吐优先 → LZ4(解压不成为 Broker CPU 瓶颈)
- 网络带宽受限 → Zstd(Kafka 2.1+ 支持,压缩率最高)
- 兼容性优先 → Snappy(最早支持,分布式跨版本稳定)
8.2 RocksDB
RocksDB 对不同 Level 可配置不同压缩算法:
// 按 Level 配置(典型生产配置)
options.compression_per_level = {
CompressionType::kNoCompression, // Level 0: 频繁 compaction,不压缩
CompressionType::kNoCompression, // Level 1
CompressionType::kLZ4Compression, // Level 2+: 读多写少,LZ4 解压快
CompressionType::kLZ4Compression,
CompressionType::kZSTD, // Level 5+: 冷数据,高压缩率
CompressionType::kZSTD,
CompressionType::kZSTD,
};
options.bottommost_compression = CompressionType::kZSTD;
options.zstd_max_train_bytes = 100 * 1024; // Zstd 字典训练
默认值:Snappy(历史遗留,多数场景 LZ4 更优)。
8.3 ClickHouse
ClickHouse 默认使用 LZ4 压缩列数据,可按列配置:
CREATE TABLE events (
event_time DateTime CODEC(Delta, LZ4), -- Delta 预处理 + LZ4
user_id UInt64 CODEC(LZ4),
event_data String CODEC(ZSTD(3)), -- 字符串用 Zstd 压缩率更高
raw_bytes FixedString(64) CODEC(LZ4HC(9)) -- LZ4 高压缩变体
) ENGINE = MergeTree ORDER BY event_time;
LZ4 作为默认的原因:ClickHouse 查询时需要大量解压列数据,LZ4 解压速度使 CPU 不成为瓶颈。
8.4 Parquet
Parquet 在 RowGroup(行组)级别压缩,不同列可用不同编解码器:
import pyarrow.parquet as pq
import pyarrow as pa
# 写入时配置压缩
pq.write_table(
table,
"output.parquet",
compression="snappy", # 全局: snappy/gzip/brotli/zstd/lz4/none
use_dictionary=True,
column_encoding={
"timestamp": "DELTA_BINARY_PACKED", # 时间戳用 Delta
"user_id": "DELTA_BINARY_PACKED",
}
)
各格式生态的默认值:
- Spark/Hive 历史默认:Snappy
- 现代推荐:Zstd(压缩率更高,Spark 3.0+ 默认改为 Zstd)
- 读密集场景:LZ4(解压最快)
异常与边界场景
场景 1:不可压缩数据(随机字节)
情况:输入为 AES 加密后的随机数据或真随机字节流,LZ77 无法找到任何有效匹配。
处理方式:
LZ4 的处理策略:
// LZ4 block 头部包含 compressed_size 和 original_size
// 若压缩后 >= 原始大小,可选择存储原始数据
if compressed_size >= original_size:
output_block_header.block_type = UNCOMPRESSED
output_data = original_data // 原样存储,不压缩
量化影响:压缩 1 MB 随机数据:
- 输出约 1.000005 MB(略大,因为需要存储块头和长度描述)
- LZ4 frame 格式保证解压端能识别未压缩块,透明处理
实践建议:对加密数据不要开启压缩(先压缩后加密,或跳过压缩层)。
场景 2:并发压缩(多线程安全)
情况:多个线程同时压缩不同数据,是否存在共享状态竞争?
处理方式:LZ4、Snappy、Zstd 均设计为无全局共享状态:
// LZ4 多线程示例(伪代码)
thread_pool.submit_all([
() -> LZ4_compress(data1), // 线程 1:独立 hash_table(栈上分配)
() -> LZ4_compress(data2), // 线程 2:独立 hash_table
() -> LZ4_compress(data3), // 线程 3:独立 hash_table
])
// 三者完全独立,无锁,无竞争
// Zstd 流式上下文(复用 context 时需外部同步)
ctx1 = ZSTD_createCCtx() // 每个线程一个 context
ctx2 = ZSTD_createCCtx()
// ctx1 和 ctx2 可并发使用,不需要锁
注意:若复用同一个 ZSTD_CCtx(Zstd 压缩上下文)在多线程中调用,会有竞争。应为每个线程分配独立 context,或使用 Zstd 的 ZSTD_compressStream2 线程安全 API(内部管理线程池)。
场景 3:流式压缩(大文件分块处理)
情况:输入文件为 10 GB,无法一次性载入内存,需要流式处理。
处理方式:三种算法均提供 streaming API:
// LZ4 流式压缩伪代码
stream = LZ4_createStream()
ring_buffer = byte[RING_BUFFER_SIZE] // 循环缓冲区作为历史窗口
for chunk in read_chunks(input_file, CHUNK_SIZE=64*1024):
// 写入环形缓冲区(保留历史供跨块引用)
pos = write_to_ring(ring_buffer, chunk)
compressed = LZ4_compress_continue(stream, ring_buffer[pos], len(chunk))
write(output_file, compressed)
LZ4_freeStream(stream)
跨块引用:LZ4 流模式中,当前块可引用前块已压缩的内容(通过保留历史缓冲区),比独立块压缩率更高。
Zstd 流式压缩:
// Zstd 流式压缩(使用 frame 格式)
cctx = ZSTD_createCCtx()
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 3)
while chunk = read(input, CHUNK_SIZE):
in_buf = {src=chunk, size=len(chunk), pos=0}
out_buf = {dst=output_buf, size=OUT_SIZE, pos=0}
mode = END_OF_STREAM if is_last_chunk else CONTINUE
ZSTD_compressStream2(cctx, out_buf, in_buf, mode)
write(output_file, out_buf.dst, out_buf.pos)
场景 4:极短数据(< 16 字节)
情况:压缩单个 UUID 或短字符串(如 "hello")。
处理方式:
- LZ4:无法找到任何匹配,全部输出为字面量,压缩后略大(加了 frame header 和 block header)
- Zstd:frame header 开销固定约 12 字节,短数据压缩后可能比原始大 2-3 倍
- 最佳实践:设置压缩阈值,小于 64 字节的数据直接存储,不压缩
if len(data) < MIN_COMPRESS_THRESHOLD (64):
return data // 不压缩
else:
return compress(data)
场景 5:offset 引用超出解压缓冲区边界
情况:损坏的压缩数据中,offset 值大于当前已解压的字节数。
处理方式(安全实现):
LZ4_decompress_safe(input, output, input_size, max_output_size):
// 每次 match copy 前检查边界
match_start = op - offset
if match_start < output_start:
// offset 超出已解压区域,数据损坏
return LZ4_ERROR_MALFORMED_INPUT
if op + match_len > output_end:
// 输出缓冲区不够,防止越界写
return LZ4_ERROR_OUTPUT_OVERFLOW
// 安全时才 copy
copy(output, op, output, match_start, match_len)
LZ4 提供 LZ4_decompress_safe(安全)和 LZ4_decompress_fast(无边界检查,仅用于可信数据)两个版本。
参考资料
- LZ4 官方仓库:Yann Collet. lz4 - Extremely Fast Compression algorithm. GitHub
- Snappy 官方仓库:Google. Snappy - A fast compressor/decompressor. GitHub;Snappy Format Description. format_description.txt
- Zstd 官方仓库:Facebook. Zstandard - Fast real-time compression algorithm. GitHub
- Zstd 论文:Yann Collet. "Finite State Entropy." 2013. 博客
- LZ77 原论文:Ziv J., Lempel A. "A Universal Algorithm for Sequential Data Compression." IEEE Transactions on Information Theory, 1977.
- lzbench 基准:inikep. lzbench - in-memory benchmark of open-source LZ77/LZMA/BWTS/FSE compression algorithms. GitHub
- ClickHouse 压缩文档:ClickHouse Column Compression Codecs. 官方文档
- RocksDB 压缩文档:RocksDB Compression. Wiki
- Kafka 压缩文档:Apache Kafka Producer Compression. 官方文档
- 《数据密集型应用系统设计》(DDIA)第 3 章:列式存储与压缩。
评论 (0)
发表评论