//! PrismNET network management server binary use anyhow::anyhow; use chainfire_client::Client as ChainFireClient; use clap::Parser; use iam_service_auth::AuthService; use metrics_exporter_prometheus::PrometheusBuilder; use prismnet_api::{ ipam_service_server::IpamServiceServer, port_service_server::PortServiceServer, security_group_service_server::SecurityGroupServiceServer, subnet_service_server::SubnetServiceServer, vpc_service_server::VpcServiceServer, }; use prismnet_server::{ config::MetadataBackend, IpamServiceImpl, NetworkMetadataStore, OvnClient, PortServiceImpl, SecurityGroupServiceImpl, ServerConfig, SubnetServiceImpl, VpcServiceImpl, }; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Status}; use tonic_health::server::health_reporter; use tracing_subscriber::EnvFilter; /// PrismNET network management server #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// Configuration file path #[arg(short, long, default_value = "prismnet.toml")] config: PathBuf, /// gRPC API address (overrides config) #[arg(long)] grpc_addr: Option, /// ChainFire endpoint for cluster coordination (optional) #[arg(long)] chainfire_endpoint: Option, /// FlareDB endpoint for metadata and tenant data storage #[arg(long)] flaredb_endpoint: Option, /// Metadata backend (flaredb, postgres, sqlite) #[arg(long)] metadata_backend: Option, /// SQL database URL for metadata (required for postgres/sqlite backend) #[arg(long)] metadata_database_url: Option, /// Run in single-node mode (required when metadata backend is SQLite) #[arg(long)] single_node: bool, /// Log level (overrides config) #[arg(short, long)] log_level: Option, /// Metrics port for Prometheus scraping #[arg(long, default_value = "9096")] metrics_port: u16, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); // Load configuration from file or use defaults let mut config = if args.config.exists() { let contents = tokio::fs::read_to_string(&args.config).await?; toml::from_str(&contents)? } else { tracing::info!( "Config file not found: {}, using defaults", args.config.display() ); ServerConfig::default() }; // Apply command line overrides if let Some(grpc_addr_str) = args.grpc_addr { config.grpc_addr = grpc_addr_str.parse()?; } if let Some(log_level) = args.log_level { config.log_level = log_level; } if let Some(chainfire_endpoint) = args.chainfire_endpoint { config.chainfire_endpoint = Some(chainfire_endpoint); } if let Some(flaredb_endpoint) = args.flaredb_endpoint { config.flaredb_endpoint = Some(flaredb_endpoint); } if let Some(metadata_backend) = args.metadata_backend { config.metadata_backend = parse_metadata_backend(&metadata_backend)?; } if let Some(metadata_database_url) = args.metadata_database_url { config.metadata_database_url = Some(metadata_database_url); } if args.single_node { config.single_node = true; } if config.chainfire_endpoint.is_none() { if let Ok(chainfire_endpoint) = std::env::var("PRISMNET_CHAINFIRE_ENDPOINT") { let trimmed = chainfire_endpoint.trim(); if !trimmed.is_empty() { config.chainfire_endpoint = Some(trimmed.to_string()); } } } if config.flaredb_endpoint.is_none() { if let Ok(flaredb_endpoint) = std::env::var("PRISMNET_FLAREDB_ENDPOINT") { let trimmed = flaredb_endpoint.trim(); if !trimmed.is_empty() { config.flaredb_endpoint = Some(trimmed.to_string()); } } } if let Ok(metadata_backend) = std::env::var("PRISMNET_METADATA_BACKEND") { let trimmed = metadata_backend.trim(); if !trimmed.is_empty() { config.metadata_backend = parse_metadata_backend(trimmed)?; } } if config.metadata_database_url.is_none() { if let Ok(metadata_database_url) = std::env::var("PRISMNET_METADATA_DATABASE_URL") { let trimmed = metadata_database_url.trim(); if !trimmed.is_empty() { config.metadata_database_url = Some(trimmed.to_string()); } } } if !config.single_node { if let Ok(single_node) = std::env::var("PRISMNET_SINGLE_NODE") { let parsed = single_node.trim().to_ascii_lowercase(); config.single_node = matches!(parsed.as_str(), "1" | "true" | "yes" | "on"); } } // Initialize tracing tracing_subscriber::fmt() .with_env_filter( EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.log_level)), ) .init(); tracing::info!("Starting PrismNET server"); tracing::info!(" gRPC: {}", config.grpc_addr); // Initialize Prometheus metrics exporter let metrics_addr = format!("0.0.0.0:{}", args.metrics_port); let builder = PrometheusBuilder::new(); builder .with_http_listener(metrics_addr.parse::()?) .install() .expect("Failed to install Prometheus metrics exporter"); tracing::info!( "Prometheus metrics available at http://{}/metrics", metrics_addr ); if let Some(endpoint) = &config.chainfire_endpoint { tracing::info!(" Cluster coordination: ChainFire @ {}", endpoint); let endpoint = endpoint.clone(); let addr = config.grpc_addr.to_string(); tokio::spawn(async move { if let Err(error) = register_chainfire_membership(&endpoint, "prismnet", addr).await { tracing::warn!(error = %error, "Failed to register ChainFire membership"); } }); } // Create metadata store from explicitly selected backend. let metadata = match config.metadata_backend { MetadataBackend::FlareDb => { if let Some(endpoint) = &config.flaredb_endpoint { tracing::info!(" Metadata backend: FlareDB @ {}", endpoint); } else { tracing::info!(" Metadata backend: FlareDB"); } Arc::new( NetworkMetadataStore::new_flaredb_with_pd( config.flaredb_endpoint.clone(), config.chainfire_endpoint.clone(), ) .await .map_err(|e| anyhow!("Failed to init FlareDB metadata store: {}", e))?, ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { let database_url = config .metadata_database_url .as_deref() .ok_or_else(|| { anyhow!( "metadata_database_url is required when metadata_backend={} (env: PRISMNET_METADATA_DATABASE_URL)", metadata_backend_name(config.metadata_backend) ) })?; ensure_sql_backend_matches_url(config.metadata_backend, database_url)?; tracing::info!( " Metadata backend: {} @ {}", metadata_backend_name(config.metadata_backend), database_url ); Arc::new( NetworkMetadataStore::new_sql(database_url, config.single_node) .await .map_err(|e| anyhow!("Failed to init SQL metadata store: {}", e))?, ) } }; // Initialize OVN client (default: mock) let ovn = Arc::new(OvnClient::from_env().map_err(|e| anyhow!("Failed to init OVN client: {}", e))?); // Initialize IAM authentication service tracing::info!( "Connecting to IAM server at {}", config.auth.iam_server_addr ); let auth_service = AuthService::new(&config.auth.iam_server_addr) .await .map_err(|e| anyhow!("Failed to connect to IAM server: {}", e))?; let auth_service = Arc::new(auth_service); // Dedicated runtime for auth interceptors to avoid blocking the main async runtime let auth_runtime = Arc::new(tokio::runtime::Runtime::new()?); let make_interceptor = |auth: Arc| { let rt = auth_runtime.clone(); move |mut req: Request<()>| -> Result, Status> { let auth = auth.clone(); tokio::task::block_in_place(|| { rt.block_on(async move { let tenant_context = auth.authenticate_request(&req).await?; req.extensions_mut().insert(tenant_context); Ok(req) }) }) } }; // Create gRPC services let vpc_service = Arc::new(VpcServiceImpl::new( metadata.clone(), ovn.clone(), auth_service.clone(), )); let subnet_service = Arc::new(SubnetServiceImpl::new( metadata.clone(), auth_service.clone(), )); let port_service = Arc::new(PortServiceImpl::new( metadata.clone(), ovn.clone(), auth_service.clone(), )); let sg_service = Arc::new(SecurityGroupServiceImpl::new( metadata.clone(), ovn.clone(), auth_service.clone(), )); let ipam_service = Arc::new(IpamServiceImpl::new(metadata.clone(), auth_service.clone())); // Setup health service let (mut health_reporter, health_service) = health_reporter(); health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; // Parse address let grpc_addr: SocketAddr = config.grpc_addr; // Configure TLS if enabled let mut server = Server::builder(); if let Some(tls_config) = &config.tls { tracing::info!("TLS enabled, loading certificates..."); let cert = tokio::fs::read(&tls_config.cert_file).await?; let key = tokio::fs::read(&tls_config.key_file).await?; let server_identity = Identity::from_pem(cert, key); let tls = if tls_config.require_client_cert { tracing::info!("mTLS enabled"); let ca_cert = tokio::fs::read( tls_config .ca_file .as_ref() .ok_or("ca_file required for mTLS")?, ) .await?; let ca = Certificate::from_pem(ca_cert); ServerTlsConfig::new() .identity(server_identity) .client_ca_root(ca) } else { ServerTlsConfig::new().identity(server_identity) }; server = server.tls_config(tls)?; } // Start gRPC server tracing::info!("gRPC server listening on {}", grpc_addr); let grpc_server = server .add_service(health_service) .add_service(tonic::codegen::InterceptedService::new( VpcServiceServer::new(vpc_service.as_ref().clone()), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( SubnetServiceServer::new(subnet_service.as_ref().clone()), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( PortServiceServer::new(port_service.as_ref().clone()), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( SecurityGroupServiceServer::new(sg_service.as_ref().clone()), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( IpamServiceServer::new(ipam_service.as_ref().clone()), make_interceptor(auth_service.clone()), )) .serve(grpc_addr); // HTTP REST API server let http_addr = config.http_addr; let rest_state = prismnet_server::rest::RestApiState { vpc_service: vpc_service.clone(), subnet_service: subnet_service.clone(), auth_service: auth_service.clone(), }; let rest_app = prismnet_server::rest::build_router(rest_state); let http_listener = tokio::net::TcpListener::bind(&http_addr).await?; tracing::info!("PrismNET HTTP REST API server starting on {}", http_addr); let http_server = async move { axum::serve(http_listener, rest_app) .await .map_err(|e| anyhow!("HTTP server error: {}", e)) }; // Run both servers concurrently tokio::select! { result = grpc_server => { result?; } result = http_server => { result?; } } Ok(()) } fn parse_metadata_backend(value: &str) -> Result> { match value.trim().to_ascii_lowercase().as_str() { "flaredb" => Ok(MetadataBackend::FlareDb), "postgres" => Ok(MetadataBackend::Postgres), "sqlite" => Ok(MetadataBackend::Sqlite), other => Err(format!( "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite", other ) .into()), } } fn metadata_backend_name(backend: MetadataBackend) -> &'static str { match backend { MetadataBackend::FlareDb => "flaredb", MetadataBackend::Postgres => "postgres", MetadataBackend::Sqlite => "sqlite", } } fn ensure_sql_backend_matches_url( backend: MetadataBackend, database_url: &str, ) -> Result<(), Box> { let normalized = database_url.trim().to_ascii_lowercase(); match backend { MetadataBackend::Postgres => { if normalized.starts_with("postgres://") || normalized.starts_with("postgresql://") { Ok(()) } else { Err("metadata_backend=postgres requires postgres:// or postgresql:// URL".into()) } } MetadataBackend::Sqlite => { if normalized.starts_with("sqlite:") { Ok(()) } else { Err("metadata_backend=sqlite requires sqlite: URL".into()) } } MetadataBackend::FlareDb => Ok(()), } } async fn register_chainfire_membership( endpoint: &str, service: &str, addr: String, ) -> anyhow::Result<()> { let node_id = std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id())); let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let key = format!("/cluster/{}/members/{}", service, node_id); let value = format!(r#"{{"addr":"{}","ts":{}}}"#, addr, ts); let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120); let mut attempt = 0usize; let mut last_error = String::new(); loop { attempt += 1; match ChainFireClient::connect(endpoint).await { Ok(mut client) => match client.put_str(&key, &value).await { Ok(_) => return Ok(()), Err(error) => last_error = format!("put failed: {}", error), }, Err(error) => last_error = format!("connect failed: {}", error), } if tokio::time::Instant::now() >= deadline { break; } tracing::warn!( attempt, endpoint, service, error = %last_error, "retrying ChainFire membership registration" ); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } anyhow::bail!( "failed to register ChainFire membership for {} via {} after {} attempts: {}", service, endpoint, attempt, last_error ) }