//! Custom Raft Consensus Implementation //! //! This module implements the Raft consensus algorithm from scratch, //! replacing OpenRaft for ChainFire's single Raft group use case. //! //! Architecture: //! - RaftCore: Main consensus state machine //! - RaftState: Follower/Candidate/Leader role management //! - RaftTimer: Election and heartbeat timeout management //! - Integration with existing chainfire-storage and network layers use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, RwLock, Mutex}; use tokio::time; use chainfire_storage::{LogStorage, StateMachine, LogEntry, EntryPayload, LogId}; use chainfire_types::command::RaftCommand; use crate::network::RaftRpcClient; use tracing::{debug, trace}; pub type NodeId = u64; pub type Term = u64; pub type LogIndex = u64; // ============================================================================ // Core Raft Types // ============================================================================ /// Node role in the Raft cluster #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RaftRole { Follower, Candidate, Leader, } /// Persistent state (must be saved to stable storage before responding to RPCs) #[derive(Debug, Clone)] pub struct PersistentState { /// Latest term server has seen (initialized to 0, increases monotonically) pub current_term: Term, /// Candidate that received vote in current term (or None) pub voted_for: Option, } /// Volatile state on all servers #[derive(Debug, Clone)] pub struct VolatileState { /// Index of highest log entry known to be committed pub commit_index: LogIndex, /// Index of highest log entry applied to state machine pub last_applied: LogIndex, /// Current leader (None if unknown) pub current_leader: Option, } /// Volatile state on candidates (during election) #[derive(Debug, Clone)] pub struct CandidateState { /// Nodes that have granted votes (includes self) pub votes_received: std::collections::HashSet, } /// Volatile state on leaders (reinitialized after election) #[derive(Debug, Clone)] pub struct LeaderState { /// For each server, index of next log entry to send pub next_index: HashMap, /// For each server, index of highest log entry known to be replicated pub match_index: HashMap, } // ============================================================================ // RPC Request/Response Types // ============================================================================ /// RequestVote RPC request #[derive(Debug, Clone)] pub struct VoteRequest { /// Candidate's term pub term: Term, /// Candidate requesting vote pub candidate_id: NodeId, /// Index of candidate's last log entry pub last_log_index: LogIndex, /// Term of candidate's last log entry pub last_log_term: Term, } /// RequestVote RPC response #[derive(Debug, Clone)] pub struct VoteResponse { /// Current term, for candidate to update itself pub term: Term, /// True means candidate received vote pub vote_granted: bool, } /// AppendEntries RPC request (also used as heartbeat) #[derive(Debug, Clone)] pub struct AppendEntriesRequest { /// Leader's term pub term: Term, /// So follower can redirect clients pub leader_id: NodeId, /// Index of log entry immediately preceding new ones pub prev_log_index: LogIndex, /// Term of prev_log_index entry pub prev_log_term: Term, /// Log entries to store (empty for heartbeat) pub entries: Vec>, /// Leader's commit_index pub leader_commit: LogIndex, } /// AppendEntries RPC response #[derive(Debug, Clone)] pub struct AppendEntriesResponse { /// Current term, for leader to update itself pub term: Term, /// True if follower contained entry matching prev_log_index and prev_log_term pub success: bool, /// For fast log backtracking on conflict pub conflict_index: Option, /// For fast log backtracking on conflict pub conflict_term: Option, } // ============================================================================ // Internal Events // ============================================================================ /// Internal events for Raft state machine #[derive(Debug)] pub enum RaftEvent { /// Election timeout fired ElectionTimeout, /// Heartbeat timeout fired (leader only) HeartbeatTimeout, /// Client write request ClientWrite { command: RaftCommand, response_tx: oneshot::Sender>, }, /// RequestVote RPC received VoteRequest { req: VoteRequest, response_tx: oneshot::Sender, }, /// AppendEntries RPC received AppendEntries { req: AppendEntriesRequest, response_tx: oneshot::Sender, }, /// RequestVote RPC response received VoteResponse { from: NodeId, resp: VoteResponse, }, /// AppendEntries RPC response received AppendEntriesResponse { from: NodeId, resp: AppendEntriesResponse, }, } // ============================================================================ // Error Types // ============================================================================ #[derive(Debug, Clone)] pub enum RaftError { NotLeader { leader_id: Option }, StorageError(String), NetworkError(String), Timeout, } impl std::fmt::Display for RaftError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RaftError::NotLeader { leader_id } => { write!(f, "Not leader, leader is: {:?}", leader_id) } RaftError::StorageError(msg) => write!(f, "Storage error: {}", msg), RaftError::NetworkError(msg) => write!(f, "Network error: {}", msg), RaftError::Timeout => write!(f, "Operation timed out"), } } } impl std::error::Error for RaftError {} // ============================================================================ // RaftCore: Main Consensus Engine // ============================================================================ pub struct RaftCore { /// This node's ID node_id: NodeId, /// Cluster members (excluding self) peers: Vec, /// Persistent state persistent: Arc>, /// Volatile state volatile: Arc>, /// Candidate state (None if not candidate) candidate_state: Arc>>, /// Leader state (None if not leader) leader_state: Arc>>, /// Current role role: Arc>, /// Storage backend storage: Arc, /// State machine state_machine: Arc, /// Network client network: Arc, /// Event channel event_tx: mpsc::UnboundedSender, event_rx: Arc>>, /// Election timer reset notifier election_timer_reset: Arc, /// Configuration config: RaftConfig, } #[derive(Debug, Clone)] pub struct RaftConfig { /// Election timeout range (ms) pub election_timeout_min: u64, pub election_timeout_max: u64, /// Heartbeat interval (ms) pub heartbeat_interval: u64, } impl Default for RaftConfig { fn default() -> Self { Self { election_timeout_min: 300, election_timeout_max: 600, heartbeat_interval: 150, } } } impl RaftCore { pub fn new( node_id: NodeId, peers: Vec, storage: Arc, state_machine: Arc, network: Arc, config: RaftConfig, ) -> Self { let (event_tx, event_rx) = mpsc::unbounded_channel(); Self { node_id, peers, persistent: Arc::new(RwLock::new(PersistentState { current_term: 0, voted_for: None, })), volatile: Arc::new(RwLock::new(VolatileState { commit_index: 0, last_applied: 0, current_leader: None, })), candidate_state: Arc::new(RwLock::new(None)), leader_state: Arc::new(RwLock::new(None)), role: Arc::new(RwLock::new(RaftRole::Follower)), storage, state_machine, network, event_tx, event_rx: Arc::new(Mutex::new(event_rx)), election_timer_reset: Arc::new(tokio::sync::Notify::new()), config, } } /// Initialize Raft node (load persistent state from storage) pub async fn initialize(&self) -> Result<(), RaftError> { // Load persistent state from storage match self.storage.read_vote() { Ok(Some(vote)) => { let mut persistent = self.persistent.write().await; persistent.current_term = vote.term; persistent.voted_for = vote.node_id; tracing::info!( term = vote.term, voted_for = ?vote.node_id, "Loaded persistent state from storage" ); } Ok(None) => { tracing::info!("No persistent state found, starting fresh"); } Err(e) => { return Err(RaftError::StorageError(format!("Failed to load vote: {}", e))); } } Ok(()) } /// Persist current term and vote to storage async fn persist_vote(&self) -> Result<(), RaftError> { let persistent = self.persistent.read().await; let vote = chainfire_storage::Vote { term: persistent.current_term, node_id: persistent.voted_for, committed: false, }; self.storage .save_vote(vote) .map_err(|e| RaftError::StorageError(format!("Failed to save vote: {}", e)))?; Ok(()) } /// Start the Raft event loop pub async fn run(&self) -> Result<(), RaftError> { eprintln!("[Node {}] EVENT LOOP STARTING", self.node_id); // Start election timer self.spawn_election_timer(); // Start heartbeat timer self.spawn_heartbeat_timer(); // Main event loop let mut event_rx = self.event_rx.lock().await; eprintln!("[Node {}] EVENT LOOP acquired event_rx, starting recv loop", self.node_id); loop { tokio::select! { Some(event) = event_rx.recv() => { let event_type = match &event { RaftEvent::ElectionTimeout => "ElectionTimeout", RaftEvent::HeartbeatTimeout => "HeartbeatTimeout", RaftEvent::VoteRequest { .. } => "VoteRequest", RaftEvent::VoteResponse { .. } => "VoteResponse", RaftEvent::AppendEntries { .. } => "AppendEntries", RaftEvent::AppendEntriesResponse { .. } => "AppendEntriesResponse", RaftEvent::ClientWrite { .. } => "ClientWrite", }; eprintln!("[Node {}] EVENT LOOP received: {}", self.node_id, event_type); if let Err(e) = self.handle_event(event).await { eprintln!("[Node {}] EVENT LOOP error: {:?}, continuing...", self.node_id, e); // Continue loop instead of exiting - event loop must stay alive } } else => { eprintln!("[Node {}] EVENT LOOP channel closed, exiting", self.node_id); break; } } } eprintln!("[Node {}] EVENT LOOP EXITED", self.node_id); Ok(()) } /// Handle a single event async fn handle_event(&self, event: RaftEvent) -> Result<(), RaftError> { match event { RaftEvent::ElectionTimeout => { self.handle_election_timeout().await?; } RaftEvent::HeartbeatTimeout => { self.handle_heartbeat_timeout().await?; } RaftEvent::ClientWrite { command, response_tx } => { let result = self.handle_client_write(command).await; let _ = response_tx.send(result); } RaftEvent::VoteRequest { req, response_tx } => { let resp = self.handle_vote_request(req).await?; let _ = response_tx.send(resp); } RaftEvent::AppendEntries { req, response_tx } => { eprintln!("[Node {}] EVENT LOOP processing AppendEntries from {} term={}", self.node_id, req.leader_id, req.term); let resp = self.handle_append_entries(req).await?; let _ = response_tx.send(resp); } RaftEvent::VoteResponse { from, resp } => { self.handle_vote_response(from, resp).await?; } RaftEvent::AppendEntriesResponse { from, resp } => { self.handle_append_entries_response(from, resp).await?; } } Ok(()) } // ======================================================================== // P1: Leader Election Implementation // ======================================================================== /// Handle election timeout - transition to candidate and start election async fn handle_election_timeout(&self) -> Result<(), RaftError> { let role = *self.role.read().await; eprintln!("[Node {}] handle_election_timeout: role={:?}", self.node_id, role); // Only followers and candidates start elections if role == RaftRole::Leader { eprintln!("[Node {}] Already leader, ignoring election timeout", self.node_id); return Ok(()); } // Transition to candidate *self.role.write().await = RaftRole::Candidate; eprintln!("[Node {}] Transitioned to Candidate", self.node_id); // Clear current leader (election in progress) self.volatile.write().await.current_leader = None; // Increment current term and vote for self let mut persistent = self.persistent.write().await; persistent.current_term += 1; persistent.voted_for = Some(self.node_id); let current_term = persistent.current_term; drop(persistent); eprintln!("[Node {}] Starting election for term {}", self.node_id, current_term); // Persist vote to storage before sending RPCs (Raft safety) self.persist_vote().await?; // Initialize candidate state with self-vote let mut votes = std::collections::HashSet::new(); votes.insert(self.node_id); *self.candidate_state.write().await = Some(CandidateState { votes_received: votes, }); // Check if already have majority (single-node case) let cluster_size = self.peers.len() + 1; let majority = cluster_size / 2 + 1; eprintln!("[Node {}] Cluster size={}, majority={}, peers={:?}", self.node_id, cluster_size, majority, self.peers); if 1 >= majority { // For single-node cluster, immediately become leader eprintln!("[Node {}] Single-node cluster, becoming leader immediately", self.node_id); self.become_leader().await?; return Ok(()); } // Get last log index and term let (last_log_index, last_log_term) = self.get_last_log_info().await?; // Send RequestVote RPCs to all peers let vote_request = VoteRequest { term: current_term, candidate_id: self.node_id, last_log_index, last_log_term, }; // Send vote requests in parallel for peer_id in &self.peers { let peer_id = *peer_id; let network = self.network.clone(); let req = vote_request.clone(); let event_tx = self.event_tx.clone(); tokio::spawn(async move { // TODO: Use actual network layer instead of mock let resp = network.vote(peer_id, req).await .unwrap_or(VoteResponse { term: current_term, vote_granted: false, }); // Send response back to main event loop let _ = event_tx.send(RaftEvent::VoteResponse { from: peer_id, resp }); }); } Ok(()) } /// Handle RequestVote RPC async fn handle_vote_request(&self, req: VoteRequest) -> Result { let mut persistent = self.persistent.write().await; // Reply false if term < currentTerm if req.term < persistent.current_term { return Ok(VoteResponse { term: persistent.current_term, vote_granted: false, }); } // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower if req.term > persistent.current_term { persistent.current_term = req.term; persistent.voted_for = None; *self.role.write().await = RaftRole::Follower; drop(persistent); self.persist_vote().await?; persistent = self.persistent.write().await; } // Check if we can grant vote let can_vote = persistent.voted_for.is_none() || persistent.voted_for == Some(req.candidate_id); if !can_vote { return Ok(VoteResponse { term: persistent.current_term, vote_granted: false, }); } // Check if candidate's log is at least as up-to-date as receiver's log let (last_log_index, last_log_term) = self.get_last_log_info().await?; let log_ok = req.last_log_term > last_log_term || (req.last_log_term == last_log_term && req.last_log_index >= last_log_index); if log_ok { persistent.voted_for = Some(req.candidate_id); let term = persistent.current_term; drop(persistent); // Persist vote to storage before responding (Raft safety) self.persist_vote().await?; // Reset election timer since we granted a vote self.reset_election_timer(); Ok(VoteResponse { term, vote_granted: true, }) } else { Ok(VoteResponse { term: persistent.current_term, vote_granted: false, }) } } /// Handle VoteResponse from a peer async fn handle_vote_response(&self, from: NodeId, resp: VoteResponse) -> Result<(), RaftError> { let role = *self.role.read().await; let persistent = self.persistent.read().await; // Ignore if not candidate if role != RaftRole::Candidate { return Ok(()); } // If response term > current term, step down if resp.term > persistent.current_term { drop(persistent); self.step_down(resp.term).await?; return Ok(()); } // Ignore stale responses if resp.term < persistent.current_term { return Ok(()); } // Count votes if resp.vote_granted { let mut candidate_state_guard = self.candidate_state.write().await; if let Some(candidate_state) = candidate_state_guard.as_mut() { candidate_state.votes_received.insert(from); // Calculate majority (cluster size = peers + 1 for self) let cluster_size = self.peers.len() + 1; let majority = cluster_size / 2 + 1; let votes_count = candidate_state.votes_received.len(); // If received majority, become leader if votes_count >= majority { drop(candidate_state_guard); drop(persistent); self.become_leader().await?; } } } Ok(()) } /// Transition to leader async fn become_leader(&self) -> Result<(), RaftError> { *self.role.write().await = RaftRole::Leader; // Set self as current leader self.volatile.write().await.current_leader = Some(self.node_id); // Clear candidate state *self.candidate_state.write().await = None; // Initialize leader state let last_log_index = self.get_last_log_info().await?.0; let next_index = last_log_index + 1; let mut leader_state = LeaderState { next_index: HashMap::new(), match_index: HashMap::new(), }; for peer_id in &self.peers { leader_state.next_index.insert(*peer_id, next_index); leader_state.match_index.insert(*peer_id, 0); } *self.leader_state.write().await = Some(leader_state); // Start sending heartbeats immediately self.event_tx.send(RaftEvent::HeartbeatTimeout) .map_err(|e| RaftError::NetworkError(format!("Failed to send heartbeat: {}", e)))?; Ok(()) } /// Step down to follower async fn step_down(&self, new_term: Term) -> Result<(), RaftError> { let mut persistent = self.persistent.write().await; persistent.current_term = new_term; persistent.voted_for = None; drop(persistent); // Persist term and vote to storage self.persist_vote().await?; *self.role.write().await = RaftRole::Follower; *self.candidate_state.write().await = None; *self.leader_state.write().await = None; // Reset election timer when stepping down to follower self.reset_election_timer(); Ok(()) } // ======================================================================== // P2: Log Replication (Stub implementations) // ======================================================================== async fn handle_heartbeat_timeout(&self) -> Result<(), RaftError> { // Only leaders send heartbeats let role = *self.role.read().await; if role != RaftRole::Leader { return Ok(()); } let term = self.persistent.read().await.current_term; let (last_log_index, _) = self.get_last_log_info().await?; eprintln!("[Node {}] Sending heartbeat to peers: {:?} (term={})", self.node_id, self.peers, term); // Send AppendEntries (with entries if available) to all peers for peer_id in &self.peers { let peer_id = *peer_id; // Read commit_index fresh for each peer to ensure it's up-to-date let commit_index = self.volatile.read().await.commit_index; // Get prevLogIndex and prevLogTerm for this peer let leader_state = self.leader_state.read().await; let next_index = leader_state.as_ref() .and_then(|ls| ls.next_index.get(&peer_id).copied()) .unwrap_or(1); drop(leader_state); let prev_log_index = next_index.saturating_sub(1); let prev_log_term = if prev_log_index > 0 { // Read as Vec since that's how it's stored let entries: Vec>> = self.storage .get_log_entries(prev_log_index..=prev_log_index) .map_err(|e| RaftError::StorageError(format!("Failed to read log: {}", e)))?; if entries.is_empty() { 0 } else { entries[0].log_id.term } } else { 0 }; // Get entries to send (if any) let entries: Vec> = if next_index <= last_log_index { // Read entries from storage (stored as Vec) let stored_entries: Vec>> = self.storage .get_log_entries(next_index..=last_log_index) .map_err(|e| RaftError::StorageError(format!("Failed to read log entries: {}", e)))?; // Convert Vec back to RaftCommand stored_entries.into_iter().map(|entry| { let command = bincode::deserialize(&match &entry.payload { EntryPayload::Normal(data) => data, EntryPayload::Blank => return Ok(LogEntry { log_id: entry.log_id, payload: EntryPayload::Blank, }), EntryPayload::Membership(nodes) => return Ok(LogEntry { log_id: entry.log_id, payload: EntryPayload::Membership(nodes.clone()), }), }).map_err(|e| RaftError::StorageError(format!("Failed to deserialize command: {}", e)))?; Ok(LogEntry { log_id: entry.log_id, payload: EntryPayload::Normal(command), }) }).collect::, RaftError>>()? } else { // No entries to send, just heartbeat vec![] }; eprintln!("[Node {}] HEARTBEAT to {}: entries.len()={} next_index={} last_log_index={}", self.node_id, peer_id, entries.len(), next_index, last_log_index); let req = AppendEntriesRequest { term, leader_id: self.node_id, prev_log_index, prev_log_term, entries, leader_commit: commit_index, }; eprintln!("[Node {}] LEADER sending to {}: leader_commit={}", self.node_id, peer_id, commit_index); let network = Arc::clone(&self.network); let event_tx = self.event_tx.clone(); // Send in background, don't wait for response tokio::spawn(async move { if let Ok(resp) = network.append_entries(peer_id, req).await { let _ = event_tx.send(RaftEvent::AppendEntriesResponse { from: peer_id, resp, }); } }); } Ok(()) } async fn handle_append_entries(&self, req: AppendEntriesRequest) -> Result { let mut persistent = self.persistent.write().await; let current_term = persistent.current_term; // DIAGNOSTIC: Log all AppendEntries received eprintln!("[Node {}] Received AppendEntries from {} term={} (my term={})", self.node_id, req.leader_id, req.term, current_term); // If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower if req.term > current_term { eprintln!("[Node {}] STEPPING DOWN: req.term={} > my term={}", self.node_id, req.term, current_term); persistent.current_term = req.term; persistent.voted_for = None; drop(persistent); self.persist_vote().await?; *self.role.write().await = RaftRole::Follower; *self.candidate_state.write().await = None; *self.leader_state.write().await = None; eprintln!("[Node {}] Stepped down to Follower (now term={})", self.node_id, req.term); } else { drop(persistent); } let persistent = self.persistent.read().await; let term = persistent.current_term; drop(persistent); // Reply false if term < currentTerm if req.term < term { return Ok(AppendEntriesResponse { term, success: false, conflict_index: None, conflict_term: None, }); } // Valid AppendEntries from current leader - reset election timer self.reset_election_timer(); // Update current leader self.volatile.write().await.current_leader = Some(req.leader_id); // P2: Log consistency check // Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm if req.prev_log_index > 0 { // Try to get the entry at prevLogIndex (stored as Vec) let prev_entries: Vec>> = self.storage .get_log_entries(req.prev_log_index..=req.prev_log_index) .map_err(|e| RaftError::StorageError(format!("Failed to read log: {}", e)))?; if prev_entries.is_empty() { // Follower doesn't have entry at prevLogIndex // Return conflict information for fast backtracking let last_index = self.get_last_log_info().await?.0; return Ok(AppendEntriesResponse { term, success: false, conflict_index: Some(last_index + 1), conflict_term: None, }); } let prev_entry = &prev_entries[0]; if prev_entry.log_id.term != req.prev_log_term { // Entry exists but term doesn't match // Find the first index of the conflicting term let conflict_term = prev_entry.log_id.term; // Search backwards to find first entry of this term let mut conflict_index = req.prev_log_index; for idx in (1..req.prev_log_index).rev() { let entries: Vec>> = self.storage .get_log_entries(idx..=idx) .map_err(|e| RaftError::StorageError(format!("Failed to read log: {}", e)))?; if !entries.is_empty() && entries[0].log_id.term != conflict_term { conflict_index = idx + 1; break; } } return Ok(AppendEntriesResponse { term, success: false, conflict_index: Some(conflict_index), conflict_term: Some(conflict_term), }); } } // P2: Log append/overwrite logic // If an existing entry conflicts with a new one (same index but different terms), // delete the existing entry and all that follow it if !req.entries.is_empty() { let first_new_index = req.entries[0].log_id.index; // Check if there's a conflict (stored as Vec) let existing: Vec>> = self.storage .get_log_entries(first_new_index..=first_new_index) .map_err(|e| RaftError::StorageError(format!("Failed to read log: {}", e)))?; if !existing.is_empty() && existing[0].log_id.term != req.entries[0].log_id.term { // Conflict detected - truncate from this index self.storage .truncate(first_new_index) .map_err(|e| RaftError::StorageError(format!("Failed to truncate log: {}", e)))?; } // Convert RaftCommand entries to Vec before storing let entries_to_store: Vec>> = req.entries.iter().map(|entry| { let payload = match &entry.payload { EntryPayload::Normal(cmd) => { let bytes = bincode::serialize(cmd) .map_err(|e| RaftError::StorageError(format!("Serialize failed: {}", e)))?; EntryPayload::Normal(bytes) } EntryPayload::Blank => EntryPayload::Blank, EntryPayload::Membership(nodes) => EntryPayload::Membership(nodes.clone()), }; Ok(LogEntry { log_id: entry.log_id, payload, }) }).collect::, RaftError>>()?; // Append converted entries self.storage .append(&entries_to_store) .map_err(|e| RaftError::StorageError(format!("Failed to append entries: {}", e)))?; let (last_log_index, _) = self.get_last_log_info().await?; eprintln!("[Node {}] FOLLOWER appended {} entries, last_index_now={}", self.node_id, req.entries.len(), last_log_index); } // P2: Update commit index // If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) eprintln!("[Node {}] FOLLOWER commit check: req.leader_commit={} my_commit={}", self.node_id, req.leader_commit, self.volatile.read().await.commit_index); if req.leader_commit > 0 { let mut volatile = self.volatile.write().await; if req.leader_commit > volatile.commit_index { let last_new_index = if !req.entries.is_empty() { req.entries.last().unwrap().log_id.index } else { req.prev_log_index }; let new_commit = std::cmp::min(req.leader_commit, last_new_index); eprintln!("[Node {}] FOLLOWER updating commit: {} -> {}", self.node_id, volatile.commit_index, new_commit); volatile.commit_index = new_commit; debug!( commit_index = volatile.commit_index, leader_commit = req.leader_commit, "Updated commit index" ); // Drop the lock before calling apply drop(volatile); // Apply newly committed entries to state machine self.apply_committed_entries().await?; } } Ok(AppendEntriesResponse { term, success: true, conflict_index: None, conflict_term: None, }) } async fn handle_append_entries_response(&self, from: NodeId, resp: AppendEntriesResponse) -> Result<(), RaftError> { // Only leaders process AppendEntries responses let role = *self.role.read().await; if role != RaftRole::Leader { return Ok(()); } let current_term = self.persistent.read().await.current_term; // If response term > current term, step down if resp.term > current_term { self.step_down(resp.term).await?; return Ok(()); } // Ignore stale responses if resp.term < current_term { return Ok(()); } // Update next_index and match_index based on response let mut leader_state_guard = self.leader_state.write().await; if let Some(leader_state) = leader_state_guard.as_mut() { if resp.success { // Follower successfully replicated entries // Get the old next_index to calculate what we sent let old_next_index = leader_state.next_index.get(&from).copied().unwrap_or(1); // Get current last_log_index after getting old_next_index let (last_log_index, _) = self.get_last_log_info().await?; // We sent entries from old_next_index to last_log_index (at time of sending) // Since the response is success, the follower has all entries up to // the last index we sent let new_match_index = if old_next_index <= last_log_index { // We sent some entries, follower has up to last_log_index last_log_index } else { // Empty heartbeat, match_index stays at previous value old_next_index.saturating_sub(1) }; leader_state.match_index.insert(from, new_match_index); leader_state.next_index.insert(from, new_match_index + 1); eprintln!("[Node {}] RESP from {}: success={} match_index={} next_index={}", self.node_id, from, resp.success, new_match_index, new_match_index + 1); trace!( peer = from, match_index = new_match_index, next_index = new_match_index + 1, old_next_index = old_next_index, "Updated peer replication progress" ); } else { // Follower's log is inconsistent, decrement next_index if let Some(next_index) = leader_state.next_index.get_mut(&from) { if let Some(conflict_index) = resp.conflict_index { // Use conflict information for fast backtracking *next_index = conflict_index; } else { // Decrement next_index by 1 *next_index = next_index.saturating_sub(1).max(1); } debug!( peer = from, new_next_index = *next_index, conflict_index = ?resp.conflict_index, conflict_term = ?resp.conflict_term, "Follower log inconsistent, adjusted next_index" ); } } } drop(leader_state_guard); // Try to advance commit index after updating match_index if resp.success { self.advance_commit_index().await?; } Ok(()) } // ======================================================================== // P3: Commitment Logic // ======================================================================== /// Advance commit index based on majority replication async fn advance_commit_index(&self) -> Result<(), RaftError> { let leader_state = self.leader_state.read().await; if leader_state.is_none() { return Ok(()); // Not leader } let leader_state = leader_state.as_ref().unwrap(); // Collect all match_index values plus leader's own log let (last_log_index, _) = self.get_last_log_info().await?; let mut match_indices: Vec = leader_state .match_index .values() .copied() .collect(); // Add leader's own index match_indices.push(last_log_index); // Sort to find median (majority point) match_indices.sort_unstable(); // Majority index is at position N/2 (0-indexed median) let majority_index = match_indices.len() / 2; let new_commit_index = match_indices[majority_index]; eprintln!("[Node {}] COMMIT CHECK: match_indices={:?} majority_idx={} new_commit={}", self.node_id, match_indices, majority_index, new_commit_index); let current_term = self.persistent.read().await.current_term; let old_commit_index = self.volatile.read().await.commit_index; // Only commit if: // 1. new_commit_index > current commit_index // 2. The entry at new_commit_index is from current term (Raft safety) if new_commit_index > old_commit_index { // Check term of entry at new_commit_index (stored as Vec) let entries: Vec>> = self.storage .get_log_entries(new_commit_index..=new_commit_index) .map_err(|e| RaftError::StorageError(format!("Failed to read log for commit: {}", e)))?; if !entries.is_empty() && entries[0].log_id.term == current_term { // Safe to commit self.volatile.write().await.commit_index = new_commit_index; debug!( old_commit = old_commit_index, new_commit = new_commit_index, "Advanced commit index" ); // Apply newly committed entries self.apply_committed_entries().await?; } } Ok(()) } /// Apply committed entries to state machine async fn apply_committed_entries(&self) -> Result<(), RaftError> { let mut volatile = self.volatile.write().await; let commit_index = volatile.commit_index; let last_applied = volatile.last_applied; if commit_index <= last_applied { return Ok(()); // Nothing to apply } // Get entries to apply (stored as Vec) let stored_entries: Vec>> = self.storage .get_log_entries((last_applied + 1)..=commit_index) .map_err(|e| RaftError::StorageError(format!("Failed to read entries for apply: {}", e)))?; // Apply each entry to state machine for entry in &stored_entries { if let EntryPayload::Normal(data) = &entry.payload { // Deserialize the command let command: RaftCommand = bincode::deserialize(data) .map_err(|e| RaftError::StorageError(format!("Failed to deserialize for apply: {}", e)))?; self.state_machine .apply(command) .map_err(|e| RaftError::StorageError(format!("Failed to apply to state machine: {}", e)))?; debug!( index = entry.log_id.index, term = entry.log_id.term, "Applied entry to state machine" ); } } // Update last_applied volatile.last_applied = commit_index; debug!( last_applied = commit_index, entries_applied = stored_entries.len(), "Applied committed entries to state machine" ); Ok(()) } // ======================================================================== // P3: Client Requests // ======================================================================== async fn handle_client_write(&self, command: RaftCommand) -> Result<(), RaftError> { let role = *self.role.read().await; if role != RaftRole::Leader { return Err(RaftError::NotLeader { leader_id: None }); } // Get current term and last log index let term = self.persistent.read().await.current_term; eprintln!("[Node {}] handle_client_write: getting last_log_info...", self.node_id); let (last_log_index, _) = match self.get_last_log_info().await { Ok(info) => { eprintln!("[Node {}] handle_client_write: last_log_index={}", self.node_id, info.0); info } Err(e) => { eprintln!("[Node {}] handle_client_write: ERROR getting last_log_info: {:?}", self.node_id, e); return Err(e); } }; let new_index = last_log_index + 1; // Serialize command to Vec for storage let command_bytes = bincode::serialize(&command) .map_err(|e| RaftError::StorageError(format!("Failed to serialize command: {}", e)))?; // Create new log entry let log_id = LogId { term, index: new_index, }; let entry = LogEntry { log_id, payload: EntryPayload::Normal(command_bytes), }; // Append to leader's log eprintln!("[Node {}] handle_client_write: appending entry index={} term={}...", self.node_id, new_index, term); match self.storage.append(&[entry.clone()]) { Ok(()) => { eprintln!("[Node {}] handle_client_write: append SUCCESS index={}", self.node_id, new_index); } Err(e) => { eprintln!("[Node {}] handle_client_write: append FAILED: {:?}", self.node_id, e); return Err(RaftError::StorageError(format!("Failed to append entry: {}", e))); } } debug!( term = term, index = new_index, "Leader appended entry to log" ); // Trigger immediate replication to all followers // Send AppendEntries with the new entry to all peers self.event_tx .send(RaftEvent::HeartbeatTimeout) .map_err(|e| RaftError::NetworkError(format!("Failed to trigger replication: {}", e)))?; // Single-node cluster: immediately commit since we're the only voter if self.peers.is_empty() { self.advance_commit_index().await?; } // Note: In a production implementation, we would wait for majority // acknowledgment before returning success. For now, we return immediately // and let the async replication/commit process handle it via normal // heartbeat responses updating match_index. Ok(()) } // ======================================================================== // Helper Methods // ======================================================================== /// Get last log index and term async fn get_last_log_info(&self) -> Result<(LogIndex, Term), RaftError> { let log_state = self.storage .get_log_state() .map_err(|e| RaftError::StorageError(format!("Failed to get log state: {}", e)))?; if let Some(last_log_id) = log_state.last_log_id { Ok((last_log_id.index, last_log_id.term)) } else { Ok((0, 0)) } } /// Spawn election timer task fn spawn_election_timer(&self) { let event_tx = self.event_tx.clone(); let config = self.config.clone(); let reset_notify = Arc::clone(&self.election_timer_reset); tokio::spawn(async move { eprintln!("[ELECTION TIMER] Spawned"); loop { let timeout = rand::random::() % (config.election_timeout_max - config.election_timeout_min) + config.election_timeout_min; eprintln!("[ELECTION TIMER] Waiting {}ms", timeout); tokio::select! { _ = time::sleep(Duration::from_millis(timeout)) => { // Election timeout fired eprintln!("[ELECTION TIMER] Timeout fired, sending event"); if event_tx.send(RaftEvent::ElectionTimeout).is_err() { eprintln!("[ELECTION TIMER] Send failed, exiting"); break; } eprintln!("[ELECTION TIMER] Event sent successfully"); } _ = reset_notify.notified() => { // Timer was reset, restart the loop with new timeout eprintln!("[ELECTION TIMER] Reset notification received"); continue; } } } eprintln!("[ELECTION TIMER] Exited"); }); } /// Reset the election timer (called when receiving valid RPC or becoming leader) fn reset_election_timer(&self) { self.election_timer_reset.notify_one(); } /// Spawn heartbeat timer task (leader sends periodic heartbeats) fn spawn_heartbeat_timer(&self) { let event_tx = self.event_tx.clone(); let config = self.config.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(config.heartbeat_interval)); // Skip the first tick (fires immediately) interval.tick().await; loop { interval.tick().await; if event_tx.send(RaftEvent::HeartbeatTimeout).is_err() { break; } } }); } // ======================================================================== // Public API for external access (testing, metrics, etc.) // ======================================================================== /// Get this node's ID pub fn node_id(&self) -> NodeId { self.node_id } /// Alias for node_id() for API compatibility pub fn id(&self) -> NodeId { self.node_id } /// Get current role pub async fn role(&self) -> RaftRole { *self.role.read().await } /// Get current term pub async fn current_term(&self) -> Term { self.persistent.read().await.current_term } /// Inject RequestVote RPC (for testing) pub async fn request_vote_rpc( &self, req: VoteRequest, resp_tx: oneshot::Sender, ) { let _ = self.event_tx.send(RaftEvent::VoteRequest { req, response_tx: resp_tx }); } /// Inject AppendEntries RPC (for testing) pub async fn append_entries_rpc( &self, req: AppendEntriesRequest, resp_tx: oneshot::Sender, ) { eprintln!("[Node {}] append_entries_rpc: from {} term={}", self.node_id, req.leader_id, req.term); let result = self.event_tx.send(RaftEvent::AppendEntries { req, response_tx: resp_tx }); if let Err(e) = result { eprintln!("[Node {}] ERROR: Failed to send AppendEntries event: channel closed", self.node_id); } } /// Get current leader pub async fn leader(&self) -> Option { self.volatile.read().await.current_leader } /// Submit a client write command (non-blocking, returns immediately after append) pub async fn client_write(&self, command: RaftCommand) -> Result<(), RaftError> { let (tx, rx) = oneshot::channel(); self.event_tx .send(RaftEvent::ClientWrite { command, response_tx: tx, }) .map_err(|e| RaftError::NetworkError(format!("Failed to send client write: {}", e)))?; rx.await .map_err(|e| RaftError::NetworkError(format!("Client write response lost: {}", e)))? } /// Submit a client write and wait for commit (blocking version) /// Returns RaftResponse after the command is committed and applied pub async fn write(&self, command: RaftCommand) -> Result { use chainfire_types::command::RaftResponse; // Get current commit index before write let initial_commit = self.volatile.read().await.commit_index; // Submit the write self.client_write(command).await?; // Wait for commit to advance (with timeout) let timeout = tokio::time::Duration::from_secs(5); let start = tokio::time::Instant::now(); loop { let current_commit = self.volatile.read().await.commit_index; if current_commit > initial_commit { // Entry committed, get current revision from state machine let revision = self.state_machine.current_revision(); return Ok(RaftResponse { revision, prev_kv: None, deleted: 0, succeeded: true, prev_kvs: vec![], lease_id: None, lease_ttl: None, txn_responses: vec![], }); } if start.elapsed() > timeout { return Err(RaftError::Timeout); } // Sleep briefly before checking again tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } } /// Get current commit index pub async fn commit_index(&self) -> LogIndex { self.volatile.read().await.commit_index } /// Get current last_applied index pub async fn last_applied(&self) -> LogIndex { self.volatile.read().await.last_applied } /// Get state machine reference for testing/verification pub fn state_machine(&self) -> Arc { Arc::clone(&self.state_machine) } /// Get storage reference for snapshot operations pub fn storage(&self) -> Arc { Arc::clone(&self.storage) } /// Get current cluster membership as list of node IDs /// NOTE: Custom RaftCore uses static membership configured at startup pub async fn membership(&self) -> Vec { let mut members = vec![self.node_id]; members.extend(self.peers.iter().cloned()); members.sort(); members } } // ============================================================================ // Unit Tests // ============================================================================ #[cfg(test)] mod tests { use super::*; #[test] fn test_vote_request_creation() { let req = VoteRequest { term: 1, candidate_id: 1, last_log_index: 0, last_log_term: 0, }; assert_eq!(req.term, 1); assert_eq!(req.candidate_id, 1); } #[tokio::test] async fn test_raft_core_creation() { // TODO: Add proper unit tests with mock storage/network } }