专栏文章
专栏文章
存储算法系列
1. 存储算法 #01:LSM Tree 2. 存储算法 #02:Bloom Filter 3. 存储算法 #03:B+ Tree 4. 存储算法 #04:SSTable 5. 存储算法 #05:Copy-on-Write(COW) 6. 存储算法 #06:Roaring Bitmap 7. 存储算法 #07:WAL(Write-Ahead Log) 8. 存储算法 #08:Bitcask 9. 存储算法 #09:Redo 与 Undo Log 10. 存储算法 #10:Varint 与 ZigZag 编码 11. 存储算法 #11:Delta 编码 12. 存储算法 #12:LZ4 与 Snappy 与 Zstd

存储算法 #12:LZ4 与 Snappy 与 Zstd

发布于 2026-06-04 04:34 👁 6 次阅读

LZ4/Snappy/Zstd 均是 LZ77 变体,LZ4 以极速解压(>4 GB/s)著称,Zstd 提供高压缩率(3-4x)兼顾速度,Snappy 是 Google 内部的稳定平衡选项;Kafka、RocksDB、ClickHouse、Parquet 均内置支持。

相关文章Varint 与 ZigZag 编码 · Delta 编码

lz4 snappy zstd

目录

章节 说明
问题背景 为何需要快速通用压缩算法
LZ77 公共基础 滑动窗口原理与 encode/decode 伪代码
LZ4 核心设计 Hash table 快速匹配、token 格式、无熵编码
Snappy 核心设计 Google 内部场景优化、跨平台稳定性
Zstd 核心设计 FSE 熵编码、可训练字典、多压缩级别
执行追踪 LZ4 压缩 "abcabcabc" 完整演示
压缩率 vs 速度基准 典型数据集的量化对比
各系统的选择 Kafka/RocksDB/ClickHouse/Parquet 配置
异常与边界场景 不可压缩数据、并发压缩、流式压缩
参考资料 论文与官方文档

问题背景

存储系统和消息队列面临两个相互冲突的需求:

关键矛盾:高压缩率(bzip2 ~5x)与高速度(需要 GB/s 级)之间如何取舍?

三种算法给出了不同的答案:

算法 定位 核心取舍
LZ4 极速 压缩率换速度,解压 >4 GB/s
Snappy 平衡 Google 内部场景稳定可靠
Zstd 高压缩率 FSE 熵编码,速度仍远快于 gzip

LZ77 公共基础

LZ4、Snappy、Zstd 均是 LZ77 的变体,理解 LZ77 是理解三者的前提。

2.1 核心概念

LZ77 维护两个缓冲区:

HistoryBuffer (历史缓冲区 / 滑动窗口)
  [已编码数据,最近 W 字节]
                                  ┌──────────────────┐
  ... a b c a b c │ a b c x y z  │ 前向缓冲区
                  └──────────────┘
                  cursor (当前位置)

匹配:在历史窗口中找到与当前前向缓冲区最长一致的子串,记录:

若匹配长度 >= MIN_MATCH(通常 3),输出反向引用 token (offset, length),否则输出字面量字节。

2.2 核心数据结构

LZ77Token {
    type    : enum { LITERAL, MATCH }

    // LITERAL 分支
    byte    : uint8          // 字面量字节值

    // MATCH 分支
    offset  : uint16         // 反向偏移量(>= 1,指向历史窗口)
    length  : uint16         // 匹配长度(>= MIN_MATCH = 3)
}

LZ77Stream {
    tokens  : LZ77Token[]   // 字面量与匹配混合序列
    window  : byte[]        // 滑动窗口(历史缓冲区,大小 W)
}

2.3 LZ77 编码伪代码(简化)

