Fleet API¶
repe includes three fleet APIs for multi-node control:
Fleet(sync TCP request/response)AsyncFleet(tokio TCP request/response)UniUdpFleet(sync UDP fire-and-forget; requiresfleet-udpfeature)
TCP Fleet Types¶
NodeConfig- per-node config (name,host,port,tags,timeout)RetryPolicy- retry behavior (max_attempts,delay)FleetOptions- fleet-level defaults (default_timeout,retry_policy)RemoteResult<T>- per-node call result (value,error,elapsed)HealthStatus- health-check status (healthy,latency,error)
Sync Fleet (Fleet)¶
use repe::{Fleet, FleetOptions, NodeConfig, RetryPolicy};
use serde_json::json;
use std::time::Duration;
let fleet = Fleet::with_options(
vec![
NodeConfig::new("127.0.0.1", 8081)?.with_name("node-1")?.with_tags(["compute"]),
NodeConfig::new("127.0.0.1", 8082)?.with_name("node-2")?.with_tags(["compute", "primary"]),
],
FleetOptions {
default_timeout: Duration::from_secs(2),
retry_policy: RetryPolicy { max_attempts: 3, delay: Duration::from_millis(100) },
},
)?;
let _ = fleet.connect_all();
let single = fleet.call_json("node-1", "/compute", Some(&json!({"value": 10})))?;
let many = fleet.broadcast_json("/status", None, &[] as &[&str]);
let primary = fleet.broadcast_json("/status", None, &["primary"]);
let health = fleet.health_check("/status");
let total = fleet.map_reduce_json("/compute", Some(&json!({"value": 10})), &["compute"], |results| {
results
.into_iter()
.filter_map(|r| r.value.and_then(|v| v["result"].as_i64()))
.sum::<i64>()
});
let _ = fleet.disconnect_all();
Sync Fleet API¶
- Construction:
Fleet::new(configs)Fleet::with_options(configs, options)Fleet::default()- Node access:
len,is_empty,keys,node,nodes,connected_nodes,filter_nodes- Dynamic node management:
add_node,remove_node- Connection management:
connect_all,disconnect_all,reconnect_disconnectedis_connected_all,is_connected(name)- Invocation:
call_json(name, method, params)call_message(name, method)broadcast_json(method, params, tags)map_reduce_json(method, params, tags, reduce_fn)- Health:
health_check(endpoint)
Async Fleet (AsyncFleet)¶
AsyncFleet mirrors Fleet but all operations are async.
use repe::{AsyncFleet, FleetOptions, NodeConfig, RetryPolicy};
use serde_json::json;
use std::time::Duration;
let fleet = AsyncFleet::with_options(
vec![NodeConfig::new("127.0.0.1", 8081)?.with_name("node-1")?],
FleetOptions {
default_timeout: Duration::from_secs(2),
retry_policy: RetryPolicy { max_attempts: 3, delay: Duration::from_millis(100) },
},
)?;
let _ = fleet.connect_all().await;
let result = fleet.call_json("node-1", "/status", Some(&json!({}))).await?;
assert!(result.succeeded());
UDP Fleet (UniUdpFleet)¶
UniUdpFleet is for fire-and-forget fanout. It reports send success/failure, not delivery confirmation.
UDP Types¶
UniUdpNodeConfig- endpoint config (name,host,port,tags,redundancy,chunk_size,fec_group_size,parity_shards)SendResult- per-node send result (message_id,error,elapsed)
UDP API¶
- Construction:
UniUdpFleet::new(configs)UniUdpFleet::default()- Node access:
len,is_empty,keys,node,nodes,filter_nodes- Node management:
add_node,remove_node,close- Sends:
send_notify(method, params, tags)send_request(method, params, tags)notify_all(method, params)send_notify_to(node_name, method, params)
use repe::{UniUdpFleet, UniUdpNodeConfig};
use serde_json::json;
let fleet = UniUdpFleet::new(vec![
UniUdpNodeConfig::new("127.0.0.1", 5001)?.with_name("edge-a")?.with_tags(["edge"]),
UniUdpNodeConfig::new("127.0.0.1", 5002)?.with_name("edge-b")?.with_tags(["edge"]),
])?;
let results = fleet.send_notify("/heartbeat", Some(&json!({"source": "controller"})), &["edge"]);
for (name, result) in results {
println!("{name}: sent={} msg_id={}", result.succeeded(), result.message_id);
}
Notes¶
- Tag filters match nodes that contain all specified tags. An empty tag list matches all nodes.
- Fleet methods snapshot target nodes before fanout, so in-flight broadcasts are isolated from concurrent add/remove operations.
- Retry behavior is applied to TCP fleet calls (
call_json,call_message,broadcast_json) for transport/I/O failures only (not well-formed server error responses). UniUdpClientis backed by theuniudpcrate. UDP fanout now uses UniUDP framing/reliability features (chunking, redundancy, and optional RS FEC).- Default RS profile is
data_shards=4,parity_shards=2(withredundancy=1).