//! Node orchestration //! //! This module manages the lifecycle of all components in a Chainfire node. use crate::config::ServerConfig; use anyhow::Result; use chainfire_api::GrpcRaftClient; use chainfire_gossip::{GossipAgent, GossipId}; use chainfire_raft::core::{RaftCore, RaftConfig}; use chainfire_raft::network::RaftRpcClient; use chainfire_storage::{RocksStore, LogStorage, StateMachine}; use chainfire_types::node::NodeRole; use chainfire_types::RaftRole; use chainfire_watch::WatchRegistry; use std::sync::Arc; use tokio::sync::broadcast; use tracing::info; /// Node instance managing all components pub struct Node { /// Server configuration config: ServerConfig, /// Raft core (None if role is RaftRole::None) raft: Option>, /// gRPC Raft client (None if role is RaftRole::None) rpc_client: Option>, /// Watch registry watch_registry: Arc, /// Gossip agent (runs on all nodes) gossip: Option, /// Shutdown signal shutdown_tx: broadcast::Sender<()>, } impl Node { /// Create a new node pub async fn new(config: ServerConfig) -> Result { // Ensure data directory exists std::fs::create_dir_all(&config.storage.data_dir)?; // Create watch registry let watch_registry = Arc::new(WatchRegistry::new()); // Create Raft core only if role participates in Raft let (raft, rpc_client) = if config.raft.role.participates_in_raft() { // Create RocksDB store let store = RocksStore::new(&config.storage.data_dir)?; info!(data_dir = ?config.storage.data_dir, "Opened storage"); // Create LogStorage and StateMachine from store let log_storage = Arc::new(LogStorage::new(store.clone())); let state_machine = Arc::new(StateMachine::new(store.clone())?); // Create gRPC Raft client and register peer addresses let rpc_client = Arc::new(GrpcRaftClient::new()); for member in &config.cluster.initial_members { rpc_client.add_node(member.id, member.raft_addr.clone()).await; info!(node_id = member.id, addr = %member.raft_addr, "Registered peer"); } // Extract peer node IDs (excluding self) let peers: Vec = config.cluster.initial_members .iter() .map(|m| m.id) .filter(|&id| id != config.node.id) .collect(); // Create RaftCore with default config let raft_core = Arc::new(RaftCore::new( config.node.id, peers, log_storage, state_machine, Arc::clone(&rpc_client) as Arc, RaftConfig::default(), )); // Initialize Raft (load persistent state) raft_core.initialize().await?; info!( node_id = config.node.id, raft_role = %config.raft.role, "Created Raft core" ); // Spawn the Raft event loop let raft_clone = Arc::clone(&raft_core); tokio::spawn(async move { if let Err(e) = raft_clone.run().await { tracing::error!(error = ?e, "Raft event loop failed"); } }); info!(node_id = config.node.id, "Raft event loop started"); (Some(raft_core), Some(rpc_client)) } else { info!( node_id = config.node.id, raft_role = %config.raft.role, "Skipping Raft core (role=none)" ); (None, None) }; // Gossip runs on ALL nodes regardless of Raft role let gossip_role = match config.node.role.as_str() { "control_plane" => NodeRole::ControlPlane, _ => NodeRole::Worker, }; let gossip_id = GossipId::new(config.node.id, config.network.gossip_addr, gossip_role); let gossip = Some( GossipAgent::new(gossip_id, chainfire_gossip::agent::default_config()) .await?, ); info!( addr = %config.network.gossip_addr, gossip_role = ?gossip_role, "Created gossip agent" ); let (shutdown_tx, _) = broadcast::channel(1); Ok(Self { config, raft, rpc_client, watch_registry, gossip, shutdown_tx, }) } /// Get the Raft core (None if role is RaftRole::None) pub fn raft(&self) -> Option<&Arc> { self.raft.as_ref() } /// Check if this node has Raft enabled pub fn has_raft(&self) -> bool { self.raft.is_some() } /// Get the Raft role configuration pub fn raft_role(&self) -> RaftRole { self.config.raft.role } /// Get the watch registry pub fn watch_registry(&self) -> &Arc { &self.watch_registry } /// Get the gRPC Raft client (None if role is RaftRole::None) pub fn rpc_client(&self) -> Option<&Arc> { self.rpc_client.as_ref() } /// Get the cluster ID pub fn cluster_id(&self) -> u64 { self.config.cluster.id } /// Initialize the cluster if bootstrapping /// /// This handles different behaviors based on RaftRole: /// - Voter with bootstrap=true: Raft is ready (already initialized in new()) /// - Learner: Wait to be added by the leader /// - None: No Raft, nothing to do /// /// NOTE: Custom RaftCore handles multi-node initialization via the peers parameter /// in the constructor. All nodes start with the same peer list and will elect a leader. pub async fn maybe_bootstrap(&self) -> Result<()> { let Some(raft) = &self.raft else { info!("No Raft core to bootstrap (role=none)"); return Ok(()); }; match self.config.raft.role { RaftRole::Voter if self.config.cluster.bootstrap => { info!( node_id = self.config.node.id, peers = ?self.config.cluster.initial_members.iter().map(|m| m.id).collect::>(), "Raft core ready for leader election" ); // Raft core is already initialized and running from new() // It will participate in leader election automatically } RaftRole::Learner => { info!( node_id = self.config.node.id, "Learner node ready, waiting to be added to cluster" ); // Learners don't participate in elections } RaftRole::Voter if !self.config.cluster.bootstrap => { info!( node_id = self.config.node.id, "Non-bootstrap voter ready for leader election" ); // Non-bootstrap voters are also ready to participate } _ => { info!( node_id = self.config.node.id, raft_role = %self.config.raft.role, bootstrap = self.config.cluster.bootstrap, "Raft core initialized" ); } } Ok(()) } /// Get shutdown receiver pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> { self.shutdown_tx.subscribe() } /// Trigger shutdown pub fn shutdown(&self) { let _ = self.shutdown_tx.send(()); } }