photoncloud-monorepo/chainfire/crates/chainfire-raft/tests/proptest_sim.rs

274 lines
9 KiB
Rust

//! Property-based tests for `chainfire-raft` using an in-process simulated cluster.
//!
//! These tests aim to catch timing/partition edge cases with high reproducibility.
#![cfg(all(test, feature = "custom-raft"))]
use std::sync::Arc;
use std::time::Duration;
use proptest::prelude::*;
use tokio::sync::mpsc;
use tokio::time;
use chainfire_raft::core::{RaftConfig, RaftCore};
use chainfire_raft::network::test_client::{RpcMessage, SimulatedNetwork};
use chainfire_raft::storage::{EntryPayload, LogEntry, LogStorage, StateMachine};
use chainfire_types::command::RaftCommand;
#[derive(Debug, Clone)]
enum Op {
Tick(u64),
Disconnect(u64, u64),
Reconnect(u64, u64),
Delay(u64, u64, u64),
ClearLink(u64, u64),
Write(u64, u8, u8),
}
fn node_id() -> impl Strategy<Value = u64> {
1_u64..=3_u64
}
fn distinct_pair() -> impl Strategy<Value = (u64, u64)> {
(node_id(), node_id()).prop_filter("distinct nodes", |(a, b)| a != b)
}
fn op_strategy() -> impl Strategy<Value = Op> {
prop_oneof![
// Advance simulated time by up to 300ms.
(0_u64..=300).prop_map(Op::Tick),
distinct_pair().prop_map(|(a, b)| Op::Disconnect(a, b)),
distinct_pair().prop_map(|(a, b)| Op::Reconnect(a, b)),
(distinct_pair(), 0_u64..=50).prop_map(|((a, b), d)| Op::Delay(a, b, d)),
distinct_pair().prop_map(|(a, b)| Op::ClearLink(a, b)),
// Client writes: pick node + small key/value.
(node_id(), any::<u8>(), any::<u8>()).prop_map(|(n, k, v)| Op::Write(n, k, v)),
]
}
fn ops_strategy() -> impl Strategy<Value = Vec<Op>> {
prop::collection::vec(op_strategy(), 0..40)
}
async fn advance_ms(total_ms: u64) {
// Advance in small steps to avoid “simultaneous” timer firings starving message handling.
let step_ms: u64 = 10;
let mut remaining = total_ms;
while remaining > 0 {
let d = remaining.min(step_ms);
time::advance(Duration::from_millis(d)).await;
tokio::task::yield_now().await;
remaining -= d;
}
}
async fn create_3node_cluster() -> (Vec<Arc<RaftCore>>, Arc<SimulatedNetwork>) {
let network = Arc::new(SimulatedNetwork::new());
let mut nodes = Vec::new();
for node_id in 1..=3_u64 {
let peers: Vec<u64> = (1..=3_u64).filter(|&id| id != node_id).collect();
let storage = Arc::new(LogStorage::new_in_memory());
let state_machine = Arc::new(StateMachine::new_in_memory());
let config = RaftConfig {
election_timeout_min: 150,
election_timeout_max: 300,
heartbeat_interval: 50,
// Deterministic per-node seed for reproducibility.
deterministic_seed: Some(node_id),
};
let node = Arc::new(RaftCore::new(
node_id,
peers,
storage,
state_machine,
Arc::new(network.client(node_id)) as Arc<dyn chainfire_raft::network::RaftRpcClient>,
config,
));
node.initialize().await.unwrap();
nodes.push(node);
}
// Wire up RPC handlers.
for node in &nodes {
let node_id = node.node_id();
let (tx, mut rx) = mpsc::unbounded_channel::<RpcMessage>();
network.register(node_id, tx).await;
let node_clone: Arc<RaftCore> = Arc::clone(node);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
match msg {
RpcMessage::Vote(req, resp_tx) => {
node_clone.request_vote_rpc(req, resp_tx).await;
}
RpcMessage::AppendEntries(req, resp_tx) => {
node_clone.append_entries_rpc(req, resp_tx).await;
}
}
}
});
}
(nodes, network)
}
fn payload_fingerprint(payload: &EntryPayload<Vec<u8>>) -> Vec<u8> {
// Serialize the enum for stable equality checks across variants.
bincode::serialize(payload).unwrap_or_default()
}
async fn assert_raft_invariants(nodes: &[Arc<RaftCore>]) {
// Per-node monotonic invariants.
for node in nodes {
let commit = node.commit_index().await;
let last_applied = node.last_applied().await;
let st = node.storage().get_log_state().expect("log state");
let last_log_index = st.last_log_id.map(|id| id.index).unwrap_or(0);
assert!(
last_applied <= commit,
"node {}: last_applied={} > commit_index={}",
node.node_id(),
last_applied,
commit
);
assert!(
commit <= last_log_index,
"node {}: commit_index={} > last_log_index={}",
node.node_id(),
commit,
last_log_index
);
}
// Log Matching Property:
// If two logs contain an entry with the same index and term, then the logs are identical
// for all entries up through that index.
let mut node_logs: Vec<std::collections::BTreeMap<u64, (u64, Vec<u8>)>> = Vec::new();
for node in nodes {
let st = node.storage().get_log_state().expect("log state");
let last = st.last_log_id.map(|id| id.index).unwrap_or(0);
let entries: Vec<LogEntry<Vec<u8>>> = if last == 0 {
vec![]
} else {
node.storage()
.get_log_entries(1..=last)
.expect("log entries")
};
let mut m = std::collections::BTreeMap::new();
for e in entries {
m.insert(e.log_id.index, (e.log_id.term, payload_fingerprint(&e.payload)));
}
node_logs.push(m);
}
for a in 0..nodes.len() {
for b in (a + 1)..nodes.len() {
let la = &node_logs[a];
let lb = &node_logs[b];
for (idx, (term_a, payload_a)) in la.iter() {
if let Some((term_b, payload_b)) = lb.get(idx) {
if term_a == term_b {
assert_eq!(
payload_a, payload_b,
"log mismatch at idx={} term={} (nodes {} vs {})",
idx,
term_a,
nodes[a].node_id(),
nodes[b].node_id()
);
for j in 1..=*idx {
assert_eq!(
la.get(&j),
lb.get(&j),
"log matching violated at idx={} (prefix {} differs) nodes {} vs {}",
idx,
j,
nodes[a].node_id(),
nodes[b].node_id()
);
}
}
}
}
}
}
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 32,
.. ProptestConfig::default()
})]
#[test]
fn prop_raft_log_matching_holds(ops in ops_strategy()) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(async move {
tokio::time::pause();
let (nodes, network) = create_3node_cluster().await;
// Start event loops.
let mut handles = Vec::new();
for node in &nodes {
let node_clone = Arc::clone(node);
handles.push(tokio::spawn(async move {
let _ = node_clone.run().await;
}));
}
tokio::task::yield_now().await;
// Drive a randomized sequence of operations.
for op in ops {
match op {
Op::Tick(ms) => advance_ms(ms).await,
Op::Disconnect(a, b) => network.disconnect(a, b).await,
Op::Reconnect(a, b) => network.reconnect(a, b).await,
Op::Delay(a, b, d) => {
use chainfire_raft::network::test_client::LinkBehavior;
network.set_link(a, b, LinkBehavior::Delay(Duration::from_millis(d))).await;
network.set_link(b, a, LinkBehavior::Delay(Duration::from_millis(d))).await;
}
Op::ClearLink(a, b) => {
network.clear_link(a, b).await;
network.clear_link(b, a).await;
}
Op::Write(n, k, v) => {
let node = nodes.iter().find(|x| x.node_id() == n).unwrap();
let _ = node.client_write(RaftCommand::Put {
key: vec![k],
value: vec![v],
lease_id: None,
prev_kv: false,
}).await;
}
}
}
// Let the system settle a bit.
advance_ms(500).await;
assert_raft_invariants(&nodes).await;
// Best-effort cleanup.
for h in handles {
h.abort();
}
});
}
}