LZ77_encode(input: byte[], W: int = 65535, MIN_MATCH: int = 3) -> LZ77Token[]:
    tokens = []
    cursor = 0                          // 当前处理位置
    n = input.length

    while cursor < n:
        // 在历史窗口 [cursor-W, cursor) 中寻找最长匹配
        best_offset = 0
        best_length = 0

        window_start = max(0, cursor - W)
        for pos = window_start to cursor - 1:
            // 从 pos 与 cursor 同时向前比较
            match_len = 0
            while cursor + match_len < n
                  and input[pos + match_len] == input[cursor + match_len]:
                match_len = match_len + 1

            if match_len > best_length:
                best_length = match_len
                best_offset = cursor - pos  // 反向距离

        if best_length >= MIN_MATCH:
            // 输出反向引用 token
            tokens.append(Token(MATCH, offset=best_offset, length=best_length))
            cursor = cursor + best_length   // 跳过已匹配的字节
        else:
            // 输出字面量
            tokens.append(Token(LITERAL, byte=input[cursor]))
            cursor = cursor + 1

    return tokens

2.4 LZ77 解码伪代码

LZ77_decode(tokens: LZ77Token[]) -> byte[]:
    output = []

    for token in tokens:
        if token.type == LITERAL:
            output.append(token.byte)
        else:  // MATCH
            start = output.length - token.offset
            for i = 0 to token.length - 1:
                output.append(output[start + i])
                // 注意:start+i 可能追上 output 末尾(overlap copy,合法)

    return output

LZ4 核心设计

3.1 设计目标

LZ4 由 Yann Collet 于 2011 年发布,核心目标:解压速度超过内存带宽,使解压不成为瓶颈。

3.2 Hash Table 快速匹配

LZ77 朴素实现的匹配是 O(W) 的扫描,LZ4 用 hash table 加速:

HashTable {
    table : uint32[HASH_SIZE]   // position -> last seen position of this hash
    // HASH_SIZE = 2^16 (64KB 模式) 或 2^12 (16KB 快速模式)
}

对当前位置 cursor 取 4 字节,计算 hash:

hash4(cursor: int, input: byte[]) -> uint16:
    val = read_uint32_le(input, cursor)
    return (val * 0x9E3779B9) >> (32 - LOG2_HASH_SIZE)
    // Fibonacci hashing,均匀散布 4 字节值

匹配流程:

LZ4_find_match(cursor, input, hash_table):
    h = hash4(cursor, input)
    candidate = hash_table.table[h]         // 候选位置
    hash_table.table[h] = cursor            // 更新为当前位置

    if candidate != INVALID
       and cursor - candidate < MAX_DISTANCE (65535)
       and read_uint32_le(input, candidate) == read_uint32_le(input, cursor):
        // 4 字节匹配成功,继续向后扩展
        length = 4
        while cursor + length < n
              and input[candidate + length] == input[cursor + length]:
            length = length + 1
        return (offset = cursor - candidate, length = length)
    else:
        return NO_MATCH

关键特性:hash 冲突时直接丢弃旧位置(最多损失压缩率,不影响正确性)。

3.3 Token 格式

LZ4 压缩块由连续的 sequence 组成,每个 sequence 的结构:

Sequence {
    token     : uint8           // 高 4 bit = literal_len, 低 4 bit = match_len_extra
    extra_lit : byte[]          // 若 literal_len == 15,追加额外字节直到遇到 < 255
    literals  : byte[]          // literal_len 个字面量字节
    offset    : uint16_le       // 匹配偏移量(小端 2 字节)
    extra_mat : byte[]          // 若 match_len_extra == 15,追加额外字节
}

长度编码规则(以字面量长度为例):

encode_length(len: int, base: int) -> (token_nibble: uint4, extra: byte[]):
    // base = 字面量时为 0,匹配时为 MIN_MATCH(4)
    adjusted = len - base
    if adjusted < 15:
        return (token_nibble=adjusted, extra=[])
    else:
        extra = []
        adjusted = adjusted - 15
        while adjusted >= 255:
            extra.append(255)
            adjusted = adjusted - 255
        extra.append(adjusted)
        return (token_nibble=15, extra=extra)

