use clap::Parser; use flaredb_proto::kvrpc::kv_cas_server::KvCasServer; use flaredb_proto::kvrpc::kv_raw_server::KvRawServer; use flaredb_proto::raft_server::raft_service_server::RaftServiceServer; use flaredb_proto::sqlrpc::sql_service_server::SqlServiceServer; use flaredb_server::config::{self, Config, NamespaceManager}; use flaredb_storage::rocks_engine::RocksEngine; use flaredb_types::RegionMeta; use metrics_exporter_prometheus::PrometheusBuilder; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic_health::server::health_reporter; use tracing::{info, warn}; // Import warn use tracing_subscriber::EnvFilter; use anyhow::Result; // Import anyhow mod heartbeat; mod merkle; mod pd_client; mod raft_service; mod rest; mod service; mod sql_service; mod store; use pd_client::{PdClient, PdEvent}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// Configuration file path #[arg(short, long)] config: Option, /// Store ID (overrides config) #[arg(long)] store_id: Option, /// Listen address for gRPC API (overrides config) #[arg(long)] addr: Option, /// Data directory for RocksDB (overrides config) #[arg(long)] data_dir: Option, /// ChainFire PD address (overrides config) #[arg(long)] pd_addr: Option, /// Initial cluster peers in id=host:port format (overrides config) #[arg(long = "peer")] peers: Vec, /// Namespace modes in format namespace=strong|eventual (overrides config) #[arg(long = "namespace-mode")] namespace_modes: Vec, /// Default mode for implicitly created namespaces (strong|eventual) #[arg(long = "default-namespace-mode")] default_namespace_mode: Option, /// Log level (e.g., "info", "debug", "trace") #[arg(long)] log_level: Option, /// Metrics port for Prometheus scraping #[arg(long, default_value = "9092")] metrics_port: u16, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); // Load configuration let mut settings = ::config::Config::builder() .add_source(::config::Environment::with_prefix("FLAREDB").separator("_")); // Add config file if specified if let Some(config_path) = &args.config { info!("Loading config from file: {}", config_path.display()); settings = settings.add_source(::config::File::from(config_path.as_path())); } let loaded_config: Config = settings .build()? .try_deserialize() .map_err(|e| anyhow::anyhow!("Failed to load configuration: {}", e))?; // Apply CLI overrides to the loaded configuration let config = Config { store_id: args.store_id.unwrap_or(loaded_config.store_id), addr: args .addr .map(|s| s.parse().unwrap_or(loaded_config.addr)) .unwrap_or(loaded_config.addr), http_addr: loaded_config.http_addr, data_dir: args.data_dir.unwrap_or(loaded_config.data_dir), pd_addr: args.pd_addr.unwrap_or(loaded_config.pd_addr), peers: if args.peers.is_empty() { loaded_config.peers } else { let mut peers_map = HashMap::new(); for p in args.peers { if let Some((id_str, addr_str)) = p.split_once('=') { if let Ok(id) = id_str.parse::() { peers_map.insert(id, addr_str.parse().unwrap()); } } } peers_map }, namespace_modes: if args.namespace_modes.is_empty() { loaded_config.namespace_modes } else { config::parse_namespace_modes(&args.namespace_modes)? }, default_namespace_mode: if let Some(mode_str) = args.default_namespace_mode { config::parse_mode(&mode_str)? } else { loaded_config.default_namespace_mode }, log_level: args.log_level.unwrap_or(loaded_config.log_level), tls: loaded_config.tls, }; // Initialize tracing init_logging(&config.log_level); // Initialize Prometheus metrics exporter let metrics_addr = format!("0.0.0.0:{}", args.metrics_port); let builder = PrometheusBuilder::new(); builder .with_http_listener(metrics_addr.parse::()?) .install() .expect("Failed to install Prometheus metrics exporter"); info!( "Prometheus metrics available at http://{}/metrics", metrics_addr ); info!("FlareDB server listening on {}", config.addr); let addr = config.addr; let server_config = Arc::new(config); let namespace_manager = Arc::new(NamespaceManager::from_config(&server_config)); // Parse peer addresses for cluster membership let mut voters = vec![server_config.store_id]; let mut peer_addrs: HashMap = HashMap::new(); // Add self address peer_addrs.insert(server_config.store_id, server_config.addr.to_string()); for (id, addr) in server_config.peers.clone() { if id != server_config.store_id { voters.push(id); peer_addrs.insert(id, addr.to_string()); } } let peer_addrs = Arc::new(peer_addrs); let engine = Arc::new(RocksEngine::new(server_config.data_dir.to_str().unwrap())?); let store = Arc::new(store::Store::new( server_config.store_id, engine.clone(), server_config.clone(), // Pass server_config namespace_manager.clone(), // Pass namespace manager peer_addrs.clone(), )); let service = service::KvServiceImpl::new(engine.clone(), namespace_manager.clone(), store.clone()); let raft_service = raft_service::RaftServiceImpl::new(store.clone(), server_config.store_id); println!("Connecting to ChainFire PD at {}...", server_config.pd_addr); let pd_client_res = PdClient::connect(server_config.pd_addr.to_string()).await; if let Ok(mut pd_client) = pd_client_res { println!( "Connected to ChainFire. Cluster ID: {}", pd_client.cluster_id() ); // Register this store with the PD if let Err(e) = pd_client .register_store(server_config.store_id, server_config.addr.to_string()) .await { eprintln!("Failed to register store: {}", e); } else { println!("Store {} registered with PD", server_config.store_id); } // Start watching for metadata changes from ChainFire let mut event_rx = pd_client.subscribe(); if let Err(e) = pd_client.start_watch().await { eprintln!("Failed to start PD watch: {}", e); } else { println!("Started watching PD for metadata changes"); // Spawn a background task to handle PD events let store_clone_for_events = store.clone(); let server_config_for_events = server_config.clone(); // Capture server_config tokio::spawn(async move { while let Ok(event) = event_rx.recv().await { match event { PdEvent::RegionUpdated(region) => { info!("Region {} updated via watch", region.id); let metas = vec![( RegionMeta { id: region.id, start_key: region.start_key, end_key: region.end_key, }, if region.peers.is_empty() { vec![server_config_for_events.store_id] } else { region.peers }, )]; if let Err(e) = store_clone_for_events.refresh_regions(metas).await { warn!("Failed to refresh region from event: {}", e); } } PdEvent::RegionRemoved(id) => { info!("Region {} removed via watch", id); } PdEvent::StoreUpdated(store_info) => { info!("Store {} updated via watch", store_info.id); } PdEvent::StoreRemoved(id) => { info!("Store {} removed via watch", id); } } } }); } // Initialize default region if this is the first node if let Err(e) = pd_client.init_default_region(voters.clone()).await { warn!("Failed to init default region: {}", e); } // Fetch initial region metadata from PD (from cache) let regions = pd_client.list_regions().await; let mut region_metas = Vec::new(); for r in regions { let region_voters = if r.peers.is_empty() { voters.clone() } else { r.peers.clone() }; region_metas.push(( RegionMeta { id: r.id, start_key: r.start_key, end_key: r.end_key, }, region_voters, )); } if region_metas.is_empty() { region_metas.push(( RegionMeta { id: 1, start_key: Vec::new(), end_key: Vec::new(), }, voters.clone(), )); } if let Err(e) = store.bootstrap_regions(region_metas.clone()).await { warn!("failed to bootstrap regions: {}", e); } // Background task: heartbeat and refresh regions from PD let store_clone = store.clone(); let pd_addr_string = server_config.pd_addr.to_string(); let store_id = server_config.store_id; let server_addr_string = server_config.addr.to_string(); tokio::spawn(async move { let client = Arc::new(Mutex::new( PdClient::connect(pd_addr_string.clone()).await.ok(), )); loop { sleep(Duration::from_secs(10)).await; let mut guard = client.lock().await; if let Some(ref mut c) = *guard { // Send heartbeat let heartbeat_ok = match c.heartbeat(store_id, server_addr_string.clone()).await { Ok(_) => true, Err(e) => { warn!("Heartbeat failed: {}", e); false } }; // If heartbeat failed, try to reconnect on next cycle if !heartbeat_ok { *guard = None; continue; } // Report leader status for regions we lead for region_id in [1u64] { // TODO: get actual regions if let Some(node) = store_clone.get_raft_node(region_id).await { if node.is_leader().await { if let Err(e) = c.report_leader(region_id, store_id).await { warn!("Report leader failed: {}", e); } } } } // Refresh regions from PD (from cache, updated via watch) let regions = c.list_regions().await; let metas: Vec<_> = regions .into_iter() .map(|r| { let region_voters = if r.peers.is_empty() { voters.clone() } else { r.peers.clone() }; ( RegionMeta { id: r.id, start_key: r.start_key, end_key: r.end_key, }, region_voters, ) }) .collect(); if !metas.is_empty() { if let Err(e) = store_clone.refresh_regions(metas).await { warn!("refresh regions failed: {}", e); } } } else { // Try to reconnect if let Ok(new_client) = PdClient::connect(pd_addr_string.clone()).await { info!("Reconnected to PD"); *guard = Some(new_client); } } } }); } else { warn!( "Failed to connect to ChainFire PD: {:?}", pd_client_res.err() ); info!("Starting in standalone mode with default region..."); let _ = store .bootstrap_regions(vec![( RegionMeta { id: 1, start_key: Vec::new(), end_key: Vec::new(), }, voters.clone(), )]) .await; } // Health check service for K8s liveness/readiness probes let (mut health_reporter, health_service) = health_reporter(); health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; info!("FlareDB server starting with health checks enabled"); // Create SQL service let sql_service = sql_service::SqlServiceImpl::new(server_config.addr.to_string()); // Configure TLS if enabled let mut server = Server::builder(); if let Some(tls_config) = &server_config.tls { info!("TLS enabled, loading certificates..."); let cert = tokio::fs::read(&tls_config.cert_file) .await .map_err(|e| anyhow::anyhow!("Failed to read cert file: {}", e))?; let key = tokio::fs::read(&tls_config.key_file) .await .map_err(|e| anyhow::anyhow!("Failed to read key file: {}", e))?; let server_identity = Identity::from_pem(cert, key); let tls = if tls_config.require_client_cert { info!("mTLS enabled, requiring client certificates"); let ca_cert = tokio::fs::read( tls_config .ca_file .as_ref() .ok_or_else(|| anyhow::anyhow!("ca_file required when require_client_cert=true"))?, ) .await .map_err(|e| anyhow::anyhow!("Failed to read CA file: {}", e))?; let ca = Certificate::from_pem(ca_cert); ServerTlsConfig::new() .identity(server_identity) .client_ca_root(ca) } else { info!("TLS-only mode, client certificates not required"); ServerTlsConfig::new().identity(server_identity) }; server = server .tls_config(tls) .map_err(|e| anyhow::anyhow!("Failed to configure TLS: {}", e))?; info!("TLS configuration applied successfully"); } else { info!("TLS disabled, running in plain-text mode"); } // gRPC server let grpc_server = server .add_service(health_service) .add_service(KvRawServer::new(service.clone())) .add_service(KvCasServer::new(service)) .add_service(RaftServiceServer::new(raft_service)) .add_service(SqlServiceServer::new(sql_service)) .serve(addr); // HTTP REST API server let http_addr = server_config.http_addr; let rest_state = rest::RestApiState { server_addr: server_config.addr.to_string(), }; let rest_app = rest::build_router(rest_state); let http_listener = tokio::net::TcpListener::bind(&http_addr).await?; info!(http_addr = %http_addr, "HTTP REST API server starting"); let http_server = async move { axum::serve(http_listener, rest_app) .await .map_err(|e| anyhow::anyhow!("HTTP server error: {}", e)) }; // Run both servers concurrently tokio::select! { result = grpc_server => { result?; } result = http_server => { result?; } } Ok(()) } fn init_logging(level: &str) { tracing_subscriber::fmt() .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level))) .init(); }