//! LightningStor object storage server binary use chainfire_client::Client as ChainFireClient; use clap::Parser; use iam_service_auth::AuthService; use lightningstor_api::{BucketServiceServer, ObjectServiceServer}; use lightningstor_distributed::{ DistributedConfig, ErasureCodedBackend, RedundancyMode, ReplicatedBackend, RepairQueue, StaticNodeRegistry, }; use lightningstor_server::{ config::{MetadataBackend, ObjectStorageBackend}, metadata::MetadataStore, repair::{spawn_replicated_repair_worker, MetadataRepairQueue}, s3, BucketServiceImpl, ObjectServiceImpl, ServerConfig, }; use lightningstor_storage::{LocalFsBackend, StorageBackend}; use metrics_exporter_prometheus::PrometheusBuilder; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Status}; use tonic_health::server::health_reporter; use tracing_subscriber::EnvFilter; const MAX_OBJECT_GRPC_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; const OBJECT_GRPC_INITIAL_STREAM_WINDOW: u32 = 64 * 1024 * 1024; const OBJECT_GRPC_INITIAL_CONNECTION_WINDOW: u32 = 512 * 1024 * 1024; const OBJECT_GRPC_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30); const OBJECT_GRPC_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10); const REPLICATED_REPAIR_SCAN_INTERVAL: Duration = Duration::from_secs(5); struct StorageRuntime { backend: Arc, repair_worker: Option>, } /// LightningStor object storage server #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// Configuration file path #[arg(short, long, default_value = "lightningstor.toml")] config: PathBuf, /// gRPC address to listen on (overrides config) #[arg(long)] grpc_addr: Option, /// S3 HTTP API address to listen on (overrides config) #[arg(long)] s3_addr: Option, /// Log level (overrides config) #[arg(short, long)] log_level: Option, /// ChainFire endpoint for cluster coordination (overrides config) #[arg(long, env = "LIGHTNINGSTOR_CHAINFIRE_ENDPOINT")] chainfire_endpoint: Option, /// FlareDB endpoint for metadata and tenant data storage (overrides config) #[arg(long, env = "LIGHTNINGSTOR_FLAREDB_ENDPOINT")] flaredb_endpoint: Option, /// Metadata backend (flaredb, postgres, sqlite) #[arg(long, env = "LIGHTNINGSTOR_METADATA_BACKEND")] metadata_backend: Option, /// SQL database URL for metadata (required for postgres/sqlite backend) #[arg(long, env = "LIGHTNINGSTOR_METADATA_DATABASE_URL")] metadata_database_url: Option, /// Run in single-node mode (required when metadata backend is SQLite) #[arg(long, env = "LIGHTNINGSTOR_SINGLE_NODE")] single_node: bool, /// Data directory for object storage (overrides config) #[arg(long)] data_dir: Option, /// Metrics port for Prometheus scraping #[arg(long, default_value = "9099")] metrics_port: u16, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); // Load configuration from file or use defaults let mut config = if args.config.exists() { let contents = tokio::fs::read_to_string(&args.config).await?; toml::from_str(&contents)? } else { tracing::info!( "Config file not found: {}, using defaults", args.config.display() ); ServerConfig::default() }; // Apply command line overrides if let Some(grpc_addr_str) = args.grpc_addr { config.grpc_addr = grpc_addr_str.parse()?; } if let Some(s3_addr_str) = args.s3_addr { config.s3_addr = s3_addr_str.parse()?; } if let Some(log_level) = args.log_level { config.log_level = log_level; } if let Some(chainfire_endpoint) = args.chainfire_endpoint { config.chainfire_endpoint = Some(chainfire_endpoint); } if let Some(flaredb_endpoint) = args.flaredb_endpoint { config.flaredb_endpoint = Some(flaredb_endpoint); } if let Some(metadata_backend) = args.metadata_backend { config.metadata_backend = parse_metadata_backend(&metadata_backend)?; } if let Some(metadata_database_url) = args.metadata_database_url { config.metadata_database_url = Some(metadata_database_url); } if args.single_node { config.single_node = true; } if let Some(data_dir) = args.data_dir { config.data_dir = data_dir; } // Initialize tracing tracing_subscriber::fmt() .with_env_filter( EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.log_level)), ) .init(); tracing::info!("Starting LightningStor server"); tracing::info!(" gRPC: {}", config.grpc_addr); tracing::info!(" S3 HTTP: {}", config.s3_addr); tracing::info!(" Data dir: {}", config.data_dir); // Initialize Prometheus metrics exporter let metrics_addr = format!("0.0.0.0:{}", args.metrics_port); let builder = PrometheusBuilder::new(); builder .with_http_listener(metrics_addr.parse::()?) .install() .expect("Failed to install Prometheus metrics exporter"); tracing::info!( "Prometheus metrics available at http://{}/metrics", metrics_addr ); if let Some(endpoint) = &config.chainfire_endpoint { tracing::info!(" Cluster coordination: ChainFire @ {}", endpoint); let endpoint = endpoint.clone(); let addr = config.grpc_addr.to_string(); tokio::spawn(async move { if let Err(error) = register_chainfire_membership(&endpoint, "lightningstor", addr).await { tracing::warn!(error = %error, "ChainFire membership registration failed"); } }); } // Create metadata store from explicitly selected backend. let metadata = match config.metadata_backend { MetadataBackend::FlareDb => { if let Some(flaredb_endpoint) = config.flaredb_endpoint.as_deref() { tracing::info!("Metadata backend: FlareDB @ {}", flaredb_endpoint); } else { tracing::info!("Metadata backend: FlareDB"); } Arc::new( MetadataStore::new_flaredb_with_pd( config.flaredb_endpoint.clone(), config.chainfire_endpoint.clone(), ) .await .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?, ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { let database_url = config .metadata_database_url .as_deref() .ok_or_else(|| { format!( "metadata_database_url is required when metadata_backend={} (env: LIGHTNINGSTOR_METADATA_DATABASE_URL)", metadata_backend_name(config.metadata_backend) ) })?; ensure_sql_backend_matches_url(config.metadata_backend, database_url)?; tracing::info!( "Metadata backend: {} @ {}", metadata_backend_name(config.metadata_backend), database_url ); Arc::new( MetadataStore::new_sql(database_url, config.single_node) .await .map_err(|e| format!("Failed to initialize SQL metadata store: {}", e))?, ) } }; let storage_runtime = create_storage_backend(&config, metadata.clone()).await?; let storage = storage_runtime.backend.clone(); let _repair_worker = storage_runtime.repair_worker; // Initialize IAM authentication service tracing::info!( "Connecting to IAM server at {}", config.auth.iam_server_addr ); let auth_service = AuthService::new(&config.auth.iam_server_addr) .await .map_err(|e| format!("Failed to connect to IAM server: {}", e))?; let auth_service = Arc::new(auth_service); // Dedicated runtime for auth interceptors to avoid blocking the main async runtime let auth_runtime = Arc::new(tokio::runtime::Runtime::new()?); let make_interceptor = |auth: Arc| { let rt = auth_runtime.clone(); move |mut req: Request<()>| -> Result, Status> { let auth = auth.clone(); tokio::task::block_in_place(|| { rt.block_on(async move { let tenant_context = auth.authenticate_request(&req).await?; req.extensions_mut().insert(tenant_context); Ok(req) }) }) } }; // Create services let object_service = ObjectServiceImpl::new(storage.clone(), metadata.clone(), auth_service.clone()) .await .expect("Failed to create ObjectService"); let bucket_service = BucketServiceImpl::new(metadata.clone(), auth_service.clone()) .await .expect("Failed to create BucketService"); // Setup health service let (mut health_reporter, health_service) = health_reporter(); health_reporter .set_serving::>() .await; health_reporter .set_serving::>() .await; // Parse addresses let grpc_addr: SocketAddr = config.grpc_addr; let s3_addr: SocketAddr = config.s3_addr; // Start S3 HTTP server with shared state let s3_router = s3::create_router_with_auth( storage.clone(), metadata.clone(), Some(config.auth.iam_server_addr.clone()), ); let s3_server = tokio::spawn(async move { tracing::info!("S3 HTTP server listening on {}", s3_addr); let listener = tokio::net::TcpListener::bind(s3_addr).await.unwrap(); axum::serve(listener, s3_router).await.unwrap(); }); // Configure TLS if enabled let mut server = Server::builder() .tcp_nodelay(true) .initial_stream_window_size(OBJECT_GRPC_INITIAL_STREAM_WINDOW) .initial_connection_window_size(OBJECT_GRPC_INITIAL_CONNECTION_WINDOW) .http2_keepalive_interval(Some(OBJECT_GRPC_KEEPALIVE_INTERVAL)) .http2_keepalive_timeout(Some(OBJECT_GRPC_KEEPALIVE_TIMEOUT)); if let Some(tls_config) = &config.tls { tracing::info!("TLS enabled, loading certificates..."); let cert = tokio::fs::read(&tls_config.cert_file).await?; let key = tokio::fs::read(&tls_config.key_file).await?; let server_identity = Identity::from_pem(cert, key); let tls = if tls_config.require_client_cert { tracing::info!("mTLS enabled"); let ca_cert = tokio::fs::read( tls_config .ca_file .as_ref() .ok_or("ca_file required for mTLS")?, ) .await?; let ca = Certificate::from_pem(ca_cert); ServerTlsConfig::new() .identity(server_identity) .client_ca_root(ca) } else { ServerTlsConfig::new().identity(server_identity) }; server = server.tls_config(tls)?; } // Start gRPC server tracing::info!("gRPC server listening on {}", grpc_addr); let grpc_server = server .add_service(health_service) .add_service(tonic::codegen::InterceptedService::new( ObjectServiceServer::new(object_service) .max_decoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE) .max_encoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE), make_interceptor(auth_service.clone()), )) .add_service(tonic::codegen::InterceptedService::new( BucketServiceServer::new(bucket_service) .max_decoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE) .max_encoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE), make_interceptor(auth_service.clone()), )) .serve(grpc_addr); // Run both servers tokio::select! { result = grpc_server => { if let Err(e) = result { tracing::error!("gRPC server error: {}", e); } } _ = s3_server => { tracing::error!("S3 server unexpectedly terminated"); } } Ok(()) } fn parse_metadata_backend(value: &str) -> Result> { match value.trim().to_ascii_lowercase().as_str() { "flaredb" => Ok(MetadataBackend::FlareDb), "postgres" => Ok(MetadataBackend::Postgres), "sqlite" => Ok(MetadataBackend::Sqlite), other => Err(format!( "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite", other ) .into()), } } fn metadata_backend_name(backend: MetadataBackend) -> &'static str { match backend { MetadataBackend::FlareDb => "flaredb", MetadataBackend::Postgres => "postgres", MetadataBackend::Sqlite => "sqlite", } } fn ensure_sql_backend_matches_url( backend: MetadataBackend, database_url: &str, ) -> Result<(), Box> { let normalized = database_url.trim().to_ascii_lowercase(); match backend { MetadataBackend::Postgres => { if normalized.starts_with("postgres://") || normalized.starts_with("postgresql://") { Ok(()) } else { Err("metadata_backend=postgres requires postgres:// or postgresql:// URL".into()) } } MetadataBackend::Sqlite => { if normalized.starts_with("sqlite:") { Ok(()) } else { Err("metadata_backend=sqlite requires sqlite: URL".into()) } } MetadataBackend::FlareDb => Ok(()), } } async fn register_chainfire_membership( endpoint: &str, service: &str, addr: String, ) -> Result<(), Box> { let node_id = std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id())); let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let key = format!("/cluster/{}/members/{}", service, node_id); let value = format!(r#"{{"addr":"{}","ts":{}}}"#, addr, ts); let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120); let mut attempt = 0usize; let mut last_error = String::new(); loop { attempt += 1; match ChainFireClient::connect(endpoint).await { Ok(mut client) => match client.put_str(&key, &value).await { Ok(_) => return Ok(()), Err(error) => last_error = format!("put failed: {}", error), }, Err(error) => last_error = format!("connect failed: {}", error), } if tokio::time::Instant::now() >= deadline { break; } tracing::warn!( attempt, endpoint, service, error = %last_error, "retrying ChainFire membership registration" ); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } Err(std::io::Error::other(format!( "failed to register ChainFire membership for {} via {} after {} attempts: {}", service, endpoint, attempt, last_error )) .into()) } async fn create_storage_backend( config: &ServerConfig, metadata: Arc, ) -> Result> { match config.object_storage_backend { ObjectStorageBackend::LocalFs => { tracing::info!("Object storage backend: local_fs"); Ok(StorageRuntime { backend: Arc::new(LocalFsBackend::new(&config.data_dir, config.sync_on_write).await?), repair_worker: None, }) } ObjectStorageBackend::Distributed => { tracing::info!("Object storage backend: distributed"); create_distributed_storage_backend(&config.distributed, metadata).await } } } async fn create_distributed_storage_backend( config: &DistributedConfig, metadata: Arc, ) -> Result> { let endpoints: Vec = config .node_endpoints .iter() .map(|endpoint| endpoint.trim().to_string()) .filter(|endpoint| !endpoint.is_empty()) .collect(); if endpoints.is_empty() { return Err(std::io::Error::other( "distributed object storage requires at least one node endpoint", ) .into()); } let min_nodes = config.redundancy.min_nodes(); if endpoints.len() < min_nodes { return Err(std::io::Error::other(format!( "distributed object storage requires at least {} node endpoints for the configured redundancy mode, got {}", min_nodes, endpoints.len() )) .into()); } if let Some(registry_endpoint) = config.registry_endpoint.as_deref() { tracing::warn!( registry_endpoint, "registry_endpoint is not implemented yet; using static node_endpoints only" ); } tracing::info!( node_count = endpoints.len(), min_nodes, fault_tolerance = config.redundancy.fault_tolerance(), connection_timeout_ms = config.connection_timeout_ms, request_timeout_ms = config.request_timeout_ms, "Initializing LightningStor distributed object storage" ); let registry = Arc::new( StaticNodeRegistry::new_with_timeouts( &endpoints, std::time::Duration::from_millis(config.connection_timeout_ms), std::time::Duration::from_millis(config.request_timeout_ms), ) .await?, ); match &config.redundancy { RedundancyMode::Replicated { replica_count, read_quorum, write_quorum, } => { tracing::info!( replica_count, read_quorum, write_quorum, "Using replicated LightningStor storage backend" ); let repair_queue: Arc = Arc::new(MetadataRepairQueue::new(metadata.clone())); let backend = Arc::new( ReplicatedBackend::new_with_repair_queue( config.clone(), registry, Some(repair_queue), ) .await?, ); let repair_worker = Some(spawn_replicated_repair_worker( metadata, backend.clone(), REPLICATED_REPAIR_SCAN_INTERVAL, )); Ok(StorageRuntime { backend, repair_worker, }) } RedundancyMode::ErasureCoded { data_shards, parity_shards, } => { tracing::info!( data_shards, parity_shards, "Using erasure-coded LightningStor storage backend" ); Ok(StorageRuntime { backend: Arc::new(ErasureCodedBackend::new(config.clone(), registry).await?), repair_worker: None, }) } RedundancyMode::None => Err(std::io::Error::other( "distributed object storage does not support redundancy.type=none; use object_storage_backend=local_fs instead", ) .into()), } }