3.4 完整 LZ4 编码伪代码

LZ4_compress(input: byte[]) -> byte[]:
    n = input.length
    output = []
    hash_table = HashTable(size=HASH_SIZE, fill=INVALID)

    cursor = 0
    lit_start = 0               // 当前字面量段的起始位置

    while cursor < n - MFLIMIT: // MFLIMIT=12,末尾保留字面量
        (offset, match_len) = LZ4_find_match(cursor, input, hash_table)

        if offset == NO_MATCH:
            cursor = cursor + 1
            continue

        // 编码字面量段 + 匹配段
        lit_len   = cursor - lit_start
        match_adj = match_len - MIN_MATCH (4)

        // token 字节
        token_nibble_lit = min(lit_len, 15)
        token_nibble_mat = min(match_adj, 15)
        output.append((token_nibble_lit << 4) | token_nibble_mat)

        // 超长字面量长度
        output.append_extra_length(lit_len - token_nibble_lit)

        // 字面量数据
        output.append_bytes(input, lit_start, lit_len)

        // 偏移量(小端 2 字节)
        output.append_uint16_le(offset)

        // 超长匹配长度
        output.append_extra_length(match_adj - token_nibble_mat)

        cursor = cursor + match_len
        lit_start = cursor

    // 末尾剩余字面量(无 offset/match)
    lit_len = n - lit_start
    token = min(lit_len, 15) << 4
    output.append(token)
    output.append_extra_length(lit_len - min(lit_len,15))
    output.append_bytes(input, lit_start, lit_len)

    return output

3.5 LZ4 解码伪代码

LZ4_decompress(input: byte[], original_size: int) -> byte[]:
    output = byte[original_size]
    ip = 0      // 输入指针
    op = 0      // 输出指针

    while ip < input.length:
        token = input[ip]; ip = ip + 1

        // 读取字面量长度
        lit_len = (token >> 4) & 0x0F
        if lit_len == 15:
            while True:
                extra = input[ip]; ip = ip + 1
                lit_len = lit_len + extra
                if extra != 255: break

        // 复制字面量
        copy(output, op, input, ip, lit_len)
        op = op + lit_len; ip = ip + lit_len

        if ip >= input.length: break     // 末尾无 match

        // 读取偏移量
        offset = read_uint16_le(input, ip); ip = ip + 2

        // 读取匹配长度
        match_len = (token & 0x0F) + MIN_MATCH (4)
        if (token & 0x0F) == 15:
            while True:
                extra = input[ip]; ip = ip + 1
                match_len = match_len + extra
                if extra != 255: break

        // 复制匹配(允许 overlap)
        match_start = op - offset
        for i = 0 to match_len - 1:
            output[op + i] = output[match_start + i]
        op = op + match_len

    return output

Snappy 核心设计

4.1 设计背景

Snappy 由 Google(Sanjay Ghemawat 等)于 2011 年开源,原名 Zippy,内部使用超过 10 年。设计约束:

4.2 核心数据结构

SnappyStream {
    preamble : varint32      // 原始数据长度(解压器预分配缓冲区用)
    elements  : SnappyElement[]
}

SnappyElement = Literal | CopyShort | CopyLong

Literal {
    tag     : uint8           // 低 2 bit = 00,高 6 bit 编码长度
    data    : byte[]
}

CopyShort {                   // 1~64 字节匹配,offset < 2KB
    tag     : uint8           // 低 2 bit = 01
                              // bit[2..4] = length - 4,bit[5..7] = offset 高 3 bit
    offset_lo : uint8         // offset 低 8 bit
}

CopyLong {                    // 1~64 字节匹配,offset 任意
    tag     : uint8           // 低 2 bit = 10 (2 字节 offset) 或 11 (4 字节 offset)
    length  : uint8           // 匹配长度
    offset  : uint16/uint32
}

4.3 Snappy 编码伪代码

