From fbcbb4e5dc69428071c6edff0ff188ffd08c6b44 Mon Sep 17 00:00:00 2001 From: centra Date: Fri, 20 Mar 2026 17:43:26 +0900 Subject: [PATCH] Add bootstrap flake bundle delivery and Nix desired-system generation --- deployer/crates/deployer-ctl/src/chainfire.rs | 375 +++++++++++------- .../deployer-server/src/bootstrap_assets.rs | 133 +++++++ .../crates/deployer-server/src/cloud_init.rs | 19 +- deployer/crates/deployer-server/src/config.rs | 11 + deployer/crates/deployer-server/src/lib.rs | 7 +- .../crates/deployer-server/src/phone_home.rs | 37 +- .../crates/deployer-server/src/storage.rs | 11 +- deployer/crates/deployer-types/src/lib.rs | 8 +- .../scripts/verify-deployer-bootstrap-e2e.sh | 53 ++- flake.nix | 65 ++- nix/iso/plasmacloud-iso.nix | 45 ++- nix/modules/cluster-config-lib.nix | 68 ++++ nix/modules/deployer.nix | 9 + nix/nodes/vm-cluster/cluster.nix | 12 + nix/test-cluster/node06.nix | 1 + 15 files changed, 666 insertions(+), 188 deletions(-) create mode 100644 deployer/crates/deployer-server/src/bootstrap_assets.rs diff --git a/deployer/crates/deployer-ctl/src/chainfire.rs b/deployer/crates/deployer-ctl/src/chainfire.rs index d494ec6..042f004 100644 --- a/deployer/crates/deployer-ctl/src/chainfire.rs +++ b/deployer/crates/deployer-ctl/src/chainfire.rs @@ -164,18 +164,25 @@ fn node_config_from_spec(node: &NodeSpec) -> NodeConfig { } fn desired_system_from_spec(node: &NodeSpec) -> Option { - Some(DesiredSystemSpec { - node_id: node.node_id.clone(), - nixos_configuration: node + let mut desired = node.desired_system.clone().unwrap_or_default(); + desired.node_id = node.node_id.clone(); + if desired.nixos_configuration.is_none() { + desired.nixos_configuration = node .install_plan .as_ref() - .and_then(|plan| plan.nixos_configuration.clone()), - flake_ref: None, - switch_action: Some("switch".to_string()), - health_check_command: Vec::new(), - rollback_on_failure: Some(true), - }) - .filter(|desired| desired.nixos_configuration.is_some()) + .and_then(|plan| plan.nixos_configuration.clone()); + } + if desired.switch_action.is_none() { + desired.switch_action = Some("switch".to_string()); + } + if desired.rollback_on_failure.is_none() { + desired.rollback_on_failure = Some(true); + } + if desired.nixos_configuration.is_some() { + Some(desired) + } else { + None + } } fn resolve_nodes(spec: &ClusterStateSpec) -> Result> { @@ -196,12 +203,12 @@ fn resolve_nodes(spec: &ClusterStateSpec) -> Result> { let mut resolved = node.clone(); let pool_spec = match resolved.pool.as_deref() { - Some(pool_name) => Some( - pools - .get(pool_name) - .copied() - .with_context(|| format!("node {} references unknown pool {}", node.node_id, pool_name))?, - ), + Some(pool_name) => Some(pools.get(pool_name).copied().with_context(|| { + format!( + "node {} references unknown pool {}", + node.node_id, pool_name + ) + })?), None => None, }; @@ -557,94 +564,102 @@ pub async fn apply_cluster_state( with_chainfire_endpoint_failover(&endpoints, "apply cluster state", |endpoint| { let endpoint = endpoint.to_string(); async move { - let spec: ClusterStateSpec = read_config_file(config_path).await?; - let resolved_nodes = resolve_nodes(&spec)?; - let cluster_id = cli_cluster_id.unwrap_or(&spec.cluster.cluster_id); + let spec: ClusterStateSpec = read_config_file(config_path).await?; + let resolved_nodes = resolve_nodes(&spec)?; + let cluster_id = cli_cluster_id.unwrap_or(&spec.cluster.cluster_id); - info!(cluster_id, "applying cluster state to Chainfire at {}", endpoint); - let mut client = Client::connect(endpoint.to_string()).await?; - - // MVP としては bootstrap と同じく upsert のみ行う。 - // 将来的に、既存一覧を取得して差分削除 (prune) を実装できる構造にしておく。 - let meta_key = key_cluster_meta(cluster_namespace, cluster_id); - let meta_value = serde_json::to_vec(&spec.cluster)?; - client.put(&meta_key, &meta_value).await?; - - for node in &resolved_nodes { - let key = key_node(cluster_namespace, cluster_id, &node.node_id); - let merged = merge_existing_node_observed_fields(&mut client, &key, node).await?; - let value = serde_json::to_vec(&merged)?; - client.put(&key, &value).await?; - - if let Some(desired_system) = desired_system_from_spec(node) { - client - .put( - &key_desired_system(cluster_namespace, cluster_id, &node.node_id), - &serde_json::to_vec(&desired_system)?, - ) - .await?; - } - - if let Some(machine_id) = node.machine_id.as_deref() { - let config = node_config_from_spec(node); - client - .put( - &deployer_node_config_key(deployer_namespace, machine_id), - serde_json::to_vec(&config)?, - ) - .await?; - client - .put( - &deployer_node_mapping_key(deployer_namespace, machine_id), - node.node_id.as_bytes(), - ) - .await?; - } - } - - for node_class in &spec.node_classes { - let key = key_node_class(cluster_namespace, cluster_id, &node_class.name); - let value = serde_json::to_vec(node_class)?; - client.put(&key, &value).await?; - } - for pool in &spec.pools { - let key = key_pool(cluster_namespace, cluster_id, &pool.name); - let value = serde_json::to_vec(pool)?; - client.put(&key, &value).await?; - } - for rule in &spec.enrollment_rules { - let key = key_enrollment_rule(cluster_namespace, cluster_id, &rule.name); - let value = serde_json::to_vec(rule)?; - client.put(&key, &value).await?; - } - for svc in &spec.services { - let key = key_service(cluster_namespace, cluster_id, &svc.name); - let value = serde_json::to_vec(svc)?; - client.put(&key, &value).await?; - } - for inst in &spec.instances { - let key = key_instance(cluster_namespace, cluster_id, &inst.service, &inst.instance_id); - let value = serde_json::to_vec(inst)?; - client.put(&key, &value).await?; - } - for policy in &spec.mtls_policies { - let key = key_mtls_policy(cluster_namespace, cluster_id, &policy.policy_id); - let value = serde_json::to_vec(policy)?; - client.put(&key, &value).await?; - } - - if prune { - prune_cluster_state( - &mut client, - cluster_namespace, - deployer_namespace, + info!( cluster_id, - &spec, - ) - .await?; - } + "applying cluster state to Chainfire at {}", endpoint + ); + let mut client = Client::connect(endpoint.to_string()).await?; - Ok(()) + // MVP としては bootstrap と同じく upsert のみ行う。 + // 将来的に、既存一覧を取得して差分削除 (prune) を実装できる構造にしておく。 + let meta_key = key_cluster_meta(cluster_namespace, cluster_id); + let meta_value = serde_json::to_vec(&spec.cluster)?; + client.put(&meta_key, &meta_value).await?; + + for node in &resolved_nodes { + let key = key_node(cluster_namespace, cluster_id, &node.node_id); + let merged = merge_existing_node_observed_fields(&mut client, &key, node).await?; + let value = serde_json::to_vec(&merged)?; + client.put(&key, &value).await?; + + if let Some(desired_system) = desired_system_from_spec(node) { + client + .put( + &key_desired_system(cluster_namespace, cluster_id, &node.node_id), + &serde_json::to_vec(&desired_system)?, + ) + .await?; + } + + if let Some(machine_id) = node.machine_id.as_deref() { + let config = node_config_from_spec(node); + client + .put( + &deployer_node_config_key(deployer_namespace, machine_id), + serde_json::to_vec(&config)?, + ) + .await?; + client + .put( + &deployer_node_mapping_key(deployer_namespace, machine_id), + node.node_id.as_bytes(), + ) + .await?; + } + } + + for node_class in &spec.node_classes { + let key = key_node_class(cluster_namespace, cluster_id, &node_class.name); + let value = serde_json::to_vec(node_class)?; + client.put(&key, &value).await?; + } + for pool in &spec.pools { + let key = key_pool(cluster_namespace, cluster_id, &pool.name); + let value = serde_json::to_vec(pool)?; + client.put(&key, &value).await?; + } + for rule in &spec.enrollment_rules { + let key = key_enrollment_rule(cluster_namespace, cluster_id, &rule.name); + let value = serde_json::to_vec(rule)?; + client.put(&key, &value).await?; + } + for svc in &spec.services { + let key = key_service(cluster_namespace, cluster_id, &svc.name); + let value = serde_json::to_vec(svc)?; + client.put(&key, &value).await?; + } + for inst in &spec.instances { + let key = key_instance( + cluster_namespace, + cluster_id, + &inst.service, + &inst.instance_id, + ); + let value = serde_json::to_vec(inst)?; + client.put(&key, &value).await?; + } + for policy in &spec.mtls_policies { + let key = key_mtls_policy(cluster_namespace, cluster_id, &policy.policy_id); + let value = serde_json::to_vec(policy)?; + client.put(&key, &value).await?; + } + + if prune { + prune_cluster_state( + &mut client, + cluster_namespace, + deployer_namespace, + cluster_id, + &spec, + ) + .await?; + } + + Ok(()) } }) .await @@ -656,35 +671,36 @@ pub async fn dump_prefix(endpoint: &str, prefix: &str, json_output: bool) -> Res with_chainfire_endpoint_failover(&endpoints, "dump Chainfire prefix", |endpoint| { let endpoint = endpoint.to_string(); async move { - let mut client = Client::connect(endpoint.to_string()).await?; - let start = prefix.as_bytes(); + let mut client = Client::connect(endpoint.to_string()).await?; + let start = prefix.as_bytes(); - info!("dumping keys with prefix {:?}", prefix); - let (kvs, _next) = client.scan_prefix(start, 0).await?; - if kvs.is_empty() { - warn!("no keys found under prefix {:?}", prefix); - } - - for (key, value, rev) in kvs { - let k = String::from_utf8_lossy(&key); - if json_output { - let value = serde_json::from_slice::(&value) - .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&value).into_owned())); - println!( - "{}", - serde_json::to_string(&json!({ - "revision": rev, - "key": k.as_ref(), - "value": value, - }))? - ); - } else { - let v = String::from_utf8_lossy(&value); - println!("rev={} key={} value={}", rev, k, v); + info!("dumping keys with prefix {:?}", prefix); + let (kvs, _next) = client.scan_prefix(start, 0).await?; + if kvs.is_empty() { + warn!("no keys found under prefix {:?}", prefix); } - } - Ok(()) + for (key, value, rev) in kvs { + let k = String::from_utf8_lossy(&key); + if json_output { + let value = serde_json::from_slice::(&value).unwrap_or_else(|_| { + Value::String(String::from_utf8_lossy(&value).into_owned()) + }); + println!( + "{}", + serde_json::to_string(&json!({ + "revision": rev, + "key": k.as_ref(), + "value": value, + }))? + ); + } else { + let v = String::from_utf8_lossy(&value); + println!("rev={} key={} value={}", rev, k, v); + } + } + + Ok(()) } }) .await @@ -698,42 +714,80 @@ async fn prune_cluster_state( spec: &ClusterStateSpec, ) -> Result<()> { let mut desired_keys = HashSet::new(); - desired_keys.insert(String::from_utf8_lossy(&key_cluster_meta(cluster_namespace, cluster_id)).to_string()); + desired_keys.insert( + String::from_utf8_lossy(&key_cluster_meta(cluster_namespace, cluster_id)).to_string(), + ); let resolved_nodes = resolve_nodes(spec)?; for node in &resolved_nodes { - desired_keys.insert(String::from_utf8_lossy(&key_node(cluster_namespace, cluster_id, &node.node_id)).to_string()); + desired_keys.insert( + String::from_utf8_lossy(&key_node(cluster_namespace, cluster_id, &node.node_id)) + .to_string(), + ); if desired_system_from_spec(node).is_some() { desired_keys.insert( - String::from_utf8_lossy(&key_desired_system(cluster_namespace, cluster_id, &node.node_id)) - .to_string(), + String::from_utf8_lossy(&key_desired_system( + cluster_namespace, + cluster_id, + &node.node_id, + )) + .to_string(), ); } } for node_class in &spec.node_classes { desired_keys.insert( - String::from_utf8_lossy(&key_node_class(cluster_namespace, cluster_id, &node_class.name)) - .to_string(), + String::from_utf8_lossy(&key_node_class( + cluster_namespace, + cluster_id, + &node_class.name, + )) + .to_string(), ); } for pool in &spec.pools { - desired_keys.insert(String::from_utf8_lossy(&key_pool(cluster_namespace, cluster_id, &pool.name)).to_string()); - } - for rule in &spec.enrollment_rules { desired_keys.insert( - String::from_utf8_lossy(&key_enrollment_rule(cluster_namespace, cluster_id, &rule.name)) + String::from_utf8_lossy(&key_pool(cluster_namespace, cluster_id, &pool.name)) .to_string(), ); } + for rule in &spec.enrollment_rules { + desired_keys.insert( + String::from_utf8_lossy(&key_enrollment_rule( + cluster_namespace, + cluster_id, + &rule.name, + )) + .to_string(), + ); + } for svc in &spec.services { - desired_keys.insert(String::from_utf8_lossy(&key_service(cluster_namespace, cluster_id, &svc.name)).to_string()); + desired_keys.insert( + String::from_utf8_lossy(&key_service(cluster_namespace, cluster_id, &svc.name)) + .to_string(), + ); } for inst in &spec.instances { - desired_keys.insert(String::from_utf8_lossy(&key_instance(cluster_namespace, cluster_id, &inst.service, &inst.instance_id)).to_string()); + desired_keys.insert( + String::from_utf8_lossy(&key_instance( + cluster_namespace, + cluster_id, + &inst.service, + &inst.instance_id, + )) + .to_string(), + ); } for policy in &spec.mtls_policies { - desired_keys.insert(String::from_utf8_lossy(&key_mtls_policy(cluster_namespace, cluster_id, &policy.policy_id)).to_string()); + desired_keys.insert( + String::from_utf8_lossy(&key_mtls_policy( + cluster_namespace, + cluster_id, + &policy.policy_id, + )) + .to_string(), + ); } let prefix = cluster_prefix(cluster_namespace, cluster_id); @@ -839,6 +893,7 @@ mod tests { failure_domain: Some("rack-a".to_string()), nix_profile: None, install_plan: None, + desired_system: None, state: Some(match NodeState::Pending { NodeState::Pending => "pending".to_string(), _ => unreachable!(), @@ -879,7 +934,10 @@ mod tests { assert_eq!(node.node_class.as_deref(), Some("worker-linux")); assert_eq!(node.nix_profile.as_deref(), Some("profiles/worker-linux")); - let install_plan = node.install_plan.as_ref().expect("install plan should inherit"); + let install_plan = node + .install_plan + .as_ref() + .expect("install plan should inherit"); assert_eq!( install_plan.nixos_configuration.as_deref(), Some("worker-golden") @@ -890,11 +948,15 @@ mod tests { assert_eq!(node.labels.get("env").map(String::as_str), Some("dev")); assert_eq!(node.labels.get("pool").map(String::as_str), Some("general")); assert_eq!( - node.labels.get("nodeclass.photoncloud.io/name").map(String::as_str), + node.labels + .get("nodeclass.photoncloud.io/name") + .map(String::as_str), Some("worker-linux") ); assert_eq!( - node.labels.get("topology.kubernetes.io/zone").map(String::as_str), + node.labels + .get("topology.kubernetes.io/zone") + .map(String::as_str), Some("rack-a") ); } @@ -906,11 +968,36 @@ mod tests { let desired = desired_system_from_spec(&resolved[0]).expect("desired system should exist"); assert_eq!(desired.node_id, "node01"); - assert_eq!(desired.nixos_configuration.as_deref(), Some("worker-golden")); + assert_eq!( + desired.nixos_configuration.as_deref(), + Some("worker-golden") + ); assert_eq!(desired.switch_action.as_deref(), Some("switch")); assert_eq!(desired.rollback_on_failure, Some(true)); } + #[test] + fn test_desired_system_keeps_explicit_node_overrides() { + let mut spec = test_spec(); + spec.nodes[0].desired_system = Some(DesiredSystemSpec { + node_id: String::new(), + nixos_configuration: Some("node01-next".to_string()), + flake_ref: Some("github:centra/cloud".to_string()), + switch_action: Some("boot".to_string()), + health_check_command: vec!["true".to_string()], + rollback_on_failure: Some(false), + }); + + let resolved = resolve_nodes(&spec).unwrap(); + let desired = desired_system_from_spec(&resolved[0]).expect("desired system should exist"); + assert_eq!(desired.node_id, "node01"); + assert_eq!(desired.nixos_configuration.as_deref(), Some("node01-next")); + assert_eq!(desired.flake_ref.as_deref(), Some("github:centra/cloud")); + assert_eq!(desired.switch_action.as_deref(), Some("boot")); + assert_eq!(desired.health_check_command, vec!["true".to_string()]); + assert_eq!(desired.rollback_on_failure, Some(false)); + } + #[test] fn test_is_prunable_key_keeps_observed_system() { let prefix = cluster_prefix("photoncloud", "test-cluster"); diff --git a/deployer/crates/deployer-server/src/bootstrap_assets.rs b/deployer/crates/deployer-server/src/bootstrap_assets.rs new file mode 100644 index 0000000..1b6dc01 --- /dev/null +++ b/deployer/crates/deployer-server/src/bootstrap_assets.rs @@ -0,0 +1,133 @@ +use std::sync::Arc; + +use axum::{ + body::Body, + extract::State, + http::{header, HeaderMap, HeaderValue, StatusCode}, + response::IntoResponse, +}; +use tokio::fs; + +use crate::{auth::require_bootstrap_auth, state::AppState}; + +/// GET /api/v1/bootstrap/flake-bundle +pub async fn flake_bundle( + State(state): State>, + headers: HeaderMap, +) -> Result { + require_bootstrap_auth(&state, &headers)?; + + let Some(path) = state.config.bootstrap_flake_bundle_path.as_ref() else { + return Err(( + StatusCode::SERVICE_UNAVAILABLE, + "bootstrap flake bundle not configured".to_string(), + )); + }; + + let bytes = fs::read(path).await.map_err(|error| { + let status = if error.kind() == std::io::ErrorKind::NotFound { + StatusCode::NOT_FOUND + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + ( + status, + format!( + "failed to read bootstrap flake bundle {}: {}", + path.display(), + error + ), + ) + })?; + + let headers = [ + ( + header::CONTENT_TYPE, + HeaderValue::from_static("application/gzip"), + ), + ( + header::CONTENT_DISPOSITION, + HeaderValue::from_static("attachment; filename=\"plasmacloud-flake-bundle.tar.gz\""), + ), + ]; + + Ok((headers, Body::from(bytes))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{build_router, config::Config}; + use axum::{body::to_bytes, http::Request}; + use std::{ + fs, + time::{SystemTime, UNIX_EPOCH}, + }; + use tower::ServiceExt; + + fn temp_path(name: &str) -> std::path::PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + std::env::temp_dir().join(format!("{}-{}-{}", name, std::process::id(), nanos)) + } + + #[tokio::test] + async fn flake_bundle_route_serves_configured_bundle() { + let bundle_path = temp_path("deployer-flake-bundle"); + fs::write(&bundle_path, b"bundle-bytes").unwrap(); + + let mut config = Config::default(); + config.bootstrap_token = Some("test-token".to_string()); + config.bootstrap_flake_bundle_path = Some(bundle_path.clone()); + let state = Arc::new(AppState::with_config(config)); + let app = build_router(state); + + let response = app + .oneshot( + Request::builder() + .uri("/api/v1/bootstrap/flake-bundle") + .header("x-deployer-token", "test-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()), + Some("application/gzip") + ); + + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + assert_eq!(body.as_ref(), b"bundle-bytes"); + + let _ = fs::remove_file(bundle_path); + } + + #[tokio::test] + async fn flake_bundle_route_requires_configured_bundle() { + let mut config = Config::default(); + config.bootstrap_token = Some("test-token".to_string()); + let state = Arc::new(AppState::with_config(config)); + let app = build_router(state); + + let response = app + .oneshot( + Request::builder() + .uri("/api/v1/bootstrap/flake-bundle") + .header("x-deployer-token", "test-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE); + } +} diff --git a/deployer/crates/deployer-server/src/cloud_init.rs b/deployer/crates/deployer-server/src/cloud_init.rs index 73ff40c..6d14721 100644 --- a/deployer/crates/deployer-server/src/cloud_init.rs +++ b/deployer/crates/deployer-server/src/cloud_init.rs @@ -7,9 +7,7 @@ use deployer_types::NodeConfig; use std::sync::Arc; use crate::{ - auth::require_bootstrap_auth, - phone_home::lookup_node_config, - state::AppState, + auth::require_bootstrap_auth, phone_home::lookup_node_config, state::AppState, validation::validate_identifier, }; @@ -23,7 +21,10 @@ pub async fn meta_data( validate_identifier(&machine_id, "machine_id")?; let Some((node_id, config)) = lookup_node_config(&state, &machine_id).await else { - return Err((StatusCode::NOT_FOUND, "machine-id not registered".to_string())); + return Err(( + StatusCode::NOT_FOUND, + "machine-id not registered".to_string(), + )); }; let body = format!( @@ -43,12 +44,18 @@ pub async fn user_data( validate_identifier(&machine_id, "machine_id")?; let Some((node_id, config)) = lookup_node_config(&state, &machine_id).await else { - return Err((StatusCode::NOT_FOUND, "machine-id not registered".to_string())); + return Err(( + StatusCode::NOT_FOUND, + "machine-id not registered".to_string(), + )); }; let body = render_user_data(&node_id, &config) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; - Ok(([(axum::http::header::CONTENT_TYPE, "text/cloud-config")], body)) + Ok(( + [(axum::http::header::CONTENT_TYPE, "text/cloud-config")], + body, + )) } fn render_yaml_list(items: &[String], indent: usize) -> String { diff --git a/deployer/crates/deployer-server/src/config.rs b/deployer/crates/deployer-server/src/config.rs index 72cc18c..a79ddc4 100644 --- a/deployer/crates/deployer-server/src/config.rs +++ b/deployer/crates/deployer-server/src/config.rs @@ -30,6 +30,10 @@ pub struct Config { #[serde(default = "default_local_state_path")] pub local_state_path: Option, + /// Optional tar.gz bundle containing the PhotonCloud flake source tree for bootstrap installs + #[serde(default)] + pub bootstrap_flake_bundle_path: Option, + /// Shared bootstrap token required for phone-home/admin APIs #[serde(default)] pub bootstrap_token: Option, @@ -80,6 +84,7 @@ impl Default for Config { cluster_namespace: default_cluster_namespace(), heartbeat_timeout_secs: default_heartbeat_timeout(), local_state_path: default_local_state_path(), + bootstrap_flake_bundle_path: None, bootstrap_token: None, admin_token: None, allow_admin_fallback: default_allow_admin_fallback(), @@ -224,6 +229,7 @@ mod tests { config.local_state_path, Some(PathBuf::from("/var/lib/deployer/state")) ); + assert!(config.bootstrap_flake_bundle_path.is_none()); assert!(config.bootstrap_token.is_none()); assert!(config.admin_token.is_none()); assert!(!config.allow_admin_fallback); @@ -253,6 +259,7 @@ mod tests { bind_addr = "127.0.0.1:18080" cluster_id = "cluster-a" allow_unauthenticated = true + bootstrap_flake_bundle_path = "/tmp/plasmacloud-flake-bundle.tar.gz" [chainfire] endpoints = ["http://10.0.0.1:2379"] @@ -264,6 +271,10 @@ mod tests { let config = load_config(&path).unwrap(); assert_eq!(config.bind_addr.to_string(), "127.0.0.1:18080"); assert_eq!(config.cluster_id.as_deref(), Some("cluster-a")); + assert_eq!( + config.bootstrap_flake_bundle_path, + Some(PathBuf::from("/tmp/plasmacloud-flake-bundle.tar.gz")) + ); assert!(config.allow_unauthenticated); assert_eq!(config.chainfire.namespace, "bootstrap"); assert_eq!(config.chainfire.endpoints, vec!["http://10.0.0.1:2379"]); diff --git a/deployer/crates/deployer-server/src/lib.rs b/deployer/crates/deployer-server/src/lib.rs index 6b5cfa2..a93a58b 100644 --- a/deployer/crates/deployer-server/src/lib.rs +++ b/deployer/crates/deployer-server/src/lib.rs @@ -1,7 +1,8 @@ pub mod admin; pub mod auth; -pub mod cluster; +pub mod bootstrap_assets; pub mod cloud_init; +pub mod cluster; pub mod config; pub mod local_storage; pub mod phone_home; @@ -34,6 +35,10 @@ pub fn build_router(state: Arc) -> Router { "/api/v1/cloud-init/:machine_id/user-data", get(cloud_init::user_data), ) + .route( + "/api/v1/bootstrap/flake-bundle", + get(bootstrap_assets::flake_bundle), + ) // Admin API (node management) .route("/api/v1/admin/nodes", post(admin::pre_register)) .route("/api/v1/admin/nodes", get(admin::list_nodes)) diff --git a/deployer/crates/deployer-server/src/phone_home.rs b/deployer/crates/deployer-server/src/phone_home.rs index 9957726..0fd11e8 100644 --- a/deployer/crates/deployer-server/src/phone_home.rs +++ b/deployer/crates/deployer-server/src/phone_home.rs @@ -440,12 +440,15 @@ async fn resolve_enrollment_config( format!("failed to load node classes: {}", e), ) })?; - let pools = storage.list_pools(cluster_namespace, cluster_id).await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to load pools: {}", e), - ) - })?; + let pools = storage + .list_pools(cluster_namespace, cluster_id) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to load pools: {}", e), + ) + })?; drop(storage); rules.sort_by(|lhs, rhs| { @@ -483,7 +486,11 @@ fn enrollment_rule_matches(rule: &EnrollmentRuleSpec, request: &PhoneHomeRequest let Some(ip) = request.ip.as_deref() else { return false; }; - if !rule.match_ip_prefixes.iter().any(|prefix| ip.starts_with(prefix)) { + if !rule + .match_ip_prefixes + .iter() + .any(|prefix| ip.starts_with(prefix)) + { return false; } } @@ -528,16 +535,16 @@ fn build_node_config_from_rule( .node_class .clone() .or_else(|| pool_spec.and_then(|pool| pool.node_class.clone())); - let node_class_spec = node_class - .as_deref() - .and_then(|name| node_classes.iter().find(|node_class| node_class.name == name)); + let node_class_spec = node_class.as_deref().and_then(|name| { + node_classes + .iter() + .find(|node_class| node_class.name == name) + }); let role = rule .role .clone() - .or_else(|| { - node_class_spec.and_then(|node_class| node_class.roles.first().cloned()) - }) + .or_else(|| node_class_spec.and_then(|node_class| node_class.roles.first().cloned())) .unwrap_or_else(|| "worker".to_string()); let mut labels = std::collections::HashMap::new(); @@ -1011,7 +1018,9 @@ mod tests { assert_eq!(config.pool.as_deref(), Some("gpu")); assert_eq!(config.node_class.as_deref(), Some("gpu-worker")); assert_eq!(config.nix_profile.as_deref(), Some("profiles/gpu-worker")); - let install_plan = config.install_plan.expect("install_plan should inherit from class"); + let install_plan = config + .install_plan + .expect("install_plan should inherit from class"); assert_eq!( install_plan.nixos_configuration.as_deref(), Some("gpu-worker") diff --git a/deployer/crates/deployer-server/src/storage.rs b/deployer/crates/deployer-server/src/storage.rs index dff9562..2253880 100644 --- a/deployer/crates/deployer-server/src/storage.rs +++ b/deployer/crates/deployer-server/src/storage.rs @@ -80,7 +80,10 @@ impl NodeStorage { } fn cluster_node_classes_prefix(&self, cluster_namespace: &str, cluster_id: &str) -> String { - format!("{}/clusters/{}/node-classes/", cluster_namespace, cluster_id) + format!( + "{}/clusters/{}/node-classes/", + cluster_namespace, cluster_id + ) } fn cluster_pools_prefix(&self, cluster_namespace: &str, cluster_id: &str) -> String { @@ -276,8 +279,10 @@ impl NodeStorage { cluster_namespace: &str, cluster_id: &str, ) -> Result, StorageError> { - self.list_cluster_objects(self.cluster_enrollment_rules_prefix(cluster_namespace, cluster_id)) - .await + self.list_cluster_objects( + self.cluster_enrollment_rules_prefix(cluster_namespace, cluster_id), + ) + .await } /// Get node info by node_id diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index 967c3f9..f05965e 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -446,6 +446,7 @@ pub struct ObservedSystemState { /// Desired NixOS system state for a specific node. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct DesiredSystemSpec { + #[serde(default)] pub node_id: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub nixos_configuration: Option, @@ -490,6 +491,8 @@ pub struct NodeSpec { #[serde(default)] pub install_plan: Option, #[serde(default)] + pub desired_system: Option, + #[serde(default)] pub state: Option, #[serde(default)] pub last_heartbeat: Option>, @@ -954,10 +957,7 @@ mod tests { nixos_configuration: Some("node01".to_string()), flake_ref: Some("/opt/plasmacloud-src".to_string()), switch_action: Some("switch".to_string()), - health_check_command: vec![ - "systemctl".to_string(), - "is-system-running".to_string(), - ], + health_check_command: vec!["systemctl".to_string(), "is-system-running".to_string()], rollback_on_failure: Some(true), }; diff --git a/deployer/scripts/verify-deployer-bootstrap-e2e.sh b/deployer/scripts/verify-deployer-bootstrap-e2e.sh index f8ed149..78dfcb3 100755 --- a/deployer/scripts/verify-deployer-bootstrap-e2e.sh +++ b/deployer/scripts/verify-deployer-bootstrap-e2e.sh @@ -101,6 +101,7 @@ raft_port="$(free_port)" gossip_port="$(free_port)" deployer_port="$(free_port)" bootstrap_token="bootstrap-secret" +printf 'bundle-bytes' >"$tmp_dir/flake-bundle.tar.gz" cat >"$tmp_dir/chainfire.toml" <"$tmp_dir/desired-system.dump" +python3 - "$tmp_dir/desired-system.dump" <<'PY' +import json +import sys + +path = sys.argv[1] +with open(path, "r", encoding="utf-8") as handle: + lines = [line.strip() for line in handle if " value=" in line] + +if len(lines) != 1: + raise SystemExit(f"unexpected desired-system dump: {lines}") + +payload = json.loads(lines[0].split(" value=", 1)[1]) +assert payload["node_id"] == "node-seeded" +assert payload["nixos_configuration"] == "node01" +assert payload["flake_ref"] == "github:centra/cloud" +assert payload["health_check_command"] == ["systemctl", "is-system-running", "--wait"] +assert payload["rollback_on_failure"] is True +print("desired-system state validated") +PY + echo "Deployer bootstrap E2E verification passed" diff --git a/flake.nix b/flake.nix index b19d01c..43b886b 100644 --- a/flake.nix +++ b/flake.nix @@ -102,6 +102,43 @@ || builtins.elem topLevel includedTopLevels; }; + flakeBundleSrc = pkgs.lib.cleanSourceWith { + src = ./.; + filter = path: type: + let + rel = pkgs.lib.removePrefix ((toString ./. ) + "/") (toString path); + topLevel = builtins.head (pkgs.lib.splitString "/" rel); + includedTopLevels = [ + "apigateway" + "baremetal" + "chainfire" + "coronafs" + "crates" + "creditservice" + "deployer" + "fiberlb" + "flashdns" + "flaredb" + "iam" + "k8shost" + "lightningstor" + "mtls-agent" + "nightlight" + "nix" + "nix-nos" + "plasmavmc" + "prismnet" + ]; + isTargetDir = builtins.match "(.*/)?target(/.*)?" rel != null; + in + !isTargetDir + && ( + rel == "" + || builtins.elem rel [ "flake.nix" "flake.lock" ] + || builtins.elem topLevel includedTopLevels + ); + }; + # Helper function to build a Rust workspace package # Parameters: # name: package name (e.g., "chainfire-server") @@ -397,6 +434,20 @@ description = "Node-local NixOS reconciliation agent for PhotonCloud hosts"; }; + plasmacloudFlakeBundle = pkgs.runCommand "plasmacloud-flake-bundle.tar.gz" { + nativeBuildInputs = [ pkgs.gnutar pkgs.gzip ]; + } '' + tar \ + --sort=name \ + --mtime='@1' \ + --owner=0 \ + --group=0 \ + --numeric-owner \ + -C ${flakeBundleSrc} \ + -cf - . \ + | gzip -n > "$out" + ''; + # -------------------------------------------------------------------- # Fleet Scheduler: Non-Kubernetes service scheduler for bare-metal nodes # -------------------------------------------------------------------- @@ -410,6 +461,8 @@ vmClusterDeployerState = self.nixosConfigurations.node01.config.system.build.plasmacloudDeployerClusterState; + vmClusterFlakeBundle = self.packages.${system}.plasmacloudFlakeBundle; + # -------------------------------------------------------------------- # Default package: Build all servers # -------------------------------------------------------------------- @@ -583,7 +636,7 @@ nix-nos.nixosModules.default ./nix/nodes/vm-cluster/node01/configuration.nix self.nixosModules.default - { + ({ pkgs, ... }: { services.deployer = { enable = true; bindAddr = "0.0.0.0:8088"; @@ -594,6 +647,7 @@ allowUnauthenticated = false; bootstrapToken = "vm-cluster-bootstrap-token"; adminToken = "vm-cluster-admin-token"; + bootstrapFlakeBundle = pkgs.plasmacloudFlakeBundle; seedClusterState = true; }; @@ -604,11 +658,9 @@ nodeId = "node01"; flakeRoot = self.outPath; intervalSecs = 30; - healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; - rollbackOnFailure = true; apply = true; }; - } + }) { nixpkgs.overlays = [ self.overlays.default ]; } ]; }; @@ -628,8 +680,6 @@ nodeId = "node02"; flakeRoot = self.outPath; intervalSecs = 30; - healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; - rollbackOnFailure = true; apply = true; }; } @@ -652,8 +702,6 @@ nodeId = "node03"; flakeRoot = self.outPath; intervalSecs = 30; - healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; - rollbackOnFailure = true; apply = true; }; } @@ -684,6 +732,7 @@ k8shost-server = self.packages.${final.system}.k8shost-server; deployer-server = self.packages.${final.system}.deployer-server; deployer-ctl = self.packages.${final.system}.deployer-ctl; + plasmacloudFlakeBundle = self.packages.${final.system}.plasmacloudFlakeBundle; nix-agent = self.packages.${final.system}.nix-agent; node-agent = self.packages.${final.system}.node-agent; fleet-scheduler = self.packages.${final.system}.fleet-scheduler; diff --git a/nix/iso/plasmacloud-iso.nix b/nix/iso/plasmacloud-iso.nix index 84bbe1e..81df962 100644 --- a/nix/iso/plasmacloud-iso.nix +++ b/nix/iso/plasmacloud-iso.nix @@ -186,6 +186,8 @@ NODE_IP=$(${pkgs.jq}/bin/jq -r '.ip // empty' /etc/plasmacloud/node-config.json) NIXOS_CONFIGURATION=$(${pkgs.jq}/bin/jq -r '.install_plan.nixos_configuration // .hostname // empty' /etc/plasmacloud/node-config.json) DISKO_PATH=$(${pkgs.jq}/bin/jq -r '.install_plan.disko_config_path // empty' /etc/plasmacloud/node-config.json) + DEPLOYER_URL="''${DEPLOYER_URL:-http://192.168.100.1:8080}" + SRC_ROOT="/opt/plasmacloud-src" if [ -z "$NODE_ID" ] || [ -z "$NODE_IP" ]; then echo "ERROR: node-config.json missing hostname/ip" @@ -197,9 +199,38 @@ exit 1 fi + TOKEN_FILE="/etc/plasmacloud/bootstrap-token" + DEPLOYER_TOKEN="" + if [ -s "$TOKEN_FILE" ]; then + DEPLOYER_TOKEN=$(cat "$TOKEN_FILE") + elif [ -n "''${DEPLOYER_BOOTSTRAP_TOKEN:-}" ]; then + DEPLOYER_TOKEN="''${DEPLOYER_BOOTSTRAP_TOKEN}" + fi + + CURL_ARGS=(-sfL --connect-timeout 5 --max-time 120) + if [ -n "$DEPLOYER_TOKEN" ]; then + CURL_ARGS+=(-H "X-Deployer-Token: $DEPLOYER_TOKEN") + fi + if [ -n "''${DEPLOYER_CA_CERT:-}" ] && [ -f "''${DEPLOYER_CA_CERT}" ]; then + CURL_ARGS+=(--cacert "''${DEPLOYER_CA_CERT}") + fi + + BUNDLE_PATH="/run/plasmacloud/flake-bundle.tar.gz" + mkdir -p /run/plasmacloud + if ${pkgs.curl}/bin/curl "''${CURL_ARGS[@]}" \ + "$DEPLOYER_URL/api/v1/bootstrap/flake-bundle" \ + -o "$BUNDLE_PATH"; then + echo "Downloaded bootstrap flake bundle from deployer" + rm -rf "$SRC_ROOT" + mkdir -p "$SRC_ROOT" + ${pkgs.gzip}/bin/gzip -dc "$BUNDLE_PATH" | ${pkgs.gnutar}/bin/tar -xf - -C "$SRC_ROOT" + else + echo "No deployer flake bundle available; using embedded source tree" + fi + if [ -z "$DISKO_PATH" ]; then CANDIDATE_DISKO="nix/nodes/vm-cluster/$NODE_ID/disko.nix" - if [ -f "/opt/plasmacloud-src/$CANDIDATE_DISKO" ]; then + if [ -f "$SRC_ROOT/$CANDIDATE_DISKO" ]; then DISKO_PATH="$CANDIDATE_DISKO" fi fi @@ -209,8 +240,8 @@ exit 1 fi - if [ ! -f "/opt/plasmacloud-src/$DISKO_PATH" ]; then - echo "ERROR: Disko config not found: /opt/plasmacloud-src/$DISKO_PATH" + if [ ! -f "$SRC_ROOT/$DISKO_PATH" ]; then + echo "ERROR: Disko config not found: $SRC_ROOT/$DISKO_PATH" exit 1 fi @@ -238,14 +269,14 @@ fi echo "Validating NixOS configuration output..." - nix eval --raw "/opt/plasmacloud-src#nixosConfigurations.$NIXOS_CONFIGURATION.config.system.build.toplevel.drvPath" >/dev/null + nix eval --raw "$SRC_ROOT#nixosConfigurations.$NIXOS_CONFIGURATION.config.system.build.toplevel.drvPath" >/dev/null echo "Running disko to partition $DISK..." export NIX_CONFIG="experimental-features = nix-command flakes" - nix run github:nix-community/disko -- --mode disko "/opt/plasmacloud-src/$DISKO_PATH" + nix run github:nix-community/disko -- --mode disko "$SRC_ROOT/$DISKO_PATH" echo "Running nixos-install..." - nixos-install --flake "/opt/plasmacloud-src#$NIXOS_CONFIGURATION" --no-root-passwd + nixos-install --flake "$SRC_ROOT#$NIXOS_CONFIGURATION" --no-root-passwd sync echo "✓ Install complete; rebooting..." @@ -255,7 +286,7 @@ # Packages for bootstrap + install environment.systemPackages = with pkgs; [ - curl jq vim htop gawk gnugrep util-linux parted dosfstools e2fsprogs + curl jq vim htop gawk gnugrep util-linux parted dosfstools e2fsprogs gnutar gzip ]; # SSH with key-based auth for non-interactive access diff --git a/nix/modules/cluster-config-lib.nix b/nix/modules/cluster-config-lib.nix index 6022f09..38d75b0 100644 --- a/nix/modules/cluster-config-lib.nix +++ b/nix/modules/cluster-config-lib.nix @@ -19,9 +19,44 @@ let }; }; + mkDesiredSystemType = types: types.submodule { + options = { + nixosConfiguration = mkOption { + type = types.nullOr types.str; + default = null; + description = "Name of the nixosConfigurations output to activate"; + }; + + flakeRef = mkOption { + type = types.nullOr types.str; + default = null; + description = "Explicit flake reference used by nix-agent"; + }; + + switchAction = mkOption { + type = types.nullOr types.str; + default = null; + description = "switch-to-configuration action for nix-agent"; + }; + + healthCheckCommand = mkOption { + type = types.listOf types.str; + default = [ ]; + description = "Command vector executed after activation to validate node health"; + }; + + rollbackOnFailure = mkOption { + type = types.nullOr types.bool; + default = null; + description = "Whether nix-agent should roll back when the health check fails"; + }; + }; + }; + mkNodeType = types: let installPlanType = mkInstallPlanType types; + desiredSystemType = mkDesiredSystemType types; in types.submodule { options = { role = mkOption { @@ -101,6 +136,12 @@ let description = "Explicit NixOS installation targets for bare-metal bootstrap"; }; + desiredSystem = mkOption { + type = types.nullOr desiredSystemType; + default = null; + description = "Desired NixOS reconciliation state exported for nix-agent"; + }; + state = mkOption { type = types.nullOr (types.enum [ "pending" "provisioning" "active" "failed" "draining" ]); default = null; @@ -277,6 +318,29 @@ let in if plan == null || rendered == { } then null else rendered; + mkDesiredSystem = nodeName: desiredSystem: + let + rendered = + optionalAttrs (desiredSystem != null && desiredSystem.nixosConfiguration != null) { + nixos_configuration = desiredSystem.nixosConfiguration; + } + // optionalAttrs (desiredSystem != null && desiredSystem.flakeRef != null) { + flake_ref = desiredSystem.flakeRef; + } + // optionalAttrs (desiredSystem != null && desiredSystem.switchAction != null) { + switch_action = desiredSystem.switchAction; + } + // optionalAttrs (desiredSystem != null && desiredSystem.healthCheckCommand != [ ]) { + health_check_command = desiredSystem.healthCheckCommand; + } + // optionalAttrs (desiredSystem != null && desiredSystem.rollbackOnFailure != null) { + rollback_on_failure = desiredSystem.rollbackOnFailure; + }; + in + if desiredSystem == null || rendered == { } then null else { + node_id = nodeName; + } // rendered; + mkDeployerNodeSpec = nodeName: node: { node_id = nodeName; @@ -303,6 +367,9 @@ let // optionalAttrs (mkInstallPlan node.installPlan != null) { install_plan = mkInstallPlan node.installPlan; } + // optionalAttrs (mkDesiredSystem nodeName node.desiredSystem != null) { + desired_system = mkDesiredSystem nodeName node.desiredSystem; + } // optionalAttrs (node.state != null) { state = node.state; }; @@ -455,6 +522,7 @@ in { inherit mkInstallPlanType + mkDesiredSystemType mkNodeType mkNodeClassType mkNodePoolType diff --git a/nix/modules/deployer.nix b/nix/modules/deployer.nix index b66f21b..07af739 100644 --- a/nix/modules/deployer.nix +++ b/nix/modules/deployer.nix @@ -22,6 +22,9 @@ let // lib.optionalAttrs (cfg.clusterId != null) { cluster_id = cfg.clusterId; } + // lib.optionalAttrs (cfg.bootstrapFlakeBundle != null) { + bootstrap_flake_bundle_path = toString cfg.bootstrapFlakeBundle; + } // lib.optionalAttrs (cfg.tlsCaCertPath != null) { tls_ca_cert_path = cfg.tlsCaCertPath; } @@ -77,6 +80,12 @@ in description = "Local storage path for deployer bootstrap state"; }; + bootstrapFlakeBundle = lib.mkOption { + type = lib.types.nullOr lib.types.path; + default = null; + description = "Optional tar.gz bundle served to bootstrap installers as the canonical PhotonCloud flake source"; + }; + requireChainfire = lib.mkOption { type = lib.types.bool; default = false; diff --git a/nix/nodes/vm-cluster/cluster.nix b/nix/nodes/vm-cluster/cluster.nix index 8eafc83..d784cff 100644 --- a/nix/nodes/vm-cluster/cluster.nix +++ b/nix/nodes/vm-cluster/cluster.nix @@ -22,6 +22,10 @@ nixosConfiguration = "node01"; diskoConfigPath = "nix/nodes/vm-cluster/node01/disko.nix"; }; + desiredSystem = { + healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; + rollbackOnFailure = true; + }; raftPort = 2380; apiPort = 2379; }; @@ -42,6 +46,10 @@ nixosConfiguration = "node02"; diskoConfigPath = "nix/nodes/vm-cluster/node02/disko.nix"; }; + desiredSystem = { + healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; + rollbackOnFailure = true; + }; raftPort = 2380; apiPort = 2379; }; @@ -62,6 +70,10 @@ nixosConfiguration = "node03"; diskoConfigPath = "nix/nodes/vm-cluster/node03/disko.nix"; }; + desiredSystem = { + healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; + rollbackOnFailure = true; + }; raftPort = 2380; apiPort = 2379; }; diff --git a/nix/test-cluster/node06.nix b/nix/test-cluster/node06.nix index 021c1a8..776397c 100644 --- a/nix/test-cluster/node06.nix +++ b/nix/test-cluster/node06.nix @@ -81,6 +81,7 @@ requireChainfire = true; bootstrapToken = "test-bootstrap-token"; adminToken = "test-admin-token"; + bootstrapFlakeBundle = pkgs.plasmacloudFlakeBundle; seedClusterState = true; };