use clap::Parser; use flaredb_proto::kvrpc::kv_cas_server::KvCasServer; use flaredb_proto::kvrpc::kv_raw_server::KvRawServer; use flaredb_proto::raft_server::raft_service_server::RaftServiceServer; use flaredb_storage::rocks_engine::RocksEngine; use flaredb_types::RegionMeta; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; use tonic::transport::Server; use tonic_health::server::health_reporter; use tracing::info; use tracing_subscriber::EnvFilter; mod config; mod heartbeat; mod merkle; mod pd_client; mod raft_service; mod service; use pd_client::PdEvent; mod store; use pd_client::PdClient; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { #[arg(long, default_value = "1")] store_id: u64, #[arg(long, default_value = "127.0.0.1:50051")] addr: String, #[arg(long, default_value = "data")] data_dir: String, #[arg(long, default_value = "127.0.0.1:2379")] pd_addr: String, /// Peers in format id=host:port (repeatable) #[arg(long = "peer")] peers: Vec, /// Namespace modes in format namespace=strong|eventual (repeatable) #[arg(long = "namespace-mode")] namespace_modes: Vec, } #[tokio::main] async fn main() -> Result<(), Box> { // Initialize tracing tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env().add_directive("flaredb_server=info".parse()?)) .init(); let args = Args::parse(); let addr = args.addr.parse()?; info!("rdb-server listening on {}", addr); // Build namespace consistency config let namespace_map = config::parse_namespace_modes(&args.namespace_modes).unwrap_or_else(|e| { eprintln!("Failed to parse namespace modes: {}", e); std::process::exit(1); }); let server_config = Arc::new(config::ServerConfig::new( config::ConsistencyMode::Strong, namespace_map, )); // Parse peer addresses for cluster membership let mut voters = vec![args.store_id]; let mut peer_addrs: HashMap = HashMap::new(); // Add self address peer_addrs.insert(args.store_id, args.addr.clone()); for p in &args.peers { if let Some((id_str, addr)) = p.split_once('=') { if let Ok(id) = id_str.parse::() { if id != args.store_id { voters.push(id); peer_addrs.insert(id, addr.to_string()); } } } } let peer_addrs = Arc::new(peer_addrs); let engine = Arc::new(RocksEngine::new(&args.data_dir)?); let store = Arc::new(store::Store::new( args.store_id, engine.clone(), server_config.clone(), peer_addrs.clone(), )); let service = service::KvServiceImpl::new(engine.clone(), server_config.clone(), store.clone()); let raft_service = raft_service::RaftServiceImpl::new(store.clone(), args.store_id); println!("Connecting to ChainFire PD at {}...", args.pd_addr); let pd_client_res = PdClient::connect(args.pd_addr.clone()).await; if let Ok(mut pd_client) = pd_client_res { println!( "Connected to ChainFire. Cluster ID: {}", pd_client.cluster_id() ); // Register this store with the PD if let Err(e) = pd_client .register_store(args.store_id, args.addr.clone()) .await { eprintln!("Failed to register store: {}", e); } else { println!("Store {} registered with PD", args.store_id); } // Start watching for metadata changes from ChainFire let mut event_rx = pd_client.subscribe(); if let Err(e) = pd_client.start_watch().await { eprintln!("Failed to start PD watch: {}", e); } else { println!("Started watching PD for metadata changes"); // Spawn a background task to handle PD events let store_clone_for_events = store.clone(); tokio::spawn(async move { while let Ok(event) = event_rx.recv().await { match event { PdEvent::RegionUpdated(region) => { println!("Region {} updated via watch", region.id); // Could trigger immediate region refresh here let metas = vec![( RegionMeta { id: region.id, start_key: region.start_key, end_key: region.end_key, }, if region.peers.is_empty() { vec![store_clone_for_events.store_id()] } else { region.peers }, )]; if let Err(e) = store_clone_for_events.refresh_regions(metas).await { eprintln!("Failed to refresh region from event: {}", e); } } PdEvent::RegionRemoved(id) => { println!("Region {} removed via watch", id); } PdEvent::StoreUpdated(store_info) => { println!("Store {} updated via watch", store_info.id); } PdEvent::StoreRemoved(id) => { println!("Store {} removed via watch", id); } } } }); } // Initialize default region if this is the first node if let Err(e) = pd_client.init_default_region(voters.clone()).await { eprintln!("Failed to init default region: {}", e); } // Fetch initial region metadata from PD (from cache) let regions = pd_client.list_regions().await; let mut region_metas = Vec::new(); for r in regions { let region_voters = if r.peers.is_empty() { voters.clone() } else { r.peers.clone() }; region_metas.push(( RegionMeta { id: r.id, start_key: r.start_key, end_key: r.end_key, }, region_voters, )); } if region_metas.is_empty() { region_metas.push(( RegionMeta { id: 1, start_key: Vec::new(), end_key: Vec::new(), }, voters.clone(), )); } if let Err(e) = store.bootstrap_regions(region_metas.clone()).await { eprintln!("failed to bootstrap regions: {}", e); } // Background task: heartbeat and refresh regions from PD let store_clone = store.clone(); let pd_addr_clone = args.pd_addr.clone(); let store_id = args.store_id; let server_addr = args.addr.clone(); tokio::spawn(async move { let client = Arc::new(Mutex::new( PdClient::connect(pd_addr_clone.clone()).await.ok(), )); loop { sleep(Duration::from_secs(10)).await; let mut guard = client.lock().await; if let Some(ref mut c) = *guard { // Send heartbeat let heartbeat_ok = match c.heartbeat(store_id, server_addr.clone()).await { Ok(_) => true, Err(e) => { eprintln!("Heartbeat failed: {}", e); false } }; // If heartbeat failed, try to reconnect on next cycle if !heartbeat_ok { *guard = None; continue; } // Report leader status for regions we lead for region_id in [1u64] { // TODO: get actual regions if let Some(node) = store_clone.get_raft_node(region_id).await { if node.is_leader().await { if let Err(e) = c.report_leader(region_id, store_id).await { eprintln!("Report leader failed: {}", e); } } } } // Refresh regions from PD (from cache, updated via watch) let regions = c.list_regions().await; let metas: Vec<_> = regions .into_iter() .map(|r| { let region_voters = if r.peers.is_empty() { vec![store_clone.store_id()] } else { r.peers.clone() }; ( RegionMeta { id: r.id, start_key: r.start_key, end_key: r.end_key, }, region_voters, ) }) .collect(); if !metas.is_empty() { if let Err(e) = store_clone.refresh_regions(metas).await { eprintln!("refresh regions failed: {}", e); } } } else { // Try to reconnect if let Some(new_client) = PdClient::connect(pd_addr_clone.clone()).await.ok() { println!("Reconnected to PD"); *guard = Some(new_client); } } } }); } else { eprintln!( "Failed to connect to ChainFire PD: {:?}", pd_client_res.err() ); eprintln!("Starting in standalone mode with default region..."); let _ = store .bootstrap_regions(vec![( RegionMeta { id: 1, start_key: Vec::new(), end_key: Vec::new(), }, voters.clone(), )]) .await; } // Health check service for K8s liveness/readiness probes let (mut health_reporter, health_service) = health_reporter(); health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; info!("FlareDB server starting with health checks enabled"); Server::builder() .add_service(health_service) .add_service(KvRawServer::new(service.clone())) .add_service(KvCasServer::new(service)) .add_service(RaftServiceServer::new(raft_service)) .serve(addr) .await?; Ok(()) }