Skip to content

CQUT-OpenProject/JSJWL_AI-Stream-Proxy

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

AI Stream Proxy

AI-Stream-Proxy 是一个用于验证 AI 流式响应断线恢复策略的 Rust 原型。项目使用模拟服务端生成带序号的流式事件,并在服务端与客户端之间加入代理层进行故障注入、事件缓存、重放和 ACK 处理。

真实 AI 流式接口通常通过 HTTP Streaming 或 SSE 返回增量内容。本文关注的问题是:客户端已经观察到较高序号事件时,是否可以直接用最大已见序号作为续传游标。项目通过 baseline、responses_like 和 optimized 三种模式,对比最大已见序号与连续提交序号在断线、乱序和重复事件场景下的恢复效果。

架构

 ┌──────────────┐    TCP/JSON Lines    ┌──────────────┐    TCP/JSON Lines    ┌──────────────┐
 │  AI Server   │ ◄──────────────────► │    Proxy     │ ◄──────────────────► │   Client     │
 │    (模拟)     │     127.0.0.1:9000   │  (故障注入)   │     127.0.0.1:8000   │  (CLI/Demo)  │
 └──────────────┘                      └──────────────┘                      └──────────────┘

Server 从文本文件分块模拟 AI 流式输出,Proxy 在下游路径注入延迟、抖动、缓冲、断线、空闲超时、重复、拆包/合包和乱序等网络故障,Client 消费流式内容并记录指标。

Quick Start

cargo run --bin demo
# 浏览器打开 http://127.0.0.1:7000 查看双栏对比演示

手动运行各组件:

# 终端 1:启动模拟 AI 服务端
cargo run --bin server

# 终端 2:启动代理(故障注入)
cargo run --bin proxy -- --config config/normal.json

# 终端 3:启动客户端
cargo run --bin client -- --mode optimized

模式

模式 序号机制 ACK 续传依据 行为说明
baseline 无事件级恢复序号 连接中断后终止本次流式传输
responses_like sequence_number last_seen_seq 顺序断线可恢复;乱序缺口场景中可能跳过未提交事件
optimized seq + committed_seq committed_seq 从连续提交前缀之后保守重放,重复事件由客户端去重

三种模式共享 StartDone 等会话消息。数据事件按模式区分:baseline 与 optimized 使用 Delta,responses_like 使用 ResponsesLikeDelta,并额外模拟 ResponseCreated

故障场景

config/ 目录下的 JSON 文件定义故障注入参数:

配置文件 关键参数 模拟场景
normal.json 全部归零 无故障基线
buffering.json buffer_ms=500, buffer_bytes=4096 延迟缓冲发送
disconnect.json delay_ms=50, jitter_ms=30, disconnect_after_seq=8 顺序传输中的连接断开
idle_timeout.json idle_timeout_ms=2000 服务端暂停触发代理空闲超时
split_merge.json duplicate_rate=0.1, split_packet=true, merge_window_ms=50 拆包、合包和重复事件
out_of_order_disconnect.json reorder_window_size=2, disconnect_after_seq=4 乱序到达后断线

关键机制

  • 连续提交 ACK:optimized 模式下,客户端在处理数据事件后发送 Ack,其中 ack_seq 表示刚到达的事件序号,committed_seq 表示已经连续提交到本地状态的最高序号。ACK 经 Proxy 转发后产生 AckReceipt,客户端据此统计控制路径上的 AckReceipt 等待。该指标不等同于底层物理网络 RTT。

  • ReplayStore:Proxy 在内存中缓存可重放的数据事件和完成事件。客户端重连时发送 Resume(after_seq, committed_seq)。responses_like 使用 last_seen_seq 作为 after_seq;optimized 使用 committed_seq,代理端再结合已收到 ACK 中的提交进度选择保守重放起点,避免跳过尚未连续提交的缺口。

  • 确定性故障注入:故障注入使用 SHA-256(seed + seq + purpose) 派生确定性随机值。相同配置下,抖动位置和重复事件决策保持一致,便于复现实验。

  • JSON Lines 编解码:消息以 JSON Lines 传输,LineCodec 按换行符切分 TCP 字节流,可处理单次读取中的多条消息、跨读取边界的拆分消息和尾随空白。