Snappy_compress(input: byte[]) -> byte[]:
    n = input.length
    output = []
    output.append_varint32(n)            // preamble

    hash_table = new uint16[1 << 14]     // 16K 表,每项存 last position
    fill(hash_table, INVALID)

    ip = 0
    ip_end = n - 5
    next_emit = 0                        // 下一个字面量段起点

    if n >= 15:
        ip = ip + 1
        ip_hash = hash4(input, ip)

        while True:
            // 快速跳跃寻找匹配
            skip = 32
            ip_next = ip

            while True:
                ip = ip_next
                ip_next = ip + (skip >> 5)
                skip = skip + 1
                if ip_next > ip_end: goto emit_remainder

                h = hash4(input, ip)
                candidate = hash_table[h]
                hash_table[h] = ip

                if read_uint32(input, candidate) == read_uint32(input, ip):
                    break

            // 找到匹配,扩展
            lit_len = ip - next_emit
            match_len = 4
            while ip + match_len < n
                  and input[candidate + match_len] == input[ip + match_len]:
                match_len = match_len + 1
            offset = ip - candidate

            emit_literal(output, input, next_emit, lit_len)
            emit_copy(output, offset, match_len)

            ip = ip + match_len
            next_emit = ip
            if ip >= ip_end: goto emit_remainder

            // 更新 hash(当前和前一位置)
            hash_table[hash4(input, ip - 1)] = ip - 1
            h = hash4(input, ip)
            candidate = hash_table[h]
            hash_table[h] = ip
            if read_uint32(input, candidate) != read_uint32(input, ip):
                ip = ip + 1

    emit_remainder:
        if next_emit < n:
            emit_literal(output, input, next_emit, n - next_emit)

    return output

4.4 与 LZ4 的关键差异

特性 LZ4 Snappy
Hash table 大小 4K~64K(可配置) 固定 16K
最大 offset 65535 32767
匹配长度编码 变长追加字节 固定 3 种 tag
压缩率 略高于 Snappy 略低,更稳定
解压速度 >4 GB/s ~1-2 GB/s
熵编码

Zstd 核心设计

5.1 设计目标

Zstd(Zstandard)由 Facebook 的 Yann Collet 于 2016 年发布。目标:

5.2 核心架构:LZ77 + FSE

Zstd 的压缩流水线:

输入数据
    │
    ▼
[LZ77 匹配引擎]  →  序列流 (literal_len, offset, match_len)
    │
    ▼
[Huffman 编码字面量]  →  压缩字面量块
    │
[FSE 编码序列参数]   →  压缩序列块(offset/match_len 用 FSE)
    │
    ▼
输出帧

FSE(有限状态熵编码) 是 ANS(Asymmetric Numeral Systems)的一种实现,比 Huffman 更接近熵下界,且支持批量编解码。

5.3 核心数据结构

ZstdFrame {
    magic      : uint32 = 0xFD2FB528
    fhd        : FrameHeaderDescriptor
    header     : FrameHeader
    blocks     : ZstdBlock[]
    checksum   : uint32 (optional, xxHash64)
}

ZstdBlock {
    block_header : uint24         // last_block(1) + block_type(2) + block_size(21)
    data         : byte[]
}

// 压缩块内部结构
CompressedBlock {
    literals_section : LiteralsSection
    sequences_section: SequencesSection
}

LiteralsSection {
    header        : LiteralsHeader  // 编码类型:Raw/RLE/Huffman/TreelessHuffman
    huffman_tree  : HuffmanTree (optional)
    data          : byte[]          // Huffman 压缩的字面量
}

SequencesSection {
    nb_sequences  : uint8/uint16    // 序列数量
    symbol_comp   : uint8           // LLtype(2) + Offtype(2) + MLtype(2)
    fse_tables    : FSETables       // LL/ML/OF 的 FSE 解码表
    bitstream     : byte[]          // 逆序 bitstream,包含所有序列参数
}

5.4 Zstd 序列编码伪代码(概念级)

