//! gRPC server //! //! This module provides the main server implementation that hosts all gRPC services. //! Supports two modes: //! - Full server mode (voter/learner): Runs Raft consensus and all services //! - Agent mode (role=none): Runs gossip only, proxies requests to control-plane use crate::config::ServerConfig; use crate::node::Node; use crate::rest::{build_router, RestApiState}; use anyhow::Result; use chainfire_api::internal_proto::raft_service_server::RaftServiceServer; use chainfire_api::proto::{ cluster_server::ClusterServer, kv_server::KvServer, watch_server::WatchServer, Member, }; use chainfire_api::{ClusterServiceImpl, KvServiceImpl, RaftServiceImpl, WatchServiceImpl}; use chainfire_types::RaftRole; use std::collections::HashMap; use std::sync::Arc; use tokio::signal; use tonic::transport::{Certificate, Identity, Server as TonicServer, ServerTlsConfig}; use tonic_health::server::health_reporter; use tracing::info; /// Main server instance pub struct Server { node: Arc, config: ServerConfig, } impl Server { /// Create a new server pub async fn new(config: ServerConfig) -> Result { let node = Arc::new(Node::new(config.clone()).await?); Ok(Self { node, config }) } /// Apply TLS configuration to a server builder async fn apply_tls_config( &self, builder: TonicServer, ) -> Result { if let Some(tls_config) = &self.config.network.tls { info!("TLS enabled, loading certificates..."); let cert = tokio::fs::read(&tls_config.cert_file).await?; let key = tokio::fs::read(&tls_config.key_file).await?; let server_identity = Identity::from_pem(cert, key); let tls = if tls_config.require_client_cert { info!("mTLS enabled, requiring client certificates"); let ca_cert = tokio::fs::read( tls_config .ca_file .as_ref() .ok_or_else(|| anyhow::anyhow!("ca_file required when require_client_cert=true"))?, ) .await?; let ca = Certificate::from_pem(ca_cert); ServerTlsConfig::new() .identity(server_identity) .client_ca_root(ca) } else { info!("TLS-only mode, client certificates not required"); ServerTlsConfig::new().identity(server_identity) }; Ok(builder.tls_config(tls)?) } else { info!("TLS disabled, running in plain-text mode"); Ok(builder) } } /// Run the server in the appropriate mode based on Raft role pub async fn run(self) -> Result<()> { match self.node.raft_role() { RaftRole::None => self.run_agent_mode().await, _ => self.run_full_mode().await, } } /// Run in full server mode (voter/learner with Raft consensus) async fn run_full_mode(self) -> Result<()> { let raft = self .node .raft() .expect("raft core should exist in full mode") .clone(); // Bootstrap cluster if needed self.node.maybe_bootstrap().await?; // Create gRPC services for client API let kv_service = KvServiceImpl::new(Arc::clone(&raft), self.node.cluster_id()); let watch_service = WatchServiceImpl::new( Arc::clone(self.node.watch_registry()), self.node.cluster_id(), raft.node_id(), ); let rpc_client = self .node .rpc_client() .expect("rpc_client should exist in full mode") .clone(); let cluster_service = ClusterServiceImpl::new( Arc::clone(&raft), rpc_client, self.node.cluster_id(), configured_members(&self.config), ); // Internal Raft service for inter-node communication let raft_service = RaftServiceImpl::new(Arc::clone(&raft)); // Health check service for K8s liveness/readiness probes let (mut health_reporter, health_service) = health_reporter(); health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; info!( api_addr = %self.config.network.api_addr, http_addr = %self.config.network.http_addr, raft_addr = %self.config.network.raft_addr, "Starting gRPC and HTTP servers" ); // Shutdown signal channel let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1); let mut shutdown_rx1 = shutdown_tx.subscribe(); let mut shutdown_rx2 = shutdown_tx.subscribe(); let mut shutdown_rx3 = shutdown_tx.subscribe(); // Client API server (KV, Watch, Cluster, Health) let api_addr = self.config.network.api_addr; let api_builder = self .apply_tls_config(TonicServer::builder()) .await? .add_service(health_service) .add_service(KvServer::new(kv_service)) .add_service(WatchServer::new(watch_service)) .add_service(ClusterServer::new(cluster_service)); let api_server = api_builder.serve_with_shutdown(api_addr, async move { let _ = shutdown_rx1.recv().await; }); // Internal Raft server (peer-to-peer communication) let raft_addr = self.config.network.raft_addr; let raft_builder = self .apply_tls_config(TonicServer::builder()) .await? .add_service(RaftServiceServer::new(raft_service)); let raft_server = raft_builder.serve_with_shutdown(raft_addr, async move { let _ = shutdown_rx2.recv().await; }); // HTTP REST API server let http_addr = self.config.network.http_addr; let http_port = self.config.network.http_addr.port(); let peer_http_addrs = Arc::new( self.config .cluster .initial_members .iter() .filter_map(|member| { http_endpoint_from_raft_addr(&member.raft_addr, http_port) .map(|http_addr| (member.id, http_addr)) }) .collect::>(), ); let rest_state = RestApiState { raft: Arc::clone(&raft), cluster_id: self.node.cluster_id(), rpc_client: self.node.rpc_client().cloned(), http_client: reqwest::Client::new(), peer_http_addrs, }; let rest_app = build_router(rest_state); let http_listener = tokio::net::TcpListener::bind(&http_addr).await?; let http_server = async move { axum::serve(http_listener, rest_app) .with_graceful_shutdown(async move { let _ = shutdown_rx3.recv().await; }) .await }; info!(api_addr = %api_addr, "Client API server (gRPC) starting"); info!(http_addr = %http_addr, "HTTP REST API server starting"); info!(raft_addr = %raft_addr, "Raft server starting"); // Run all three servers concurrently tokio::select! { result = api_server => { if let Err(e) = result { tracing::error!(error = %e, "API server error"); } } result = raft_server => { if let Err(e) = result { tracing::error!(error = %e, "Raft server error"); } } result = http_server => { if let Err(e) = result { tracing::error!(error = %e, "HTTP server error"); } } _ = signal::ctrl_c() => { info!("Received shutdown signal"); let _ = shutdown_tx.send(()); } } info!("Server stopped"); Ok(()) } /// Run in agent mode (role=none, gossip only, no Raft) /// /// Agent mode runs a lightweight server that: /// - Participates in gossip protocol for cluster discovery /// - Can subscribe to watch events (if connected to control-plane) /// - Does not run Raft consensus /// - Suitable for worker nodes that only need cluster membership async fn run_agent_mode(self) -> Result<()> { info!( node_id = self.config.node.id, api_addr = %self.config.network.api_addr, "Starting agent mode (no Raft)" ); // Get control-plane Raft addresses from initial_members // These can be used to derive API addresses or discover them via gossip let control_plane_addrs: Vec<&str> = self .config .cluster .initial_members .iter() .map(|m| m.raft_addr.as_str()) .collect(); if !control_plane_addrs.is_empty() { info!( control_plane_nodes = ?control_plane_addrs, "Agent mode: control-plane Raft endpoints (use gossip for API discovery)" ); } // Health check service for K8s liveness/readiness probes let (mut health_reporter, health_service) = health_reporter(); // In agent mode, we report the agent service as serving (gossip is running) health_reporter .set_service_status("chainfire.Agent", tonic_health::ServingStatus::Serving) .await; // Shutdown signal channel let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1); let mut shutdown_rx = shutdown_tx.subscribe(); // Run health check server for K8s probes let api_addr = self.config.network.api_addr; let health_builder = self .apply_tls_config(TonicServer::builder()) .await? .add_service(health_service); let health_server = health_builder.serve_with_shutdown(api_addr, async move { let _ = shutdown_rx.recv().await; }); info!(api_addr = %api_addr, "Agent health server starting"); info!("Agent running. Press Ctrl+C to stop."); tokio::select! { result = health_server => { if let Err(e) = result { tracing::error!(error = %e, "Agent health server error"); } } _ = signal::ctrl_c() => { info!("Received shutdown signal"); let _ = shutdown_tx.send(()); } } self.node.shutdown(); info!("Agent stopped"); Ok(()) } } fn http_endpoint_from_raft_addr(raft_addr: &str, http_port: u16) -> Option { if let Ok(addr) = raft_addr.parse::() { return Some(format!("http://{}:{}", addr.ip(), http_port)); } let (host, _) = raft_addr.rsplit_once(':')?; Some(format!("http://{}:{}", host, http_port)) } fn grpc_endpoint_from_raft_addr(raft_addr: &str, api_port: u16) -> Option { if let Ok(addr) = raft_addr.parse::() { return Some(format!("http://{}:{}", addr.ip(), api_port)); } let (host, _) = raft_addr.rsplit_once(':')?; Some(format!("http://{}:{}", host, api_port)) } fn normalize_peer_url(raft_addr: &str) -> String { if raft_addr.contains("://") { raft_addr.to_string() } else { format!("http://{raft_addr}") } } fn configured_members(config: &ServerConfig) -> Vec { let api_port = config.network.api_addr.port(); config .cluster .initial_members .iter() .map(|member| Member { id: member.id, name: format!("node-{}", member.id), peer_urls: vec![normalize_peer_url(&member.raft_addr)], client_urls: grpc_endpoint_from_raft_addr(&member.raft_addr, api_port) .into_iter() .collect(), is_learner: false, }) .collect() }