//! FiberLB load balancer server binary use std::sync::Arc; use chainfire_client::Client as ChainFireClient; use clap::Parser; use fiberlb_api::{ backend_service_server::BackendServiceServer, certificate_service_server::CertificateServiceServer, health_check_service_server::HealthCheckServiceServer, l7_policy_service_server::L7PolicyServiceServer, l7_rule_service_server::L7RuleServiceServer, listener_service_server::ListenerServiceServer, load_balancer_service_server::LoadBalancerServiceServer, pool_service_server::PoolServiceServer, }; use fiberlb_server::{ config::MetadataBackend, create_bgp_client, spawn_health_checker, BackendServiceImpl, CertificateServiceImpl, DataPlane, HealthCheckServiceImpl, KernelVipAddressOwner, L7DataPlane, L7PolicyServiceImpl, L7RuleServiceImpl, LbMetadataStore, ListenerServiceImpl, LoadBalancerServiceImpl, PoolServiceImpl, ServerConfig, VipAddressOwner, VipManager, }; use iam_service_auth::AuthService; use metrics_exporter_prometheus::PrometheusBuilder; use std::net::SocketAddr; use std::path::PathBuf; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Status}; use tonic_health::server::health_reporter; use tracing_subscriber::EnvFilter; /// FiberLB load balancer server #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// Configuration file path #[arg(short, long, default_value = "fiberlb.toml")] config: PathBuf, /// gRPC management API address (overrides config) #[arg(long)] grpc_addr: Option, /// ChainFire endpoint for cluster coordination #[arg(long, env = "FIBERLB_CHAINFIRE_ENDPOINT")] chainfire_endpoint: Option, /// FlareDB endpoint for metadata and tenant data storage #[arg(long, env = "FIBERLB_FLAREDB_ENDPOINT")] flaredb_endpoint: Option, /// Metadata backend (flaredb, postgres, sqlite) #[arg(long, env = "FIBERLB_METADATA_BACKEND")] metadata_backend: Option, /// SQL database URL for metadata (required for postgres/sqlite backend) #[arg(long, env = "FIBERLB_METADATA_DATABASE_URL")] metadata_database_url: Option, /// Run in single-node mode (required when metadata backend is SQLite) #[arg(long, env = "FIBERLB_SINGLE_NODE")] single_node: bool, /// Log level (overrides config) #[arg(short, long)] log_level: Option, /// Metrics port for Prometheus scraping #[arg(long, default_value = "9098")] metrics_port: u16, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); // Load configuration from file or use defaults let mut config = if args.config.exists() { let contents = tokio::fs::read_to_string(&args.config).await?; toml::from_str(&contents)? } else { tracing::info!( "Config file not found: {}, using defaults", args.config.display() ); ServerConfig::default() }; // Apply command line overrides if let Some(grpc_addr_str) = args.grpc_addr { config.grpc_addr = grpc_addr_str.parse()?; } if let Some(log_level) = args.log_level { config.log_level = log_level; } if let Some(chainfire_endpoint) = args.chainfire_endpoint { config.chainfire_endpoint = Some(chainfire_endpoint); } if let Some(flaredb_endpoint) = args.flaredb_endpoint { config.flaredb_endpoint = Some(flaredb_endpoint); } if let Some(metadata_backend) = args.metadata_backend { config.metadata_backend = parse_metadata_backend(&metadata_backend)?; } if let Some(metadata_database_url) = args.metadata_database_url { config.metadata_database_url = Some(metadata_database_url); } if args.single_node { config.single_node = true; } // Initialize tracing tracing_subscriber::fmt() .with_env_filter( EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.log_level)), ) .init(); tracing::info!("Starting FiberLB server"); tracing::info!(" gRPC: {}", config.grpc_addr); // Initialize Prometheus metrics exporter let metrics_addr = format!("0.0.0.0:{}", args.metrics_port); let builder = PrometheusBuilder::new(); builder .with_http_listener(metrics_addr.parse::()?) .install() .expect("Failed to install Prometheus metrics exporter"); tracing::info!( "Prometheus metrics available at http://{}/metrics", metrics_addr ); metrics::describe_gauge!( "fiberlb_bgp_configured_peers", "Number of BGP peers configured for the native FiberLB speaker" ); metrics::describe_gauge!( "fiberlb_bgp_connected_peers", "Number of BGP peer sessions currently established" ); metrics::describe_gauge!( "fiberlb_bgp_desired_routes", "Number of VIP routes FiberLB currently wants to advertise" ); metrics::describe_gauge!( "fiberlb_bgp_peer_session_up", "Per-peer BGP session state (1=established, 0=down)" ); metrics::describe_gauge!( "fiberlb_bgp_peer_bfd_up", "Per-peer BFD session state for FiberLB native BGP peers (1=up, 0=down)" ); metrics::describe_counter!( "fiberlb_bgp_session_established_total", "Total number of BGP peer sessions established" ); metrics::describe_counter!( "fiberlb_bgp_session_ends_total", "Total number of BGP peer session terminations by peer and result" ); metrics::describe_gauge!( "fiberlb_vip_drain_active", "Whether FiberLB node drain mode is active (1=drain, 0=normal)" ); if let Some(endpoint) = &config.chainfire_endpoint { tracing::info!(" Cluster coordination: ChainFire @ {}", endpoint); let endpoint = endpoint.clone(); let addr = config.grpc_addr.to_string(); tokio::spawn(async move { if let Err(error) = register_chainfire_membership(&endpoint, "fiberlb", addr).await { tracing::warn!(error = %error, "ChainFire membership registration failed"); } }); } // Create metadata store from explicitly selected backend. let metadata = match config.metadata_backend { MetadataBackend::FlareDb => { if let Some(endpoint) = config.flaredb_endpoint.as_deref() { tracing::info!(" Metadata backend: FlareDB @ {}", endpoint); } else { tracing::info!(" Metadata backend: FlareDB"); } Arc::new( LbMetadataStore::new_flaredb_with_pd( config.flaredb_endpoint.clone(), config.chainfire_endpoint.clone(), ) .await .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?, ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { let database_url = config .metadata_database_url .as_deref() .ok_or_else(|| { format!( "metadata_database_url is required when metadata_backend={} (env: FIBERLB_METADATA_DATABASE_URL)", metadata_backend_name(config.metadata_backend) ) })?; ensure_sql_backend_matches_url(config.metadata_backend, database_url)?; tracing::info!( " Metadata backend: {} @ {}", metadata_backend_name(config.metadata_backend), database_url ); Arc::new( LbMetadataStore::new_sql(database_url, config.single_node) .await .map_err(|e| format!("Failed to initialize SQL metadata store: {}", e))?, ) } }; // Initialize IAM authentication service tracing::info!( "Connecting to IAM server at {}", config.auth.iam_server_addr ); let auth_service = AuthService::new(&config.auth.iam_server_addr) .await .map_err(|e| format!("Failed to connect to IAM server: {}", e))?; let auth_service = Arc::new(auth_service); let dataplane = Arc::new(DataPlane::new(metadata.clone())); let l7_dataplane = Arc::new(L7DataPlane::new(metadata.clone())); // Dedicated runtime for auth interceptors to avoid blocking the main async runtime let auth_runtime = Arc::new(tokio::runtime::Runtime::new()?); let make_interceptor = |auth: Arc| { let rt = auth_runtime.clone(); move |mut req: Request<()>| -> Result, Status> { let auth = auth.clone(); tokio::task::block_in_place(|| { rt.block_on(async move { let tenant_context = auth.authenticate_request(&req).await?; req.extensions_mut().insert(tenant_context); Ok(req) }) }) } }; // Create gRPC services with metadata store let lb_service = LoadBalancerServiceImpl::new(metadata.clone(), auth_service.clone()); let pool_service = PoolServiceImpl::new(metadata.clone(), auth_service.clone()); let backend_service = BackendServiceImpl::new(metadata.clone(), auth_service.clone()); let listener_service = ListenerServiceImpl::new( metadata.clone(), auth_service.clone(), dataplane.clone(), l7_dataplane.clone(), ); let health_check_service = HealthCheckServiceImpl::new(metadata.clone(), auth_service.clone()); let l7_policy_service = L7PolicyServiceImpl::new(metadata.clone(), auth_service.clone()); let l7_rule_service = L7RuleServiceImpl::new(metadata.clone(), auth_service.clone()); let certificate_service = CertificateServiceImpl::new(metadata.clone(), auth_service.clone()); restore_runtime_listeners(metadata.clone(), dataplane.clone(), l7_dataplane.clone()).await?; let (_health_task, health_shutdown_tx) = spawn_health_checker( metadata.clone(), Duration::from_secs(config.health.interval_secs.max(1)), Duration::from_secs(config.health.timeout_secs.max(1)), ); let vip_manager = if config.bgp.enabled { let next_hop = config.bgp.next_hop_addr().map_err(|error| { format!( "failed to parse FiberLB BGP next hop '{}': {}", config .bgp .next_hop .as_deref() .unwrap_or(&config.bgp.router_id), error ) })?; let bgp = create_bgp_client(config.bgp.clone()).await?; let vip_owner: Option> = if config.vip_ownership.enabled { Some(Arc::new(KernelVipAddressOwner::new( config.vip_ownership.interface.clone(), ))) } else { None }; let manager = Arc::new(VipManager::new( bgp, metadata.clone(), next_hop, vip_owner, config.vip_advertisement.drain_file.clone(), Duration::from_secs(config.vip_advertisement.drain_hold_time_secs), )); let _vip_task = manager.clone().spawn(Duration::from_secs( config.vip_advertisement.interval_secs.max(1), )); Some(manager) } else { None }; // Setup health service let (mut health_reporter, health_service) = health_reporter(); health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; // Parse address let grpc_addr: SocketAddr = config.grpc_addr; // Configure TLS if enabled let mut server = Server::builder(); if let Some(tls_config) = &config.tls { tracing::info!("TLS enabled, loading certificates..."); let cert = tokio::fs::read(&tls_config.cert_file).await?; let key = tokio::fs::read(&tls_config.key_file).await?; let server_identity = Identity::from_pem(cert, key); let tls = if tls_config.require_client_cert { tracing::info!("mTLS enabled"); let ca_cert = tokio::fs::read( tls_config .ca_file .as_ref() .ok_or("ca_file required for mTLS")?, ) .await?; let ca = Certificate::from_pem(ca_cert); ServerTlsConfig::new() .identity(server_identity) .client_ca_root(ca) } else { ServerTlsConfig::new().identity(server_identity) }; server = server.tls_config(tls)?; } // Start gRPC server tracing::info!("gRPC server listening on {}", grpc_addr); let server_result = server .add_service(health_service) .add_service(tonic::codegen::InterceptedService::new( LoadBalancerServiceServer::new(lb_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( PoolServiceServer::new(pool_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( BackendServiceServer::new(backend_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( ListenerServiceServer::new(listener_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( HealthCheckServiceServer::new(health_check_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( L7PolicyServiceServer::new(l7_policy_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( L7RuleServiceServer::new(l7_rule_service), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( CertificateServiceServer::new(certificate_service), make_interceptor(auth_service.clone()), )) .serve_with_shutdown(grpc_addr, async { if let Err(error) = wait_for_shutdown_signal().await { tracing::warn!(error = %error, "FiberLB shutdown signal handler failed"); } }) .await; let _ = health_shutdown_tx.send(true); if let Some(vip_manager) = vip_manager { if let Err(error) = vip_manager.shutdown().await { tracing::warn!(error = %error, "FiberLB VIP manager shutdown failed"); } } server_result?; Ok(()) } async fn wait_for_shutdown_signal() -> Result<(), Box> { #[cfg(unix)] { use tokio::signal::unix::{signal, SignalKind}; let mut terminate = signal(SignalKind::terminate())?; tokio::select! { _ = tokio::signal::ctrl_c() => {} _ = terminate.recv() => {} } } #[cfg(not(unix))] { tokio::signal::ctrl_c().await?; } tracing::info!("FiberLB shutdown signal received"); Ok(()) } fn parse_metadata_backend(value: &str) -> Result> { match value.trim().to_ascii_lowercase().as_str() { "flaredb" => Ok(MetadataBackend::FlareDb), "postgres" => Ok(MetadataBackend::Postgres), "sqlite" => Ok(MetadataBackend::Sqlite), other => Err(format!( "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite", other ) .into()), } } fn metadata_backend_name(backend: MetadataBackend) -> &'static str { match backend { MetadataBackend::FlareDb => "flaredb", MetadataBackend::Postgres => "postgres", MetadataBackend::Sqlite => "sqlite", } } fn ensure_sql_backend_matches_url( backend: MetadataBackend, database_url: &str, ) -> Result<(), Box> { let normalized = database_url.trim().to_ascii_lowercase(); match backend { MetadataBackend::Postgres => { if normalized.starts_with("postgres://") || normalized.starts_with("postgresql://") { Ok(()) } else { Err("metadata_backend=postgres requires postgres:// or postgresql:// URL".into()) } } MetadataBackend::Sqlite => { if normalized.starts_with("sqlite:") { Ok(()) } else { Err("metadata_backend=sqlite requires sqlite: URL".into()) } } MetadataBackend::FlareDb => Ok(()), } } async fn register_chainfire_membership( endpoint: &str, service: &str, addr: String, ) -> Result<(), Box> { let node_id = std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id())); let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let key = format!("/cluster/{}/members/{}", service, node_id); let value = format!(r#"{{"addr":"{}","ts":{}}}"#, addr, ts); let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120); let mut attempt = 0usize; let last_error = loop { attempt += 1; let current_error = match ChainFireClient::connect(endpoint).await { Ok(mut client) => match client.put_str(&key, &value).await { Ok(_) => return Ok(()), Err(error) => format!("put failed: {}", error), }, Err(error) => format!("connect failed: {}", error), }; if tokio::time::Instant::now() >= deadline { break current_error; } tracing::warn!( attempt, endpoint, service, error = %current_error, "retrying ChainFire membership registration" ); tokio::time::sleep(std::time::Duration::from_secs(2)).await; }; Err(std::io::Error::other(format!( "failed to register ChainFire membership for {} via {} after {} attempts: {}", service, endpoint, attempt, last_error )) .into()) } async fn restore_runtime_listeners( metadata: Arc, dataplane: Arc, l7_dataplane: Arc, ) -> Result<(), Box> { let lbs = metadata.list_all_lbs().await?; for lb in lbs { for listener in metadata.list_listeners(&lb.id).await? { if !listener.enabled { continue; } let result = if listener.is_l7() { l7_dataplane .start_listener(listener.id) .await .map_err(|e| e.to_string()) } else { dataplane .start_listener(listener.id) .await .map_err(|e| e.to_string()) }; if let Err(err) = result { tracing::warn!( listener_id = %listener.id, lb_id = %lb.id, "Failed to restore listener runtime: {}", err ); } } } Ok(()) }