Zstd_encode_sequences(sequences: Sequence[]) -> byte[]:
    // sequences 中每项为 (literal_len, offset_code, match_len)
    // offset_code 经过特殊变换:offset_code = log2(offset) + extra bits

    // 构建 FSE 表(基于序列中各参数的频率分布)
    ll_freqs = count_frequencies([s.literal_len for s in sequences])
    ml_freqs = count_frequencies([s.match_len   for s in sequences])
    of_freqs = count_frequencies([s.offset_code for s in sequences])

    ll_table = FSE_build_table(ll_freqs, LL_MAX_LOG)
    ml_table = FSE_build_table(ml_freqs, ML_MAX_LOG)
    of_table = FSE_build_table(of_freqs, OF_MAX_LOG)

    // 逆序 ANS 编码(从最后一个序列开始,倒序写入 bitstream)
    state_ll = FSE_init_state(ll_table)
    state_ml = FSE_init_state(ml_table)
    state_of = FSE_init_state(of_table)

    bitstream = BitWriter()
    for seq in reversed(sequences):
        FSE_encode_symbol(state_ll, seq.literal_len, ll_table, bitstream)
        FSE_encode_symbol(state_ml, seq.match_len,   ml_table, bitstream)
        FSE_encode_symbol(state_of, seq.offset_code, of_table, bitstream)

    bitstream.flush_states(state_ll, state_ml, state_of)
    return bitstream.to_bytes()

5.5 可训练字典

Zstd 支持从代表性样本集中训练字典(Dictionary),字典本质上是预加载的历史窗口

// 训练(离线)
dictionary = zstd_train_dict(samples: byte[][], dict_size: int = 112 * 1024)
    // 内部使用 Cover 算法选取高覆盖率的字节序列
    // 输出一个 dict_size 字节的字典文件

// 压缩(使用字典)
Zstd_compress_with_dict(input: byte[], dict: byte[], level: int) -> byte[]:
    ctx = Zstd_create_context()
    Zstd_load_dict(ctx, dict)
    // 压缩时历史窗口初始化为字典内容
    return Zstd_compress(ctx, input, level)

// 解压(使用字典)
Zstd_decompress_with_dict(compressed: byte[], dict: byte[]) -> byte[]:
    ctx = Zstd_create_context()
    Zstd_load_dict(ctx, dict)
    return Zstd_decompress(ctx, compressed)

字典效果:对 1KB 的 JSON 消息,无字典压缩率 ~1.2x,有字典压缩率可达 ~4x。

5.6 压缩级别参数

级别 压缩速度 解压速度 压缩率 对比
1 ~500 MB/s ~1.5 GB/s ~2.8x 接近 LZ4
3(默认) ~250 MB/s ~1.2 GB/s ~3.2x 替代 zlib 级别 6
9 ~60 MB/s ~1.1 GB/s ~3.6x
19 ~5 MB/s ~1.0 GB/s ~4.2x 接近 gzip -9
22 ~1 MB/s ~0.9 GB/s ~4.5x 接近 bzip2

执行追踪

以 LZ4 压缩字符串 "abcabcabc" 为例(ASCII 字节)。

6.1 输入

索引:    0  1  2  3  4  5  6  7  8
字节:    a  b  c  a  b  c  a  b  c
十六进制: 61 62 63 61 62 63 61 62 63

6.2 Hash Table 初始化

hash_table = [INVALID] × HASH_SIZE
cursor = 0, lit_start = 0

6.3 逐步追踪

cursor=0('a')

hash4(input, 0) → hash("abca") = H1
hash_table[H1] = INVALID → 无匹配
hash_table[H1] = 0
cursor = 1(无匹配,继续)

cursor=1('b')

hash4(input, 1) → hash("bcab") = H2
hash_table[H2] = INVALID → 无匹配
hash_table[H2] = 1
cursor = 2

cursor=2('c')

