//! Cluster management use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use parking_lot::RwLock; use tokio::sync::broadcast; use chainfire_gossip::{GossipAgent, MembershipChange}; use chainfire_types::node::NodeInfo; use crate::config::ClusterConfig; use crate::error::{ClusterError, Result}; use crate::events::EventDispatcher; use crate::kvs::{Kv, KvHandle}; /// Current state of the cluster #[derive(Debug, Clone)] #[derive(Default)] pub struct ClusterState { /// Whether this node is the leader pub is_leader: bool, /// Current leader's node ID pub leader_id: Option, /// Current term (Raft) pub term: u64, /// All known cluster members pub members: Vec, /// Whether the cluster is ready (initial leader elected) pub ready: bool, } /// Main cluster instance /// /// This is the primary interface for interacting with a Chainfire cluster. /// It manages Raft consensus, gossip membership, and the distributed KV store. pub struct Cluster { /// Node configuration config: ClusterConfig, /// Current cluster state state: Arc>, /// KV store kv: Arc, /// Gossip agent for cluster membership gossip_agent: Option, /// Event dispatcher event_dispatcher: Arc, /// Shutdown flag shutdown: AtomicBool, /// Shutdown signal sender shutdown_tx: broadcast::Sender<()>, } impl Cluster { /// Create a new cluster instance pub(crate) fn new( config: ClusterConfig, gossip_agent: Option, event_dispatcher: EventDispatcher, ) -> Self { let (shutdown_tx, _) = broadcast::channel(1); Self { config, state: Arc::new(RwLock::new(ClusterState::default())), kv: Arc::new(Kv::new()), gossip_agent, event_dispatcher: Arc::new(event_dispatcher), shutdown: AtomicBool::new(false), shutdown_tx, } } /// Get this node's ID pub fn node_id(&self) -> u64 { self.config.node_id } /// Get this node's name pub fn node_name(&self) -> &str { &self.config.node_name } /// Get a handle for interacting with the cluster /// /// Handles are lightweight and can be cloned freely. pub fn handle(&self) -> ClusterHandle { ClusterHandle { node_id: self.config.node_id, state: self.state.clone(), kv: self.kv.clone(), shutdown_tx: self.shutdown_tx.clone(), } } /// Get the KV store interface pub fn kv(&self) -> &Arc { &self.kv } /// Get current cluster state pub fn state(&self) -> ClusterState { self.state.read().clone() } /// Check if this node is the leader pub fn is_leader(&self) -> bool { self.state.read().is_leader } /// Get current leader ID pub fn leader(&self) -> Option { self.state.read().leader_id } /// Get all cluster members pub fn members(&self) -> Vec { self.state.read().members.clone() } /// Check if the cluster is ready pub fn is_ready(&self) -> bool { self.state.read().ready } /// Join an existing cluster /// /// Connects to seed nodes and joins the cluster via gossip. pub async fn join(&mut self, seed_addrs: &[std::net::SocketAddr]) -> Result<()> { if seed_addrs.is_empty() { return Err(ClusterError::Config("No seed addresses provided".into())); } let gossip_agent = self.gossip_agent.as_mut().ok_or_else(|| { ClusterError::Config("Gossip agent not initialized".into()) })?; // Announce to all seed nodes to discover the cluster for &addr in seed_addrs { tracing::info!(%addr, "Announcing to seed node"); gossip_agent .announce(addr) .map_err(|e| ClusterError::Gossip(e.to_string()))?; } tracing::info!(seeds = seed_addrs.len(), "Joined cluster via gossip"); Ok(()) } /// Leave the cluster gracefully pub async fn leave(&self) -> Result<()> { // TODO: Implement graceful leave self.shutdown(); Ok(()) } /// Add a new node to the cluster (leader only) pub async fn add_node(&self, _node: NodeInfo, _as_learner: bool) -> Result<()> { if !self.is_leader() { return Err(ClusterError::NotLeader { leader_id: self.leader(), }); } // TODO: Implement node addition via Raft Ok(()) } /// Remove a node from the cluster (leader only) pub async fn remove_node(&self, _node_id: u64) -> Result<()> { if !self.is_leader() { return Err(ClusterError::NotLeader { leader_id: self.leader(), }); } // TODO: Implement node removal via Raft Ok(()) } /// Promote a learner to voter (leader only) pub async fn promote_learner(&self, _node_id: u64) -> Result<()> { if !self.is_leader() { return Err(ClusterError::NotLeader { leader_id: self.leader(), }); } // TODO: Implement learner promotion via Raft Ok(()) } /// Run the cluster (blocks until shutdown) pub async fn run(self) -> Result<()> { self.run_until_shutdown(std::future::pending()).await } /// Run with graceful shutdown signal pub async fn run_until_shutdown(mut self, shutdown_signal: F) -> Result<()> where F: std::future::Future, { let mut shutdown_rx = self.shutdown_tx.subscribe(); // Start gossip agent if present let gossip_task = if let Some(mut gossip_agent) = self.gossip_agent.take() { let state = self.state.clone(); let shutdown_rx_gossip = self.shutdown_tx.subscribe(); // Spawn task to handle gossip membership changes Some(tokio::spawn(async move { // Run the gossip agent with shutdown signal if let Err(e) = gossip_agent.run_until_shutdown(shutdown_rx_gossip).await { tracing::error!(error = %e, "Gossip agent error"); } })) } else { None }; tokio::select! { _ = shutdown_signal => { tracing::info!("Received shutdown signal"); } _ = shutdown_rx.recv() => { tracing::info!("Received internal shutdown"); } } // Wait for gossip task to finish if let Some(task) = gossip_task { let _ = task.await; } Ok(()) } /// Trigger shutdown pub fn shutdown(&self) { self.shutdown.store(true, Ordering::SeqCst); let _ = self.shutdown_tx.send(()); } /// Check if shutdown was requested pub fn is_shutting_down(&self) -> bool { self.shutdown.load(Ordering::SeqCst) } /// Get the event dispatcher pub(crate) fn event_dispatcher(&self) -> &Arc { &self.event_dispatcher } } /// Lightweight handle for cluster operations /// /// This handle can be cloned and passed around cheaply. It provides /// access to cluster state and the KV store without owning the cluster. #[derive(Clone)] pub struct ClusterHandle { node_id: u64, state: Arc>, kv: Arc, shutdown_tx: broadcast::Sender<()>, } impl ClusterHandle { /// Get this node's ID pub fn node_id(&self) -> u64 { self.node_id } /// Get a KV handle pub fn kv(&self) -> KvHandle { KvHandle::new(self.kv.clone()) } /// Check if this node is the leader pub fn is_leader(&self) -> bool { self.state.read().is_leader } /// Get current leader ID pub fn leader(&self) -> Option { self.state.read().leader_id } /// Get all cluster members pub fn members(&self) -> Vec { self.state.read().members.clone() } /// Get current cluster state pub fn state(&self) -> ClusterState { self.state.read().clone() } /// Trigger cluster shutdown pub fn shutdown(&self) { let _ = self.shutdown_tx.send(()); } }