- Replace form_urlencoded with RFC 3986 compliant URI encoding - Implement aws_uri_encode() matching AWS SigV4 spec exactly - Unreserved chars (A-Z,a-z,0-9,-,_,.,~) not encoded - All other chars percent-encoded with uppercase hex - Preserve slashes in paths, encode in query params - Normalize empty paths to '/' per AWS spec - Fix test expectations (body hash, HMAC values) - Add comprehensive SigV4 signature determinism test This fixes the canonicalization mismatch that caused signature validation failures in T047. Auth can now be enabled for production. Refs: T058.S1
613 lines
19 KiB
Rust
613 lines
19 KiB
Rust
//! Integration tests for Leader Election (P1) and Log Replication (P2)
|
|
//!
|
|
//! Tests cover:
|
|
//! - Single-node auto-election
|
|
//! - 3-node majority election
|
|
//! - Role transitions
|
|
//! - Term management
|
|
//! - Heartbeat mechanism
|
|
//! - Log replication
|
|
//! - Leader failure recovery
|
|
|
|
#![cfg(all(test, feature = "custom-raft"))]
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::time;
|
|
use tokio::sync::mpsc;
|
|
|
|
use chainfire_raft::core::{
|
|
RaftCore, RaftConfig, RaftRole, NodeId,
|
|
};
|
|
use chainfire_raft::network::custom_test_client::{InMemoryRpcClient, RpcMessage};
|
|
use chainfire_storage::{LogStorage, StateMachine, RocksStore};
|
|
|
|
/// Helper to create a test node
|
|
async fn create_test_node(node_id: NodeId, peers: Vec<NodeId>) -> (Arc<RaftCore>, tempfile::TempDir) {
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
let rocks = RocksStore::new(temp_dir.path()).unwrap();
|
|
let storage = Arc::new(LogStorage::new(rocks.clone()));
|
|
let state_machine = Arc::new(StateMachine::new(rocks).unwrap());
|
|
let network = Arc::new(InMemoryRpcClient::new());
|
|
|
|
let config = RaftConfig {
|
|
election_timeout_min: 150,
|
|
election_timeout_max: 300,
|
|
heartbeat_interval: 50,
|
|
};
|
|
|
|
let node = Arc::new(RaftCore::new(
|
|
node_id,
|
|
peers,
|
|
storage,
|
|
state_machine,
|
|
network,
|
|
config,
|
|
));
|
|
|
|
node.initialize().await.unwrap();
|
|
|
|
(node, temp_dir)
|
|
}
|
|
|
|
/// Helper to create a 3-node cluster with RPC wiring
|
|
async fn create_3node_cluster() -> (
|
|
Vec<Arc<RaftCore>>,
|
|
Vec<tempfile::TempDir>,
|
|
Arc<InMemoryRpcClient>,
|
|
) {
|
|
let network = Arc::new(InMemoryRpcClient::new());
|
|
let mut nodes = Vec::new();
|
|
let mut temp_dirs = Vec::new();
|
|
|
|
// Create 3 nodes
|
|
for node_id in 1..=3 {
|
|
let peers: Vec<NodeId> = (1..=3).filter(|&id| id != node_id).collect();
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
let rocks = RocksStore::new(temp_dir.path()).unwrap();
|
|
let storage = Arc::new(LogStorage::new(rocks.clone()));
|
|
let state_machine = Arc::new(StateMachine::new(rocks).unwrap());
|
|
|
|
let config = RaftConfig {
|
|
election_timeout_min: 150, // 150ms - matches single-node test
|
|
election_timeout_max: 300, // 300ms
|
|
heartbeat_interval: 50, // 50ms - matches single-node test
|
|
};
|
|
|
|
let node = Arc::new(RaftCore::new(
|
|
node_id,
|
|
peers,
|
|
storage,
|
|
state_machine,
|
|
Arc::clone(&network) as Arc<dyn chainfire_raft::network::RaftRpcClient>,
|
|
config,
|
|
));
|
|
|
|
node.initialize().await.unwrap();
|
|
nodes.push(node);
|
|
temp_dirs.push(temp_dir);
|
|
}
|
|
|
|
// Wire up RPC channels for each node
|
|
for node in &nodes {
|
|
let node_id = node.node_id();
|
|
let (tx, mut rx) = mpsc::unbounded_channel::<RpcMessage>();
|
|
network.register(node_id, tx).await;
|
|
|
|
// Spawn handler for this node's RPC messages
|
|
let node_clone = Arc::clone(node);
|
|
tokio::spawn(async move {
|
|
eprintln!("[RPC Handler {}] Started", node_clone.node_id());
|
|
while let Some(msg) = rx.recv().await {
|
|
match msg {
|
|
RpcMessage::Vote(req, resp_tx) => {
|
|
eprintln!("[RPC Handler {}] Processing Vote from {}",
|
|
node_clone.node_id(), req.candidate_id);
|
|
node_clone.request_vote_rpc(req, resp_tx).await;
|
|
}
|
|
RpcMessage::AppendEntries(req, resp_tx) => {
|
|
eprintln!("[RPC Handler {}] Processing AppendEntries from {} term={}",
|
|
node_clone.node_id(), req.leader_id, req.term);
|
|
node_clone.append_entries_rpc(req, resp_tx).await;
|
|
}
|
|
}
|
|
}
|
|
eprintln!("[RPC Handler {}] Stopped (channel closed)", node_clone.node_id());
|
|
});
|
|
}
|
|
|
|
// Give all RPC handler tasks time to start
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
|
|
|
(nodes, temp_dirs, network)
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test Cases
|
|
// ============================================================================
|
|
|
|
#[tokio::test]
|
|
async fn test_node_creation_and_initialization() {
|
|
// Test that we can create a node and initialize it
|
|
let (node, _temp_dir) = create_test_node(1, vec![2, 3]).await;
|
|
|
|
// Node should start as follower
|
|
assert_eq!(node.role().await, RaftRole::Follower);
|
|
|
|
// Node ID should be correct
|
|
assert_eq!(node.node_id(), 1);
|
|
|
|
// Term should start at 0
|
|
assert_eq!(node.current_term().await, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_role_transitions() {
|
|
// Test basic role enumeration
|
|
assert_ne!(RaftRole::Follower, RaftRole::Candidate);
|
|
assert_ne!(RaftRole::Candidate, RaftRole::Leader);
|
|
assert_ne!(RaftRole::Leader, RaftRole::Follower);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_term_persistence() {
|
|
// Test that term can be persisted and loaded
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
let path = temp_dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
// Create first node and let it initialize
|
|
let rocks = RocksStore::new(&path).unwrap();
|
|
let storage = Arc::new(LogStorage::new(rocks.clone()));
|
|
let state_machine = Arc::new(StateMachine::new(rocks).unwrap());
|
|
let network = Arc::new(InMemoryRpcClient::new());
|
|
|
|
let node = Arc::new(RaftCore::new(
|
|
1,
|
|
vec![2, 3],
|
|
storage,
|
|
state_machine,
|
|
network,
|
|
RaftConfig::default(),
|
|
));
|
|
|
|
node.initialize().await.unwrap();
|
|
|
|
// Initial term should be 0
|
|
assert_eq!(node.current_term().await, 0);
|
|
}
|
|
|
|
{
|
|
// Create second node with same storage path
|
|
let rocks = RocksStore::new(&path).unwrap();
|
|
let storage = Arc::new(LogStorage::new(rocks.clone()));
|
|
let state_machine = Arc::new(StateMachine::new(rocks).unwrap());
|
|
let network = Arc::new(InMemoryRpcClient::new());
|
|
|
|
let node = Arc::new(RaftCore::new(
|
|
1,
|
|
vec![2, 3],
|
|
storage,
|
|
state_machine,
|
|
network,
|
|
RaftConfig::default(),
|
|
));
|
|
|
|
node.initialize().await.unwrap();
|
|
|
|
// Term should still be 0 (loaded from storage)
|
|
assert_eq!(node.current_term().await, 0);
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_config_defaults() {
|
|
// Test that default config has reasonable values
|
|
let config = RaftConfig::default();
|
|
|
|
assert!(config.election_timeout_min > 0);
|
|
assert!(config.election_timeout_max > config.election_timeout_min);
|
|
assert!(config.heartbeat_interval > 0);
|
|
assert!(config.heartbeat_interval < config.election_timeout_min);
|
|
}
|
|
|
|
// ============================================================================
|
|
// P2: Log Replication Integration Tests
|
|
// ============================================================================
|
|
|
|
#[tokio::test]
|
|
async fn test_3node_cluster_formation() {
|
|
// Test 1: 3-Node Cluster Formation Test
|
|
// - 3 nodes start → Leader elected
|
|
// - All followers receive heartbeat
|
|
// - No election timeout occurs
|
|
|
|
let (nodes, _temp_dirs, _network) = create_3node_cluster().await;
|
|
|
|
// Start event loops for all nodes
|
|
let mut handles = Vec::new();
|
|
for node in &nodes {
|
|
let node_clone = Arc::clone(node);
|
|
let handle = tokio::spawn(async move {
|
|
let _ = node_clone.run().await;
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Wait for leader election (should happen within ~500ms)
|
|
time::sleep(Duration::from_millis(500)).await;
|
|
|
|
// Check that exactly one leader was elected
|
|
let mut leader_count = 0;
|
|
let mut follower_count = 0;
|
|
let mut leader_id = None;
|
|
|
|
for node in &nodes {
|
|
match node.role().await {
|
|
RaftRole::Leader => {
|
|
leader_count += 1;
|
|
leader_id = Some(node.node_id());
|
|
}
|
|
RaftRole::Follower => {
|
|
follower_count += 1;
|
|
}
|
|
RaftRole::Candidate => {
|
|
// Should not have candidates after election
|
|
panic!("Node {} is still candidate after election", node.node_id());
|
|
}
|
|
}
|
|
}
|
|
|
|
assert_eq!(leader_count, 1, "Expected exactly one leader");
|
|
assert_eq!(follower_count, 2, "Expected exactly two followers");
|
|
assert!(leader_id.is_some(), "Leader should be identified");
|
|
|
|
println!("✓ Leader elected: node {}", leader_id.unwrap());
|
|
|
|
// Wait a bit more to ensure heartbeats prevent election timeout
|
|
// Heartbeat interval is 50ms, election timeout is 150-300ms
|
|
// So after 400ms, no new election should occur
|
|
time::sleep(Duration::from_millis(400)).await;
|
|
|
|
// Verify leader is still the same
|
|
for node in &nodes {
|
|
if node.node_id() == leader_id.unwrap() {
|
|
assert_eq!(node.role().await, RaftRole::Leader, "Leader should remain leader");
|
|
} else {
|
|
assert_eq!(
|
|
node.role().await,
|
|
RaftRole::Follower,
|
|
"Followers should remain followers due to heartbeats"
|
|
);
|
|
}
|
|
}
|
|
|
|
println!("✓ Heartbeats prevent election timeout");
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore] // Requires client write API implementation
|
|
async fn test_log_replication() {
|
|
// Test 2: Log Replication Test
|
|
// - Leader adds entries
|
|
// - Replicated to all followers
|
|
// - commit_index synchronized
|
|
|
|
// TODO: Implement once client write API is ready
|
|
// This requires handle_client_write to be fully implemented
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore] // Requires graceful node shutdown
|
|
async fn test_leader_failure_recovery() {
|
|
// Test 3: Leader Failure Test
|
|
// - Leader stops → New leader elected
|
|
// - Log consistency maintained
|
|
|
|
// TODO: Implement once we have graceful shutdown mechanism
|
|
// Currently, aborting the event loop doesn't cleanly stop the node
|
|
}
|
|
|
|
// ============================================================================
|
|
// Deferred complex tests
|
|
// ============================================================================
|
|
|
|
#[tokio::test]
|
|
#[ignore] // Requires full cluster setup
|
|
async fn test_split_vote_recovery() {
|
|
// Test that cluster recovers from split vote
|
|
// Deferred: Requires complex timing control
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore] // Requires node restart mechanism
|
|
async fn test_vote_persistence_across_restart() {
|
|
// Test that votes persist across node restarts
|
|
// Deferred: Requires proper shutdown/startup sequencing
|
|
}
|
|
|
|
// ============================================================================
|
|
// P3: Commitment & State Machine Integration Tests
|
|
// ============================================================================
|
|
|
|
#[tokio::test]
|
|
async fn test_write_replicate_commit() {
|
|
// Test: Client write on leader → replication → commit → state machine apply
|
|
// Verifies the complete write→replicate→commit→apply flow
|
|
|
|
use chainfire_types::command::RaftCommand;
|
|
|
|
let (nodes, _temp_dirs, _network) = create_3node_cluster().await;
|
|
|
|
// Start event loops for all nodes
|
|
let mut handles = Vec::new();
|
|
for node in &nodes {
|
|
let node_clone = Arc::clone(node);
|
|
let handle = tokio::spawn(async move {
|
|
let _ = node_clone.run().await;
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Wait for leader election (election timeout is 2-4s)
|
|
time::sleep(Duration::from_millis(5000)).await;
|
|
|
|
// Find the leader
|
|
let mut leader = None;
|
|
for node in &nodes {
|
|
if matches!(node.role().await, RaftRole::Leader) {
|
|
leader = Some(node);
|
|
break;
|
|
}
|
|
}
|
|
let leader = leader.expect("Leader should be elected");
|
|
|
|
println!("✓ Leader elected: node {}", leader.node_id());
|
|
|
|
// Submit a write command to the leader
|
|
let cmd = RaftCommand::Put {
|
|
key: b"test_key_1".to_vec(),
|
|
value: b"test_value_1".to_vec(),
|
|
lease_id: None,
|
|
prev_kv: false,
|
|
};
|
|
|
|
leader
|
|
.client_write(cmd)
|
|
.await
|
|
.expect("Client write should succeed");
|
|
|
|
println!("✓ Client write submitted to leader");
|
|
|
|
// Wait for replication and commit (heartbeat + replication + commit)
|
|
// Heartbeat interval is 50ms, need multiple rounds:
|
|
// 1. First heartbeat sends entries
|
|
// 2. Followers ack, leader updates match_index and commit_index
|
|
// 3. Second heartbeat propagates new leader_commit to followers
|
|
// 4. Followers update their commit_index and apply entries
|
|
// Give extra time to avoid re-election issues
|
|
time::sleep(Duration::from_millis(1500)).await;
|
|
|
|
// Debug: Check all nodes' roles and states
|
|
println!("\nDEBUG: All nodes after write:");
|
|
for node in &nodes {
|
|
println!(" Node {} role={:?} term={} commit_index={} last_applied={}",
|
|
node.node_id(), node.role().await, node.current_term().await,
|
|
node.commit_index().await, node.last_applied().await);
|
|
}
|
|
println!();
|
|
|
|
// Verify that the value is committed and applied on all nodes
|
|
for node in &nodes {
|
|
let commit_index = node.commit_index().await;
|
|
let last_applied = node.last_applied().await;
|
|
|
|
assert!(
|
|
commit_index >= 1,
|
|
"Node {} should have commit_index >= 1, got {}",
|
|
node.node_id(),
|
|
commit_index
|
|
);
|
|
assert!(
|
|
last_applied >= 1,
|
|
"Node {} should have last_applied >= 1, got {}",
|
|
node.node_id(),
|
|
last_applied
|
|
);
|
|
|
|
// Verify the value exists in the state machine
|
|
let state_machine = node.state_machine();
|
|
let result = state_machine.kv().get(b"test_key_1").expect("Get should succeed");
|
|
|
|
assert!(
|
|
result.is_some(),
|
|
"Node {} should have test_key_1 in state machine",
|
|
node.node_id()
|
|
);
|
|
|
|
let entry = result.unwrap();
|
|
assert_eq!(
|
|
entry.value,
|
|
b"test_value_1",
|
|
"Node {} has wrong value for test_key_1",
|
|
node.node_id()
|
|
);
|
|
|
|
println!(
|
|
"✓ Node {} has test_key_1=test_value_1 (commit_index={}, last_applied={})",
|
|
node.node_id(),
|
|
commit_index,
|
|
last_applied
|
|
);
|
|
}
|
|
|
|
println!("✓ All nodes have committed and applied the write");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_commit_consistency() {
|
|
// Test: Multiple writes preserve order across all nodes
|
|
// Verifies that the commit mechanism maintains consistency
|
|
|
|
use chainfire_types::command::RaftCommand;
|
|
|
|
let (nodes, _temp_dirs, _network) = create_3node_cluster().await;
|
|
|
|
// Start event loops
|
|
let mut handles = Vec::new();
|
|
for node in &nodes {
|
|
let node_clone = Arc::clone(node);
|
|
let handle = tokio::spawn(async move {
|
|
let _ = node_clone.run().await;
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Wait for leader election (election timeout is 2-4s)
|
|
time::sleep(Duration::from_millis(5000)).await;
|
|
|
|
// Find the leader
|
|
let mut leader = None;
|
|
for node in &nodes {
|
|
if matches!(node.role().await, RaftRole::Leader) {
|
|
leader = Some(node);
|
|
break;
|
|
}
|
|
}
|
|
let leader = leader.expect("Leader should be elected");
|
|
|
|
println!("✓ Leader elected: node {}", leader.node_id());
|
|
|
|
// Submit multiple writes in sequence
|
|
for i in 1..=5 {
|
|
let cmd = RaftCommand::Put {
|
|
key: format!("key_{}", i).into_bytes(),
|
|
value: format!("value_{}", i).into_bytes(),
|
|
lease_id: None,
|
|
prev_kv: false,
|
|
};
|
|
|
|
leader
|
|
.client_write(cmd)
|
|
.await
|
|
.expect("Client write should succeed");
|
|
}
|
|
|
|
println!("✓ Submitted 5 writes to leader");
|
|
|
|
// Wait for all writes to commit and apply
|
|
time::sleep(Duration::from_millis(500)).await;
|
|
|
|
// Verify all nodes have all 5 keys in correct order
|
|
for node in &nodes {
|
|
let commit_index = node.commit_index().await;
|
|
let last_applied = node.last_applied().await;
|
|
|
|
assert!(
|
|
commit_index >= 5,
|
|
"Node {} should have commit_index >= 5, got {}",
|
|
node.node_id(),
|
|
commit_index
|
|
);
|
|
assert!(
|
|
last_applied >= 5,
|
|
"Node {} should have last_applied >= 5, got {}",
|
|
node.node_id(),
|
|
last_applied
|
|
);
|
|
|
|
let state_machine = node.state_machine();
|
|
|
|
for i in 1..=5 {
|
|
let key = format!("key_{}", i).into_bytes();
|
|
let expected_value = format!("value_{}", i).into_bytes();
|
|
|
|
let result = state_machine.kv().get(&key).expect("Get should succeed");
|
|
|
|
assert!(
|
|
result.is_some(),
|
|
"Node {} missing key_{}",
|
|
node.node_id(),
|
|
i
|
|
);
|
|
|
|
let entry = result.unwrap();
|
|
assert_eq!(
|
|
entry.value, expected_value,
|
|
"Node {} has wrong value for key_{}",
|
|
node.node_id(), i
|
|
);
|
|
}
|
|
|
|
println!(
|
|
"✓ Node {} has all 5 keys in correct order (commit_index={}, last_applied={})",
|
|
node.node_id(),
|
|
commit_index,
|
|
last_applied
|
|
);
|
|
}
|
|
|
|
println!("✓ All nodes maintain consistent order");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_leader_only_write() {
|
|
// Test: Follower should reject client writes
|
|
// Verifies that only the leader can accept writes (Raft safety)
|
|
|
|
use chainfire_types::command::RaftCommand;
|
|
use chainfire_raft::core::RaftError;
|
|
|
|
let (nodes, _temp_dirs, _network) = create_3node_cluster().await;
|
|
|
|
// Start event loops
|
|
let mut handles = Vec::new();
|
|
for node in &nodes {
|
|
let node_clone = Arc::clone(node);
|
|
let handle = tokio::spawn(async move {
|
|
let _ = node_clone.run().await;
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Wait for leader election (election timeout is 2-4s)
|
|
time::sleep(Duration::from_millis(5000)).await;
|
|
|
|
// Find a follower
|
|
let mut follower = None;
|
|
for node in &nodes {
|
|
if matches!(node.role().await, RaftRole::Follower) {
|
|
follower = Some(node);
|
|
break;
|
|
}
|
|
}
|
|
let follower = follower.expect("Follower should exist");
|
|
|
|
println!("✓ Found follower: node {}", follower.node_id());
|
|
|
|
// Try to write to the follower
|
|
let cmd = RaftCommand::Put {
|
|
key: b"follower_write".to_vec(),
|
|
value: b"should_fail".to_vec(),
|
|
lease_id: None,
|
|
prev_kv: false,
|
|
};
|
|
|
|
let result = follower.client_write(cmd).await;
|
|
|
|
// Should return NotLeader error
|
|
assert!(
|
|
result.is_err(),
|
|
"Follower write should fail with NotLeader error"
|
|
);
|
|
|
|
if let Err(RaftError::NotLeader { .. }) = result {
|
|
println!("✓ Follower correctly rejected write with NotLeader error");
|
|
} else {
|
|
panic!(
|
|
"Expected NotLeader error, got: {:?}",
|
|
result.err().unwrap()
|
|
);
|
|
}
|
|
}
|