hash4(input, 2) → hash("cabc") = H3
hash_table[H3] = INVALID → 无匹配
hash_table[H3] = 2
cursor = 3

cursor=3('a')

hash4(input, 3) → hash("abca") = H1(与 cursor=0 相同)
candidate = hash_table[H1] = 0
hash_table[H1] = 3

验证:read_uint32(input, 0) = "abca" == read_uint32(input, 3) = "abca" ✓

扩展匹配长度:
  match_len = 4("abca" 已匹配)
  index 4 vs 1: 'b' == 'b' → match_len = 5
  index 5 vs 2: 'c' == 'c' → match_len = 6
  index 6 vs 3: 'a' == 'a' → match_len = 7
  index 7 vs 4: 'b' == 'b' → match_len = 8  ← 注意:candidate+8=8 超出原始串
  index 8 vs 5: 'c' == 'c' → match_len = 9  ← 整个剩余串都匹配(overlap copy)
  match_len 受限于 n - cursor = 9 - 3 = 6(末尾 MFLIMIT 限制)
  取 match_len = 6

offset = cursor - candidate = 3 - 0 = 3

生成 Sequence

lit_start = 0, cursor = 3
literal_len = 3 - 0 = 3   → 字面量: [a, b, c]
match_len   = 6
match_adj   = 6 - 4 = 2   → token 低 4 bit = 2

token 字节:
  高 4 bit = min(3, 15) = 3  (字面量长度)
  低 4 bit = min(2, 15) = 2  (匹配长度 - 4)
  token = 0x32

offset = 0x0003 (小端: 03 00)

输出字节序列

[0x32]           token
[0x61 0x62 0x63] 字面量 "abc"
[0x03 0x00]      offset = 3
(无 extra_mat,match_len_adj=2 < 15)

末尾无更多匹配(cursor=9 >= n-MFLIMIT),输出末尾字面量段(空)

完整压缩输出(7 字节 vs 原始 9 字节):

原始: 61 62 63 61 62 63 61 62 63  (9 字节)
压缩: 32 61 62 63 03 00 00        (7 字节,压缩率约 1.29x)
      ^token ^literals  ^offset ^末尾token

解压验证

读 token=0x32: lit_len=3, match_len_adj=2
读 3 字节字面量: output = [a, b, c]   op=3
读 offset=3: match_start = 3 - 3 = 0
match_len = 2 + 4 = 6
overlap copy: output[0..5] → output[3..8]
  i=0: output[3] = output[0] = 'a'
  i=1: output[4] = output[1] = 'b'
  i=2: output[5] = output[2] = 'c'
  i=3: output[6] = output[3] = 'a'
  i=4: output[7] = output[4] = 'b'
  i=5: output[8] = output[5] = 'c'
output = [a, b, c, a, b, c, a, b, c] ✓

压缩率 vs 速度基准

以 Silesia corpus(典型混合数据集,211 MB)和 Calgary corpus 的测试结果为参考:

算法 压缩率(混合文本) 压缩速度 解压速度 内存占用
gzip -6 ~2.8x ~30 MB/s ~350 MB/s
LZ4 默认 ~2.0x ~700 MB/s >4 GB/s 极低
Snappy ~1.9x ~300 MB/s ~1.5 GB/s
Zstd -1 ~2.8x ~500 MB/s ~1.5 GB/s
Zstd -3 ~3.2x ~250 MB/s ~1.2 GB/s
Zstd -9 ~3.6x ~60 MB/s ~1.1 GB/s
bzip2 ~3.7x ~14 MB/s ~40 MB/s

不同数据类型的表现

数据类型 LZ4 压缩率 Snappy 压缩率 Zstd-3 压缩率
英文文本 2.2x 2.0x 3.5x
JSON 日志 2.5x 2.3x 4.0x
Protobuf 1.8x 1.7x 3.2x
二进制/随机 ~1.0x ~1.0x ~1.05x
CSV 数值 1.6x 1.5x 2.8x

各系统的选择

