Skip to content

Streaming with Backpressure (repe::stream)

When you need to push multi-GB blobs (capture files, log streams, paginated query results) from a server to a peer over notify messages, the bare notify primitive does not give you flow control or reconnect-resume. The repe::stream module adds three pieces:

  • ACK-driven window credit. The producer waits before sending if the receiver is more than window_bytes behind on ACKs; an ACK releases the window and unparks the producer. Defaults to 64 MiB.
  • Idle watchdog. A transfer that has gone silent for longer than idle_timeout (default 60 s) is force-cancelled.
  • Replay ring + reconnect. Each emitted body lands in a byte-bounded ring (default 64 MiB). On a brief disconnect the producer parks; an inbound resume handler validates a last_received_offset against the ring and swaps in the new peer, after which the producer replays the ring tail and continues.

The wire shape (transfer_begin, chunk body fields, ACK / cancel / resume bodies) is up to the embedder. This module deals only in offsets, ACKs, and opaque body bytes. See TransferControl, TransferRegistry<K>, and spawn_watchdog for the full surface.

Before reaching for this module: if your payload is a serialized + compressed in-memory value of unknown compressed length (BEVE + zstd), the Streaming Serialized Values proposal is a lightweight client-pull transfer that bounds memory on both ends (file or in-memory value output) using request/response as the flow control — no credit window or replay ring. repe::stream earns its full machinery specifically when you need pipelined push throughput with backpressure or resume across a dropped connection, which client-pull defers.

Sketch

use std::sync::Arc;
use std::time::{Duration, Instant};
use repe::{NotifyBody, PeerSendError, ReconnectOutcome,
    TransferControl, TransferRegistry, spawn_watchdog};

#[derive(Hash, Eq, PartialEq, Copy, Clone)]
struct TransferId(u64);

let registry: Arc<TransferRegistry<TransferId>> = Arc::new(TransferRegistry::new());
spawn_watchdog(Arc::clone(&registry), Duration::from_secs(60));

let peer = make_peer();           // your embedder-side PeerHandle factory
let control = TransferControl::new(64 * 1024 * 1024);
control.set_peer(peer);
registry.register(TransferId(1), Arc::clone(&control));

let chunk_offset: u64 = 0;
let chunk_len: u64 = 4 * 1024 * 1024;
let body: Vec<u8> = body_for_chunk(chunk_offset);

control.wait_for_credit(chunk_len, Instant::now() + Duration::from_secs(30))?;
// push_replay records the chunk in the replay ring *before* the send, so a
// Disconnected error can't leave the ring missing it. `chunk_len` is the
// logical (ACK-domain) length; `body` is the wire payload, which may be longer.
control.push_replay(chunk_offset, chunk_len, false, body.clone());
let p = control.peer().expect("peer installed");
match p.send_notify("/file_chunk", NotifyBody::Beve(body)) {
    Ok(()) => control.record_sent(chunk_offset + chunk_len),
    Err(PeerSendError::Disconnected) => match control.wait_for_reconnect(Duration::from_secs(30)) {
        ReconnectOutcome::ResumeReady(resume) => {
            // request_resume already swapped the new peer into the slot and
            // consumed the staged request. Re-emit the ring tail from the
            // receiver's last-received offset, then continue producing.
            let p = control.peer().expect("peer installed by request_resume");
            for chunk in control.replay_chunks_from(resume.resume_at_offset) {
                let _ = p.send_notify("/file_chunk", NotifyBody::Beve(chunk.body_bytes.to_vec()));
            }
        }
        ReconnectOutcome::Cancelled(_) | ReconnectOutcome::Timeout => { /* abort */ }
    },
    Err(_) => { /* application policy */ }
}

// Inbound handlers (run in your existing dispatch path):
//   ack:    registry.get(id).map(|c| c.record_ack(file_index, offset));
//   cancel: registry.get(id).map(|c| c.cancel("client cancelled"));
//   resume: registry.get(id).and_then(|c|
//             c.request_resume(new_peer, file_index, offset).ok());

Defaults

DEFAULT_WINDOW_BYTES, DEFAULT_BACKPRESSURE_TIMEOUT, DEFAULT_IDLE_TIMEOUT, DEFAULT_REPLAY_RING_BYTES, and DEFAULT_RECONNECT_TIMEOUT are tuned for LAN-class links pushing multi-GB files. Lower the window on slow links; raise the reconnect timeout for clients with long roaming windows.

Watchdog Lifecycle

spawn_watchdog holds a Weak<TransferRegistry<K>>, not an Arc. Dropping the embedder's last strong reference terminates the watchdog thread on its next tick (clamped to [1 s, 5 s]). For a process-wide singleton this means the thread lives for the process; embedders that build short-lived registries (per-test, per-tenant) get clean teardown for free.