//! Internal Raft RPC service implementation //! //! This service handles Raft protocol messages between nodes in the cluster. //! It bridges the gRPC layer with the OpenRaft implementation. use crate::internal_proto::{ raft_service_server::RaftService, AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse, VoteRequest, VoteResponse, }; use chainfire_raft::{Raft, TypeConfig}; use chainfire_types::NodeId; use openraft::BasicNode; use std::sync::Arc; use tonic::{Request, Response, Status, Streaming}; use tracing::{debug, trace, warn}; /// Internal Raft RPC service implementation /// /// This service handles Raft protocol messages between nodes. pub struct RaftServiceImpl { /// Reference to the Raft instance raft: Arc, } impl RaftServiceImpl { /// Create a new Raft service with a Raft instance pub fn new(raft: Arc) -> Self { Self { raft } } } #[tonic::async_trait] impl RaftService for RaftServiceImpl { async fn vote( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); trace!( term = req.term, candidate = req.candidate_id, "Vote request received" ); // Convert proto request to openraft request let vote_req = openraft::raft::VoteRequest { vote: openraft::Vote::new(req.term, req.candidate_id), last_log_id: if req.last_log_index > 0 { Some(openraft::LogId::new( openraft::CommittedLeaderId::new(req.last_log_term, 0), req.last_log_index, )) } else { None }, }; // Forward to Raft node let result = self.raft.vote(vote_req).await; match result { Ok(resp) => { trace!(term = resp.vote.leader_id().term, granted = resp.vote_granted, "Vote response"); Ok(Response::new(VoteResponse { term: resp.vote.leader_id().term, vote_granted: resp.vote_granted, last_log_index: resp.last_log_id.map(|id| id.index).unwrap_or(0), last_log_term: resp.last_log_id.map(|id| id.leader_id.term).unwrap_or(0), })) } Err(e) => { warn!(error = %e, "Vote request failed"); Err(Status::internal(e.to_string())) } } } async fn append_entries( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); trace!( term = req.term, leader = req.leader_id, entries = req.entries.len(), "AppendEntries request received" ); // Convert proto entries to openraft entries let entries: Vec> = req .entries .into_iter() .map(|e| { let payload = if e.data.is_empty() { openraft::EntryPayload::Blank } else { // Deserialize the command from the entry data match bincode::deserialize(&e.data) { Ok(cmd) => openraft::EntryPayload::Normal(cmd), Err(_) => openraft::EntryPayload::Blank, } }; openraft::Entry { log_id: openraft::LogId::new( openraft::CommittedLeaderId::new(e.term, 0), e.index, ), payload, } }) .collect(); let prev_log_id = if req.prev_log_index > 0 { Some(openraft::LogId::new( openraft::CommittedLeaderId::new(req.prev_log_term, 0), req.prev_log_index, )) } else { None }; let leader_commit = if req.leader_commit > 0 { Some(openraft::LogId::new( openraft::CommittedLeaderId::new(req.term, 0), req.leader_commit, )) } else { None }; let append_req = openraft::raft::AppendEntriesRequest { vote: openraft::Vote::new_committed(req.term, req.leader_id), prev_log_id, entries, leader_commit, }; let result = self.raft.append_entries(append_req).await; match result { Ok(resp) => { let (success, conflict_index, conflict_term) = match resp { openraft::raft::AppendEntriesResponse::Success => (true, 0, 0), openraft::raft::AppendEntriesResponse::PartialSuccess(log_id) => { // Partial success - some entries were accepted let index = log_id.map(|l| l.index).unwrap_or(0); (true, index, 0) } openraft::raft::AppendEntriesResponse::HigherVote(vote) => { (false, 0, vote.leader_id().term) } openraft::raft::AppendEntriesResponse::Conflict => (false, 0, 0), }; trace!(success, "AppendEntries response"); Ok(Response::new(AppendEntriesResponse { term: req.term, success, conflict_index, conflict_term, })) } Err(e) => { warn!(error = %e, "AppendEntries request failed"); Err(Status::internal(e.to_string())) } } } async fn install_snapshot( &self, request: Request>, ) -> Result, Status> { let mut stream = request.into_inner(); debug!("InstallSnapshot stream started"); // Collect all chunks let mut term = 0; let mut leader_id = 0; let mut last_log_index = 0; let mut last_log_term = 0; let mut data = Vec::new(); while let Some(chunk) = stream.message().await? { term = chunk.term; leader_id = chunk.leader_id; last_log_index = chunk.last_included_index; last_log_term = chunk.last_included_term; data.extend_from_slice(&chunk.data); if chunk.done { break; } } debug!(term, size = data.len(), "InstallSnapshot completed"); // Create snapshot metadata let last_log_id = if last_log_index > 0 { Some(openraft::LogId::new( openraft::CommittedLeaderId::new(last_log_term, 0), last_log_index, )) } else { None }; let meta = openraft::SnapshotMeta { last_log_id, last_membership: openraft::StoredMembership::new( None, openraft::Membership::::new(vec![], None), ), snapshot_id: format!("{}-{}", term, last_log_index), }; let snapshot_req = openraft::raft::InstallSnapshotRequest { vote: openraft::Vote::new_committed(term, leader_id), meta, offset: 0, data, done: true, }; let result = self.raft.install_snapshot(snapshot_req).await; match result { Ok(resp) => { debug!(term = resp.vote.leader_id().term, "InstallSnapshot response"); Ok(Response::new(InstallSnapshotResponse { term: resp.vote.leader_id().term, })) } Err(e) => { warn!(error = %e, "InstallSnapshot request failed"); Err(Status::internal(e.to_string())) } } } }