8.1 Kafka

Kafka 在 Producer 端支持 compression.type 配置,对整个消息批次压缩:

# Producer 配置
compression.type=lz4     # 选项:none/gzip/snappy/lz4/zstd
batch.size=65536         # 批次越大,压缩率越高
linger.ms=5              # 等待更多消息填充批次

选择建议

8.2 RocksDB

RocksDB 对不同 Level 可配置不同压缩算法:

// 按 Level 配置(典型生产配置)
options.compression_per_level = {
    CompressionType::kNoCompression,    // Level 0: 频繁 compaction,不压缩
    CompressionType::kNoCompression,    // Level 1
    CompressionType::kLZ4Compression,  // Level 2+: 读多写少,LZ4 解压快
    CompressionType::kLZ4Compression,
    CompressionType::kZSTD,            // Level 5+: 冷数据,高压缩率
    CompressionType::kZSTD,
    CompressionType::kZSTD,
};
options.bottommost_compression = CompressionType::kZSTD;
options.zstd_max_train_bytes = 100 * 1024;  // Zstd 字典训练

默认值:Snappy(历史遗留,多数场景 LZ4 更优)。

8.3 ClickHouse

ClickHouse 默认使用 LZ4 压缩列数据,可按列配置:

CREATE TABLE events (
    event_time  DateTime    CODEC(Delta, LZ4),    -- Delta 预处理 + LZ4
    user_id     UInt64      CODEC(LZ4),
    event_data  String      CODEC(ZSTD(3)),       -- 字符串用 Zstd 压缩率更高
    raw_bytes   FixedString(64) CODEC(LZ4HC(9))  -- LZ4 高压缩变体
) ENGINE = MergeTree ORDER BY event_time;

LZ4 作为默认的原因:ClickHouse 查询时需要大量解压列数据,LZ4 解压速度使 CPU 不成为瓶颈。

8.4 Parquet

Parquet 在 RowGroup(行组)级别压缩,不同列可用不同编解码器:

import pyarrow.parquet as pq
import pyarrow as pa

# 写入时配置压缩
pq.write_table(
    table,
    "output.parquet",
    compression="snappy",         # 全局: snappy/gzip/brotli/zstd/lz4/none
    use_dictionary=True,
    column_encoding={
        "timestamp": "DELTA_BINARY_PACKED",  # 时间戳用 Delta
        "user_id":   "DELTA_BINARY_PACKED",
    }
)

各格式生态的默认值


异常与边界场景

场景 1:不可压缩数据(随机字节)

情况:输入为 AES 加密后的随机数据或真随机字节流,LZ77 无法找到任何有效匹配。

处理方式

LZ4 的处理策略:

// LZ4 block 头部包含 compressed_size 和 original_size
// 若压缩后 >= 原始大小,可选择存储原始数据
if compressed_size >= original_size:
    output_block_header.block_type = UNCOMPRESSED
    output_data = original_data    // 原样存储,不压缩

量化影响:压缩 1 MB 随机数据:

实践建议:对加密数据不要开启压缩(先压缩后加密,或跳过压缩层)。

场景 2:并发压缩(多线程安全)

情况:多个线程同时压缩不同数据,是否存在共享状态竞争?

处理方式:LZ4、Snappy、Zstd 均设计为无全局共享状态

// LZ4 多线程示例(伪代码)
thread_pool.submit_all([
    () -> LZ4_compress(data1),   // 线程 1:独立 hash_table(栈上分配)
    () -> LZ4_compress(data2),   // 线程 2:独立 hash_table
    () -> LZ4_compress(data3),   // 线程 3:独立 hash_table
])
// 三者完全独立,无锁,无竞争

// Zstd 流式上下文(复用 context 时需外部同步)
ctx1 = ZSTD_createCCtx()   // 每个线程一个 context
ctx2 = ZSTD_createCCtx()
// ctx1 和 ctx2 可并发使用,不需要锁

