AI-Stream-Proxy 是一个用于验证 AI 流式响应断线恢复策略的 Rust 原型。项目使用模拟服务端生成带序号的流式事件,并在服务端与客户端之间加入代理层进行故障注入、事件缓存、重放和 ACK 处理。
真实 AI 流式接口通常通过 HTTP Streaming 或 SSE 返回增量内容。本文关注的问题是:客户端已经观察到较高序号事件时,是否可以直接用最大已见序号作为续传游标。项目通过 baseline、responses_like 和 optimized 三种模式,对比最大已见序号与连续提交序号在断线、乱序和重复事件场景下的恢复效果。
┌──────────────┐ TCP/JSON Lines ┌──────────────┐ TCP/JSON Lines ┌──────────────┐
│ AI Server │ ◄──────────────────► │ Proxy │ ◄──────────────────► │ Client │
│ (模拟) │ 127.0.0.1:9000 │ (故障注入) │ 127.0.0.1:8000 │ (CLI/Demo) │
└──────────────┘ └──────────────┘ └──────────────┘
Server 从文本文件分块模拟 AI 流式输出,Proxy 在下游路径注入延迟、抖动、缓冲、断线、空闲超时、重复、拆包/合包和乱序等网络故障,Client 消费流式内容并记录指标。
cargo run --bin demo
# 浏览器打开 http://127.0.0.1:7000 查看双栏对比演示手动运行各组件:
# 终端 1:启动模拟 AI 服务端
cargo run --bin server
# 终端 2:启动代理(故障注入)
cargo run --bin proxy -- --config config/normal.json
# 终端 3:启动客户端
cargo run --bin client -- --mode optimized| 模式 | 序号机制 | ACK | 续传依据 | 行为说明 |
|---|---|---|---|---|
| baseline | 无事件级恢复序号 | 无 | 无 | 连接中断后终止本次流式传输 |
| responses_like | sequence_number |
无 | last_seen_seq |
顺序断线可恢复;乱序缺口场景中可能跳过未提交事件 |
| optimized | seq + committed_seq |
有 | committed_seq |
从连续提交前缀之后保守重放,重复事件由客户端去重 |
三种模式共享 Start 和 Done 等会话消息。数据事件按模式区分:baseline 与 optimized 使用 Delta,responses_like 使用 ResponsesLikeDelta,并额外模拟 ResponseCreated。
config/ 目录下的 JSON 文件定义故障注入参数:
| 配置文件 | 关键参数 | 模拟场景 |
|---|---|---|
| normal.json | 全部归零 | 无故障基线 |
| buffering.json | buffer_ms=500, buffer_bytes=4096 | 延迟缓冲发送 |
| disconnect.json | delay_ms=50, jitter_ms=30, disconnect_after_seq=8 | 顺序传输中的连接断开 |
| idle_timeout.json | idle_timeout_ms=2000 | 服务端暂停触发代理空闲超时 |
| split_merge.json | duplicate_rate=0.1, split_packet=true, merge_window_ms=50 | 拆包、合包和重复事件 |
| out_of_order_disconnect.json | reorder_window_size=2, disconnect_after_seq=4 | 乱序到达后断线 |
-
连续提交 ACK:optimized 模式下,客户端在处理数据事件后发送
Ack,其中ack_seq表示刚到达的事件序号,committed_seq表示已经连续提交到本地状态的最高序号。ACK 经 Proxy 转发后产生AckReceipt,客户端据此统计控制路径上的 AckReceipt 等待。该指标不等同于底层物理网络 RTT。 -
ReplayStore:Proxy 在内存中缓存可重放的数据事件和完成事件。客户端重连时发送
Resume(after_seq, committed_seq)。responses_like 使用last_seen_seq作为after_seq;optimized 使用committed_seq,代理端再结合已收到 ACK 中的提交进度选择保守重放起点,避免跳过尚未连续提交的缺口。 -
确定性故障注入:故障注入使用
SHA-256(seed + seq + purpose)派生确定性随机值。相同配置下,抖动位置和重复事件决策保持一致,便于复现实验。 -
JSON Lines 编解码:消息以 JSON Lines 传输,
LineCodec按换行符切分 TCP 字节流,可处理单次读取中的多条消息、跨读取边界的拆分消息和尾随空白。
默认运行:
cargo run --bin benchmark会执行 6 个场景 × 3 种模式,每个组合运行 1 次,共生成 18 条记录。论文中的实验结果使用每个组合 10 次重复运行:
cargo run --bin benchmark -- --iterations 10输出文件为 results/benchmark.csv。当前仓库中的 CSV 包含 180 条记录,与论文表格对应。
测试结果:
| 场景 | baseline | responses_like | optimized |
|---|---|---|---|
| normal | final_match=true | final_match=true | final_match=true |
| buffering | final_match=true | final_match=true | final_match=true |
| disconnect | final_match=false | final_match=true, 1 reconnect | final_match=true, 1 reconnect |
| idle_timeout | final_match=false | final_match=true, 1 reconnect | final_match=true, 1 reconnect |
| split_merge | final_match=true, 16 dup | final_match=true, 16 dup | final_match=true, 16 dup |
| out_of_order_disconnect | final_match=false, lost=153 | final_match=false, lost=1 | final_match=true, 1 reconnect |
- baseline 在断线、空闲超时和乱序后断线场景中无法保持最终内容一致。
- 顺序断线场景中,responses_like 和 optimized 均能完成续传;optimized 额外产生 ACK 控制消息。
- 乱序后断线场景中,responses_like 以
last_seen_seq续传会跳过缺口事件;optimized 以committed_seq为恢复点,能够通过保守重放补齐缺口。 - split_merge 场景中三种模式最终内容均一致,但都记录到重复事件,说明去重逻辑参与了最终一致性维护。
avg_ack_receipt_wait_ms来自 AckReceipt 样本,只表示控制路径等待时间,不表示真实公网 RTT。
所有二进制文件使用 clap 参数解析,通过 --help 查看完整选项。
--listen <ADDR> 监听地址,默认 127.0.0.1:9000
--script <FILE> 流式输出文本,默认 data/long_answer.txt
--chunk-size <N> 每次发送字符数,默认 12
--interval-ms <N> 块间隔毫秒数,默认 80
--pause-after-seq <N> 指定序号后暂停,用于空闲超时测试
--pause-ms <N> 暂停时长毫秒
论文实验中 benchmark 会显式使用 --interval-ms 35,手动运行 server 时默认值为 80 ms。
--listen <ADDR> 客户端连接地址,默认 127.0.0.1:8000
--upstream <ADDR> 上游服务端地址,默认 127.0.0.1:9000
--config <FILE> 故障场景配置文件
--proxy <ADDR> 代理地址,默认 127.0.0.1:8000
--mode <MODE> 模式:baseline / responses_like / optimized
--task <TASK> 任务类型:chat / image
--prompt <TEXT> 提示词
--scenario <NAME> 场景名称,用于指标记录,默认 "manual"
--max-reconnects <N> 最大重连次数,默认 3
--timeout-ms <N> 超时毫秒数,默认 20000
--expected-last-seq <N> 期望最后序号,用于未完成传输的丢失统计
--stream-id <UUID> 指定流 ID,用于重连测试
--listen <ADDR> HTTP 监听地址,默认 127.0.0.1:7000
--out <FILE> 输出 CSV 路径,默认 results/benchmark.csv
--timeout-secs <N> 每用例超时秒数,默认 30
--iterations <N> 每个场景×模式组合的运行次数,默认 1
cargo build --release # 发布构建
cargo test # 运行单元测试
cargo run --bin benchmark # 运行完整基准测试