Streaming with Backpressure (repe::stream)¶
When you need to push multi-GB blobs (capture files, log streams, paginated query results) from a server to a peer over notify messages, the bare notify primitive does not give you flow control or reconnect-resume. The repe::stream module adds three pieces:
- ACK-driven window credit. The producer waits before sending if the receiver is more than
window_bytesbehind on ACKs; an ACK releases the window and unparks the producer. Defaults to 64 MiB. - Idle watchdog. A transfer that has gone silent for longer than
idle_timeout(default 60 s) is force-cancelled. - Replay ring + reconnect. Each emitted body lands in a byte-bounded ring (default 64 MiB). On a brief disconnect the producer parks; an inbound resume handler validates a
last_received_offsetagainst the ring and swaps in the new peer, after which the producer replays the ring tail and continues.
The wire shape (transfer_begin, chunk body fields, ACK / cancel / resume bodies) is up to the embedder. This module deals only in offsets, ACKs, and opaque body bytes. See TransferControl, TransferRegistry<K>, and spawn_watchdog for the full surface.
Before reaching for this module: if your payload is a serialized + compressed in-memory value of unknown compressed length (BEVE + zstd), the Streaming Serialized Values proposal is a lightweight client-pull transfer that bounds memory on both ends (file or in-memory value output) using request/response as the flow control — no credit window or replay ring.
repe::streamearns its full machinery specifically when you need pipelined push throughput with backpressure or resume across a dropped connection, which client-pull defers.
Sketch¶
use std::sync::Arc;
use std::time::{Duration, Instant};
use repe::{NotifyBody, PeerSendError, ReconnectOutcome,
TransferControl, TransferRegistry, spawn_watchdog};
#[derive(Hash, Eq, PartialEq, Copy, Clone)]
struct TransferId(u64);
let registry: Arc<TransferRegistry<TransferId>> = Arc::new(TransferRegistry::new());
spawn_watchdog(Arc::clone(®istry), Duration::from_secs(60));
let peer = make_peer(); // your embedder-side PeerHandle factory
let control = TransferControl::new(64 * 1024 * 1024);
control.set_peer(peer);
registry.register(TransferId(1), Arc::clone(&control));
let chunk_offset: u64 = 0;
let chunk_len: u64 = 4 * 1024 * 1024;
let body: Vec<u8> = body_for_chunk(chunk_offset);
control.wait_for_credit(chunk_len, Instant::now() + Duration::from_secs(30))?;
// push_replay records the chunk in the replay ring *before* the send, so a
// Disconnected error can't leave the ring missing it. `chunk_len` is the
// logical (ACK-domain) length; `body` is the wire payload, which may be longer.
control.push_replay(chunk_offset, chunk_len, false, body.clone());
let p = control.peer().expect("peer installed");
match p.send_notify("/file_chunk", NotifyBody::Beve(body)) {
Ok(()) => control.record_sent(chunk_offset + chunk_len),
Err(PeerSendError::Disconnected) => match control.wait_for_reconnect(Duration::from_secs(30)) {
ReconnectOutcome::ResumeReady(resume) => {
// request_resume already swapped the new peer into the slot and
// consumed the staged request. Re-emit the ring tail from the
// receiver's last-received offset, then continue producing.
let p = control.peer().expect("peer installed by request_resume");
for chunk in control.replay_chunks_from(resume.resume_at_offset) {
let _ = p.send_notify("/file_chunk", NotifyBody::Beve(chunk.body_bytes.to_vec()));
}
}
ReconnectOutcome::Cancelled(_) | ReconnectOutcome::Timeout => { /* abort */ }
},
Err(_) => { /* application policy */ }
}
// Inbound handlers (run in your existing dispatch path):
// ack: registry.get(id).map(|c| c.record_ack(file_index, offset));
// cancel: registry.get(id).map(|c| c.cancel("client cancelled"));
// resume: registry.get(id).and_then(|c|
// c.request_resume(new_peer, file_index, offset).ok());
Defaults¶
DEFAULT_WINDOW_BYTES, DEFAULT_BACKPRESSURE_TIMEOUT, DEFAULT_IDLE_TIMEOUT, DEFAULT_REPLAY_RING_BYTES, and DEFAULT_RECONNECT_TIMEOUT are tuned for LAN-class links pushing multi-GB files. Lower the window on slow links; raise the reconnect timeout for clients with long roaming windows.
Watchdog Lifecycle¶
spawn_watchdog holds a Weak<TransferRegistry<K>>, not an Arc. Dropping the embedder's last strong reference terminates the watchdog thread on its next tick (clamped to [1 s, 5 s]). For a process-wide singleton this means the thread lives for the process; embedders that build short-lived registries (per-test, per-tenant) get clean teardown for free.