基准测试结果

默认运行:

cargo run --bin benchmark

会执行 6 个场景 × 3 种模式,每个组合运行 1 次,共生成 18 条记录。论文中的实验结果使用每个组合 10 次重复运行:

cargo run --bin benchmark -- --iterations 10

输出文件为 results/benchmark.csv。当前仓库中的 CSV 包含 180 条记录,与论文表格对应。

测试结果:

场景 baseline responses_like optimized
normal final_match=true final_match=true final_match=true
buffering final_match=true final_match=true final_match=true
disconnect final_match=false final_match=true, 1 reconnect final_match=true, 1 reconnect
idle_timeout final_match=false final_match=true, 1 reconnect final_match=true, 1 reconnect
split_merge final_match=true, 16 dup final_match=true, 16 dup final_match=true, 16 dup
out_of_order_disconnect final_match=false, lost=153 final_match=false, lost=1 final_match=true, 1 reconnect
  • baseline 在断线、空闲超时和乱序后断线场景中无法保持最终内容一致。
  • 顺序断线场景中,responses_like 和 optimized 均能完成续传;optimized 额外产生 ACK 控制消息。
  • 乱序后断线场景中,responses_like 以 last_seen_seq 续传会跳过缺口事件;optimized 以 committed_seq 为恢复点,能够通过保守重放补齐缺口。
  • split_merge 场景中三种模式最终内容均一致,但都记录到重复事件,说明去重逻辑参与了最终一致性维护。
  • avg_ack_receipt_wait_ms 来自 AckReceipt 样本,只表示控制路径等待时间,不表示真实公网 RTT。

CLI 参考

所有二进制文件使用 clap 参数解析,通过 --help 查看完整选项。

server(模拟 AI 服务端)

--listen <ADDR>          监听地址,默认 127.0.0.1:9000
--script <FILE>          流式输出文本,默认 data/long_answer.txt
--chunk-size <N>         每次发送字符数,默认 12
--interval-ms <N>        块间隔毫秒数,默认 80
--pause-after-seq <N>    指定序号后暂停,用于空闲超时测试
--pause-ms <N>           暂停时长毫秒

论文实验中 benchmark 会显式使用 --interval-ms 35,手动运行 server 时默认值为 80 ms。

proxy(网络故障注入代理)

--listen <ADDR>          客户端连接地址,默认 127.0.0.1:8000
--upstream <ADDR>        上游服务端地址,默认 127.0.0.1:9000
--config <FILE>          故障场景配置文件

client(流式 CLI 客户端)

--proxy <ADDR>           代理地址,默认 127.0.0.1:8000
--mode <MODE>            模式:baseline / responses_like / optimized
--task <TASK>            任务类型:chat / image
--prompt <TEXT>          提示词
--scenario <NAME>        场景名称,用于指标记录,默认 "manual"
--max-reconnects <N>     最大重连次数,默认 3
--timeout-ms <N>         超时毫秒数,默认 20000
--expected-last-seq <N>  期望最后序号,用于未完成传输的丢失统计
--stream-id <UUID>       指定流 ID,用于重连测试

demo(浏览器演示 HTTP 服务)

--listen <ADDR>          HTTP 监听地址,默认 127.0.0.1:7000

benchmark(基准测试运行器)

--out <FILE>             输出 CSV 路径,默认 results/benchmark.csv
--timeout-secs <N>       每用例超时秒数,默认 30
--iterations <N>         每个场景×模式组合的运行次数,默认 1

构建与测试

cargo build --release     # 发布构建
cargo test                # 运行单元测试
cargo run --bin benchmark # 运行完整基准测试

About

《计算机网络》课程设计:面向 AI 流式响应提交恢复的ACK续传机制设计与初步评估

Resources

License

Stars

Watchers

Forks

Contributors