From 0745216107fb38d0b7ae51cf91228d2e4485a847 Mon Sep 17 00:00:00 2001
From: centra
Date: Thu, 2 Apr 2026 07:57:25 +0900
Subject: [PATCH] harden plasmavmc image ingestion and internal execution paths
---
deployer/crates/deployer-ctl/src/power.rs | 26 +-
deployer/crates/deployer-ctl/src/remote.rs | 2 -
fiberlb/crates/fiberlb-server/src/config.rs | 5 +
fiberlb/crates/fiberlb-server/src/main.rs | 66 +-
fiberlb/crates/fiberlb-server/src/metadata.rs | 10 +-
.../crates/fiberlb-server/src/vip_owner.rs | 53 +-
flashdns/crates/flashdns-server/src/main.rs | 129 +--
.../crates/flashdns-server/src/metadata.rs | 6 +-
iam/crates/iam-server/src/config.rs | 53 ++
iam/crates/iam-server/src/main.rs | 73 +-
k8shost/crates/k8shost-server/src/config.rs | 14 +
k8shost/crates/k8shost-server/src/main.rs | 42 +-
.../crates/k8shost-server/src/scheduler.rs | 36 +-
.../crates/k8shost-server/src/services/pod.rs | 38 +-
.../crates/lightningstor-server/src/config.rs | 114 +++
.../crates/lightningstor-server/src/main.rs | 76 +-
.../lightningstor-server/src/metadata.rs | 104 +--
.../lightningstor-server/src/s3/auth.rs | 69 +-
.../crates/lightningstor-server/src/s3/mod.rs | 7 +-
.../lightningstor-server/src/s3/router.rs | 112 ++-
.../crates/lightningstor-server/src/s3/xml.rs | 5 +-
.../crates/nightlight-server/src/config.rs | 24 +-
.../crates/nightlight-server/src/grpc.rs | 49 +-
.../crates/nightlight-server/src/ingestion.rs | 53 +-
.../crates/nightlight-server/src/main.rs | 31 +-
nix/modules/chainfire.nix | 3 +-
nix/modules/fiberlb.nix | 37 +-
nix/modules/flashdns.nix | 2 +-
nix/modules/iam.nix | 36 +-
nix/modules/k8shost.nix | 80 +-
nix/modules/lightningstor.nix | 88 +-
nix/modules/nightlight.nix | 28 +-
nix/modules/plasmavmc.nix | 134 +--
nix/modules/prismnet.nix | 37 +-
nix/templates/plasmacloud-single-node.nix | 8 +-
nix/test-cluster/node01.nix | 13 +-
nix/test-cluster/node02.nix | 7 +-
nix/test-cluster/node03.nix | 7 +-
nix/test-cluster/storage-node01.nix | 13 +-
nix/test-cluster/storage-node02.nix | 7 +-
nix/test-cluster/storage-node03.nix | 7 +-
...fiberlb-native-bgp-ecmp-drain-vm-smoke.nix | 10 +-
.../fiberlb-native-bgp-interop-vm-smoke.nix | 5 +-
.../fiberlb-native-bgp-multipath-vm-smoke.nix | 5 +-
nix/tests/fiberlb-native-bgp-vm-smoke.nix | 5 +-
plasmavmc/Cargo.lock | 14 +
plasmavmc/crates/plasmavmc-kvm/Cargo.toml | 1 +
plasmavmc/crates/plasmavmc-kvm/src/env.rs | 10 +-
plasmavmc/crates/plasmavmc-kvm/src/lib.rs | 321 +++++--
plasmavmc/crates/plasmavmc-kvm/src/qmp.rs | 4 +-
plasmavmc/crates/plasmavmc-server/Cargo.toml | 1 +
.../plasmavmc-server/src/artifact_store.rs | 782 +++++++++++++-----
.../crates/plasmavmc-server/src/config.rs | 331 +++++++-
plasmavmc/crates/plasmavmc-server/src/main.rs | 78 +-
.../crates/plasmavmc-server/src/storage.rs | 293 ++++++-
.../crates/plasmavmc-server/src/vm_service.rs | 653 +++++++++++++--
.../plasmavmc-server/src/volume_manager.rs | 423 +++++++---
.../crates/plasmavmc-server/src/watcher.rs | 7 +-
.../crates/plasmavmc-types/src/config.rs | 13 +-
plasmavmc/proto/plasmavmc.proto | 48 ++
prismnet/crates/prismnet-server/src/config.rs | 39 +
prismnet/crates/prismnet-server/src/main.rs | 115 +--
.../crates/prismnet-server/src/metadata.rs | 18 +-
.../crates/prismnet-server/src/ovn/client.rs | 85 +-
64 files changed, 3531 insertions(+), 1434 deletions(-)
diff --git a/deployer/crates/deployer-ctl/src/power.rs b/deployer/crates/deployer-ctl/src/power.rs
index 6f86ad7..156b0d9 100644
--- a/deployer/crates/deployer-ctl/src/power.rs
+++ b/deployer/crates/deployer-ctl/src/power.rs
@@ -161,7 +161,9 @@ impl RedfishTarget {
match action {
PowerAction::Cycle => Ok(PowerState::Cycling),
- PowerAction::On | PowerAction::Off | PowerAction::Refresh => self.refresh(&client).await,
+ PowerAction::On | PowerAction::Off | PowerAction::Refresh => {
+ self.refresh(&client).await
+ }
}
}
@@ -295,7 +297,12 @@ pub async fn request_reinstall(
#[cfg(test)]
mod tests {
use super::*;
- use axum::{extract::State, http::StatusCode, routing::{get, post}, Json, Router};
+ use axum::{
+ extract::State,
+ http::StatusCode,
+ routing::{get, post},
+ Json, Router,
+ };
use serde_json::Value;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
@@ -303,7 +310,10 @@ mod tests {
#[test]
fn parse_redfish_short_reference_defaults_to_https() {
let parsed = RedfishTarget::parse("redfish://lab-bmc/node01").unwrap();
- assert_eq!(parsed.resource_url.as_str(), "https://lab-bmc/redfish/v1/Systems/node01");
+ assert_eq!(
+ parsed.resource_url.as_str(),
+ "https://lab-bmc/redfish/v1/Systems/node01"
+ );
}
#[test]
@@ -361,8 +371,14 @@ mod tests {
addr
))
.unwrap();
- assert_eq!(target.perform(PowerAction::Refresh).await.unwrap(), PowerState::On);
- assert_eq!(target.perform(PowerAction::Off).await.unwrap(), PowerState::On);
+ assert_eq!(
+ target.perform(PowerAction::Refresh).await.unwrap(),
+ PowerState::On
+ );
+ assert_eq!(
+ target.perform(PowerAction::Off).await.unwrap(),
+ PowerState::On
+ );
let payloads = state.seen_payloads.lock().unwrap().clone();
assert_eq!(payloads, vec![r#"{"ResetType":"ForceOff"}"#.to_string()]);
diff --git a/deployer/crates/deployer-ctl/src/remote.rs b/deployer/crates/deployer-ctl/src/remote.rs
index 3ff12f7..7aa398e 100644
--- a/deployer/crates/deployer-ctl/src/remote.rs
+++ b/deployer/crates/deployer-ctl/src/remote.rs
@@ -31,5 +31,3 @@ pub async fn run_deployer_command(endpoint: &str, action: &str) -> Result<()> {
Ok(())
}
-
-
diff --git a/fiberlb/crates/fiberlb-server/src/config.rs b/fiberlb/crates/fiberlb-server/src/config.rs
index b825d5c..2c43699 100644
--- a/fiberlb/crates/fiberlb-server/src/config.rs
+++ b/fiberlb/crates/fiberlb-server/src/config.rs
@@ -177,6 +177,10 @@ pub struct VipOwnershipConfig {
/// Interface used for local VIP ownership.
#[serde(default = "default_vip_ownership_interface")]
pub interface: String,
+
+ /// Optional explicit `ip` command path used for local VIP ownership.
+ #[serde(default)]
+ pub ip_command: Option,
}
fn default_vip_ownership_interface() -> String {
@@ -188,6 +192,7 @@ impl Default for VipOwnershipConfig {
Self {
enabled: false,
interface: default_vip_ownership_interface(),
+ ip_command: None,
}
}
}
diff --git a/fiberlb/crates/fiberlb-server/src/main.rs b/fiberlb/crates/fiberlb-server/src/main.rs
index 294e8e7..b486c3e 100644
--- a/fiberlb/crates/fiberlb-server/src/main.rs
+++ b/fiberlb/crates/fiberlb-server/src/main.rs
@@ -41,26 +41,6 @@ struct Args {
#[arg(long)]
grpc_addr: Option,
- /// ChainFire endpoint for cluster coordination
- #[arg(long, env = "FIBERLB_CHAINFIRE_ENDPOINT")]
- chainfire_endpoint: Option,
-
- /// FlareDB endpoint for metadata and tenant data storage
- #[arg(long, env = "FIBERLB_FLAREDB_ENDPOINT")]
- flaredb_endpoint: Option,
-
- /// Metadata backend (flaredb, postgres, sqlite)
- #[arg(long, env = "FIBERLB_METADATA_BACKEND")]
- metadata_backend: Option,
-
- /// SQL database URL for metadata (required for postgres/sqlite backend)
- #[arg(long, env = "FIBERLB_METADATA_DATABASE_URL")]
- metadata_database_url: Option,
-
- /// Run in single-node mode (required when metadata backend is SQLite)
- #[arg(long, env = "FIBERLB_SINGLE_NODE")]
- single_node: bool,
-
/// Log level (overrides config)
#[arg(short, long)]
log_level: Option,
@@ -93,21 +73,6 @@ async fn main() -> Result<(), Box> {
if let Some(log_level) = args.log_level {
config.log_level = log_level;
}
- if let Some(chainfire_endpoint) = args.chainfire_endpoint {
- config.chainfire_endpoint = Some(chainfire_endpoint);
- }
- if let Some(flaredb_endpoint) = args.flaredb_endpoint {
- config.flaredb_endpoint = Some(flaredb_endpoint);
- }
- if let Some(metadata_backend) = args.metadata_backend {
- config.metadata_backend = parse_metadata_backend(&metadata_backend)?;
- }
- if let Some(metadata_database_url) = args.metadata_database_url {
- config.metadata_database_url = Some(metadata_database_url);
- }
- if args.single_node {
- config.single_node = true;
- }
// Initialize tracing
tracing_subscriber::fmt()
@@ -194,15 +159,12 @@ async fn main() -> Result<(), Box> {
)
}
MetadataBackend::Postgres | MetadataBackend::Sqlite => {
- let database_url = config
- .metadata_database_url
- .as_deref()
- .ok_or_else(|| {
- format!(
- "metadata_database_url is required when metadata_backend={} (env: FIBERLB_METADATA_DATABASE_URL)",
- metadata_backend_name(config.metadata_backend)
- )
- })?;
+ let database_url = config.metadata_database_url.as_deref().ok_or_else(|| {
+ format!(
+ "metadata_database_url is required when metadata_backend={}",
+ metadata_backend_name(config.metadata_backend)
+ )
+ })?;
ensure_sql_backend_matches_url(config.metadata_backend, database_url)?;
tracing::info!(
" Metadata backend: {} @ {}",
@@ -282,8 +244,9 @@ async fn main() -> Result<(), Box> {
})?;
let bgp = create_bgp_client(config.bgp.clone()).await?;
let vip_owner: Option> = if config.vip_ownership.enabled {
- Some(Arc::new(KernelVipAddressOwner::new(
+ Some(Arc::new(KernelVipAddressOwner::with_ip_command(
config.vip_ownership.interface.clone(),
+ config.vip_ownership.ip_command.clone(),
)))
} else {
None
@@ -439,19 +402,6 @@ async fn wait_for_shutdown_signal() -> Result<(), Box> {
Ok(())
}
-fn parse_metadata_backend(value: &str) -> Result> {
- match value.trim().to_ascii_lowercase().as_str() {
- "flaredb" => Ok(MetadataBackend::FlareDb),
- "postgres" => Ok(MetadataBackend::Postgres),
- "sqlite" => Ok(MetadataBackend::Sqlite),
- other => Err(format!(
- "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite",
- other
- )
- .into()),
- }
-}
-
fn metadata_backend_name(backend: MetadataBackend) -> &'static str {
match backend {
MetadataBackend::FlareDb => "flaredb",
diff --git a/fiberlb/crates/fiberlb-server/src/metadata.rs b/fiberlb/crates/fiberlb-server/src/metadata.rs
index 6d2f4c2..b0738ef 100644
--- a/fiberlb/crates/fiberlb-server/src/metadata.rs
+++ b/fiberlb/crates/fiberlb-server/src/metadata.rs
@@ -61,12 +61,12 @@ impl LbMetadataStore {
endpoint: Option,
pd_endpoint: Option,
) -> Result {
- let endpoint = endpoint.unwrap_or_else(|| {
- std::env::var("FIBERLB_FLAREDB_ENDPOINT")
- .unwrap_or_else(|_| "127.0.0.1:2479".to_string())
- });
+ let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string());
+ Self::connect_flaredb(endpoint, pd_endpoint).await
+ }
+
+ async fn connect_flaredb(endpoint: String, pd_endpoint: Option) -> Result {
let pd_endpoint = pd_endpoint
- .or_else(|| std::env::var("FIBERLB_CHAINFIRE_ENDPOINT").ok())
.map(|value| normalize_transport_addr(&value))
.unwrap_or_else(|| endpoint.clone());
diff --git a/fiberlb/crates/fiberlb-server/src/vip_owner.rs b/fiberlb/crates/fiberlb-server/src/vip_owner.rs
index c5b7a14..c111095 100644
--- a/fiberlb/crates/fiberlb-server/src/vip_owner.rs
+++ b/fiberlb/crates/fiberlb-server/src/vip_owner.rs
@@ -41,9 +41,17 @@ pub struct KernelVipAddressOwner {
impl KernelVipAddressOwner {
/// Create a kernel-backed VIP owner for the given interface.
pub fn new(interface: impl Into) -> Self {
+ Self::with_ip_command(interface, None::)
+ }
+
+ /// Create a kernel-backed VIP owner with an optional explicit `ip` command path.
+ pub fn with_ip_command(
+ interface: impl Into,
+ ip_command: Option>,
+ ) -> Self {
Self {
interface: interface.into(),
- ip_command: resolve_ip_command(),
+ ip_command: resolve_ip_command(ip_command.map(Into::into)),
}
}
@@ -123,7 +131,14 @@ impl VipAddressOwner for KernelVipAddressOwner {
}
}
-fn resolve_ip_command() -> String {
+fn resolve_ip_command(configured: Option) -> String {
+ if let Some(path) = configured {
+ let trimmed = path.trim();
+ if !trimmed.is_empty() {
+ return trimmed.to_string();
+ }
+ }
+
if let Ok(path) = std::env::var("FIBERLB_IP_COMMAND") {
let trimmed = path.trim();
if !trimmed.is_empty() {
@@ -159,3 +174,37 @@ fn render_command_output(output: &std::process::Output) -> String {
format!("exit status {}", output.status)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::{Mutex, OnceLock};
+
+ fn env_lock() -> &'static Mutex<()> {
+ static LOCK: OnceLock> = OnceLock::new();
+ LOCK.get_or_init(|| Mutex::new(()))
+ }
+
+ #[test]
+ fn configured_ip_command_wins_over_env() {
+ let _guard = env_lock().lock().unwrap();
+ std::env::set_var("FIBERLB_IP_COMMAND", "/env/ip");
+
+ assert_eq!(
+ resolve_ip_command(Some(" /configured/ip ".to_string())),
+ "/configured/ip"
+ );
+
+ std::env::remove_var("FIBERLB_IP_COMMAND");
+ }
+
+ #[test]
+ fn env_ip_command_is_used_when_config_is_absent() {
+ let _guard = env_lock().lock().unwrap();
+ std::env::set_var("FIBERLB_IP_COMMAND", " /env/ip ");
+
+ assert_eq!(resolve_ip_command(None), "/env/ip");
+
+ std::env::remove_var("FIBERLB_IP_COMMAND");
+ }
+}
diff --git a/flashdns/crates/flashdns-server/src/main.rs b/flashdns/crates/flashdns-server/src/main.rs
index 5cd317a..71fe4ad 100644
--- a/flashdns/crates/flashdns-server/src/main.rs
+++ b/flashdns/crates/flashdns-server/src/main.rs
@@ -1,27 +1,24 @@
//! FlashDNS authoritative DNS server binary
+use anyhow::Result;
+use chainfire_client::Client as ChainFireClient;
+use clap::Parser;
use flashdns_api::{RecordServiceServer, ZoneServiceServer};
use flashdns_server::{
config::{MetadataBackend, ServerConfig},
dns::DnsHandler,
metadata::DnsMetadataStore,
- RecordServiceImpl,
- ZoneServiceImpl,
+ RecordServiceImpl, ZoneServiceImpl,
};
-use chainfire_client::Client as ChainFireClient;
use iam_service_auth::AuthService;
use metrics_exporter_prometheus::PrometheusBuilder;
+use std::path::PathBuf;
use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig};
use tonic::{Request, Status};
use tonic_health::server::health_reporter;
use tracing_subscriber::EnvFilter;
-use anyhow::Result;
-use clap::Parser;
-use std::path::PathBuf;
-use std::time::{SystemTime, UNIX_EPOCH};
-
-use config::{Config as Cfg, Environment, File, FileFormat};
/// Command-line arguments for FlashDNS server.
#[derive(Parser, Debug)]
@@ -39,26 +36,6 @@ struct CliArgs {
#[arg(long)]
dns_addr: Option,
- /// ChainFire endpoint for cluster coordination (overrides config)
- #[arg(long, env = "FLASHDNS_CHAINFIRE_ENDPOINT")]
- chainfire_endpoint: Option,
-
- /// FlareDB endpoint for metadata and tenant data storage (overrides config)
- #[arg(long, env = "FLASHDNS_FLAREDB_ENDPOINT")]
- flaredb_endpoint: Option,
-
- /// Metadata backend (flaredb, postgres, sqlite)
- #[arg(long, env = "FLASHDNS_METADATA_BACKEND")]
- metadata_backend: Option,
-
- /// SQL database URL for metadata (required for postgres/sqlite backend)
- #[arg(long, env = "FLASHDNS_METADATA_DATABASE_URL")]
- metadata_database_url: Option,
-
- /// Run in single-node mode (required when metadata backend is SQLite)
- #[arg(long, env = "FLASHDNS_SINGLE_NODE")]
- single_node: bool,
-
/// Log level (overrides config)
#[arg(short, long)]
log_level: Option,
@@ -72,54 +49,22 @@ struct CliArgs {
async fn main() -> Result<(), Box> {
let cli_args = CliArgs::parse();
- // Load configuration using config-rs
- let mut settings = Cfg::builder()
- // Layer 1: Application defaults. Serialize ServerConfig::default() into TOML.
- .add_source(File::from_str(
- toml::to_string(&ServerConfig::default())?.as_str(),
- FileFormat::Toml,
- ))
- // Layer 2: Environment variables (e.g., FLASHDNS_GRPC_ADDR, FLASHDNS_LOG_LEVEL)
- .add_source(
- Environment::with_prefix("FLASHDNS")
- .separator("__") // Use double underscore for nested fields
- );
-
- // Layer 3: Configuration file (if specified)
- if cli_args.config.exists() {
+ let mut config = if cli_args.config.exists() {
tracing::info!("Loading config from file: {}", cli_args.config.display());
- settings = settings.add_source(File::from(cli_args.config.as_path()));
+ let contents = tokio::fs::read_to_string(&cli_args.config).await?;
+ toml::from_str(&contents)?
} else {
- tracing::info!("Config file not found, using defaults and environment variables.");
- }
+ tracing::info!("Config file not found, using defaults.");
+ ServerConfig::default()
+ };
- let mut config: ServerConfig = settings
- .build()?
- .try_deserialize()
- .map_err(|e| anyhow::anyhow!("Failed to load configuration: {}", e))?;
-
- // Apply command line overrides (Layer 4: highest precedence)
+ // Apply command line overrides
if let Some(grpc_addr_str) = cli_args.grpc_addr {
config.grpc_addr = grpc_addr_str.parse()?;
}
if let Some(dns_addr_str) = cli_args.dns_addr {
config.dns_addr = dns_addr_str.parse()?;
}
- if let Some(chainfire_endpoint) = cli_args.chainfire_endpoint {
- config.chainfire_endpoint = Some(chainfire_endpoint);
- }
- if let Some(flaredb_endpoint) = cli_args.flaredb_endpoint {
- config.flaredb_endpoint = Some(flaredb_endpoint);
- }
- if let Some(metadata_backend) = cli_args.metadata_backend {
- config.metadata_backend = parse_metadata_backend(&metadata_backend)?;
- }
- if let Some(metadata_database_url) = cli_args.metadata_database_url {
- config.metadata_database_url = Some(metadata_database_url);
- }
- if cli_args.single_node {
- config.single_node = true;
- }
if let Some(log_level) = cli_args.log_level {
config.log_level = log_level;
}
@@ -172,20 +117,19 @@ async fn main() -> Result<(), Box> {
config.flaredb_endpoint.clone(),
config.chainfire_endpoint.clone(),
)
- .await
- .map_err(|e| anyhow::anyhow!("Failed to initialize FlareDB metadata store: {}", e))?,
+ .await
+ .map_err(|e| {
+ anyhow::anyhow!("Failed to initialize FlareDB metadata store: {}", e)
+ })?,
)
}
MetadataBackend::Postgres | MetadataBackend::Sqlite => {
- let database_url = config
- .metadata_database_url
- .as_deref()
- .ok_or_else(|| {
- anyhow::anyhow!(
- "metadata_database_url is required when metadata_backend={} (env: FLASHDNS_METADATA_DATABASE_URL)",
- metadata_backend_name(config.metadata_backend)
- )
- })?;
+ let database_url = config.metadata_database_url.as_deref().ok_or_else(|| {
+ anyhow::anyhow!(
+ "metadata_database_url is required when metadata_backend={}",
+ metadata_backend_name(config.metadata_backend)
+ )
+ })?;
ensure_sql_backend_matches_url(config.metadata_backend, database_url)?;
tracing::info!(
" Metadata backend: {} @ {}",
@@ -195,13 +139,18 @@ async fn main() -> Result<(), Box> {
Arc::new(
DnsMetadataStore::new_sql(database_url, config.single_node)
.await
- .map_err(|e| anyhow::anyhow!("Failed to initialize SQL metadata store: {}", e))?,
+ .map_err(|e| {
+ anyhow::anyhow!("Failed to initialize SQL metadata store: {}", e)
+ })?,
)
}
};
// Initialize IAM authentication service
- tracing::info!("Connecting to IAM server at {}", config.auth.iam_server_addr);
+ tracing::info!(
+ "Connecting to IAM server at {}",
+ config.auth.iam_server_addr
+ );
let auth_service = AuthService::new(&config.auth.iam_server_addr)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to IAM server: {}", e))?;
@@ -300,18 +249,6 @@ async fn main() -> Result<(), Box> {
Ok(())
}
-fn parse_metadata_backend(value: &str) -> Result {
- match value.trim().to_ascii_lowercase().as_str() {
- "flaredb" => Ok(MetadataBackend::FlareDb),
- "postgres" => Ok(MetadataBackend::Postgres),
- "sqlite" => Ok(MetadataBackend::Sqlite),
- other => Err(anyhow::anyhow!(
- "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite",
- other
- )),
- }
-}
-
fn metadata_backend_name(backend: MetadataBackend) -> &'static str {
match backend {
MetadataBackend::FlareDb => "flaredb",
@@ -345,11 +282,7 @@ fn ensure_sql_backend_matches_url(backend: MetadataBackend, database_url: &str)
}
}
-async fn register_chainfire_membership(
- endpoint: &str,
- service: &str,
- addr: String,
-) -> Result<()> {
+async fn register_chainfire_membership(endpoint: &str, service: &str, addr: String) -> Result<()> {
let node_id =
std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id()));
let ts = SystemTime::now()
diff --git a/flashdns/crates/flashdns-server/src/metadata.rs b/flashdns/crates/flashdns-server/src/metadata.rs
index d96405e..04ac534 100644
--- a/flashdns/crates/flashdns-server/src/metadata.rs
+++ b/flashdns/crates/flashdns-server/src/metadata.rs
@@ -57,12 +57,8 @@ impl DnsMetadataStore {
endpoint: Option,
pd_endpoint: Option,
) -> Result {
- let endpoint = endpoint.unwrap_or_else(|| {
- std::env::var("FLASHDNS_FLAREDB_ENDPOINT")
- .unwrap_or_else(|_| "127.0.0.1:2479".to_string())
- });
+ let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string());
let pd_endpoint = pd_endpoint
- .or_else(|| std::env::var("FLASHDNS_CHAINFIRE_ENDPOINT").ok())
.map(|value| normalize_transport_addr(&value))
.unwrap_or_else(|| endpoint.clone());
diff --git a/iam/crates/iam-server/src/config.rs b/iam/crates/iam-server/src/config.rs
index 392e2a0..93987bb 100644
--- a/iam/crates/iam-server/src/config.rs
+++ b/iam/crates/iam-server/src/config.rs
@@ -27,6 +27,14 @@ pub struct ServerConfig {
/// Logging configuration
#[serde(default)]
pub logging: LoggingConfig,
+
+ /// Admin API policy configuration
+ #[serde(default)]
+ pub admin: AdminConfig,
+
+ /// Development-only safety valves
+ #[serde(default)]
+ pub dev: DevConfig,
}
impl ServerConfig {
@@ -102,6 +110,29 @@ impl ServerConfig {
}
}
+ if let Ok(value) = std::env::var("IAM_ALLOW_UNAUTHENTICATED_ADMIN")
+ .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_UNAUTHENTICATED_ADMIN"))
+ {
+ let value = value.trim().to_ascii_lowercase();
+ config.admin.allow_unauthenticated =
+ matches!(value.as_str(), "1" | "true" | "yes" | "on");
+ }
+
+ if let Ok(value) = std::env::var("IAM_ALLOW_RANDOM_SIGNING_KEY")
+ .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_RANDOM_SIGNING_KEY"))
+ {
+ let value = value.trim().to_ascii_lowercase();
+ config.dev.allow_random_signing_key =
+ matches!(value.as_str(), "1" | "true" | "yes" | "on");
+ }
+
+ if let Ok(value) = std::env::var("IAM_ALLOW_MEMORY_BACKEND")
+ .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_MEMORY_BACKEND"))
+ {
+ let value = value.trim().to_ascii_lowercase();
+ config.dev.allow_memory_backend = matches!(value.as_str(), "1" | "true" | "yes" | "on");
+ }
+
Ok(config)
}
@@ -132,6 +163,8 @@ impl ServerConfig {
},
},
logging: LoggingConfig::default(),
+ admin: AdminConfig::default(),
+ dev: DevConfig::default(),
}
}
}
@@ -226,6 +259,26 @@ pub struct ClusterConfig {
pub chainfire_endpoint: Option,
}
+/// Admin API policy configuration
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct AdminConfig {
+ /// Allow admin APIs to run without an explicit admin token (dev only)
+ #[serde(default)]
+ pub allow_unauthenticated: bool,
+}
+
+/// Development-only safety valves
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct DevConfig {
+ /// Allow generating a random signing key when none is configured
+ #[serde(default)]
+ pub allow_random_signing_key: bool,
+
+ /// Allow the in-memory backend
+ #[serde(default)]
+ pub allow_memory_backend: bool,
+}
+
/// Backend type
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
diff --git a/iam/crates/iam-server/src/main.rs b/iam/crates/iam-server/src/main.rs
index 761c20f..86d71ee 100644
--- a/iam/crates/iam-server/src/main.rs
+++ b/iam/crates/iam-server/src/main.rs
@@ -64,17 +64,18 @@ fn load_admin_token() -> Option {
.filter(|value| !value.is_empty())
}
-fn allow_unauthenticated_admin() -> bool {
- std::env::var("IAM_ALLOW_UNAUTHENTICATED_ADMIN")
- .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_UNAUTHENTICATED_ADMIN"))
- .ok()
- .map(|value| {
- matches!(
- value.trim().to_ascii_lowercase().as_str(),
- "1" | "true" | "yes" | "y" | "on"
- )
- })
- .unwrap_or(false)
+fn allow_unauthenticated_admin(config: &ServerConfig) -> bool {
+ config.admin.allow_unauthenticated
+ || std::env::var("IAM_ALLOW_UNAUTHENTICATED_ADMIN")
+ .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_UNAUTHENTICATED_ADMIN"))
+ .ok()
+ .map(|value| {
+ matches!(
+ value.trim().to_ascii_lowercase().as_str(),
+ "1" | "true" | "yes" | "y" | "on"
+ )
+ })
+ .unwrap_or(false)
}
fn admin_token_valid(metadata: &MetadataMap, token: &str) -> bool {
@@ -334,19 +335,20 @@ async fn main() -> Result<(), Box> {
// Create token service
let signing_key = if config.authn.internal_token.signing_key.is_empty() {
- let allow_random = std::env::var("IAM_ALLOW_RANDOM_SIGNING_KEY")
- .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_RANDOM_SIGNING_KEY"))
- .ok()
- .map(|value| {
- matches!(
- value.trim().to_lowercase().as_str(),
- "1" | "true" | "yes" | "y" | "on"
- )
- })
- .unwrap_or(false);
+ let allow_random = config.dev.allow_random_signing_key
+ || std::env::var("IAM_ALLOW_RANDOM_SIGNING_KEY")
+ .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_RANDOM_SIGNING_KEY"))
+ .ok()
+ .map(|value| {
+ matches!(
+ value.trim().to_lowercase().as_str(),
+ "1" | "true" | "yes" | "y" | "on"
+ )
+ })
+ .unwrap_or(false);
if !allow_random {
- return Err("No signing key configured. Set IAM_ALLOW_RANDOM_SIGNING_KEY=true for dev or configure authn.internal_token.signing_key.".into());
+ return Err("No signing key configured. Set dev.allow_random_signing_key=true (or IAM_ALLOW_RANDOM_SIGNING_KEY=true for legacy dev mode) or configure authn.internal_token.signing_key.".into());
}
warn!("No signing key configured, generating random key (dev-only)");
@@ -369,9 +371,9 @@ async fn main() -> Result<(), Box> {
let token_service = Arc::new(InternalTokenService::new(token_config));
let admin_token = load_admin_token();
- if admin_token.is_none() && !allow_unauthenticated_admin() {
+ if admin_token.is_none() && !allow_unauthenticated_admin(&config) {
return Err(
- "IAM admin token not configured. Set IAM_ADMIN_TOKEN or explicitly allow dev mode with IAM_ALLOW_UNAUTHENTICATED_ADMIN=true."
+ "IAM admin token not configured. Set IAM_ADMIN_TOKEN or explicitly allow dev mode with admin.allow_unauthenticated=true."
.into(),
);
}
@@ -550,19 +552,20 @@ async fn create_backend(
) -> Result> {
match config.store.backend {
BackendKind::Memory => {
- let allow_memory = std::env::var("IAM_ALLOW_MEMORY_BACKEND")
- .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_MEMORY_BACKEND"))
- .ok()
- .map(|value| {
- matches!(
- value.trim().to_ascii_lowercase().as_str(),
- "1" | "true" | "yes" | "on"
- )
- })
- .unwrap_or(false);
+ let allow_memory = config.dev.allow_memory_backend
+ || std::env::var("IAM_ALLOW_MEMORY_BACKEND")
+ .or_else(|_| std::env::var("PHOTON_IAM_ALLOW_MEMORY_BACKEND"))
+ .ok()
+ .map(|value| {
+ matches!(
+ value.trim().to_ascii_lowercase().as_str(),
+ "1" | "true" | "yes" | "on"
+ )
+ })
+ .unwrap_or(false);
if !allow_memory {
return Err(
- "In-memory IAM backend is disabled. Use FlareDB backend, or set IAM_ALLOW_MEMORY_BACKEND=true for tests/dev only."
+ "In-memory IAM backend is disabled. Use FlareDB backend, or set dev.allow_memory_backend=true for tests/dev only."
.into(),
);
}
diff --git a/k8shost/crates/k8shost-server/src/config.rs b/k8shost/crates/k8shost-server/src/config.rs
index d2cbdd6..9f47b58 100644
--- a/k8shost/crates/k8shost-server/src/config.rs
+++ b/k8shost/crates/k8shost-server/src/config.rs
@@ -91,6 +91,18 @@ impl Default for PrismNetConfig {
}
}
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct CreditServiceConfig {
+ #[serde(default)]
+ pub server_addr: Option,
+}
+
+impl Default for CreditServiceConfig {
+ fn default() -> Self {
+ Self { server_addr: None }
+ }
+}
+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ChainFireConfig {
pub endpoint: Option,
@@ -110,6 +122,8 @@ pub struct Config {
pub flaredb: FlareDbConfig,
pub chainfire: ChainFireConfig,
pub iam: IamConfig,
+ #[serde(default)]
+ pub creditservice: CreditServiceConfig,
pub fiberlb: FiberLbConfig,
pub flashdns: FlashDnsConfig,
pub prismnet: PrismNetConfig,
diff --git a/k8shost/crates/k8shost-server/src/main.rs b/k8shost/crates/k8shost-server/src/main.rs
index 0a1bfad..f3ecbb5 100644
--- a/k8shost/crates/k8shost-server/src/main.rs
+++ b/k8shost/crates/k8shost-server/src/main.rs
@@ -38,8 +38,8 @@ use tracing_subscriber::EnvFilter;
#[command(about = "Kubernetes API server for PlasmaCloud's k8shost component")]
struct Args {
/// Configuration file path
- #[arg(short, long)]
- config: Option,
+ #[arg(short, long, default_value = "k8shost.toml")]
+ config: PathBuf,
/// Listen address for gRPC server (e.g., "[::]:6443")
#[arg(long)]
@@ -91,17 +91,19 @@ async fn main() -> Result<(), Box> {
let args = Args::parse();
// Load configuration
- let mut settings = ::config::Config::builder()
- .add_source(::config::File::from_str(
- toml::to_string(&Config::default())?.as_str(),
- ::config::FileFormat::Toml,
- ))
- .add_source(::config::Environment::with_prefix("K8SHOST").separator("_"));
+ let mut settings = ::config::Config::builder().add_source(::config::File::from_str(
+ toml::to_string(&Config::default())?.as_str(),
+ ::config::FileFormat::Toml,
+ ));
- // Add config file if specified
- if let Some(config_path) = &args.config {
- info!("Loading config from file: {}", config_path.display());
- settings = settings.add_source(::config::File::from(config_path.as_path()));
+ if args.config.exists() {
+ info!("Loading config from file: {}", args.config.display());
+ settings = settings.add_source(::config::File::from(args.config.as_path()));
+ } else {
+ info!(
+ "Config file not found: {}, using defaults",
+ args.config.display()
+ );
}
let loaded_config: Config = settings
@@ -136,6 +138,9 @@ async fn main() -> Result<(), Box> {
.iam_server_addr
.unwrap_or(loaded_config.iam.server_addr),
},
+ creditservice: config::CreditServiceConfig {
+ server_addr: loaded_config.creditservice.server_addr,
+ },
fiberlb: config::FiberLbConfig {
server_addr: args
.fiberlb_server_addr
@@ -275,8 +280,14 @@ async fn main() -> Result<(), Box> {
let ipam_client = Arc::new(IpamClient::new(config.prismnet.server_addr.clone()));
// Create service implementations with storage
+ let creditservice_endpoint = config.creditservice.server_addr.as_deref();
let pod_service = Arc::new(
- PodServiceImpl::new_with_credit_service(storage.clone(), auth_service.clone()).await,
+ PodServiceImpl::new_with_credit_service(
+ storage.clone(),
+ auth_service.clone(),
+ creditservice_endpoint,
+ )
+ .await,
);
let service_service = Arc::new(ServiceServiceImpl::new(
storage.clone(),
@@ -290,7 +301,10 @@ async fn main() -> Result<(), Box> {
));
// Start scheduler in background with CreditService integration
- let scheduler = Arc::new(scheduler::Scheduler::new_with_credit_service(storage.clone()).await);
+ let scheduler = Arc::new(
+ scheduler::Scheduler::new_with_credit_service(storage.clone(), creditservice_endpoint)
+ .await,
+ );
tokio::spawn(async move {
scheduler.run().await;
});
diff --git a/k8shost/crates/k8shost-server/src/scheduler.rs b/k8shost/crates/k8shost-server/src/scheduler.rs
index 7976990..63bb3f9 100644
--- a/k8shost/crates/k8shost-server/src/scheduler.rs
+++ b/k8shost/crates/k8shost-server/src/scheduler.rs
@@ -33,27 +33,31 @@ impl Scheduler {
}
/// Create a new scheduler with CreditService quota enforcement
- pub async fn new_with_credit_service(storage: Arc) -> Self {
+ pub async fn new_with_credit_service(storage: Arc, endpoint: Option<&str>) -> Self {
// Initialize CreditService client if endpoint is configured
- let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") {
- Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await {
- Ok(client) => {
- info!(
- "Scheduler: CreditService quota enforcement enabled: {}",
- endpoint
- );
- Some(Arc::new(RwLock::new(client)))
- }
- Err(e) => {
- warn!(
+ let credit_service = match endpoint {
+ Some(endpoint) if !endpoint.trim().is_empty() => {
+ match CreditServiceClient::connect(endpoint).await {
+ Ok(client) => {
+ info!(
+ "Scheduler: CreditService quota enforcement enabled: {}",
+ endpoint
+ );
+ Some(Arc::new(RwLock::new(client)))
+ }
+ Err(e) => {
+ warn!(
"Scheduler: Failed to connect to CreditService (quota enforcement disabled): {}",
e
);
- None
+ None
+ }
}
- },
- Err(_) => {
- info!("Scheduler: CREDITSERVICE_ENDPOINT not set, quota enforcement disabled");
+ }
+ _ => {
+ info!(
+ "Scheduler: CreditService endpoint not configured, quota enforcement disabled"
+ );
None
}
};
diff --git a/k8shost/crates/k8shost-server/src/services/pod.rs b/k8shost/crates/k8shost-server/src/services/pod.rs
index ca518b8..366b513 100644
--- a/k8shost/crates/k8shost-server/src/services/pod.rs
+++ b/k8shost/crates/k8shost-server/src/services/pod.rs
@@ -45,24 +45,30 @@ impl PodServiceImpl {
}
}
- pub async fn new_with_credit_service(storage: Arc, auth: Arc) -> Self {
+ pub async fn new_with_credit_service(
+ storage: Arc,
+ auth: Arc,
+ endpoint: Option<&str>,
+ ) -> Self {
// Initialize CreditService client if endpoint is configured
- let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") {
- Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await {
- Ok(client) => {
- tracing::info!("CreditService admission control enabled: {}", endpoint);
- Some(Arc::new(RwLock::new(client)))
+ let credit_service = match endpoint {
+ Some(endpoint) if !endpoint.trim().is_empty() => {
+ match CreditServiceClient::connect(endpoint).await {
+ Ok(client) => {
+ tracing::info!("CreditService admission control enabled: {}", endpoint);
+ Some(Arc::new(RwLock::new(client)))
+ }
+ Err(e) => {
+ tracing::warn!(
+ "Failed to connect to CreditService (admission control disabled): {}",
+ e
+ );
+ None
+ }
}
- Err(e) => {
- tracing::warn!(
- "Failed to connect to CreditService (admission control disabled): {}",
- e
- );
- None
- }
- },
- Err(_) => {
- tracing::info!("CREDITSERVICE_ENDPOINT not set, admission control disabled");
+ }
+ _ => {
+ tracing::info!("CreditService endpoint not configured, admission control disabled");
None
}
};
diff --git a/lightningstor/crates/lightningstor-server/src/config.rs b/lightningstor/crates/lightningstor-server/src/config.rs
index e9b19ba..690c89f 100644
--- a/lightningstor/crates/lightningstor-server/src/config.rs
+++ b/lightningstor/crates/lightningstor-server/src/config.rs
@@ -50,6 +50,75 @@ pub enum ObjectStorageBackend {
Distributed,
}
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct S3AuthConfig {
+ #[serde(default = "default_s3_auth_enabled")]
+ pub enabled: bool,
+
+ #[serde(default = "default_s3_aws_region")]
+ pub aws_region: String,
+
+ #[serde(default = "default_s3_iam_cache_ttl_secs")]
+ pub iam_cache_ttl_secs: u64,
+
+ #[serde(default = "default_s3_default_org_id")]
+ pub default_org_id: Option,
+
+ #[serde(default = "default_s3_default_project_id")]
+ pub default_project_id: Option,
+
+ #[serde(default = "default_s3_max_auth_body_bytes")]
+ pub max_auth_body_bytes: usize,
+}
+
+impl Default for S3AuthConfig {
+ fn default() -> Self {
+ Self {
+ enabled: default_s3_auth_enabled(),
+ aws_region: default_s3_aws_region(),
+ iam_cache_ttl_secs: default_s3_iam_cache_ttl_secs(),
+ default_org_id: default_s3_default_org_id(),
+ default_project_id: default_s3_default_project_id(),
+ max_auth_body_bytes: default_s3_max_auth_body_bytes(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct S3PerformanceConfig {
+ #[serde(default = "default_s3_streaming_put_threshold_bytes")]
+ pub streaming_put_threshold_bytes: usize,
+
+ #[serde(default = "default_s3_inline_put_max_bytes")]
+ pub inline_put_max_bytes: usize,
+
+ #[serde(default = "default_s3_multipart_put_concurrency")]
+ pub multipart_put_concurrency: usize,
+
+ #[serde(default = "default_s3_multipart_fetch_concurrency")]
+ pub multipart_fetch_concurrency: usize,
+}
+
+impl Default for S3PerformanceConfig {
+ fn default() -> Self {
+ Self {
+ streaming_put_threshold_bytes: default_s3_streaming_put_threshold_bytes(),
+ inline_put_max_bytes: default_s3_inline_put_max_bytes(),
+ multipart_put_concurrency: default_s3_multipart_put_concurrency(),
+ multipart_fetch_concurrency: default_s3_multipart_fetch_concurrency(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct S3Config {
+ #[serde(default)]
+ pub auth: S3AuthConfig,
+
+ #[serde(default)]
+ pub performance: S3PerformanceConfig,
+}
+
/// Server configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
@@ -100,6 +169,10 @@ pub struct ServerConfig {
/// Authentication configuration
#[serde(default)]
pub auth: AuthConfig,
+
+ /// S3 API runtime settings
+ #[serde(default)]
+ pub s3: S3Config,
}
/// Authentication configuration
@@ -114,6 +187,46 @@ fn default_iam_server_addr() -> String {
"127.0.0.1:50051".to_string()
}
+fn default_s3_auth_enabled() -> bool {
+ true
+}
+
+fn default_s3_aws_region() -> String {
+ "us-east-1".to_string()
+}
+
+fn default_s3_iam_cache_ttl_secs() -> u64 {
+ 30
+}
+
+fn default_s3_default_org_id() -> Option {
+ Some("default".to_string())
+}
+
+fn default_s3_default_project_id() -> Option {
+ Some("default".to_string())
+}
+
+fn default_s3_max_auth_body_bytes() -> usize {
+ 1024 * 1024 * 1024
+}
+
+fn default_s3_streaming_put_threshold_bytes() -> usize {
+ 16 * 1024 * 1024
+}
+
+fn default_s3_inline_put_max_bytes() -> usize {
+ 128 * 1024 * 1024
+}
+
+fn default_s3_multipart_put_concurrency() -> usize {
+ 4
+}
+
+fn default_s3_multipart_fetch_concurrency() -> usize {
+ 4
+}
+
impl Default for AuthConfig {
fn default() -> Self {
Self {
@@ -139,6 +252,7 @@ impl Default for ServerConfig {
sync_on_write: false,
tls: None,
auth: AuthConfig::default(),
+ s3: S3Config::default(),
}
}
}
diff --git a/lightningstor/crates/lightningstor-server/src/main.rs b/lightningstor/crates/lightningstor-server/src/main.rs
index f55d8d5..1961001 100644
--- a/lightningstor/crates/lightningstor-server/src/main.rs
+++ b/lightningstor/crates/lightningstor-server/src/main.rs
@@ -5,7 +5,7 @@ use clap::Parser;
use iam_service_auth::AuthService;
use lightningstor_api::{BucketServiceServer, ObjectServiceServer};
use lightningstor_distributed::{
- DistributedConfig, ErasureCodedBackend, RedundancyMode, ReplicatedBackend, RepairQueue,
+ DistributedConfig, ErasureCodedBackend, RedundancyMode, RepairQueue, ReplicatedBackend,
StaticNodeRegistry,
};
use lightningstor_server::{
@@ -57,26 +57,6 @@ struct Args {
#[arg(short, long)]
log_level: Option,
- /// ChainFire endpoint for cluster coordination (overrides config)
- #[arg(long, env = "LIGHTNINGSTOR_CHAINFIRE_ENDPOINT")]
- chainfire_endpoint: Option,
-
- /// FlareDB endpoint for metadata and tenant data storage (overrides config)
- #[arg(long, env = "LIGHTNINGSTOR_FLAREDB_ENDPOINT")]
- flaredb_endpoint: Option,
-
- /// Metadata backend (flaredb, postgres, sqlite)
- #[arg(long, env = "LIGHTNINGSTOR_METADATA_BACKEND")]
- metadata_backend: Option,
-
- /// SQL database URL for metadata (required for postgres/sqlite backend)
- #[arg(long, env = "LIGHTNINGSTOR_METADATA_DATABASE_URL")]
- metadata_database_url: Option,
-
- /// Run in single-node mode (required when metadata backend is SQLite)
- #[arg(long, env = "LIGHTNINGSTOR_SINGLE_NODE")]
- single_node: bool,
-
/// Data directory for object storage (overrides config)
#[arg(long)]
data_dir: Option,
@@ -112,21 +92,6 @@ async fn main() -> Result<(), Box> {
if let Some(log_level) = args.log_level {
config.log_level = log_level;
}
- if let Some(chainfire_endpoint) = args.chainfire_endpoint {
- config.chainfire_endpoint = Some(chainfire_endpoint);
- }
- if let Some(flaredb_endpoint) = args.flaredb_endpoint {
- config.flaredb_endpoint = Some(flaredb_endpoint);
- }
- if let Some(metadata_backend) = args.metadata_backend {
- config.metadata_backend = parse_metadata_backend(&metadata_backend)?;
- }
- if let Some(metadata_database_url) = args.metadata_database_url {
- config.metadata_database_url = Some(metadata_database_url);
- }
- if args.single_node {
- config.single_node = true;
- }
if let Some(data_dir) = args.data_dir {
config.data_dir = data_dir;
}
@@ -182,20 +147,17 @@ async fn main() -> Result<(), Box> {
config.flaredb_endpoint.clone(),
config.chainfire_endpoint.clone(),
)
- .await
- .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?,
+ .await
+ .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?,
)
}
MetadataBackend::Postgres | MetadataBackend::Sqlite => {
- let database_url = config
- .metadata_database_url
- .as_deref()
- .ok_or_else(|| {
- format!(
- "metadata_database_url is required when metadata_backend={} (env: LIGHTNINGSTOR_METADATA_DATABASE_URL)",
- metadata_backend_name(config.metadata_backend)
- )
- })?;
+ let database_url = config.metadata_database_url.as_deref().ok_or_else(|| {
+ format!(
+ "metadata_database_url is required when metadata_backend={}",
+ metadata_backend_name(config.metadata_backend)
+ )
+ })?;
ensure_sql_backend_matches_url(config.metadata_backend, database_url)?;
tracing::info!(
"Metadata backend: {} @ {}",
@@ -263,10 +225,11 @@ async fn main() -> Result<(), Box> {
let s3_addr: SocketAddr = config.s3_addr;
// Start S3 HTTP server with shared state
- let s3_router = s3::create_router_with_auth(
+ let s3_router = s3::create_router_with_auth_config(
storage.clone(),
metadata.clone(),
Some(config.auth.iam_server_addr.clone()),
+ config.s3.clone(),
);
let s3_server = tokio::spawn(async move {
tracing::info!("S3 HTTP server listening on {}", s3_addr);
@@ -341,19 +304,6 @@ async fn main() -> Result<(), Box> {
Ok(())
}
-fn parse_metadata_backend(value: &str) -> Result> {
- match value.trim().to_ascii_lowercase().as_str() {
- "flaredb" => Ok(MetadataBackend::FlareDb),
- "postgres" => Ok(MetadataBackend::Postgres),
- "sqlite" => Ok(MetadataBackend::Sqlite),
- other => Err(format!(
- "invalid metadata backend '{}'; expected one of: flaredb, postgres, sqlite",
- other
- )
- .into()),
- }
-}
-
fn metadata_backend_name(backend: MetadataBackend) -> &'static str {
match backend {
MetadataBackend::FlareDb => "flaredb",
@@ -442,7 +392,9 @@ async fn create_storage_backend(
ObjectStorageBackend::LocalFs => {
tracing::info!("Object storage backend: local_fs");
Ok(StorageRuntime {
- backend: Arc::new(LocalFsBackend::new(&config.data_dir, config.sync_on_write).await?),
+ backend: Arc::new(
+ LocalFsBackend::new(&config.data_dir, config.sync_on_write).await?,
+ ),
repair_worker: None,
})
}
diff --git a/lightningstor/crates/lightningstor-server/src/metadata.rs b/lightningstor/crates/lightningstor-server/src/metadata.rs
index e44d993..3356b59 100644
--- a/lightningstor/crates/lightningstor-server/src/metadata.rs
+++ b/lightningstor/crates/lightningstor-server/src/metadata.rs
@@ -55,26 +55,25 @@ impl MetadataStore {
endpoint: Option,
pd_endpoint: Option,
) -> Result {
- let endpoint = endpoint.unwrap_or_else(|| {
- std::env::var("LIGHTNINGSTOR_FLAREDB_ENDPOINT")
- .unwrap_or_else(|_| "127.0.0.1:2479".to_string())
- });
+ let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string());
let pd_endpoint = pd_endpoint
- .or_else(|| std::env::var("LIGHTNINGSTOR_CHAINFIRE_ENDPOINT").ok())
.map(|value| normalize_transport_addr(&value))
.unwrap_or_else(|| endpoint.clone());
let mut clients = Vec::with_capacity(FLAREDB_CLIENT_POOL_SIZE);
for _ in 0..FLAREDB_CLIENT_POOL_SIZE {
- let client =
- RdbClient::connect_with_pd_namespace(endpoint.clone(), pd_endpoint.clone(), "lightningstor")
- .await
- .map_err(|e| {
- lightningstor_types::Error::StorageError(format!(
- "Failed to connect to FlareDB: {}",
- e
- ))
- })?;
+ let client = RdbClient::connect_with_pd_namespace(
+ endpoint.clone(),
+ pd_endpoint.clone(),
+ "lightningstor",
+ )
+ .await
+ .map_err(|e| {
+ lightningstor_types::Error::StorageError(format!(
+ "Failed to connect to FlareDB: {}",
+ e
+ ))
+ })?;
clients.push(Arc::new(Mutex::new(client)));
}
@@ -321,7 +320,11 @@ impl MetadataStore {
Ok((results, next))
}
- async fn flaredb_put(clients: &[Arc>], key: &[u8], value: &[u8]) -> Result<()> {
+ async fn flaredb_put(
+ clients: &[Arc>],
+ key: &[u8],
+ value: &[u8],
+ ) -> Result<()> {
let client = Self::flaredb_client_for_key(clients, key);
let raw_result = {
let mut c = client.lock().await;
@@ -443,7 +446,8 @@ impl MetadataStore {
let client = Self::flaredb_scan_client(clients);
let (mut items, next) = match {
let mut c = client.lock().await;
- c.raw_scan(start_key.clone(), end_key.clone(), fetch_limit).await
+ c.raw_scan(start_key.clone(), end_key.clone(), fetch_limit)
+ .await
} {
Ok((keys, values, next)) => {
let items = keys
@@ -697,12 +701,13 @@ impl MetadataStore {
.await
}
StorageBackend::Sql(sql) => {
- let prefix_end = String::from_utf8(Self::prefix_end(prefix.as_bytes())).map_err(|e| {
- lightningstor_types::Error::StorageError(format!(
- "Failed to encode prefix end: {}",
- e
- ))
- })?;
+ let prefix_end =
+ String::from_utf8(Self::prefix_end(prefix.as_bytes())).map_err(|e| {
+ lightningstor_types::Error::StorageError(format!(
+ "Failed to encode prefix end: {}",
+ e
+ ))
+ })?;
let fetch_limit = (limit.saturating_add(1)) as i64;
match sql {
SqlStorageBackend::Postgres(pool) => {
@@ -908,7 +913,10 @@ impl MetadataStore {
}
fn multipart_bucket_prefix(bucket_id: &BucketId, prefix: &str) -> String {
- format!("/lightningstor/multipart/by-bucket/{}/{}", bucket_id, prefix)
+ format!(
+ "/lightningstor/multipart/by-bucket/{}/{}",
+ bucket_id, prefix
+ )
}
fn multipart_object_key(object_id: &ObjectId) -> String {
@@ -955,7 +963,8 @@ impl MetadataStore {
}
pub async fn delete_replicated_repair_task(&self, task_id: &str) -> Result<()> {
- self.delete_key(&Self::replicated_repair_task_key(task_id)).await
+ self.delete_key(&Self::replicated_repair_task_key(task_id))
+ .await
}
/// Save bucket metadata
@@ -1246,7 +1255,8 @@ impl MetadataStore {
))
.await?;
}
- self.delete_key(&Self::multipart_upload_key(upload_id)).await
+ self.delete_key(&Self::multipart_upload_key(upload_id))
+ .await
}
pub async fn list_multipart_uploads(
@@ -1331,7 +1341,8 @@ impl MetadataStore {
}
pub async fn delete_object_multipart_upload(&self, object_id: &ObjectId) -> Result<()> {
- self.delete_key(&Self::multipart_object_key(object_id)).await
+ self.delete_key(&Self::multipart_object_key(object_id))
+ .await
}
}
@@ -1380,13 +1391,11 @@ mod tests {
store.delete_bucket(&bucket).await.unwrap();
assert!(!store.bucket_cache.contains_key(&cache_key));
assert!(!store.bucket_cache.contains_key(&cache_id_key));
- assert!(
- store
- .load_bucket("org-a", "project-a", "bench-bucket")
- .await
- .unwrap()
- .is_none()
- );
+ assert!(store
+ .load_bucket("org-a", "project-a", "bench-bucket")
+ .await
+ .unwrap()
+ .is_none());
}
#[tokio::test]
@@ -1426,13 +1435,11 @@ mod tests {
.await
.unwrap();
assert!(!store.object_cache.contains_key(&cache_key));
- assert!(
- store
- .load_object(&bucket.id, object.key.as_str(), None)
- .await
- .unwrap()
- .is_none()
- );
+ assert!(store
+ .load_object(&bucket.id, object.key.as_str(), None)
+ .await
+ .unwrap()
+ .is_none());
}
#[tokio::test]
@@ -1496,8 +1503,10 @@ mod tests {
);
store.save_bucket(&bucket).await.unwrap();
- let upload_a = MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/one.bin").unwrap());
- let upload_b = MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/two.bin").unwrap());
+ let upload_a =
+ MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/one.bin").unwrap());
+ let upload_b =
+ MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/two.bin").unwrap());
let other_bucket = Bucket::new(
BucketName::new("other-bucket").unwrap(),
"org-a",
@@ -1505,8 +1514,10 @@ mod tests {
"default",
);
store.save_bucket(&other_bucket).await.unwrap();
- let upload_other =
- MultipartUpload::new(other_bucket.id.to_string(), ObjectKey::new("a/three.bin").unwrap());
+ let upload_other = MultipartUpload::new(
+ other_bucket.id.to_string(),
+ ObjectKey::new("a/three.bin").unwrap(),
+ );
store.save_multipart_upload(&upload_a).await.unwrap();
store.save_multipart_upload(&upload_b).await.unwrap();
@@ -1543,10 +1554,7 @@ mod tests {
assert_eq!(tasks[0].attempt_count, 1);
assert_eq!(tasks[0].last_error.as_deref(), Some("transient failure"));
- store
- .delete_replicated_repair_task(&task.id)
- .await
- .unwrap();
+ store.delete_replicated_repair_task(&task.id).await.unwrap();
assert!(store
.list_replicated_repair_tasks(10)
.await
diff --git a/lightningstor/crates/lightningstor-server/src/s3/auth.rs b/lightningstor/crates/lightningstor-server/src/s3/auth.rs
index 4552ebc..7c90087 100644
--- a/lightningstor/crates/lightningstor-server/src/s3/auth.rs
+++ b/lightningstor/crates/lightningstor-server/src/s3/auth.rs
@@ -3,6 +3,7 @@
//! Implements simplified SigV4 authentication compatible with AWS S3 SDKs and aws-cli.
//! Integrates with IAM for access key validation.
+use crate::config::S3AuthConfig;
use crate::tenant::TenantContext;
use axum::{
body::{Body, Bytes},
@@ -23,8 +24,6 @@ use tracing::{debug, warn};
use url::form_urlencoded;
type HmacSha256 = Hmac;
-const DEFAULT_MAX_AUTH_BODY_BYTES: usize = 1024 * 1024 * 1024;
-
#[derive(Clone, Debug)]
pub(crate) struct VerifiedBodyBytes(pub Bytes);
@@ -49,6 +48,8 @@ pub struct AuthState {
aws_region: String,
/// AWS service name for SigV4 (e.g., s3)
aws_service: String,
+ /// Maximum request body size to buffer during auth verification
+ max_auth_body_bytes: usize,
}
pub struct IamClient {
@@ -60,6 +61,8 @@ pub struct IamClient {
enum IamClientMode {
Env {
credentials: std::collections::HashMap,
+ default_org_id: Option,
+ default_project_id: Option,
},
Grpc {
endpoint: String,
@@ -83,11 +86,11 @@ struct CachedCredential {
impl IamClient {
/// Create a new IAM client. If an endpoint is supplied, use the IAM gRPC API.
pub fn new(iam_endpoint: Option) -> Self {
- let cache_ttl = std::env::var("LIGHTNINGSTOR_S3_IAM_CACHE_TTL_SECS")
- .ok()
- .and_then(|value| value.parse::().ok())
- .map(StdDuration::from_secs)
- .unwrap_or_else(|| StdDuration::from_secs(30));
+ Self::new_with_config(iam_endpoint, &S3AuthConfig::default())
+ }
+
+ pub fn new_with_config(iam_endpoint: Option, config: &S3AuthConfig) -> Self {
+ let cache_ttl = StdDuration::from_secs(config.iam_cache_ttl_secs);
if let Some(endpoint) = iam_endpoint
.map(|value| normalize_iam_endpoint(&value))
@@ -106,6 +109,8 @@ impl IamClient {
Self {
mode: IamClientMode::Env {
credentials: Self::load_env_credentials(),
+ default_org_id: config.default_org_id.clone(),
+ default_project_id: config.default_project_id.clone(),
},
credential_cache: Arc::new(RwLock::new(HashMap::new())),
cache_ttl,
@@ -160,32 +165,32 @@ impl IamClient {
#[cfg(test)]
fn env_credentials(&self) -> Option<&std::collections::HashMap> {
match &self.mode {
- IamClientMode::Env { credentials } => Some(credentials),
+ IamClientMode::Env { credentials, .. } => Some(credentials),
IamClientMode::Grpc { .. } => None,
}
}
- fn env_default_tenant() -> (Option, Option) {
- let org_id = std::env::var("S3_TENANT_ORG_ID")
- .ok()
- .or_else(|| std::env::var("S3_ORG_ID").ok())
- .or_else(|| Some("default".to_string()));
- let project_id = std::env::var("S3_TENANT_PROJECT_ID")
- .ok()
- .or_else(|| std::env::var("S3_PROJECT_ID").ok())
- .or_else(|| Some("default".to_string()));
- (org_id, project_id)
+ fn env_default_tenant(
+ default_org_id: Option,
+ default_project_id: Option,
+ ) -> (Option, Option) {
+ (default_org_id, default_project_id)
}
/// Validate access key and resolve the credential context.
pub async fn get_credential(&self, access_key_id: &str) -> Result {
match &self.mode {
- IamClientMode::Env { credentials } => {
+ IamClientMode::Env {
+ credentials,
+ default_org_id,
+ default_project_id,
+ } => {
let secret_key = credentials
.get(access_key_id)
.cloned()
.ok_or_else(|| "Access key ID not found".to_string())?;
- let (org_id, project_id) = Self::env_default_tenant();
+ let (org_id, project_id) =
+ Self::env_default_tenant(default_org_id.clone(), default_project_id.clone());
Ok(ResolvedCredential {
secret_key,
principal_id: access_key_id.to_string(),
@@ -318,16 +323,21 @@ fn iam_admin_token() -> Option {
impl AuthState {
/// Create new auth state with IAM integration
pub fn new(iam_endpoint: Option) -> Self {
- let iam_client = Some(Arc::new(RwLock::new(IamClient::new(iam_endpoint))));
+ Self::new_with_config(iam_endpoint, &S3AuthConfig::default())
+ }
+
+ pub fn new_with_config(iam_endpoint: Option, config: &S3AuthConfig) -> Self {
+ let iam_client = Some(Arc::new(RwLock::new(IamClient::new_with_config(
+ iam_endpoint,
+ config,
+ ))));
Self {
iam_client,
- enabled: std::env::var("S3_AUTH_ENABLED")
- .unwrap_or_else(|_| "true".to_string())
- .parse()
- .unwrap_or(true),
- aws_region: std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()), // Default S3 region
+ enabled: config.enabled,
+ aws_region: config.aws_region.clone(),
aws_service: "s3".to_string(),
+ max_auth_body_bytes: config.max_auth_body_bytes,
}
}
@@ -338,6 +348,7 @@ impl AuthState {
enabled: false,
aws_region: "us-east-1".to_string(),
aws_service: "s3".to_string(),
+ max_auth_body_bytes: S3AuthConfig::default().max_auth_body_bytes,
}
}
}
@@ -438,12 +449,8 @@ pub async fn sigv4_auth_middleware(
let should_buffer_body = should_buffer_auth_body(payload_hash_header.as_deref());
let body_bytes = if should_buffer_body {
- let max_body_bytes = std::env::var("S3_MAX_AUTH_BODY_BYTES")
- .ok()
- .and_then(|value| value.parse::().ok())
- .unwrap_or(DEFAULT_MAX_AUTH_BODY_BYTES);
let (parts, body) = request.into_parts();
- let body_bytes = match axum::body::to_bytes(body, max_body_bytes).await {
+ let body_bytes = match axum::body::to_bytes(body, auth_state.max_auth_body_bytes).await {
Ok(b) => b,
Err(e) => {
return error_response(
diff --git a/lightningstor/crates/lightningstor-server/src/s3/mod.rs b/lightningstor/crates/lightningstor-server/src/s3/mod.rs
index 4018331..10c9bed 100644
--- a/lightningstor/crates/lightningstor-server/src/s3/mod.rs
+++ b/lightningstor/crates/lightningstor-server/src/s3/mod.rs
@@ -6,5 +6,8 @@ mod auth;
mod router;
mod xml;
-pub use auth::{AuthState, sigv4_auth_middleware};
-pub use router::{create_router, create_router_with_auth, create_router_with_state};
+pub use auth::{sigv4_auth_middleware, AuthState};
+pub use router::{
+ create_router, create_router_with_auth, create_router_with_auth_config,
+ create_router_with_state,
+};
diff --git a/lightningstor/crates/lightningstor-server/src/s3/router.rs b/lightningstor/crates/lightningstor-server/src/s3/router.rs
index 5001d5f..cff2270 100644
--- a/lightningstor/crates/lightningstor-server/src/s3/router.rs
+++ b/lightningstor/crates/lightningstor-server/src/s3/router.rs
@@ -1,5 +1,6 @@
//! S3 API router using Axum
+use crate::config::{S3Config, S3PerformanceConfig};
use axum::{
body::{Body, Bytes},
extract::{Request, State},
@@ -14,8 +15,8 @@ use futures::{stream, stream::FuturesUnordered, StreamExt};
use http_body_util::BodyExt;
use md5::{Digest, Md5};
use serde::Deserialize;
-use std::io;
use sha2::Sha256;
+use std::io;
use std::sync::Arc;
use tokio::task::JoinHandle;
@@ -38,17 +39,24 @@ use super::xml::{
pub struct S3State {
pub storage: Arc,
pub metadata: Arc,
+ pub performance: S3PerformanceConfig,
}
-// Keep streamed single-PUT parts aligned with the distributed backend's
-// large-object chunking so GET does not degrade into many small serial reads.
-const DEFAULT_STREAMING_PUT_THRESHOLD_BYTES: usize = 16 * 1024 * 1024;
-const DEFAULT_INLINE_PUT_MAX_BYTES: usize = 128 * 1024 * 1024;
-const DEFAULT_MULTIPART_PUT_CONCURRENCY: usize = 4;
-
impl S3State {
pub fn new(storage: Arc, metadata: Arc) -> Self {
- Self { storage, metadata }
+ Self::new_with_config(storage, metadata, S3PerformanceConfig::default())
+ }
+
+ pub fn new_with_config(
+ storage: Arc,
+ metadata: Arc,
+ performance: S3PerformanceConfig,
+ ) -> Self {
+ Self {
+ storage,
+ metadata,
+ performance,
+ }
}
}
@@ -57,7 +65,12 @@ pub fn create_router_with_state(
storage: Arc,
metadata: Arc,
) -> Router {
- create_router_with_auth_state(storage, metadata, Arc::new(AuthState::new(None)))
+ create_router_with_auth_state(
+ storage,
+ metadata,
+ Arc::new(AuthState::new(None)),
+ S3PerformanceConfig::default(),
+ )
}
/// Create the S3-compatible HTTP router with auth and storage backends
@@ -66,15 +79,30 @@ pub fn create_router_with_auth(
metadata: Arc,
iam_endpoint: Option,
) -> Router {
- create_router_with_auth_state(storage, metadata, Arc::new(AuthState::new(iam_endpoint)))
+ create_router_with_auth_config(storage, metadata, iam_endpoint, S3Config::default())
+}
+
+pub fn create_router_with_auth_config(
+ storage: Arc,
+ metadata: Arc,
+ iam_endpoint: Option,
+ config: S3Config,
+) -> Router {
+ create_router_with_auth_state(
+ storage,
+ metadata,
+ Arc::new(AuthState::new_with_config(iam_endpoint, &config.auth)),
+ config.performance,
+ )
}
fn create_router_with_auth_state(
storage: Arc,
metadata: Arc,
auth_state: Arc,
+ performance: S3PerformanceConfig,
) -> Router {
- let state = Arc::new(S3State::new(storage, metadata));
+ let state = Arc::new(S3State::new_with_config(storage, metadata, performance));
Router::new()
// Catch-all route for ALL operations (including root /)
@@ -126,28 +154,16 @@ fn request_tenant(extensions: &axum::http::Extensions) -> TenantContext {
})
}
-fn streaming_put_threshold_bytes() -> usize {
- std::env::var("LIGHTNINGSTOR_S3_STREAMING_PUT_THRESHOLD_BYTES")
- .ok()
- .and_then(|value| value.parse::().ok())
- .filter(|value| *value > 0)
- .unwrap_or(DEFAULT_STREAMING_PUT_THRESHOLD_BYTES)
+fn streaming_put_threshold_bytes(state: &Arc) -> usize {
+ state.performance.streaming_put_threshold_bytes
}
-fn inline_put_max_bytes() -> usize {
- std::env::var("LIGHTNINGSTOR_S3_INLINE_PUT_MAX_BYTES")
- .ok()
- .and_then(|value| value.parse::().ok())
- .filter(|value| *value > 0)
- .unwrap_or(DEFAULT_INLINE_PUT_MAX_BYTES)
+fn inline_put_max_bytes(state: &Arc) -> usize {
+ state.performance.inline_put_max_bytes
}
-fn multipart_put_concurrency() -> usize {
- std::env::var("LIGHTNINGSTOR_S3_MULTIPART_PUT_CONCURRENCY")
- .ok()
- .and_then(|value| value.parse::().ok())
- .filter(|value| *value > 0)
- .unwrap_or(DEFAULT_MULTIPART_PUT_CONCURRENCY)
+fn multipart_put_concurrency(state: &Arc) -> usize {
+ state.performance.multipart_put_concurrency
}
fn request_content_length(headers: &HeaderMap) -> Option {
@@ -751,13 +767,13 @@ async fn put_object(
(body_len, etag, None, Some(body_bytes))
}
None => {
- let prepared = if let Some(content_length) =
- content_length.filter(|content_length| *content_length <= inline_put_max_bytes())
+ let prepared = if let Some(content_length) = content_length
+ .filter(|content_length| *content_length <= inline_put_max_bytes(&state))
{
read_inline_put_body(
body,
verified_payload_hash.as_deref(),
- inline_put_max_bytes(),
+ inline_put_max_bytes(&state),
Some(content_length),
)
.await
@@ -948,7 +964,7 @@ async fn stream_put_body(
) -> Result> {
let verify_payload_hash = expected_payload_hash
.filter(|expected_payload_hash| *expected_payload_hash != "UNSIGNED-PAYLOAD");
- let threshold = streaming_put_threshold_bytes();
+ let threshold = streaming_put_threshold_bytes(state);
let mut buffered = BytesMut::with_capacity(threshold);
let mut full_md5 = Some(Md5::new());
let mut full_sha256 = verify_payload_hash.map(|_| Sha256::new());
@@ -958,7 +974,7 @@ async fn stream_put_body(
let mut scheduled_part_numbers = Vec::new();
let mut completed_parts = Vec::new();
let mut in_flight_uploads = FuturesUnordered::new();
- let max_in_flight_uploads = multipart_put_concurrency();
+ let max_in_flight_uploads = multipart_put_concurrency(state);
while let Some(frame) = body.frame().await {
let frame = match frame {
@@ -1282,7 +1298,10 @@ fn multipart_object_body(state: Arc, object: &Object, upload: Multipart
)));
}
- return Ok(Some((bytes.slice(0..body_end), (storage, upload, idx, offset))));
+ return Ok(Some((
+ bytes.slice(0..body_end),
+ (storage, upload, idx, offset),
+ )));
}
Ok(None)
@@ -1374,7 +1393,11 @@ async fn get_object(
);
}
- let multipart_upload = match state.metadata.load_object_multipart_upload(&object.id).await {
+ let multipart_upload = match state
+ .metadata
+ .load_object_multipart_upload(&object.id)
+ .await
+ {
Ok(upload) => upload,
Err(e) => {
return error_response(
@@ -1385,7 +1408,10 @@ async fn get_object(
}
};
let (body, content_length) = if let Some(upload) = multipart_upload {
- (multipart_object_body(Arc::clone(&state), &object, upload), object.size as usize)
+ (
+ multipart_object_body(Arc::clone(&state), &object, upload),
+ object.size as usize,
+ )
} else {
let data = match state.storage.get_object(&object.id).await {
Ok(data) => data,
@@ -1653,6 +1679,7 @@ mod tests {
storage,
metadata.clone(),
Arc::new(AuthState::disabled()),
+ S3PerformanceConfig::default(),
);
std::mem::forget(tempdir);
(router, metadata)
@@ -1982,7 +2009,8 @@ mod tests {
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
- let body = vec![b'x'; DEFAULT_STREAMING_PUT_THRESHOLD_BYTES + 1024];
+ let threshold = S3PerformanceConfig::default().streaming_put_threshold_bytes;
+ let body = vec![b'x'; threshold + 1024];
let response = router
.clone()
.oneshot(
@@ -2197,10 +2225,12 @@ mod tests {
async fn large_put_streams_multipart_parts_with_parallel_uploads() {
let storage = Arc::new(DelayedMultipartStorage::new(Duration::from_millis(25)));
let metadata = Arc::new(MetadataStore::new_in_memory());
+ let performance = S3PerformanceConfig::default();
let router = create_router_with_auth_state(
storage.clone(),
metadata,
Arc::new(AuthState::disabled()),
+ performance.clone(),
);
let response = router
@@ -2216,7 +2246,7 @@ mod tests {
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
- let body = vec![b'x'; (DEFAULT_STREAMING_PUT_THRESHOLD_BYTES * 2) + 4096];
+ let body = vec![b'x'; (performance.streaming_put_threshold_bytes * 2) + 4096];
let response = router
.clone()
.oneshot(
@@ -2250,10 +2280,12 @@ mod tests {
async fn moderate_put_with_content_length_stays_inline() {
let storage = Arc::new(DelayedMultipartStorage::new(Duration::from_millis(25)));
let metadata = Arc::new(MetadataStore::new_in_memory());
+ let performance = S3PerformanceConfig::default();
let router = create_router_with_auth_state(
storage.clone(),
metadata.clone(),
Arc::new(AuthState::disabled()),
+ performance.clone(),
);
let response = router
@@ -2269,7 +2301,7 @@ mod tests {
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
- let body = vec![b'y'; DEFAULT_STREAMING_PUT_THRESHOLD_BYTES + 4096];
+ let body = vec![b'y'; performance.streaming_put_threshold_bytes + 4096];
let response = router
.clone()
.oneshot(
diff --git a/lightningstor/crates/lightningstor-server/src/s3/xml.rs b/lightningstor/crates/lightningstor-server/src/s3/xml.rs
index 0521496..0eeef8d 100644
--- a/lightningstor/crates/lightningstor-server/src/s3/xml.rs
+++ b/lightningstor/crates/lightningstor-server/src/s3/xml.rs
@@ -170,5 +170,8 @@ pub struct CompleteMultipartUploadResult {
/// Convert to XML with declaration
pub fn to_xml(value: &T) -> Result {
let xml = xml_to_string(value)?;
- Ok(format!("\n{}", xml))
+ Ok(format!(
+ "\n{}",
+ xml
+ ))
}
diff --git a/nightlight/crates/nightlight-server/src/config.rs b/nightlight/crates/nightlight-server/src/config.rs
index 6c996e7..ed946f7 100644
--- a/nightlight/crates/nightlight-server/src/config.rs
+++ b/nightlight/crates/nightlight-server/src/config.rs
@@ -6,6 +6,7 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::fs;
+use std::path::Path;
/// Main server configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -123,7 +124,7 @@ pub struct TlsConfig {
impl Config {
/// Load configuration from a YAML file
- pub fn from_file(path: &str) -> Result {
+ pub fn from_file(path: impl AsRef) -> Result {
let content = fs::read_to_string(path)?;
let config = serde_yaml::from_str(&content)?;
Ok(config)
@@ -136,27 +137,6 @@ impl Config {
fs::write(path, content)?;
Ok(())
}
-
- /// Apply environment variable overrides
- ///
- /// This allows NixOS service module to override configuration via environment variables.
- /// Environment variables take precedence over configuration file values.
- pub fn apply_env_overrides(&mut self) {
- if let Ok(val) = std::env::var("NIGHTLIGHT_HTTP_ADDR") {
- self.server.http_addr = val;
- }
- if let Ok(val) = std::env::var("NIGHTLIGHT_GRPC_ADDR") {
- self.server.grpc_addr = val;
- }
- if let Ok(val) = std::env::var("NIGHTLIGHT_DATA_DIR") {
- self.storage.data_dir = val;
- }
- if let Ok(val) = std::env::var("NIGHTLIGHT_RETENTION_DAYS") {
- if let Ok(days) = val.parse() {
- self.storage.retention_days = days;
- }
- }
- }
}
impl Default for Config {
diff --git a/nightlight/crates/nightlight-server/src/grpc.rs b/nightlight/crates/nightlight-server/src/grpc.rs
index 5cee297..c4b2aa0 100644
--- a/nightlight/crates/nightlight-server/src/grpc.rs
+++ b/nightlight/crates/nightlight-server/src/grpc.rs
@@ -183,7 +183,11 @@ impl Admin for AdminServiceImpl {
_request: Request,
) -> Result, Status> {
let storage_result = self.storage.stats().await;
- let status = if storage_result.is_ok() { "ok" } else { "degraded" };
+ let status = if storage_result.is_ok() {
+ "ok"
+ } else {
+ "degraded"
+ };
let storage_message = match &storage_result {
Ok(_) => "storage ready".to_string(),
Err(error) => error.to_string(),
@@ -253,7 +257,9 @@ impl Admin for AdminServiceImpl {
version: env!("CARGO_PKG_VERSION").to_string(),
commit: option_env!("GIT_COMMIT").unwrap_or("unknown").to_string(),
build_time: option_env!("BUILD_TIME").unwrap_or("unknown").to_string(),
- rust_version: option_env!("RUSTC_VERSION").unwrap_or("unknown").to_string(),
+ rust_version: option_env!("RUSTC_VERSION")
+ .unwrap_or("unknown")
+ .to_string(),
target: format!("{}-{}", std::env::consts::ARCH, std::env::consts::OS),
}))
}
@@ -330,9 +336,7 @@ mod tests {
use super::*;
use crate::ingestion::IngestionService;
use crate::storage::Storage;
- use nightlight_api::nightlight::{
- InstantQueryRequest, LabelValuesRequest, SeriesQueryRequest,
- };
+ use nightlight_api::nightlight::{InstantQueryRequest, LabelValuesRequest, SeriesQueryRequest};
use nightlight_api::prometheus::{Label, Sample, TimeSeries, WriteRequest};
#[tokio::test]
@@ -380,7 +384,10 @@ mod tests {
data.result[0].metric.get("__name__").map(String::as_str),
Some("grpc_metric")
);
- assert_eq!(data.result[0].value.as_ref().map(|value| value.value), Some(12.5));
+ assert_eq!(
+ data.result[0].value.as_ref().map(|value| value.value),
+ Some(12.5)
+ );
}
#[tokio::test]
@@ -452,7 +459,10 @@ mod tests {
.unwrap()
.into_inner();
assert_eq!(label_values.status, "success");
- assert_eq!(label_values.data, vec!["api".to_string(), "worker".to_string()]);
+ assert_eq!(
+ label_values.data,
+ vec!["api".to_string(), "worker".to_string()]
+ );
}
#[tokio::test]
@@ -477,26 +487,33 @@ mod tests {
.unwrap();
let query = QueryService::from_storage(storage.queryable());
- query.execute_instant_query("admin_metric", 2_000).await.unwrap();
+ query
+ .execute_instant_query("admin_metric", 2_000)
+ .await
+ .unwrap();
- let admin = AdminServiceImpl::new(
- Arc::clone(&storage),
- ingestion.metrics(),
- query.metrics(),
- );
+ let admin =
+ AdminServiceImpl::new(Arc::clone(&storage), ingestion.metrics(), query.metrics());
let stats = admin
.stats(Request::new(StatsRequest {}))
.await
.unwrap()
.into_inner();
- assert_eq!(stats.storage.as_ref().map(|value| value.total_samples), Some(1));
assert_eq!(
- stats.ingestion
+ stats.storage.as_ref().map(|value| value.total_samples),
+ Some(1)
+ );
+ assert_eq!(
+ stats
+ .ingestion
.as_ref()
.map(|value| value.samples_ingested_total),
Some(1)
);
- assert_eq!(stats.query.as_ref().map(|value| value.queries_total), Some(1));
+ assert_eq!(
+ stats.query.as_ref().map(|value| value.queries_total),
+ Some(1)
+ );
}
}
diff --git a/nightlight/crates/nightlight-server/src/ingestion.rs b/nightlight/crates/nightlight-server/src/ingestion.rs
index bf9f428..485cd15 100644
--- a/nightlight/crates/nightlight-server/src/ingestion.rs
+++ b/nightlight/crates/nightlight-server/src/ingestion.rs
@@ -15,8 +15,8 @@ use nightlight_api::prometheus::{Label, WriteRequest};
use nightlight_types::Error;
use prost::Message;
use snap::raw::Decoder as SnappyDecoder;
-use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, warn};
@@ -113,9 +113,8 @@ impl IngestionService {
}
// Store series with samples in shared storage
- let series_id = nightlight_types::SeriesId(
- compute_series_fingerprint(&internal_labels)
- );
+ let series_id =
+ nightlight_types::SeriesId(compute_series_fingerprint(&internal_labels));
let time_series = nightlight_types::TimeSeries {
id: series_id,
@@ -178,11 +177,11 @@ impl IngestionMetrics {
}
/// Axum handler for /api/v1/write endpoint
-async fn handle_remote_write(
- State(service): State,
- body: Bytes,
-) -> Response {
- service.metrics.requests_total.fetch_add(1, Ordering::Relaxed);
+async fn handle_remote_write(State(service): State, body: Bytes) -> Response {
+ service
+ .metrics
+ .requests_total
+ .fetch_add(1, Ordering::Relaxed);
debug!("Received remote_write request, size: {} bytes", body.len());
@@ -225,17 +224,26 @@ async fn handle_remote_write(
}
Err(Error::Storage(msg)) if msg.contains("buffer full") => {
warn!("Write buffer full, returning 429");
- service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed);
+ service
+ .metrics
+ .requests_failed
+ .fetch_add(1, Ordering::Relaxed);
IngestionError::Backpressure.into_response()
}
Err(Error::InvalidLabel(msg)) => {
warn!("Invalid labels: {}", msg);
- service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed);
+ service
+ .metrics
+ .requests_failed
+ .fetch_add(1, Ordering::Relaxed);
IngestionError::InvalidLabels.into_response()
}
Err(e) => {
error!("Failed to process write request: {}", e);
- service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed);
+ service
+ .metrics
+ .requests_failed
+ .fetch_add(1, Ordering::Relaxed);
IngestionError::StorageError.into_response()
}
}
@@ -285,7 +293,11 @@ fn validate_labels(labels: Vec