diff --git a/deployer/crates/deployer-ctl/src/power.rs b/deployer/crates/deployer-ctl/src/power.rs index 6f86ad7..156b0d9 100644 --- a/deployer/crates/deployer-ctl/src/power.rs +++ b/deployer/crates/deployer-ctl/src/power.rs @@ -161,7 +161,9 @@ impl RedfishTarget { match action { PowerAction::Cycle => Ok(PowerState::Cycling), - PowerAction::On | PowerAction::Off | PowerAction::Refresh => self.refresh(&client).await, + PowerAction::On | PowerAction::Off | PowerAction::Refresh => { + self.refresh(&client).await + } } } @@ -295,7 +297,12 @@ pub async fn request_reinstall( #[cfg(test)] mod tests { use super::*; - use axum::{extract::State, http::StatusCode, routing::{get, post}, Json, Router}; + use axum::{ + extract::State, + http::StatusCode, + routing::{get, post}, + Json, Router, + }; use serde_json::Value; use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; @@ -303,7 +310,10 @@ mod tests { #[test] fn parse_redfish_short_reference_defaults_to_https() { let parsed = RedfishTarget::parse("redfish://lab-bmc/node01").unwrap(); - assert_eq!(parsed.resource_url.as_str(), "https://lab-bmc/redfish/v1/Systems/node01"); + assert_eq!( + parsed.resource_url.as_str(), + "https://lab-bmc/redfish/v1/Systems/node01" + ); } #[test] @@ -361,8 +371,14 @@ mod tests { addr )) .unwrap(); - assert_eq!(target.perform(PowerAction::Refresh).await.unwrap(), PowerState::On); - assert_eq!(target.perform(PowerAction::Off).await.unwrap(), PowerState::On); + assert_eq!( + target.perform(PowerAction::Refresh).await.unwrap(), + PowerState::On + ); + assert_eq!( + target.perform(PowerAction::Off).await.unwrap(), + PowerState::On + ); let payloads = state.seen_payloads.lock().unwrap().clone(); assert_eq!(payloads, vec![r#"{"ResetType":"ForceOff"}"#.to_string()]); diff --git a/deployer/crates/deployer-ctl/src/remote.rs b/deployer/crates/deployer-ctl/src/remote.rs index 3ff12f7..7aa398e 100644 --- a/deployer/crates/deployer-ctl/src/remote.rs +++ b/deployer/crates/deployer-ctl/src/remote.rs @@ -31,5 +31,3 @@ pub async fn run_deployer_command(endpoint: &str, action: &str) -> Result<()> { Ok(()) } - - diff --git a/fiberlb/crates/fiberlb-server/src/config.rs b/fiberlb/crates/fiberlb-server/src/config.rs index b825d5c..2c43699 100644 --- a/fiberlb/crates/fiberlb-server/src/config.rs +++ b/fiberlb/crates/fiberlb-server/src/config.rs @@ -177,6 +177,10 @@ pub struct VipOwnershipConfig { /// Interface used for local VIP ownership. #[serde(default = "default_vip_ownership_interface")] pub interface: String, + + /// Optional explicit `ip` command path used for local VIP ownership. + #[serde(default)] + pub ip_command: Option, } fn default_vip_ownership_interface() -> String { @@ -188,6 +192,7 @@ impl Default for VipOwnershipConfig { Self { enabled: false, interface: default_vip_ownership_interface(), + ip_command: None, } } } diff --git a/fiberlb/crates/fiberlb-server/src/main.rs b/fiberlb/crates/fiberlb-server/src/main.rs index 294e8e7..b486c3e 100644 --- a/fiberlb/crates/fiberlb-server/src/main.rs +++ b/fiberlb/crates/fiberlb-server/src/main.rs @@ -41,26 +41,6 @@ struct Args { #[arg(long)] grpc_addr: Option, - /// ChainFire endpoint for cluster coordination - #[arg(long, env = "FIBERLB_CHAINFIRE_ENDPOINT")] - chainfire_endpoint: Option, - - /// FlareDB endpoint for metadata and tenant data storage - #[arg(long, env = "FIBERLB_FLAREDB_ENDPOINT")] - flaredb_endpoint: Option, - - /// Metadata backend (flaredb, postgres, sqlite) - #[arg(long, env = "FIBERLB_METADATA_BACKEND")] - metadata_backend: Option, - - /// SQL database URL for metadata (required for postgres/sqlite backend) - #[arg(long, env = "FIBERLB_METADATA_DATABASE_URL")] - metadata_database_url: Option, - - /// Run in single-node mode (required when metadata backend is SQLite) - #[arg(long, env = "FIBERLB_SINGLE_NODE")] - single_node: bool, - /// Log level (overrides config) #[arg(short, long)] log_level: Option, @@ -93,21 +73,6 @@ async fn main() -> Result<(), Box> { if let Some(log_level) = args.log_level { config.log_level = log_level; } - if let Some(chainfire_endpoint) = args.chainfire_endpoint { - config.chainfire_endpoint = Some(chainfire_endpoint); - } - if let Some(flaredb_endpoint) = args.flaredb_endpoint { - config.flaredb_endpoint = Some(flaredb_endpoint); - } - if let Some(metadata_backend) = args.metadata_backend { - config.metadata_backend = parse_metadata_backend(&metadata_backend)?; - } - if let Some(metadata_database_url) = args.metadata_database_url { - config.metadata_database_url = Some(metadata_database_url); - } - if args.single_node { - config.single_node = true; - } // Initialize tracing tracing_subscriber::fmt() @@ -194,15 +159,12 @@ async fn main() -> Result<(), Box> { ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { - let database_url = config - .metadata_database_url - .as_deref() - .ok_or_else(|| { - format!( - "metadata_database_url is required when metadata_backend={} (env: FIBERLB_METADATA_DATABASE_URL)", - metadata_backend_name(config.metadata_backend) - ) - })?; + let database_url = config.metadata_database_url.as_deref().ok_or_else(|| { + format!( + "metadata_database_url is required when metadata_backend={}", + metadata_backend_name(config.metadata_backend) + ) + })?; ensure_sql_backend_matches_url(config.metadata_backend, database_url)?; tracing::info!( " Metadata backend: {} @ {}", @@ -282,8 +244,9 @@ async fn main() -> Result<(), Box> { })?; let bgp = create_bgp_client(config.bgp.clone()).await?; let vip_owner: Option> = if config.vip_ownership.enabled { - Some(Arc::new(KernelVipAddressOwner::new( + Some(Arc::new(KernelVipAddressOwner::with_ip_command( config.vip_ownership.interface.clone(), + config.vip_ownership.ip_command.clone(), ))) } else { None @@ -439,19 +402,6 @@ async fn wait_for_shutdown_signal() -> Result<(), Box> { Ok(()) } -fn parse_metadata_backend(value: &str) -> Result> { - match value.trim().to_ascii_lowercase().as_str() { - "flaredb" => Ok(MetadataBackend::FlareDb), - "postgres" => Ok(MetadataBackend::Postgres), - "sqlite" => Ok(MetadataBackend::Sqlite), - other => Err(format!( - "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite", - other - ) - .into()), - } -} - fn metadata_backend_name(backend: MetadataBackend) -> &'static str { match backend { MetadataBackend::FlareDb => "flaredb", diff --git a/fiberlb/crates/fiberlb-server/src/metadata.rs b/fiberlb/crates/fiberlb-server/src/metadata.rs index 6d2f4c2..b0738ef 100644 --- a/fiberlb/crates/fiberlb-server/src/metadata.rs +++ b/fiberlb/crates/fiberlb-server/src/metadata.rs @@ -61,12 +61,12 @@ impl LbMetadataStore { endpoint: Option, pd_endpoint: Option, ) -> Result { - let endpoint = endpoint.unwrap_or_else(|| { - std::env::var("FIBERLB_FLAREDB_ENDPOINT") - .unwrap_or_else(|_| "127.0.0.1:2479".to_string()) - }); + let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string()); + Self::connect_flaredb(endpoint, pd_endpoint).await + } + + async fn connect_flaredb(endpoint: String, pd_endpoint: Option) -> Result { let pd_endpoint = pd_endpoint - .or_else(|| std::env::var("FIBERLB_CHAINFIRE_ENDPOINT").ok()) .map(|value| normalize_transport_addr(&value)) .unwrap_or_else(|| endpoint.clone()); diff --git a/fiberlb/crates/fiberlb-server/src/vip_owner.rs b/fiberlb/crates/fiberlb-server/src/vip_owner.rs index c5b7a14..c111095 100644 --- a/fiberlb/crates/fiberlb-server/src/vip_owner.rs +++ b/fiberlb/crates/fiberlb-server/src/vip_owner.rs @@ -41,9 +41,17 @@ pub struct KernelVipAddressOwner { impl KernelVipAddressOwner { /// Create a kernel-backed VIP owner for the given interface. pub fn new(interface: impl Into) -> Self { + Self::with_ip_command(interface, None::) + } + + /// Create a kernel-backed VIP owner with an optional explicit `ip` command path. + pub fn with_ip_command( + interface: impl Into, + ip_command: Option>, + ) -> Self { Self { interface: interface.into(), - ip_command: resolve_ip_command(), + ip_command: resolve_ip_command(ip_command.map(Into::into)), } } @@ -123,7 +131,14 @@ impl VipAddressOwner for KernelVipAddressOwner { } } -fn resolve_ip_command() -> String { +fn resolve_ip_command(configured: Option) -> String { + if let Some(path) = configured { + let trimmed = path.trim(); + if !trimmed.is_empty() { + return trimmed.to_string(); + } + } + if let Ok(path) = std::env::var("FIBERLB_IP_COMMAND") { let trimmed = path.trim(); if !trimmed.is_empty() { @@ -159,3 +174,37 @@ fn render_command_output(output: &std::process::Output) -> String { format!("exit status {}", output.status) } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Mutex, OnceLock}; + + fn env_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) + } + + #[test] + fn configured_ip_command_wins_over_env() { + let _guard = env_lock().lock().unwrap(); + std::env::set_var("FIBERLB_IP_COMMAND", "/env/ip"); + + assert_eq!( + resolve_ip_command(Some(" /configured/ip ".to_string())), + "/configured/ip" + ); + + std::env::remove_var("FIBERLB_IP_COMMAND"); + } + + #[test] + fn env_ip_command_is_used_when_config_is_absent() { + let _guard = env_lock().lock().unwrap(); + std::env::set_var("FIBERLB_IP_COMMAND", " /env/ip "); + + assert_eq!(resolve_ip_command(None), "/env/ip"); + + std::env::remove_var("FIBERLB_IP_COMMAND"); + } +} diff --git a/flashdns/crates/flashdns-server/src/main.rs b/flashdns/crates/flashdns-server/src/main.rs index 5cd317a..71fe4ad 100644 --- a/flashdns/crates/flashdns-server/src/main.rs +++ b/flashdns/crates/flashdns-server/src/main.rs @@ -1,27 +1,24 @@ //! FlashDNS authoritative DNS server binary +use anyhow::Result; +use chainfire_client::Client as ChainFireClient; +use clap::Parser; use flashdns_api::{RecordServiceServer, ZoneServiceServer}; use flashdns_server::{ config::{MetadataBackend, ServerConfig}, dns::DnsHandler, metadata::DnsMetadataStore, - RecordServiceImpl, - ZoneServiceImpl, + RecordServiceImpl, ZoneServiceImpl, }; -use chainfire_client::Client as ChainFireClient; use iam_service_auth::AuthService; use metrics_exporter_prometheus::PrometheusBuilder; +use std::path::PathBuf; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Status}; use tonic_health::server::health_reporter; use tracing_subscriber::EnvFilter; -use anyhow::Result; -use clap::Parser; -use std::path::PathBuf; -use std::time::{SystemTime, UNIX_EPOCH}; - -use config::{Config as Cfg, Environment, File, FileFormat}; /// Command-line arguments for FlashDNS server. #[derive(Parser, Debug)] @@ -39,26 +36,6 @@ struct CliArgs { #[arg(long)] dns_addr: Option, - /// ChainFire endpoint for cluster coordination (overrides config) - #[arg(long, env = "FLASHDNS_CHAINFIRE_ENDPOINT")] - chainfire_endpoint: Option, - - /// FlareDB endpoint for metadata and tenant data storage (overrides config) - #[arg(long, env = "FLASHDNS_FLAREDB_ENDPOINT")] - flaredb_endpoint: Option, - - /// Metadata backend (flaredb, postgres, sqlite) - #[arg(long, env = "FLASHDNS_METADATA_BACKEND")] - metadata_backend: Option, - - /// SQL database URL for metadata (required for postgres/sqlite backend) - #[arg(long, env = "FLASHDNS_METADATA_DATABASE_URL")] - metadata_database_url: Option, - - /// Run in single-node mode (required when metadata backend is SQLite) - #[arg(long, env = "FLASHDNS_SINGLE_NODE")] - single_node: bool, - /// Log level (overrides config) #[arg(short, long)] log_level: Option, @@ -72,54 +49,22 @@ struct CliArgs { async fn main() -> Result<(), Box> { let cli_args = CliArgs::parse(); - // Load configuration using config-rs - let mut settings = Cfg::builder() - // Layer 1: Application defaults. Serialize ServerConfig::default() into TOML. - .add_source(File::from_str( - toml::to_string(&ServerConfig::default())?.as_str(), - FileFormat::Toml, - )) - // Layer 2: Environment variables (e.g., FLASHDNS_GRPC_ADDR, FLASHDNS_LOG_LEVEL) - .add_source( - Environment::with_prefix("FLASHDNS") - .separator("__") // Use double underscore for nested fields - ); - - // Layer 3: Configuration file (if specified) - if cli_args.config.exists() { + let mut config = if cli_args.config.exists() { tracing::info!("Loading config from file: {}", cli_args.config.display()); - settings = settings.add_source(File::from(cli_args.config.as_path())); + let contents = tokio::fs::read_to_string(&cli_args.config).await?; + toml::from_str(&contents)? } else { - tracing::info!("Config file not found, using defaults and environment variables."); - } + tracing::info!("Config file not found, using defaults."); + ServerConfig::default() + }; - let mut config: ServerConfig = settings - .build()? - .try_deserialize() - .map_err(|e| anyhow::anyhow!("Failed to load configuration: {}", e))?; - - // Apply command line overrides (Layer 4: highest precedence) + // Apply command line overrides if let Some(grpc_addr_str) = cli_args.grpc_addr { config.grpc_addr = grpc_addr_str.parse()?; } if let Some(dns_addr_str) = cli_args.dns_addr { config.dns_addr = dns_addr_str.parse()?; } - if let Some(chainfire_endpoint) = cli_args.chainfire_endpoint { - config.chainfire_endpoint = Some(chainfire_endpoint); - } - if let Some(flaredb_endpoint) = cli_args.flaredb_endpoint { - config.flaredb_endpoint = Some(flaredb_endpoint); - } - if let Some(metadata_backend) = cli_args.metadata_backend { - config.metadata_backend = parse_metadata_backend(&metadata_backend)?; - } - if let Some(metadata_database_url) = cli_args.metadata_database_url { - config.metadata_database_url = Some(metadata_database_url); - } - if cli_args.single_node { - config.single_node = true; - } if let Some(log_level) = cli_args.log_level { config.log_level = log_level; } @@ -172,20 +117,19 @@ async fn main() -> Result<(), Box> { config.flaredb_endpoint.clone(), config.chainfire_endpoint.clone(), ) - .await - .map_err(|e| anyhow::anyhow!("Failed to initialize FlareDB metadata store: {}", e))?, + .await + .map_err(|e| { + anyhow::anyhow!("Failed to initialize FlareDB metadata store: {}", e) + })?, ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { - let database_url = config - .metadata_database_url - .as_deref() - .ok_or_else(|| { - anyhow::anyhow!( - "metadata_database_url is required when metadata_backend={} (env: FLASHDNS_METADATA_DATABASE_URL)", - metadata_backend_name(config.metadata_backend) - ) - })?; + let database_url = config.metadata_database_url.as_deref().ok_or_else(|| { + anyhow::anyhow!( + "metadata_database_url is required when metadata_backend={}", + metadata_backend_name(config.metadata_backend) + ) + })?; ensure_sql_backend_matches_url(config.metadata_backend, database_url)?; tracing::info!( " Metadata backend: {} @ {}", @@ -195,13 +139,18 @@ async fn main() -> Result<(), Box> { Arc::new( DnsMetadataStore::new_sql(database_url, config.single_node) .await - .map_err(|e| anyhow::anyhow!("Failed to initialize SQL metadata store: {}", e))?, + .map_err(|e| { + anyhow::anyhow!("Failed to initialize SQL metadata store: {}", e) + })?, ) } }; // Initialize IAM authentication service - tracing::info!("Connecting to IAM server at {}", config.auth.iam_server_addr); + tracing::info!( + "Connecting to IAM server at {}", + config.auth.iam_server_addr + ); let auth_service = AuthService::new(&config.auth.iam_server_addr) .await .map_err(|e| anyhow::anyhow!("Failed to connect to IAM server: {}", e))?; @@ -300,18 +249,6 @@ async fn main() -> Result<(), Box> { Ok(()) } -fn parse_metadata_backend(value: &str) -> Result { - match value.trim().to_ascii_lowercase().as_str() { - "flaredb" => Ok(MetadataBackend::FlareDb), - "postgres" => Ok(MetadataBackend::Postgres), - "sqlite" => Ok(MetadataBackend::Sqlite), - other => Err(anyhow::anyhow!( - "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite", - other - )), - } -} - fn metadata_backend_name(backend: MetadataBackend) -> &'static str { match backend { MetadataBackend::FlareDb => "flaredb", @@ -345,11 +282,7 @@ fn ensure_sql_backend_matches_url(backend: MetadataBackend, database_url: &str) } } -async fn register_chainfire_membership( - endpoint: &str, - service: &str, - addr: String, -) -> Result<()> { +async fn register_chainfire_membership(endpoint: &str, service: &str, addr: String) -> Result<()> { let node_id = std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id())); let ts = SystemTime::now() diff --git a/flashdns/crates/flashdns-server/src/metadata.rs b/flashdns/crates/flashdns-server/src/metadata.rs index d96405e..04ac534 100644 --- a/flashdns/crates/flashdns-server/src/metadata.rs +++ b/flashdns/crates/flashdns-server/src/metadata.rs @@ -57,12 +57,8 @@ impl DnsMetadataStore { endpoint: Option, pd_endpoint: Option, ) -> Result { - let endpoint = endpoint.unwrap_or_else(|| { - std::env::var("FLASHDNS_FLAREDB_ENDPOINT") - .unwrap_or_else(|_| "127.0.0.1:2479".to_string()) - }); + let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string()); let pd_endpoint = pd_endpoint - .or_else(|| std::env::var("FLASHDNS_CHAINFIRE_ENDPOINT").ok()) .map(|value| normalize_transport_addr(&value)) .unwrap_or_else(|| endpoint.clone()); diff --git a/iam/crates/iam-server/src/config.rs b/iam/crates/iam-server/src/config.rs index 392e2a0..93987bb 100644 --- a/iam/crates/iam-server/src/config.rs +++ b/iam/crates/iam-server/src/config.rs @@ -27,6 +27,14 @@ pub struct ServerConfig { /// Logging configuration #[serde(default)] pub logging: LoggingConfig, + + /// Admin API policy configuration + #[serde(default)] + pub admin: AdminConfig, + + /// Development-only safety valves + #[serde(default)] + pub dev: DevConfig, } impl ServerConfig { @@ -102,6 +110,29 @@ impl ServerConfig { } } + if let Ok(value) = std::env::var("IAM_ALLOW_UNAUTHENTICATED_ADMIN") + .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_UNAUTHENTICATED_ADMIN")) + { + let value = value.trim().to_ascii_lowercase(); + config.admin.allow_unauthenticated = + matches!(value.as_str(), "1" | "true" | "yes" | "on"); + } + + if let Ok(value) = std::env::var("IAM_ALLOW_RANDOM_SIGNING_KEY") + .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_RANDOM_SIGNING_KEY")) + { + let value = value.trim().to_ascii_lowercase(); + config.dev.allow_random_signing_key = + matches!(value.as_str(), "1" | "true" | "yes" | "on"); + } + + if let Ok(value) = std::env::var("IAM_ALLOW_MEMORY_BACKEND") + .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_MEMORY_BACKEND")) + { + let value = value.trim().to_ascii_lowercase(); + config.dev.allow_memory_backend = matches!(value.as_str(), "1" | "true" | "yes" | "on"); + } + Ok(config) } @@ -132,6 +163,8 @@ impl ServerConfig { }, }, logging: LoggingConfig::default(), + admin: AdminConfig::default(), + dev: DevConfig::default(), } } } @@ -226,6 +259,26 @@ pub struct ClusterConfig { pub chainfire_endpoint: Option, } +/// Admin API policy configuration +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct AdminConfig { + /// Allow admin APIs to run without an explicit admin token (dev only) + #[serde(default)] + pub allow_unauthenticated: bool, +} + +/// Development-only safety valves +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct DevConfig { + /// Allow generating a random signing key when none is configured + #[serde(default)] + pub allow_random_signing_key: bool, + + /// Allow the in-memory backend + #[serde(default)] + pub allow_memory_backend: bool, +} + /// Backend type #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] diff --git a/iam/crates/iam-server/src/main.rs b/iam/crates/iam-server/src/main.rs index 761c20f..86d71ee 100644 --- a/iam/crates/iam-server/src/main.rs +++ b/iam/crates/iam-server/src/main.rs @@ -64,17 +64,18 @@ fn load_admin_token() -> Option { .filter(|value| !value.is_empty()) } -fn allow_unauthenticated_admin() -> bool { - std::env::var("IAM_ALLOW_UNAUTHENTICATED_ADMIN") - .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_UNAUTHENTICATED_ADMIN")) - .ok() - .map(|value| { - matches!( - value.trim().to_ascii_lowercase().as_str(), - "1" | "true" | "yes" | "y" | "on" - ) - }) - .unwrap_or(false) +fn allow_unauthenticated_admin(config: &ServerConfig) -> bool { + config.admin.allow_unauthenticated + || std::env::var("IAM_ALLOW_UNAUTHENTICATED_ADMIN") + .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_UNAUTHENTICATED_ADMIN")) + .ok() + .map(|value| { + matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "y" | "on" + ) + }) + .unwrap_or(false) } fn admin_token_valid(metadata: &MetadataMap, token: &str) -> bool { @@ -334,19 +335,20 @@ async fn main() -> Result<(), Box> { // Create token service let signing_key = if config.authn.internal_token.signing_key.is_empty() { - let allow_random = std::env::var("IAM_ALLOW_RANDOM_SIGNING_KEY") - .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_RANDOM_SIGNING_KEY")) - .ok() - .map(|value| { - matches!( - value.trim().to_lowercase().as_str(), - "1" | "true" | "yes" | "y" | "on" - ) - }) - .unwrap_or(false); + let allow_random = config.dev.allow_random_signing_key + || std::env::var("IAM_ALLOW_RANDOM_SIGNING_KEY") + .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_RANDOM_SIGNING_KEY")) + .ok() + .map(|value| { + matches!( + value.trim().to_lowercase().as_str(), + "1" | "true" | "yes" | "y" | "on" + ) + }) + .unwrap_or(false); if !allow_random { - return Err("No signing key configured. Set IAM_ALLOW_RANDOM_SIGNING_KEY=true for dev or configure authn.internal_token.signing_key.".into()); + return Err("No signing key configured. Set dev.allow_random_signing_key=true (or IAM_ALLOW_RANDOM_SIGNING_KEY=true for legacy dev mode) or configure authn.internal_token.signing_key.".into()); } warn!("No signing key configured, generating random key (dev-only)"); @@ -369,9 +371,9 @@ async fn main() -> Result<(), Box> { let token_service = Arc::new(InternalTokenService::new(token_config)); let admin_token = load_admin_token(); - if admin_token.is_none() && !allow_unauthenticated_admin() { + if admin_token.is_none() && !allow_unauthenticated_admin(&config) { return Err( - "IAM admin token not configured. Set IAM_ADMIN_TOKEN or explicitly allow dev mode with IAM_ALLOW_UNAUTHENTICATED_ADMIN=true." + "IAM admin token not configured. Set IAM_ADMIN_TOKEN or explicitly allow dev mode with admin.allow_unauthenticated=true." .into(), ); } @@ -550,19 +552,20 @@ async fn create_backend( ) -> Result> { match config.store.backend { BackendKind::Memory => { - let allow_memory = std::env::var("IAM_ALLOW_MEMORY_BACKEND") - .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_MEMORY_BACKEND")) - .ok() - .map(|value| { - matches!( - value.trim().to_ascii_lowercase().as_str(), - "1" | "true" | "yes" | "on" - ) - }) - .unwrap_or(false); + let allow_memory = config.dev.allow_memory_backend + || std::env::var("IAM_ALLOW_MEMORY_BACKEND") + .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_MEMORY_BACKEND")) + .ok() + .map(|value| { + matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) + }) + .unwrap_or(false); if !allow_memory { return Err( - "In-memory IAM backend is disabled. Use FlareDB backend, or set IAM_ALLOW_MEMORY_BACKEND=true for tests/dev only." + "In-memory IAM backend is disabled. Use FlareDB backend, or set dev.allow_memory_backend=true for tests/dev only." .into(), ); } diff --git a/k8shost/crates/k8shost-server/src/config.rs b/k8shost/crates/k8shost-server/src/config.rs index d2cbdd6..9f47b58 100644 --- a/k8shost/crates/k8shost-server/src/config.rs +++ b/k8shost/crates/k8shost-server/src/config.rs @@ -91,6 +91,18 @@ impl Default for PrismNetConfig { } } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CreditServiceConfig { + #[serde(default)] + pub server_addr: Option, +} + +impl Default for CreditServiceConfig { + fn default() -> Self { + Self { server_addr: None } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ChainFireConfig { pub endpoint: Option, @@ -110,6 +122,8 @@ pub struct Config { pub flaredb: FlareDbConfig, pub chainfire: ChainFireConfig, pub iam: IamConfig, + #[serde(default)] + pub creditservice: CreditServiceConfig, pub fiberlb: FiberLbConfig, pub flashdns: FlashDnsConfig, pub prismnet: PrismNetConfig, diff --git a/k8shost/crates/k8shost-server/src/main.rs b/k8shost/crates/k8shost-server/src/main.rs index 0a1bfad..f3ecbb5 100644 --- a/k8shost/crates/k8shost-server/src/main.rs +++ b/k8shost/crates/k8shost-server/src/main.rs @@ -38,8 +38,8 @@ use tracing_subscriber::EnvFilter; #[command(about = "Kubernetes API server for PlasmaCloud's k8shost component")] struct Args { /// Configuration file path - #[arg(short, long)] - config: Option, + #[arg(short, long, default_value = "k8shost.toml")] + config: PathBuf, /// Listen address for gRPC server (e.g., "[::]:6443") #[arg(long)] @@ -91,17 +91,19 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); // Load configuration - let mut settings = ::config::Config::builder() - .add_source(::config::File::from_str( - toml::to_string(&Config::default())?.as_str(), - ::config::FileFormat::Toml, - )) - .add_source(::config::Environment::with_prefix("K8SHOST").separator("_")); + let mut settings = ::config::Config::builder().add_source(::config::File::from_str( + toml::to_string(&Config::default())?.as_str(), + ::config::FileFormat::Toml, + )); - // Add config file if specified - if let Some(config_path) = &args.config { - info!("Loading config from file: {}", config_path.display()); - settings = settings.add_source(::config::File::from(config_path.as_path())); + if args.config.exists() { + info!("Loading config from file: {}", args.config.display()); + settings = settings.add_source(::config::File::from(args.config.as_path())); + } else { + info!( + "Config file not found: {}, using defaults", + args.config.display() + ); } let loaded_config: Config = settings @@ -136,6 +138,9 @@ async fn main() -> Result<(), Box> { .iam_server_addr .unwrap_or(loaded_config.iam.server_addr), }, + creditservice: config::CreditServiceConfig { + server_addr: loaded_config.creditservice.server_addr, + }, fiberlb: config::FiberLbConfig { server_addr: args .fiberlb_server_addr @@ -275,8 +280,14 @@ async fn main() -> Result<(), Box> { let ipam_client = Arc::new(IpamClient::new(config.prismnet.server_addr.clone())); // Create service implementations with storage + let creditservice_endpoint = config.creditservice.server_addr.as_deref(); let pod_service = Arc::new( - PodServiceImpl::new_with_credit_service(storage.clone(), auth_service.clone()).await, + PodServiceImpl::new_with_credit_service( + storage.clone(), + auth_service.clone(), + creditservice_endpoint, + ) + .await, ); let service_service = Arc::new(ServiceServiceImpl::new( storage.clone(), @@ -290,7 +301,10 @@ async fn main() -> Result<(), Box> { )); // Start scheduler in background with CreditService integration - let scheduler = Arc::new(scheduler::Scheduler::new_with_credit_service(storage.clone()).await); + let scheduler = Arc::new( + scheduler::Scheduler::new_with_credit_service(storage.clone(), creditservice_endpoint) + .await, + ); tokio::spawn(async move { scheduler.run().await; }); diff --git a/k8shost/crates/k8shost-server/src/scheduler.rs b/k8shost/crates/k8shost-server/src/scheduler.rs index 7976990..63bb3f9 100644 --- a/k8shost/crates/k8shost-server/src/scheduler.rs +++ b/k8shost/crates/k8shost-server/src/scheduler.rs @@ -33,27 +33,31 @@ impl Scheduler { } /// Create a new scheduler with CreditService quota enforcement - pub async fn new_with_credit_service(storage: Arc) -> Self { + pub async fn new_with_credit_service(storage: Arc, endpoint: Option<&str>) -> Self { // Initialize CreditService client if endpoint is configured - let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") { - Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await { - Ok(client) => { - info!( - "Scheduler: CreditService quota enforcement enabled: {}", - endpoint - ); - Some(Arc::new(RwLock::new(client))) - } - Err(e) => { - warn!( + let credit_service = match endpoint { + Some(endpoint) if !endpoint.trim().is_empty() => { + match CreditServiceClient::connect(endpoint).await { + Ok(client) => { + info!( + "Scheduler: CreditService quota enforcement enabled: {}", + endpoint + ); + Some(Arc::new(RwLock::new(client))) + } + Err(e) => { + warn!( "Scheduler: Failed to connect to CreditService (quota enforcement disabled): {}", e ); - None + None + } } - }, - Err(_) => { - info!("Scheduler: CREDITSERVICE_ENDPOINT not set, quota enforcement disabled"); + } + _ => { + info!( + "Scheduler: CreditService endpoint not configured, quota enforcement disabled" + ); None } }; diff --git a/k8shost/crates/k8shost-server/src/services/pod.rs b/k8shost/crates/k8shost-server/src/services/pod.rs index ca518b8..366b513 100644 --- a/k8shost/crates/k8shost-server/src/services/pod.rs +++ b/k8shost/crates/k8shost-server/src/services/pod.rs @@ -45,24 +45,30 @@ impl PodServiceImpl { } } - pub async fn new_with_credit_service(storage: Arc, auth: Arc) -> Self { + pub async fn new_with_credit_service( + storage: Arc, + auth: Arc, + endpoint: Option<&str>, + ) -> Self { // Initialize CreditService client if endpoint is configured - let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") { - Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await { - Ok(client) => { - tracing::info!("CreditService admission control enabled: {}", endpoint); - Some(Arc::new(RwLock::new(client))) + let credit_service = match endpoint { + Some(endpoint) if !endpoint.trim().is_empty() => { + match CreditServiceClient::connect(endpoint).await { + Ok(client) => { + tracing::info!("CreditService admission control enabled: {}", endpoint); + Some(Arc::new(RwLock::new(client))) + } + Err(e) => { + tracing::warn!( + "Failed to connect to CreditService (admission control disabled): {}", + e + ); + None + } } - Err(e) => { - tracing::warn!( - "Failed to connect to CreditService (admission control disabled): {}", - e - ); - None - } - }, - Err(_) => { - tracing::info!("CREDITSERVICE_ENDPOINT not set, admission control disabled"); + } + _ => { + tracing::info!("CreditService endpoint not configured, admission control disabled"); None } }; diff --git a/lightningstor/crates/lightningstor-server/src/config.rs b/lightningstor/crates/lightningstor-server/src/config.rs index e9b19ba..690c89f 100644 --- a/lightningstor/crates/lightningstor-server/src/config.rs +++ b/lightningstor/crates/lightningstor-server/src/config.rs @@ -50,6 +50,75 @@ pub enum ObjectStorageBackend { Distributed, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct S3AuthConfig { + #[serde(default = "default_s3_auth_enabled")] + pub enabled: bool, + + #[serde(default = "default_s3_aws_region")] + pub aws_region: String, + + #[serde(default = "default_s3_iam_cache_ttl_secs")] + pub iam_cache_ttl_secs: u64, + + #[serde(default = "default_s3_default_org_id")] + pub default_org_id: Option, + + #[serde(default = "default_s3_default_project_id")] + pub default_project_id: Option, + + #[serde(default = "default_s3_max_auth_body_bytes")] + pub max_auth_body_bytes: usize, +} + +impl Default for S3AuthConfig { + fn default() -> Self { + Self { + enabled: default_s3_auth_enabled(), + aws_region: default_s3_aws_region(), + iam_cache_ttl_secs: default_s3_iam_cache_ttl_secs(), + default_org_id: default_s3_default_org_id(), + default_project_id: default_s3_default_project_id(), + max_auth_body_bytes: default_s3_max_auth_body_bytes(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct S3PerformanceConfig { + #[serde(default = "default_s3_streaming_put_threshold_bytes")] + pub streaming_put_threshold_bytes: usize, + + #[serde(default = "default_s3_inline_put_max_bytes")] + pub inline_put_max_bytes: usize, + + #[serde(default = "default_s3_multipart_put_concurrency")] + pub multipart_put_concurrency: usize, + + #[serde(default = "default_s3_multipart_fetch_concurrency")] + pub multipart_fetch_concurrency: usize, +} + +impl Default for S3PerformanceConfig { + fn default() -> Self { + Self { + streaming_put_threshold_bytes: default_s3_streaming_put_threshold_bytes(), + inline_put_max_bytes: default_s3_inline_put_max_bytes(), + multipart_put_concurrency: default_s3_multipart_put_concurrency(), + multipart_fetch_concurrency: default_s3_multipart_fetch_concurrency(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct S3Config { + #[serde(default)] + pub auth: S3AuthConfig, + + #[serde(default)] + pub performance: S3PerformanceConfig, +} + /// Server configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { @@ -100,6 +169,10 @@ pub struct ServerConfig { /// Authentication configuration #[serde(default)] pub auth: AuthConfig, + + /// S3 API runtime settings + #[serde(default)] + pub s3: S3Config, } /// Authentication configuration @@ -114,6 +187,46 @@ fn default_iam_server_addr() -> String { "127.0.0.1:50051".to_string() } +fn default_s3_auth_enabled() -> bool { + true +} + +fn default_s3_aws_region() -> String { + "us-east-1".to_string() +} + +fn default_s3_iam_cache_ttl_secs() -> u64 { + 30 +} + +fn default_s3_default_org_id() -> Option { + Some("default".to_string()) +} + +fn default_s3_default_project_id() -> Option { + Some("default".to_string()) +} + +fn default_s3_max_auth_body_bytes() -> usize { + 1024 * 1024 * 1024 +} + +fn default_s3_streaming_put_threshold_bytes() -> usize { + 16 * 1024 * 1024 +} + +fn default_s3_inline_put_max_bytes() -> usize { + 128 * 1024 * 1024 +} + +fn default_s3_multipart_put_concurrency() -> usize { + 4 +} + +fn default_s3_multipart_fetch_concurrency() -> usize { + 4 +} + impl Default for AuthConfig { fn default() -> Self { Self { @@ -139,6 +252,7 @@ impl Default for ServerConfig { sync_on_write: false, tls: None, auth: AuthConfig::default(), + s3: S3Config::default(), } } } diff --git a/lightningstor/crates/lightningstor-server/src/main.rs b/lightningstor/crates/lightningstor-server/src/main.rs index f55d8d5..1961001 100644 --- a/lightningstor/crates/lightningstor-server/src/main.rs +++ b/lightningstor/crates/lightningstor-server/src/main.rs @@ -5,7 +5,7 @@ use clap::Parser; use iam_service_auth::AuthService; use lightningstor_api::{BucketServiceServer, ObjectServiceServer}; use lightningstor_distributed::{ - DistributedConfig, ErasureCodedBackend, RedundancyMode, ReplicatedBackend, RepairQueue, + DistributedConfig, ErasureCodedBackend, RedundancyMode, RepairQueue, ReplicatedBackend, StaticNodeRegistry, }; use lightningstor_server::{ @@ -57,26 +57,6 @@ struct Args { #[arg(short, long)] log_level: Option, - /// ChainFire endpoint for cluster coordination (overrides config) - #[arg(long, env = "LIGHTNINGSTOR_CHAINFIRE_ENDPOINT")] - chainfire_endpoint: Option, - - /// FlareDB endpoint for metadata and tenant data storage (overrides config) - #[arg(long, env = "LIGHTNINGSTOR_FLAREDB_ENDPOINT")] - flaredb_endpoint: Option, - - /// Metadata backend (flaredb, postgres, sqlite) - #[arg(long, env = "LIGHTNINGSTOR_METADATA_BACKEND")] - metadata_backend: Option, - - /// SQL database URL for metadata (required for postgres/sqlite backend) - #[arg(long, env = "LIGHTNINGSTOR_METADATA_DATABASE_URL")] - metadata_database_url: Option, - - /// Run in single-node mode (required when metadata backend is SQLite) - #[arg(long, env = "LIGHTNINGSTOR_SINGLE_NODE")] - single_node: bool, - /// Data directory for object storage (overrides config) #[arg(long)] data_dir: Option, @@ -112,21 +92,6 @@ async fn main() -> Result<(), Box> { if let Some(log_level) = args.log_level { config.log_level = log_level; } - if let Some(chainfire_endpoint) = args.chainfire_endpoint { - config.chainfire_endpoint = Some(chainfire_endpoint); - } - if let Some(flaredb_endpoint) = args.flaredb_endpoint { - config.flaredb_endpoint = Some(flaredb_endpoint); - } - if let Some(metadata_backend) = args.metadata_backend { - config.metadata_backend = parse_metadata_backend(&metadata_backend)?; - } - if let Some(metadata_database_url) = args.metadata_database_url { - config.metadata_database_url = Some(metadata_database_url); - } - if args.single_node { - config.single_node = true; - } if let Some(data_dir) = args.data_dir { config.data_dir = data_dir; } @@ -182,20 +147,17 @@ async fn main() -> Result<(), Box> { config.flaredb_endpoint.clone(), config.chainfire_endpoint.clone(), ) - .await - .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?, + .await + .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?, ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { - let database_url = config - .metadata_database_url - .as_deref() - .ok_or_else(|| { - format!( - "metadata_database_url is required when metadata_backend={} (env: LIGHTNINGSTOR_METADATA_DATABASE_URL)", - metadata_backend_name(config.metadata_backend) - ) - })?; + let database_url = config.metadata_database_url.as_deref().ok_or_else(|| { + format!( + "metadata_database_url is required when metadata_backend={}", + metadata_backend_name(config.metadata_backend) + ) + })?; ensure_sql_backend_matches_url(config.metadata_backend, database_url)?; tracing::info!( "Metadata backend: {} @ {}", @@ -263,10 +225,11 @@ async fn main() -> Result<(), Box> { let s3_addr: SocketAddr = config.s3_addr; // Start S3 HTTP server with shared state - let s3_router = s3::create_router_with_auth( + let s3_router = s3::create_router_with_auth_config( storage.clone(), metadata.clone(), Some(config.auth.iam_server_addr.clone()), + config.s3.clone(), ); let s3_server = tokio::spawn(async move { tracing::info!("S3 HTTP server listening on {}", s3_addr); @@ -341,19 +304,6 @@ async fn main() -> Result<(), Box> { Ok(()) } -fn parse_metadata_backend(value: &str) -> Result> { - match value.trim().to_ascii_lowercase().as_str() { - "flaredb" => Ok(MetadataBackend::FlareDb), - "postgres" => Ok(MetadataBackend::Postgres), - "sqlite" => Ok(MetadataBackend::Sqlite), - other => Err(format!( - "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite", - other - ) - .into()), - } -} - fn metadata_backend_name(backend: MetadataBackend) -> &'static str { match backend { MetadataBackend::FlareDb => "flaredb", @@ -442,7 +392,9 @@ async fn create_storage_backend( ObjectStorageBackend::LocalFs => { tracing::info!("Object storage backend: local_fs"); Ok(StorageRuntime { - backend: Arc::new(LocalFsBackend::new(&config.data_dir, config.sync_on_write).await?), + backend: Arc::new( + LocalFsBackend::new(&config.data_dir, config.sync_on_write).await?, + ), repair_worker: None, }) } diff --git a/lightningstor/crates/lightningstor-server/src/metadata.rs b/lightningstor/crates/lightningstor-server/src/metadata.rs index e44d993..3356b59 100644 --- a/lightningstor/crates/lightningstor-server/src/metadata.rs +++ b/lightningstor/crates/lightningstor-server/src/metadata.rs @@ -55,26 +55,25 @@ impl MetadataStore { endpoint: Option, pd_endpoint: Option, ) -> Result { - let endpoint = endpoint.unwrap_or_else(|| { - std::env::var("LIGHTNINGSTOR_FLAREDB_ENDPOINT") - .unwrap_or_else(|_| "127.0.0.1:2479".to_string()) - }); + let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string()); let pd_endpoint = pd_endpoint - .or_else(|| std::env::var("LIGHTNINGSTOR_CHAINFIRE_ENDPOINT").ok()) .map(|value| normalize_transport_addr(&value)) .unwrap_or_else(|| endpoint.clone()); let mut clients = Vec::with_capacity(FLAREDB_CLIENT_POOL_SIZE); for _ in 0..FLAREDB_CLIENT_POOL_SIZE { - let client = - RdbClient::connect_with_pd_namespace(endpoint.clone(), pd_endpoint.clone(), "lightningstor") - .await - .map_err(|e| { - lightningstor_types::Error::StorageError(format!( - "Failed to connect to FlareDB: {}", - e - )) - })?; + let client = RdbClient::connect_with_pd_namespace( + endpoint.clone(), + pd_endpoint.clone(), + "lightningstor", + ) + .await + .map_err(|e| { + lightningstor_types::Error::StorageError(format!( + "Failed to connect to FlareDB: {}", + e + )) + })?; clients.push(Arc::new(Mutex::new(client))); } @@ -321,7 +320,11 @@ impl MetadataStore { Ok((results, next)) } - async fn flaredb_put(clients: &[Arc>], key: &[u8], value: &[u8]) -> Result<()> { + async fn flaredb_put( + clients: &[Arc>], + key: &[u8], + value: &[u8], + ) -> Result<()> { let client = Self::flaredb_client_for_key(clients, key); let raw_result = { let mut c = client.lock().await; @@ -443,7 +446,8 @@ impl MetadataStore { let client = Self::flaredb_scan_client(clients); let (mut items, next) = match { let mut c = client.lock().await; - c.raw_scan(start_key.clone(), end_key.clone(), fetch_limit).await + c.raw_scan(start_key.clone(), end_key.clone(), fetch_limit) + .await } { Ok((keys, values, next)) => { let items = keys @@ -697,12 +701,13 @@ impl MetadataStore { .await } StorageBackend::Sql(sql) => { - let prefix_end = String::from_utf8(Self::prefix_end(prefix.as_bytes())).map_err(|e| { - lightningstor_types::Error::StorageError(format!( - "Failed to encode prefix end: {}", - e - )) - })?; + let prefix_end = + String::from_utf8(Self::prefix_end(prefix.as_bytes())).map_err(|e| { + lightningstor_types::Error::StorageError(format!( + "Failed to encode prefix end: {}", + e + )) + })?; let fetch_limit = (limit.saturating_add(1)) as i64; match sql { SqlStorageBackend::Postgres(pool) => { @@ -908,7 +913,10 @@ impl MetadataStore { } fn multipart_bucket_prefix(bucket_id: &BucketId, prefix: &str) -> String { - format!("/lightningstor/multipart/by-bucket/{}/{}", bucket_id, prefix) + format!( + "/lightningstor/multipart/by-bucket/{}/{}", + bucket_id, prefix + ) } fn multipart_object_key(object_id: &ObjectId) -> String { @@ -955,7 +963,8 @@ impl MetadataStore { } pub async fn delete_replicated_repair_task(&self, task_id: &str) -> Result<()> { - self.delete_key(&Self::replicated_repair_task_key(task_id)).await + self.delete_key(&Self::replicated_repair_task_key(task_id)) + .await } /// Save bucket metadata @@ -1246,7 +1255,8 @@ impl MetadataStore { )) .await?; } - self.delete_key(&Self::multipart_upload_key(upload_id)).await + self.delete_key(&Self::multipart_upload_key(upload_id)) + .await } pub async fn list_multipart_uploads( @@ -1331,7 +1341,8 @@ impl MetadataStore { } pub async fn delete_object_multipart_upload(&self, object_id: &ObjectId) -> Result<()> { - self.delete_key(&Self::multipart_object_key(object_id)).await + self.delete_key(&Self::multipart_object_key(object_id)) + .await } } @@ -1380,13 +1391,11 @@ mod tests { store.delete_bucket(&bucket).await.unwrap(); assert!(!store.bucket_cache.contains_key(&cache_key)); assert!(!store.bucket_cache.contains_key(&cache_id_key)); - assert!( - store - .load_bucket("org-a", "project-a", "bench-bucket") - .await - .unwrap() - .is_none() - ); + assert!(store + .load_bucket("org-a", "project-a", "bench-bucket") + .await + .unwrap() + .is_none()); } #[tokio::test] @@ -1426,13 +1435,11 @@ mod tests { .await .unwrap(); assert!(!store.object_cache.contains_key(&cache_key)); - assert!( - store - .load_object(&bucket.id, object.key.as_str(), None) - .await - .unwrap() - .is_none() - ); + assert!(store + .load_object(&bucket.id, object.key.as_str(), None) + .await + .unwrap() + .is_none()); } #[tokio::test] @@ -1496,8 +1503,10 @@ mod tests { ); store.save_bucket(&bucket).await.unwrap(); - let upload_a = MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/one.bin").unwrap()); - let upload_b = MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/two.bin").unwrap()); + let upload_a = + MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/one.bin").unwrap()); + let upload_b = + MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/two.bin").unwrap()); let other_bucket = Bucket::new( BucketName::new("other-bucket").unwrap(), "org-a", @@ -1505,8 +1514,10 @@ mod tests { "default", ); store.save_bucket(&other_bucket).await.unwrap(); - let upload_other = - MultipartUpload::new(other_bucket.id.to_string(), ObjectKey::new("a/three.bin").unwrap()); + let upload_other = MultipartUpload::new( + other_bucket.id.to_string(), + ObjectKey::new("a/three.bin").unwrap(), + ); store.save_multipart_upload(&upload_a).await.unwrap(); store.save_multipart_upload(&upload_b).await.unwrap(); @@ -1543,10 +1554,7 @@ mod tests { assert_eq!(tasks[0].attempt_count, 1); assert_eq!(tasks[0].last_error.as_deref(), Some("transient failure")); - store - .delete_replicated_repair_task(&task.id) - .await - .unwrap(); + store.delete_replicated_repair_task(&task.id).await.unwrap(); assert!(store .list_replicated_repair_tasks(10) .await diff --git a/lightningstor/crates/lightningstor-server/src/s3/auth.rs b/lightningstor/crates/lightningstor-server/src/s3/auth.rs index 4552ebc..7c90087 100644 --- a/lightningstor/crates/lightningstor-server/src/s3/auth.rs +++ b/lightningstor/crates/lightningstor-server/src/s3/auth.rs @@ -3,6 +3,7 @@ //! Implements simplified SigV4 authentication compatible with AWS S3 SDKs and aws-cli. //! Integrates with IAM for access key validation. +use crate::config::S3AuthConfig; use crate::tenant::TenantContext; use axum::{ body::{Body, Bytes}, @@ -23,8 +24,6 @@ use tracing::{debug, warn}; use url::form_urlencoded; type HmacSha256 = Hmac; -const DEFAULT_MAX_AUTH_BODY_BYTES: usize = 1024 * 1024 * 1024; - #[derive(Clone, Debug)] pub(crate) struct VerifiedBodyBytes(pub Bytes); @@ -49,6 +48,8 @@ pub struct AuthState { aws_region: String, /// AWS service name for SigV4 (e.g., s3) aws_service: String, + /// Maximum request body size to buffer during auth verification + max_auth_body_bytes: usize, } pub struct IamClient { @@ -60,6 +61,8 @@ pub struct IamClient { enum IamClientMode { Env { credentials: std::collections::HashMap, + default_org_id: Option, + default_project_id: Option, }, Grpc { endpoint: String, @@ -83,11 +86,11 @@ struct CachedCredential { impl IamClient { /// Create a new IAM client. If an endpoint is supplied, use the IAM gRPC API. pub fn new(iam_endpoint: Option) -> Self { - let cache_ttl = std::env::var("LIGHTNINGSTOR_S3_IAM_CACHE_TTL_SECS") - .ok() - .and_then(|value| value.parse::().ok()) - .map(StdDuration::from_secs) - .unwrap_or_else(|| StdDuration::from_secs(30)); + Self::new_with_config(iam_endpoint, &S3AuthConfig::default()) + } + + pub fn new_with_config(iam_endpoint: Option, config: &S3AuthConfig) -> Self { + let cache_ttl = StdDuration::from_secs(config.iam_cache_ttl_secs); if let Some(endpoint) = iam_endpoint .map(|value| normalize_iam_endpoint(&value)) @@ -106,6 +109,8 @@ impl IamClient { Self { mode: IamClientMode::Env { credentials: Self::load_env_credentials(), + default_org_id: config.default_org_id.clone(), + default_project_id: config.default_project_id.clone(), }, credential_cache: Arc::new(RwLock::new(HashMap::new())), cache_ttl, @@ -160,32 +165,32 @@ impl IamClient { #[cfg(test)] fn env_credentials(&self) -> Option<&std::collections::HashMap> { match &self.mode { - IamClientMode::Env { credentials } => Some(credentials), + IamClientMode::Env { credentials, .. } => Some(credentials), IamClientMode::Grpc { .. } => None, } } - fn env_default_tenant() -> (Option, Option) { - let org_id = std::env::var("S3_TENANT_ORG_ID") - .ok() - .or_else(|| std::env::var("S3_ORG_ID").ok()) - .or_else(|| Some("default".to_string())); - let project_id = std::env::var("S3_TENANT_PROJECT_ID") - .ok() - .or_else(|| std::env::var("S3_PROJECT_ID").ok()) - .or_else(|| Some("default".to_string())); - (org_id, project_id) + fn env_default_tenant( + default_org_id: Option, + default_project_id: Option, + ) -> (Option, Option) { + (default_org_id, default_project_id) } /// Validate access key and resolve the credential context. pub async fn get_credential(&self, access_key_id: &str) -> Result { match &self.mode { - IamClientMode::Env { credentials } => { + IamClientMode::Env { + credentials, + default_org_id, + default_project_id, + } => { let secret_key = credentials .get(access_key_id) .cloned() .ok_or_else(|| "Access key ID not found".to_string())?; - let (org_id, project_id) = Self::env_default_tenant(); + let (org_id, project_id) = + Self::env_default_tenant(default_org_id.clone(), default_project_id.clone()); Ok(ResolvedCredential { secret_key, principal_id: access_key_id.to_string(), @@ -318,16 +323,21 @@ fn iam_admin_token() -> Option { impl AuthState { /// Create new auth state with IAM integration pub fn new(iam_endpoint: Option) -> Self { - let iam_client = Some(Arc::new(RwLock::new(IamClient::new(iam_endpoint)))); + Self::new_with_config(iam_endpoint, &S3AuthConfig::default()) + } + + pub fn new_with_config(iam_endpoint: Option, config: &S3AuthConfig) -> Self { + let iam_client = Some(Arc::new(RwLock::new(IamClient::new_with_config( + iam_endpoint, + config, + )))); Self { iam_client, - enabled: std::env::var("S3_AUTH_ENABLED") - .unwrap_or_else(|_| "true".to_string()) - .parse() - .unwrap_or(true), - aws_region: std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()), // Default S3 region + enabled: config.enabled, + aws_region: config.aws_region.clone(), aws_service: "s3".to_string(), + max_auth_body_bytes: config.max_auth_body_bytes, } } @@ -338,6 +348,7 @@ impl AuthState { enabled: false, aws_region: "us-east-1".to_string(), aws_service: "s3".to_string(), + max_auth_body_bytes: S3AuthConfig::default().max_auth_body_bytes, } } } @@ -438,12 +449,8 @@ pub async fn sigv4_auth_middleware( let should_buffer_body = should_buffer_auth_body(payload_hash_header.as_deref()); let body_bytes = if should_buffer_body { - let max_body_bytes = std::env::var("S3_MAX_AUTH_BODY_BYTES") - .ok() - .and_then(|value| value.parse::().ok()) - .unwrap_or(DEFAULT_MAX_AUTH_BODY_BYTES); let (parts, body) = request.into_parts(); - let body_bytes = match axum::body::to_bytes(body, max_body_bytes).await { + let body_bytes = match axum::body::to_bytes(body, auth_state.max_auth_body_bytes).await { Ok(b) => b, Err(e) => { return error_response( diff --git a/lightningstor/crates/lightningstor-server/src/s3/mod.rs b/lightningstor/crates/lightningstor-server/src/s3/mod.rs index 4018331..10c9bed 100644 --- a/lightningstor/crates/lightningstor-server/src/s3/mod.rs +++ b/lightningstor/crates/lightningstor-server/src/s3/mod.rs @@ -6,5 +6,8 @@ mod auth; mod router; mod xml; -pub use auth::{AuthState, sigv4_auth_middleware}; -pub use router::{create_router, create_router_with_auth, create_router_with_state}; +pub use auth::{sigv4_auth_middleware, AuthState}; +pub use router::{ + create_router, create_router_with_auth, create_router_with_auth_config, + create_router_with_state, +}; diff --git a/lightningstor/crates/lightningstor-server/src/s3/router.rs b/lightningstor/crates/lightningstor-server/src/s3/router.rs index 5001d5f..cff2270 100644 --- a/lightningstor/crates/lightningstor-server/src/s3/router.rs +++ b/lightningstor/crates/lightningstor-server/src/s3/router.rs @@ -1,5 +1,6 @@ //! S3 API router using Axum +use crate::config::{S3Config, S3PerformanceConfig}; use axum::{ body::{Body, Bytes}, extract::{Request, State}, @@ -14,8 +15,8 @@ use futures::{stream, stream::FuturesUnordered, StreamExt}; use http_body_util::BodyExt; use md5::{Digest, Md5}; use serde::Deserialize; -use std::io; use sha2::Sha256; +use std::io; use std::sync::Arc; use tokio::task::JoinHandle; @@ -38,17 +39,24 @@ use super::xml::{ pub struct S3State { pub storage: Arc, pub metadata: Arc, + pub performance: S3PerformanceConfig, } -// Keep streamed single-PUT parts aligned with the distributed backend's -// large-object chunking so GET does not degrade into many small serial reads. -const DEFAULT_STREAMING_PUT_THRESHOLD_BYTES: usize = 16 * 1024 * 1024; -const DEFAULT_INLINE_PUT_MAX_BYTES: usize = 128 * 1024 * 1024; -const DEFAULT_MULTIPART_PUT_CONCURRENCY: usize = 4; - impl S3State { pub fn new(storage: Arc, metadata: Arc) -> Self { - Self { storage, metadata } + Self::new_with_config(storage, metadata, S3PerformanceConfig::default()) + } + + pub fn new_with_config( + storage: Arc, + metadata: Arc, + performance: S3PerformanceConfig, + ) -> Self { + Self { + storage, + metadata, + performance, + } } } @@ -57,7 +65,12 @@ pub fn create_router_with_state( storage: Arc, metadata: Arc, ) -> Router { - create_router_with_auth_state(storage, metadata, Arc::new(AuthState::new(None))) + create_router_with_auth_state( + storage, + metadata, + Arc::new(AuthState::new(None)), + S3PerformanceConfig::default(), + ) } /// Create the S3-compatible HTTP router with auth and storage backends @@ -66,15 +79,30 @@ pub fn create_router_with_auth( metadata: Arc, iam_endpoint: Option, ) -> Router { - create_router_with_auth_state(storage, metadata, Arc::new(AuthState::new(iam_endpoint))) + create_router_with_auth_config(storage, metadata, iam_endpoint, S3Config::default()) +} + +pub fn create_router_with_auth_config( + storage: Arc, + metadata: Arc, + iam_endpoint: Option, + config: S3Config, +) -> Router { + create_router_with_auth_state( + storage, + metadata, + Arc::new(AuthState::new_with_config(iam_endpoint, &config.auth)), + config.performance, + ) } fn create_router_with_auth_state( storage: Arc, metadata: Arc, auth_state: Arc, + performance: S3PerformanceConfig, ) -> Router { - let state = Arc::new(S3State::new(storage, metadata)); + let state = Arc::new(S3State::new_with_config(storage, metadata, performance)); Router::new() // Catch-all route for ALL operations (including root /) @@ -126,28 +154,16 @@ fn request_tenant(extensions: &axum::http::Extensions) -> TenantContext { }) } -fn streaming_put_threshold_bytes() -> usize { - std::env::var("LIGHTNINGSTOR_S3_STREAMING_PUT_THRESHOLD_BYTES") - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|value| *value > 0) - .unwrap_or(DEFAULT_STREAMING_PUT_THRESHOLD_BYTES) +fn streaming_put_threshold_bytes(state: &Arc) -> usize { + state.performance.streaming_put_threshold_bytes } -fn inline_put_max_bytes() -> usize { - std::env::var("LIGHTNINGSTOR_S3_INLINE_PUT_MAX_BYTES") - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|value| *value > 0) - .unwrap_or(DEFAULT_INLINE_PUT_MAX_BYTES) +fn inline_put_max_bytes(state: &Arc) -> usize { + state.performance.inline_put_max_bytes } -fn multipart_put_concurrency() -> usize { - std::env::var("LIGHTNINGSTOR_S3_MULTIPART_PUT_CONCURRENCY") - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|value| *value > 0) - .unwrap_or(DEFAULT_MULTIPART_PUT_CONCURRENCY) +fn multipart_put_concurrency(state: &Arc) -> usize { + state.performance.multipart_put_concurrency } fn request_content_length(headers: &HeaderMap) -> Option { @@ -751,13 +767,13 @@ async fn put_object( (body_len, etag, None, Some(body_bytes)) } None => { - let prepared = if let Some(content_length) = - content_length.filter(|content_length| *content_length <= inline_put_max_bytes()) + let prepared = if let Some(content_length) = content_length + .filter(|content_length| *content_length <= inline_put_max_bytes(&state)) { read_inline_put_body( body, verified_payload_hash.as_deref(), - inline_put_max_bytes(), + inline_put_max_bytes(&state), Some(content_length), ) .await @@ -948,7 +964,7 @@ async fn stream_put_body( ) -> Result> { let verify_payload_hash = expected_payload_hash .filter(|expected_payload_hash| *expected_payload_hash != "UNSIGNED-PAYLOAD"); - let threshold = streaming_put_threshold_bytes(); + let threshold = streaming_put_threshold_bytes(state); let mut buffered = BytesMut::with_capacity(threshold); let mut full_md5 = Some(Md5::new()); let mut full_sha256 = verify_payload_hash.map(|_| Sha256::new()); @@ -958,7 +974,7 @@ async fn stream_put_body( let mut scheduled_part_numbers = Vec::new(); let mut completed_parts = Vec::new(); let mut in_flight_uploads = FuturesUnordered::new(); - let max_in_flight_uploads = multipart_put_concurrency(); + let max_in_flight_uploads = multipart_put_concurrency(state); while let Some(frame) = body.frame().await { let frame = match frame { @@ -1282,7 +1298,10 @@ fn multipart_object_body(state: Arc, object: &Object, upload: Multipart ))); } - return Ok(Some((bytes.slice(0..body_end), (storage, upload, idx, offset)))); + return Ok(Some(( + bytes.slice(0..body_end), + (storage, upload, idx, offset), + ))); } Ok(None) @@ -1374,7 +1393,11 @@ async fn get_object( ); } - let multipart_upload = match state.metadata.load_object_multipart_upload(&object.id).await { + let multipart_upload = match state + .metadata + .load_object_multipart_upload(&object.id) + .await + { Ok(upload) => upload, Err(e) => { return error_response( @@ -1385,7 +1408,10 @@ async fn get_object( } }; let (body, content_length) = if let Some(upload) = multipart_upload { - (multipart_object_body(Arc::clone(&state), &object, upload), object.size as usize) + ( + multipart_object_body(Arc::clone(&state), &object, upload), + object.size as usize, + ) } else { let data = match state.storage.get_object(&object.id).await { Ok(data) => data, @@ -1653,6 +1679,7 @@ mod tests { storage, metadata.clone(), Arc::new(AuthState::disabled()), + S3PerformanceConfig::default(), ); std::mem::forget(tempdir); (router, metadata) @@ -1982,7 +2009,8 @@ mod tests { .unwrap(); assert_eq!(response.status(), StatusCode::OK); - let body = vec![b'x'; DEFAULT_STREAMING_PUT_THRESHOLD_BYTES + 1024]; + let threshold = S3PerformanceConfig::default().streaming_put_threshold_bytes; + let body = vec![b'x'; threshold + 1024]; let response = router .clone() .oneshot( @@ -2197,10 +2225,12 @@ mod tests { async fn large_put_streams_multipart_parts_with_parallel_uploads() { let storage = Arc::new(DelayedMultipartStorage::new(Duration::from_millis(25))); let metadata = Arc::new(MetadataStore::new_in_memory()); + let performance = S3PerformanceConfig::default(); let router = create_router_with_auth_state( storage.clone(), metadata, Arc::new(AuthState::disabled()), + performance.clone(), ); let response = router @@ -2216,7 +2246,7 @@ mod tests { .unwrap(); assert_eq!(response.status(), StatusCode::OK); - let body = vec![b'x'; (DEFAULT_STREAMING_PUT_THRESHOLD_BYTES * 2) + 4096]; + let body = vec![b'x'; (performance.streaming_put_threshold_bytes * 2) + 4096]; let response = router .clone() .oneshot( @@ -2250,10 +2280,12 @@ mod tests { async fn moderate_put_with_content_length_stays_inline() { let storage = Arc::new(DelayedMultipartStorage::new(Duration::from_millis(25))); let metadata = Arc::new(MetadataStore::new_in_memory()); + let performance = S3PerformanceConfig::default(); let router = create_router_with_auth_state( storage.clone(), metadata.clone(), Arc::new(AuthState::disabled()), + performance.clone(), ); let response = router @@ -2269,7 +2301,7 @@ mod tests { .unwrap(); assert_eq!(response.status(), StatusCode::OK); - let body = vec![b'y'; DEFAULT_STREAMING_PUT_THRESHOLD_BYTES + 4096]; + let body = vec![b'y'; performance.streaming_put_threshold_bytes + 4096]; let response = router .clone() .oneshot( diff --git a/lightningstor/crates/lightningstor-server/src/s3/xml.rs b/lightningstor/crates/lightningstor-server/src/s3/xml.rs index 0521496..0eeef8d 100644 --- a/lightningstor/crates/lightningstor-server/src/s3/xml.rs +++ b/lightningstor/crates/lightningstor-server/src/s3/xml.rs @@ -170,5 +170,8 @@ pub struct CompleteMultipartUploadResult { /// Convert to XML with declaration pub fn to_xml(value: &T) -> Result { let xml = xml_to_string(value)?; - Ok(format!("\n{}", xml)) + Ok(format!( + "\n{}", + xml + )) } diff --git a/nightlight/crates/nightlight-server/src/config.rs b/nightlight/crates/nightlight-server/src/config.rs index 6c996e7..ed946f7 100644 --- a/nightlight/crates/nightlight-server/src/config.rs +++ b/nightlight/crates/nightlight-server/src/config.rs @@ -6,6 +6,7 @@ use anyhow::Result; use serde::{Deserialize, Serialize}; use std::fs; +use std::path::Path; /// Main server configuration #[derive(Debug, Clone, Serialize, Deserialize)] @@ -123,7 +124,7 @@ pub struct TlsConfig { impl Config { /// Load configuration from a YAML file - pub fn from_file(path: &str) -> Result { + pub fn from_file(path: impl AsRef) -> Result { let content = fs::read_to_string(path)?; let config = serde_yaml::from_str(&content)?; Ok(config) @@ -136,27 +137,6 @@ impl Config { fs::write(path, content)?; Ok(()) } - - /// Apply environment variable overrides - /// - /// This allows NixOS service module to override configuration via environment variables. - /// Environment variables take precedence over configuration file values. - pub fn apply_env_overrides(&mut self) { - if let Ok(val) = std::env::var("NIGHTLIGHT_HTTP_ADDR") { - self.server.http_addr = val; - } - if let Ok(val) = std::env::var("NIGHTLIGHT_GRPC_ADDR") { - self.server.grpc_addr = val; - } - if let Ok(val) = std::env::var("NIGHTLIGHT_DATA_DIR") { - self.storage.data_dir = val; - } - if let Ok(val) = std::env::var("NIGHTLIGHT_RETENTION_DAYS") { - if let Ok(days) = val.parse() { - self.storage.retention_days = days; - } - } - } } impl Default for Config { diff --git a/nightlight/crates/nightlight-server/src/grpc.rs b/nightlight/crates/nightlight-server/src/grpc.rs index 5cee297..c4b2aa0 100644 --- a/nightlight/crates/nightlight-server/src/grpc.rs +++ b/nightlight/crates/nightlight-server/src/grpc.rs @@ -183,7 +183,11 @@ impl Admin for AdminServiceImpl { _request: Request, ) -> Result, Status> { let storage_result = self.storage.stats().await; - let status = if storage_result.is_ok() { "ok" } else { "degraded" }; + let status = if storage_result.is_ok() { + "ok" + } else { + "degraded" + }; let storage_message = match &storage_result { Ok(_) => "storage ready".to_string(), Err(error) => error.to_string(), @@ -253,7 +257,9 @@ impl Admin for AdminServiceImpl { version: env!("CARGO_PKG_VERSION").to_string(), commit: option_env!("GIT_COMMIT").unwrap_or("unknown").to_string(), build_time: option_env!("BUILD_TIME").unwrap_or("unknown").to_string(), - rust_version: option_env!("RUSTC_VERSION").unwrap_or("unknown").to_string(), + rust_version: option_env!("RUSTC_VERSION") + .unwrap_or("unknown") + .to_string(), target: format!("{}-{}", std::env::consts::ARCH, std::env::consts::OS), })) } @@ -330,9 +336,7 @@ mod tests { use super::*; use crate::ingestion::IngestionService; use crate::storage::Storage; - use nightlight_api::nightlight::{ - InstantQueryRequest, LabelValuesRequest, SeriesQueryRequest, - }; + use nightlight_api::nightlight::{InstantQueryRequest, LabelValuesRequest, SeriesQueryRequest}; use nightlight_api::prometheus::{Label, Sample, TimeSeries, WriteRequest}; #[tokio::test] @@ -380,7 +384,10 @@ mod tests { data.result[0].metric.get("__name__").map(String::as_str), Some("grpc_metric") ); - assert_eq!(data.result[0].value.as_ref().map(|value| value.value), Some(12.5)); + assert_eq!( + data.result[0].value.as_ref().map(|value| value.value), + Some(12.5) + ); } #[tokio::test] @@ -452,7 +459,10 @@ mod tests { .unwrap() .into_inner(); assert_eq!(label_values.status, "success"); - assert_eq!(label_values.data, vec!["api".to_string(), "worker".to_string()]); + assert_eq!( + label_values.data, + vec!["api".to_string(), "worker".to_string()] + ); } #[tokio::test] @@ -477,26 +487,33 @@ mod tests { .unwrap(); let query = QueryService::from_storage(storage.queryable()); - query.execute_instant_query("admin_metric", 2_000).await.unwrap(); + query + .execute_instant_query("admin_metric", 2_000) + .await + .unwrap(); - let admin = AdminServiceImpl::new( - Arc::clone(&storage), - ingestion.metrics(), - query.metrics(), - ); + let admin = + AdminServiceImpl::new(Arc::clone(&storage), ingestion.metrics(), query.metrics()); let stats = admin .stats(Request::new(StatsRequest {})) .await .unwrap() .into_inner(); - assert_eq!(stats.storage.as_ref().map(|value| value.total_samples), Some(1)); assert_eq!( - stats.ingestion + stats.storage.as_ref().map(|value| value.total_samples), + Some(1) + ); + assert_eq!( + stats + .ingestion .as_ref() .map(|value| value.samples_ingested_total), Some(1) ); - assert_eq!(stats.query.as_ref().map(|value| value.queries_total), Some(1)); + assert_eq!( + stats.query.as_ref().map(|value| value.queries_total), + Some(1) + ); } } diff --git a/nightlight/crates/nightlight-server/src/ingestion.rs b/nightlight/crates/nightlight-server/src/ingestion.rs index bf9f428..485cd15 100644 --- a/nightlight/crates/nightlight-server/src/ingestion.rs +++ b/nightlight/crates/nightlight-server/src/ingestion.rs @@ -15,8 +15,8 @@ use nightlight_api::prometheus::{Label, WriteRequest}; use nightlight_types::Error; use prost::Message; use snap::raw::Decoder as SnappyDecoder; -use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::time::Instant; use tracing::{debug, error, info, warn}; @@ -113,9 +113,8 @@ impl IngestionService { } // Store series with samples in shared storage - let series_id = nightlight_types::SeriesId( - compute_series_fingerprint(&internal_labels) - ); + let series_id = + nightlight_types::SeriesId(compute_series_fingerprint(&internal_labels)); let time_series = nightlight_types::TimeSeries { id: series_id, @@ -178,11 +177,11 @@ impl IngestionMetrics { } /// Axum handler for /api/v1/write endpoint -async fn handle_remote_write( - State(service): State, - body: Bytes, -) -> Response { - service.metrics.requests_total.fetch_add(1, Ordering::Relaxed); +async fn handle_remote_write(State(service): State, body: Bytes) -> Response { + service + .metrics + .requests_total + .fetch_add(1, Ordering::Relaxed); debug!("Received remote_write request, size: {} bytes", body.len()); @@ -225,17 +224,26 @@ async fn handle_remote_write( } Err(Error::Storage(msg)) if msg.contains("buffer full") => { warn!("Write buffer full, returning 429"); - service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); + service + .metrics + .requests_failed + .fetch_add(1, Ordering::Relaxed); IngestionError::Backpressure.into_response() } Err(Error::InvalidLabel(msg)) => { warn!("Invalid labels: {}", msg); - service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); + service + .metrics + .requests_failed + .fetch_add(1, Ordering::Relaxed); IngestionError::InvalidLabels.into_response() } Err(e) => { error!("Failed to process write request: {}", e); - service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); + service + .metrics + .requests_failed + .fetch_add(1, Ordering::Relaxed); IngestionError::StorageError.into_response() } } @@ -285,7 +293,11 @@ fn validate_labels(labels: Vec