注意:若复用同一个 ZSTD_CCtx(Zstd 压缩上下文)在多线程中调用,会有竞争。应为每个线程分配独立 context,或使用 Zstd 的 ZSTD_compressStream2 线程安全 API(内部管理线程池)。

场景 3:流式压缩(大文件分块处理)

情况:输入文件为 10 GB,无法一次性载入内存,需要流式处理。

处理方式:三种算法均提供 streaming API:

// LZ4 流式压缩伪代码
stream = LZ4_createStream()
ring_buffer = byte[RING_BUFFER_SIZE]  // 循环缓冲区作为历史窗口

for chunk in read_chunks(input_file, CHUNK_SIZE=64*1024):
    // 写入环形缓冲区(保留历史供跨块引用)
    pos = write_to_ring(ring_buffer, chunk)

    compressed = LZ4_compress_continue(stream, ring_buffer[pos], len(chunk))
    write(output_file, compressed)

LZ4_freeStream(stream)

跨块引用:LZ4 流模式中,当前块可引用前块已压缩的内容(通过保留历史缓冲区),比独立块压缩率更高。

Zstd 流式压缩

// Zstd 流式压缩(使用 frame 格式)
cctx = ZSTD_createCCtx()
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 3)

while chunk = read(input, CHUNK_SIZE):
    in_buf  = {src=chunk, size=len(chunk), pos=0}
    out_buf = {dst=output_buf, size=OUT_SIZE, pos=0}

    mode = END_OF_STREAM if is_last_chunk else CONTINUE
    ZSTD_compressStream2(cctx, out_buf, in_buf, mode)
    write(output_file, out_buf.dst, out_buf.pos)

场景 4:极短数据(< 16 字节)

情况:压缩单个 UUID 或短字符串(如 "hello")。

处理方式

if len(data) < MIN_COMPRESS_THRESHOLD (64):
    return data                    // 不压缩
else:
    return compress(data)

场景 5:offset 引用超出解压缓冲区边界

情况:损坏的压缩数据中,offset 值大于当前已解压的字节数。

处理方式(安全实现)

LZ4_decompress_safe(input, output, input_size, max_output_size):
    // 每次 match copy 前检查边界
    match_start = op - offset

    if match_start < output_start:
        // offset 超出已解压区域,数据损坏
        return LZ4_ERROR_MALFORMED_INPUT

    if op + match_len > output_end:
        // 输出缓冲区不够,防止越界写
        return LZ4_ERROR_OUTPUT_OVERFLOW

    // 安全时才 copy
    copy(output, op, output, match_start, match_len)

LZ4 提供 LZ4_decompress_safe(安全)和 LZ4_decompress_fast(无边界检查,仅用于可信数据)两个版本。


参考资料

  1. LZ4 官方仓库:Yann Collet. lz4 - Extremely Fast Compression algorithm. GitHub
  2. Snappy 官方仓库:Google. Snappy - A fast compressor/decompressor. GitHub;Snappy Format Description. format_description.txt
  3. Zstd 官方仓库:Facebook. Zstandard - Fast real-time compression algorithm. GitHub
  4. Zstd 论文:Yann Collet. "Finite State Entropy." 2013. 博客
  5. LZ77 原论文:Ziv J., Lempel A. "A Universal Algorithm for Sequential Data Compression." IEEE Transactions on Information Theory, 1977.
  6. lzbench 基准:inikep. lzbench - in-memory benchmark of open-source LZ77/LZMA/BWTS/FSE compression algorithms. GitHub
  7. ClickHouse 压缩文档:ClickHouse Column Compression Codecs. 官方文档
  8. RocksDB 压缩文档:RocksDB Compression. Wiki
  9. Kafka 压缩文档:Apache Kafka Producer Compression. 官方文档
  10. 《数据密集型应用系统设计》(DDIA)第 3 章:列式存储与压缩。
← 返回列表

评论 (0)

暂无评论,来留下第一条吧。

发表评论