From edd24422678e4f1416f178584b98ecd0f0543929 Mon Sep 17 00:00:00 2001 From: centra Date: Fri, 20 Mar 2026 17:09:59 +0900 Subject: [PATCH] Add desired-system state and health-gated nix-agent rollback --- deployer/crates/deployer-ctl/src/chainfire.rs | 86 +++++- deployer/crates/deployer-types/src/lib.rs | 38 +++ deployer/crates/nix-agent/src/main.rs | 256 ++++++++++++++++-- flake.nix | 6 + nix/modules/nix-agent.nix | 20 +- 5 files changed, 373 insertions(+), 33 deletions(-) diff --git a/deployer/crates/deployer-ctl/src/chainfire.rs b/deployer/crates/deployer-ctl/src/chainfire.rs index ce99a9b..d494ec6 100644 --- a/deployer/crates/deployer-ctl/src/chainfire.rs +++ b/deployer/crates/deployer-ctl/src/chainfire.rs @@ -4,7 +4,7 @@ use std::path::Path; use anyhow::{Context, Result}; use chainfire_client::{Client, ClientError}; -use deployer_types::{ClusterStateSpec, InstallPlan, NodeConfig, NodeSpec}; +use deployer_types::{ClusterStateSpec, DesiredSystemSpec, InstallPlan, NodeConfig, NodeSpec}; use serde::de::DeserializeOwned; use serde_json::{json, Value}; use tokio::fs; @@ -40,6 +40,15 @@ fn key_node(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec .into_bytes() } +fn key_desired_system(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec { + format!( + "{}nodes/{}/desired-system", + cluster_prefix(cluster_namespace, cluster_id), + node_id + ) + .into_bytes() +} + fn key_node_class(cluster_namespace: &str, cluster_id: &str, node_class: &str) -> Vec { format!( "{}node-classes/{}", @@ -154,6 +163,21 @@ fn node_config_from_spec(node: &NodeSpec) -> NodeConfig { } } +fn desired_system_from_spec(node: &NodeSpec) -> Option { + Some(DesiredSystemSpec { + node_id: node.node_id.clone(), + nixos_configuration: node + .install_plan + .as_ref() + .and_then(|plan| plan.nixos_configuration.clone()), + flake_ref: None, + switch_action: Some("switch".to_string()), + health_check_command: Vec::new(), + rollback_on_failure: Some(true), + }) + .filter(|desired| desired.nixos_configuration.is_some()) +} + fn resolve_nodes(spec: &ClusterStateSpec) -> Result> { let node_classes = spec .node_classes @@ -441,6 +465,16 @@ pub async fn bootstrap_cluster( client.put(&key, &value).await?; info!(node_id = %node.node_id, "upserted node"); + if let Some(desired_system) = desired_system_from_spec(node) { + client + .put( + &key_desired_system(cluster_namespace, cluster_id, &node.node_id), + &serde_json::to_vec(&desired_system)?, + ) + .await?; + info!(node_id = %node.node_id, "upserted desired system"); + } + if let Some(machine_id) = node.machine_id.as_deref() { let config = node_config_from_spec(node); client @@ -542,6 +576,15 @@ pub async fn apply_cluster_state( let value = serde_json::to_vec(&merged)?; client.put(&key, &value).await?; + if let Some(desired_system) = desired_system_from_spec(node) { + client + .put( + &key_desired_system(cluster_namespace, cluster_id, &node.node_id), + &serde_json::to_vec(&desired_system)?, + ) + .await?; + } + if let Some(machine_id) = node.machine_id.as_deref() { let config = node_config_from_spec(node); client @@ -661,6 +704,12 @@ async fn prune_cluster_state( for node in &resolved_nodes { desired_keys.insert(String::from_utf8_lossy(&key_node(cluster_namespace, cluster_id, &node.node_id)).to_string()); + if desired_system_from_spec(node).is_some() { + desired_keys.insert( + String::from_utf8_lossy(&key_desired_system(cluster_namespace, cluster_id, &node.node_id)) + .to_string(), + ); + } } for node_class in &spec.node_classes { desired_keys.insert( @@ -849,14 +898,45 @@ mod tests { Some("rack-a") ); } + + #[test] + fn test_desired_system_is_derived_from_install_plan() { + let spec = test_spec(); + let resolved = resolve_nodes(&spec).unwrap(); + let desired = desired_system_from_spec(&resolved[0]).expect("desired system should exist"); + + assert_eq!(desired.node_id, "node01"); + assert_eq!(desired.nixos_configuration.as_deref(), Some("worker-golden")); + assert_eq!(desired.switch_action.as_deref(), Some("switch")); + assert_eq!(desired.rollback_on_failure, Some(true)); + } + + #[test] + fn test_is_prunable_key_keeps_observed_system() { + let prefix = cluster_prefix("photoncloud", "test-cluster"); + assert!(is_prunable_key(&format!("{}nodes/node01", prefix), &prefix)); + assert!(is_prunable_key( + &format!("{}nodes/node01/desired-system", prefix), + &prefix + )); + assert!(!is_prunable_key( + &format!("{}nodes/node01/observed-system", prefix), + &prefix + )); + } } fn is_prunable_key(key: &str, prefix: &str) -> bool { if key == format!("{}meta", prefix) { return true; } - key.starts_with(&format!("{}nodes/", prefix)) - || key.starts_with(&format!("{}node-classes/", prefix)) + + if let Some(node_suffix) = key.strip_prefix(&format!("{}nodes/", prefix)) { + return !node_suffix.is_empty() + && (!node_suffix.contains('/') || node_suffix.ends_with("/desired-system")); + } + + key.starts_with(&format!("{}node-classes/", prefix)) || key.starts_with(&format!("{}pools/", prefix)) || key.starts_with(&format!("{}enrollment-rules/", prefix)) || key.starts_with(&format!("{}services/", prefix)) diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index e1412a4..967c3f9 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -443,6 +443,22 @@ pub struct ObservedSystemState { pub last_error: Option, } +/// Desired NixOS system state for a specific node. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct DesiredSystemSpec { + pub node_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub nixos_configuration: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub flake_ref: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub switch_action: Option, + #[serde(default)] + pub health_check_command: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub rollback_on_failure: Option, +} + /// Cluster metadata (PhotonCloud scope). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ClusterSpec { @@ -930,4 +946,26 @@ mod tests { assert_eq!(decoded.nixos_configuration.as_deref(), Some("node01")); assert_eq!(decoded.status.as_deref(), Some("pending")); } + + #[test] + fn test_desired_system_spec_roundtrip() { + let desired = DesiredSystemSpec { + node_id: "node01".to_string(), + nixos_configuration: Some("node01".to_string()), + flake_ref: Some("/opt/plasmacloud-src".to_string()), + switch_action: Some("switch".to_string()), + health_check_command: vec![ + "systemctl".to_string(), + "is-system-running".to_string(), + ], + rollback_on_failure: Some(true), + }; + + let json = serde_json::to_string(&desired).unwrap(); + let decoded: DesiredSystemSpec = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.node_id, "node01"); + assert_eq!(decoded.nixos_configuration.as_deref(), Some("node01")); + assert_eq!(decoded.health_check_command.len(), 2); + assert_eq!(decoded.rollback_on_failure, Some(true)); + } } diff --git a/deployer/crates/nix-agent/src/main.rs b/deployer/crates/nix-agent/src/main.rs index dbd373d..729ffbb 100644 --- a/deployer/crates/nix-agent/src/main.rs +++ b/deployer/crates/nix-agent/src/main.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Context, Result}; use chainfire_client::Client; use chrono::Utc; use clap::Parser; -use deployer_types::{ClusterNodeRecord, ObservedSystemState}; +use deployer_types::{ClusterNodeRecord, DesiredSystemSpec, ObservedSystemState}; use tokio::process::Command; use tokio::time::sleep; use tracing::{info, warn}; @@ -26,6 +26,15 @@ fn key_node(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec .into_bytes() } +fn key_desired_system(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec { + format!( + "{}nodes/{}/desired-system", + cluster_prefix(cluster_namespace, cluster_id), + node_id + ) + .into_bytes() +} + fn key_observed_system(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec { format!( "{}nodes/{}/observed-system", @@ -59,6 +68,12 @@ struct Cli { #[arg(long, default_value = "switch")] switch_action: String, + #[arg(long, allow_hyphen_values = true)] + health_check_command: Vec, + + #[arg(long, default_value_t = false)] + rollback_on_failure: bool, + #[arg(long, default_value_t = false)] apply: bool, @@ -74,9 +89,20 @@ struct Agent { flake_root: String, interval: Duration, switch_action: String, + health_check_command: Vec, + rollback_on_failure: bool, apply: bool, } +#[derive(Debug, Clone, PartialEq, Eq)] +struct ResolvedDesiredSystem { + nixos_configuration: String, + flake_ref: String, + switch_action: String, + health_check_command: Vec, + rollback_on_failure: bool, +} + impl Agent { fn new(cli: Cli) -> Self { Self { @@ -87,6 +113,8 @@ impl Agent { flake_root: cli.flake_root, interval: Duration::from_secs(cli.interval_secs), switch_action: cli.switch_action, + health_check_command: cli.health_check_command, + rollback_on_failure: cli.rollback_on_failure, apply: cli.apply, } } @@ -116,8 +144,21 @@ impl Agent { let node: ClusterNodeRecord = serde_json::from_slice(&node_bytes).context("failed to parse node record")?; + let desired = client + .get(key_desired_system( + &self.cluster_namespace, + &self.cluster_id, + &self.node_id, + )) + .await? + .map(|bytes| serde_json::from_slice::(&bytes)) + .transpose() + .context("failed to parse desired-system spec")?; + let mut observed = self.base_observed_state(&node); - let reconcile_result = self.reconcile_node(&node, &mut observed).await; + let reconcile_result = self + .reconcile_node(&node, desired.as_ref(), &mut observed) + .await; if let Err(error) = reconcile_result { observed.status = Some("failed".to_string()); observed.last_error = Some(error.to_string()); @@ -136,8 +177,6 @@ impl Agent { fn base_observed_state(&self, node: &ClusterNodeRecord) -> ObservedSystemState { ObservedSystemState { node_id: node.node_id.clone(), - nixos_configuration: desired_configuration(node), - flake_root: Some(self.flake_root.clone()), current_system: read_symlink_target("/run/current-system"), booted_system: read_symlink_target("/run/booted-system"), ..ObservedSystemState::default() @@ -147,6 +186,7 @@ impl Agent { async fn reconcile_node( &self, node: &ClusterNodeRecord, + desired: Option<&DesiredSystemSpec>, observed: &mut ObservedSystemState, ) -> Result<()> { match node.state.as_deref() { @@ -157,15 +197,31 @@ impl Agent { _ => {} } - let Some(configuration) = desired_configuration(node) else { + let Some(desired) = resolve_desired_system( + node, + desired, + &self.flake_root, + &self.switch_action, + &self.health_check_command, + self.rollback_on_failure, + ) else { observed.status = Some("idle".to_string()); return Ok(()); }; + observed.nixos_configuration = Some(desired.nixos_configuration.clone()); + observed.flake_root = Some(desired.flake_ref.clone()); + + let previous_system = observed.current_system.clone(); let target_system = self - .build_target_system(&configuration) + .build_target_system(&desired.flake_ref, &desired.nixos_configuration) .await - .with_context(|| format!("failed to build target system for {}", configuration))?; + .with_context(|| { + format!( + "failed to build target system for {}", + desired.nixos_configuration + ) + })?; observed.target_system = Some(target_system.clone()); if observed.current_system.as_deref() == Some(target_system.as_str()) { @@ -181,26 +237,31 @@ impl Agent { observed.status = Some("reconciling".to_string()); observed.last_attempt = Some(Utc::now()); - self.switch_to_target(&target_system).await?; + self.switch_to_target(&target_system, &desired.switch_action) + .await?; observed.current_system = read_symlink_target("/run/current-system"); observed.booted_system = read_symlink_target("/run/booted-system"); - if observed.current_system.as_deref() == Some(target_system.as_str()) { - observed.status = Some("active".to_string()); - observed.last_success = Some(Utc::now()); - observed.last_error = None; - return Ok(()); + if observed.current_system.as_deref() != Some(target_system.as_str()) { + return Err(anyhow!( + "switch completed but /run/current-system does not match target {}", + target_system + )); } - Err(anyhow!( - "switch completed but /run/current-system does not match target {}", - target_system - )) + self.run_health_check_and_maybe_rollback(&desired, previous_system.as_deref(), observed) + .await?; + + observed.status = Some("active".to_string()); + observed.last_success = Some(Utc::now()); + observed.last_error = None; + + Ok(()) } - async fn build_target_system(&self, configuration: &str) -> Result { - let flake_attr = target_flake_attr(&self.flake_root, configuration); + async fn build_target_system(&self, flake_ref: &str, configuration: &str) -> Result { + let flake_attr = target_flake_attr(flake_ref, configuration); let output = run_command( "nix", &["build", "--no-link", "--print-out-paths", flake_attr.as_str()], @@ -214,7 +275,7 @@ impl Agent { Ok(path.to_string()) } - async fn switch_to_target(&self, target_system: &str) -> Result<()> { + async fn switch_to_target(&self, target_system: &str, switch_action: &str) -> Result<()> { let switch_bin = Path::new(target_system).join("bin/switch-to-configuration"); if !switch_bin.exists() { return Err(anyhow!( @@ -227,17 +288,79 @@ impl Agent { switch_bin .to_str() .ok_or_else(|| anyhow!("invalid switch path"))?, - &[self.switch_action.as_str()], + &[switch_action], ) .await?; Ok(()) } + + async fn run_health_check_and_maybe_rollback( + &self, + desired: &ResolvedDesiredSystem, + previous_system: Option<&str>, + observed: &mut ObservedSystemState, + ) -> Result<()> { + if desired.health_check_command.is_empty() { + return Ok(()); + } + + if let Err(error) = run_vec_command(&desired.health_check_command).await { + let error_message = format!("health check failed after activation: {error}"); + if desired.rollback_on_failure { + self.rollback_to_previous(previous_system).await?; + observed.current_system = read_symlink_target("/run/current-system"); + observed.booted_system = read_symlink_target("/run/booted-system"); + observed.status = Some("rolled-back".to_string()); + observed.last_error = Some(error_message); + return Ok(()); + } + + return Err(anyhow!(error_message)); + } + + Ok(()) + } + + async fn rollback_to_previous(&self, previous_system: Option<&str>) -> Result<()> { + let previous_system = previous_system + .filter(|value| !value.is_empty()) + .ok_or_else(|| anyhow!("rollback requested but no previous system is known"))?; + self.switch_to_target(previous_system, "switch").await + } } -fn desired_configuration(node: &ClusterNodeRecord) -> Option { - node.install_plan - .as_ref() - .and_then(|plan| plan.nixos_configuration.clone()) +fn resolve_desired_system( + node: &ClusterNodeRecord, + desired: Option<&DesiredSystemSpec>, + local_flake_root: &str, + local_switch_action: &str, + local_health_check_command: &[String], + local_rollback_on_failure: bool, +) -> Option { + let nixos_configuration = desired + .and_then(|spec| spec.nixos_configuration.clone()) + .or_else(|| { + node.install_plan + .as_ref() + .and_then(|plan| plan.nixos_configuration.clone()) + })?; + + Some(ResolvedDesiredSystem { + nixos_configuration, + flake_ref: desired + .and_then(|spec| spec.flake_ref.clone()) + .unwrap_or_else(|| local_flake_root.to_string()), + switch_action: desired + .and_then(|spec| spec.switch_action.clone()) + .unwrap_or_else(|| local_switch_action.to_string()), + health_check_command: desired + .map(|spec| spec.health_check_command.clone()) + .filter(|command| !command.is_empty()) + .unwrap_or_else(|| local_health_check_command.to_vec()), + rollback_on_failure: desired + .and_then(|spec| spec.rollback_on_failure) + .unwrap_or(local_rollback_on_failure), + }) } fn target_flake_attr(flake_root: &str, configuration: &str) -> String { @@ -279,6 +402,14 @@ async fn run_command(program: &str, args: &[&str]) -> Result { } } +async fn run_vec_command(command: &[String]) -> Result { + let (program, args) = command + .split_first() + .ok_or_else(|| anyhow!("command vector is empty"))?; + let arg_refs = args.iter().map(String::as_str).collect::>(); + run_command(program, &arg_refs).await +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -309,7 +440,7 @@ async fn main() -> Result<()> { #[cfg(test)] mod tests { use super::*; - use deployer_types::InstallPlan; + use deployer_types::{DesiredSystemSpec, InstallPlan}; fn test_node() -> ClusterNodeRecord { ClusterNodeRecord { @@ -333,8 +464,77 @@ mod tests { } #[test] - fn desired_configuration_prefers_install_plan() { - assert_eq!(desired_configuration(&test_node()).as_deref(), Some("node01")); + fn resolve_desired_system_falls_back_to_install_plan() { + let resolved = resolve_desired_system( + &test_node(), + None, + "/opt/plasmacloud-src", + "switch", + &[], + true, + ) + .expect("desired system should resolve"); + assert_eq!(resolved.nixos_configuration, "node01"); + assert_eq!(resolved.flake_ref, "/opt/plasmacloud-src"); + assert_eq!(resolved.switch_action, "switch"); + assert!(resolved.rollback_on_failure); + } + + #[test] + fn resolve_desired_system_prefers_chainfire_spec() { + let desired = DesiredSystemSpec { + node_id: "node01".to_string(), + nixos_configuration: Some("node01-next".to_string()), + flake_ref: Some("github:centra/cloud".to_string()), + switch_action: Some("boot".to_string()), + health_check_command: vec!["true".to_string()], + rollback_on_failure: Some(true), + }; + + let resolved = resolve_desired_system( + &test_node(), + Some(&desired), + "/opt/plasmacloud-src", + "switch", + &[], + false, + ) + .expect("desired system should resolve"); + assert_eq!(resolved.nixos_configuration, "node01-next"); + assert_eq!(resolved.flake_ref, "github:centra/cloud"); + assert_eq!(resolved.switch_action, "boot"); + assert_eq!(resolved.health_check_command, vec!["true".to_string()]); + assert!(resolved.rollback_on_failure); + } + + #[test] + fn resolve_desired_system_uses_local_health_check_defaults_when_spec_omits_them() { + let desired = DesiredSystemSpec { + node_id: "node01".to_string(), + nixos_configuration: Some("node01-next".to_string()), + flake_ref: None, + switch_action: None, + health_check_command: Vec::new(), + rollback_on_failure: None, + }; + + let resolved = resolve_desired_system( + &test_node(), + Some(&desired), + "/opt/plasmacloud-src", + "switch", + &["systemctl".to_string(), "is-system-running".to_string()], + true, + ) + .expect("desired system should resolve"); + + assert_eq!(resolved.flake_ref, "/opt/plasmacloud-src"); + assert_eq!(resolved.switch_action, "switch"); + assert_eq!( + resolved.health_check_command, + vec!["systemctl".to_string(), "is-system-running".to_string()] + ); + assert!(resolved.rollback_on_failure); } #[test] diff --git a/flake.nix b/flake.nix index 6b5a1c6..b19d01c 100644 --- a/flake.nix +++ b/flake.nix @@ -604,6 +604,8 @@ nodeId = "node01"; flakeRoot = self.outPath; intervalSecs = 30; + healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; + rollbackOnFailure = true; apply = true; }; } @@ -626,6 +628,8 @@ nodeId = "node02"; flakeRoot = self.outPath; intervalSecs = 30; + healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; + rollbackOnFailure = true; apply = true; }; } @@ -648,6 +652,8 @@ nodeId = "node03"; flakeRoot = self.outPath; intervalSecs = 30; + healthCheckCommand = [ "systemctl" "is-system-running" "--wait" ]; + rollbackOnFailure = true; apply = true; }; } diff --git a/nix/modules/nix-agent.nix b/nix/modules/nix-agent.nix index a062cbd..5decd88 100644 --- a/nix/modules/nix-agent.nix +++ b/nix/modules/nix-agent.nix @@ -2,6 +2,11 @@ let cfg = config.services.nix-agent; + extraArgs = + map (arg: "--health-check-command ${lib.escapeShellArg arg}") cfg.healthCheckCommand + ++ lib.optionals cfg.rollbackOnFailure [ "--rollback-on-failure" ] + ++ lib.optionals cfg.apply [ "--apply" ]; + renderedExtraArgs = lib.concatStringsSep " \\\n " extraArgs; in { options.services.nix-agent = { @@ -48,6 +53,18 @@ in description = "switch-to-configuration action executed after building the target system"; }; + healthCheckCommand = lib.mkOption { + type = lib.types.listOf lib.types.str; + default = [ ]; + description = "Command vector executed after activation to verify node health"; + }; + + rollbackOnFailure = lib.mkOption { + type = lib.types.bool; + default = true; + description = "Roll back to the previous system if the post-activation health check fails"; + }; + apply = lib.mkOption { type = lib.types.bool; default = true; @@ -81,8 +98,7 @@ in --node-id ${lib.escapeShellArg cfg.nodeId} \ --flake-root ${lib.escapeShellArg cfg.flakeRoot} \ --interval-secs ${toString cfg.intervalSecs} \ - --switch-action ${lib.escapeShellArg cfg.switchAction} \ - ${lib.optionalString cfg.apply "--apply"} + --switch-action ${lib.escapeShellArg cfg.switchAction}${lib.optionalString (renderedExtraArgs != "") " \\\n ${renderedExtraArgs}"} ''; }; };