From de60f087d679807436fab3c4cd0a944d3e31b27f Mon Sep 17 00:00:00 2001 From: centra Date: Fri, 20 Mar 2026 16:56:59 +0900 Subject: [PATCH] Add nix-agent and close NixOS reconcile loop --- deployer/Cargo.lock | 16 + deployer/Cargo.toml | 1 + deployer/crates/deployer-types/src/lib.rs | 46 +++ deployer/crates/nix-agent/Cargo.toml | 21 ++ deployer/crates/nix-agent/src/main.rs | 352 ++++++++++++++++++++++ flake.nix | 59 ++++ nix/modules/default.nix | 1 + nix/modules/nix-agent.nix | 90 ++++++ nix/test-cluster/flake.nix | 1 + 9 files changed, 587 insertions(+) create mode 100644 deployer/crates/nix-agent/Cargo.toml create mode 100644 deployer/crates/nix-agent/src/main.rs create mode 100644 nix/modules/nix-agent.nix diff --git a/deployer/Cargo.lock b/deployer/Cargo.lock index cdce325..0ff219b 100644 --- a/deployer/Cargo.lock +++ b/deployer/Cargo.lock @@ -1668,6 +1668,22 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nix-agent" +version = "0.1.0" +dependencies = [ + "anyhow", + "chainfire-client", + "chrono", + "clap", + "deployer-types", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "node-agent" version = "0.1.0" diff --git a/deployer/Cargo.toml b/deployer/Cargo.toml index ad154c4..a9dac2f 100644 --- a/deployer/Cargo.toml +++ b/deployer/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/deployer-types", "crates/deployer-server", "crates/node-agent", + "crates/nix-agent", "crates/cert-authority", "crates/deployer-ctl", "crates/plasmacloud-reconciler", diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index 7745202..e1412a4 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -419,6 +419,30 @@ pub struct ClusterNodeRecord { pub last_heartbeat: Option>, } +/// Node-local NixOS reconciliation status reported by nix-agent. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ObservedSystemState { + pub node_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub nixos_configuration: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub flake_root: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub target_system: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub current_system: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub booted_system: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_attempt: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_success: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_error: Option, +} + /// Cluster metadata (PhotonCloud scope). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ClusterSpec { @@ -884,4 +908,26 @@ mod tests { ); assert_eq!(decoded.dns.unwrap().fqdn, "api.test.cluster.local"); } + + #[test] + fn test_observed_system_state_roundtrip() { + let observed = ObservedSystemState { + node_id: "node01".to_string(), + nixos_configuration: Some("node01".to_string()), + flake_root: Some("/opt/plasmacloud-src".to_string()), + target_system: Some("/nix/store/system-node01".to_string()), + current_system: Some("/nix/store/system-old".to_string()), + booted_system: Some("/nix/store/system-old".to_string()), + status: Some("pending".to_string()), + last_attempt: None, + last_success: None, + last_error: Some("waiting for apply".to_string()), + }; + + let json = serde_json::to_string(&observed).unwrap(); + let decoded: ObservedSystemState = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.node_id, "node01"); + assert_eq!(decoded.nixos_configuration.as_deref(), Some("node01")); + assert_eq!(decoded.status.as_deref(), Some("pending")); + } } diff --git a/deployer/crates/nix-agent/Cargo.toml b/deployer/crates/nix-agent/Cargo.toml new file mode 100644 index 0000000..b36212c --- /dev/null +++ b/deployer/crates/nix-agent/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "nix-agent" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +clap.workspace = true +serde.workspace = true +serde_json.workspace = true +chrono.workspace = true + +chainfire-client.workspace = true +deployer-types.workspace = true diff --git a/deployer/crates/nix-agent/src/main.rs b/deployer/crates/nix-agent/src/main.rs new file mode 100644 index 0000000..dbd373d --- /dev/null +++ b/deployer/crates/nix-agent/src/main.rs @@ -0,0 +1,352 @@ +use std::fs; +use std::path::Path; +use std::process::Stdio; +use std::time::Duration; + +use anyhow::{anyhow, Context, Result}; +use chainfire_client::Client; +use chrono::Utc; +use clap::Parser; +use deployer_types::{ClusterNodeRecord, ObservedSystemState}; +use tokio::process::Command; +use tokio::time::sleep; +use tracing::{info, warn}; +use tracing_subscriber::EnvFilter; + +fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String { + format!("{}/clusters/{}/", cluster_namespace, cluster_id) +} + +fn key_node(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec { + format!( + "{}nodes/{}", + cluster_prefix(cluster_namespace, cluster_id), + node_id + ) + .into_bytes() +} + +fn key_observed_system(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec { + format!( + "{}nodes/{}/observed-system", + cluster_prefix(cluster_namespace, cluster_id), + node_id + ) + .into_bytes() +} + +#[derive(Parser, Debug)] +#[command(author, version, about)] +struct Cli { + #[arg(long, default_value = "http://127.0.0.1:7000")] + chainfire_endpoint: String, + + #[arg(long, default_value = "photoncloud")] + cluster_namespace: String, + + #[arg(long)] + cluster_id: String, + + #[arg(long)] + node_id: String, + + #[arg(long, default_value = "/etc/nixos")] + flake_root: String, + + #[arg(long, default_value_t = 30)] + interval_secs: u64, + + #[arg(long, default_value = "switch")] + switch_action: String, + + #[arg(long, default_value_t = false)] + apply: bool, + + #[arg(long, default_value_t = false)] + once: bool, +} + +struct Agent { + endpoint: String, + cluster_namespace: String, + cluster_id: String, + node_id: String, + flake_root: String, + interval: Duration, + switch_action: String, + apply: bool, +} + +impl Agent { + fn new(cli: Cli) -> Self { + Self { + endpoint: cli.chainfire_endpoint, + cluster_namespace: cli.cluster_namespace, + cluster_id: cli.cluster_id, + node_id: cli.node_id, + flake_root: cli.flake_root, + interval: Duration::from_secs(cli.interval_secs), + switch_action: cli.switch_action, + apply: cli.apply, + } + } + + async fn run_loop(&self) -> Result<()> { + loop { + if let Err(error) = self.tick().await { + warn!(error = %error, "nix-agent tick failed"); + } + sleep(self.interval).await; + } + } + + async fn tick(&self) -> Result<()> { + let mut client = Client::connect(self.endpoint.clone()).await?; + let node_key = key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id); + let node_raw = client.get_with_revision(&node_key).await?; + let Some((node_bytes, _revision)) = node_raw else { + warn!( + cluster_id = %self.cluster_id, + node_id = %self.node_id, + "node definition not found; skipping nix reconciliation" + ); + return Ok(()); + }; + + let node: ClusterNodeRecord = + serde_json::from_slice(&node_bytes).context("failed to parse node record")?; + + let mut observed = self.base_observed_state(&node); + let reconcile_result = self.reconcile_node(&node, &mut observed).await; + if let Err(error) = reconcile_result { + observed.status = Some("failed".to_string()); + observed.last_error = Some(error.to_string()); + } + + client + .put( + &key_observed_system(&self.cluster_namespace, &self.cluster_id, &self.node_id), + &serde_json::to_vec(&observed)?, + ) + .await?; + + Ok(()) + } + + fn base_observed_state(&self, node: &ClusterNodeRecord) -> ObservedSystemState { + ObservedSystemState { + node_id: node.node_id.clone(), + nixos_configuration: desired_configuration(node), + flake_root: Some(self.flake_root.clone()), + current_system: read_symlink_target("/run/current-system"), + booted_system: read_symlink_target("/run/booted-system"), + ..ObservedSystemState::default() + } + } + + async fn reconcile_node( + &self, + node: &ClusterNodeRecord, + observed: &mut ObservedSystemState, + ) -> Result<()> { + match node.state.as_deref() { + Some("failed") | Some("draining") => { + observed.status = Some("paused".to_string()); + return Ok(()); + } + _ => {} + } + + let Some(configuration) = desired_configuration(node) else { + observed.status = Some("idle".to_string()); + return Ok(()); + }; + + let target_system = self + .build_target_system(&configuration) + .await + .with_context(|| format!("failed to build target system for {}", configuration))?; + observed.target_system = Some(target_system.clone()); + + if observed.current_system.as_deref() == Some(target_system.as_str()) { + observed.status = Some("active".to_string()); + observed.last_success = Some(Utc::now()); + return Ok(()); + } + + if !self.apply { + observed.status = Some("pending".to_string()); + return Ok(()); + } + + observed.status = Some("reconciling".to_string()); + observed.last_attempt = Some(Utc::now()); + self.switch_to_target(&target_system).await?; + + observed.current_system = read_symlink_target("/run/current-system"); + observed.booted_system = read_symlink_target("/run/booted-system"); + + if observed.current_system.as_deref() == Some(target_system.as_str()) { + observed.status = Some("active".to_string()); + observed.last_success = Some(Utc::now()); + observed.last_error = None; + return Ok(()); + } + + Err(anyhow!( + "switch completed but /run/current-system does not match target {}", + target_system + )) + } + + async fn build_target_system(&self, configuration: &str) -> Result { + let flake_attr = target_flake_attr(&self.flake_root, configuration); + let output = run_command( + "nix", + &["build", "--no-link", "--print-out-paths", flake_attr.as_str()], + ) + .await?; + let path = output + .lines() + .find(|line| !line.trim().is_empty()) + .map(str::trim) + .ok_or_else(|| anyhow!("nix build returned no output path"))?; + Ok(path.to_string()) + } + + async fn switch_to_target(&self, target_system: &str) -> Result<()> { + let switch_bin = Path::new(target_system).join("bin/switch-to-configuration"); + if !switch_bin.exists() { + return Err(anyhow!( + "target system {} does not contain switch-to-configuration", + target_system + )); + } + + run_command( + switch_bin + .to_str() + .ok_or_else(|| anyhow!("invalid switch path"))?, + &[self.switch_action.as_str()], + ) + .await?; + Ok(()) + } +} + +fn desired_configuration(node: &ClusterNodeRecord) -> Option { + node.install_plan + .as_ref() + .and_then(|plan| plan.nixos_configuration.clone()) +} + +fn target_flake_attr(flake_root: &str, configuration: &str) -> String { + format!( + "{}#nixosConfigurations.{}.config.system.build.toplevel", + flake_root, configuration + ) +} + +fn read_symlink_target(path: &str) -> Option { + fs::read_link(path) + .ok() + .map(|value| value.to_string_lossy().into_owned()) +} + +async fn run_command(program: &str, args: &[&str]) -> Result { + let output = Command::new(program) + .args(args) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output() + .await + .with_context(|| format!("failed to execute {}", program))?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + Err(anyhow!( + "{} {:?} failed with status {}: stdout='{}' stderr='{}'", + program, + args, + output.status, + stdout, + stderr + )) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env().add_directive("info".parse()?)) + .init(); + + let cli = Cli::parse(); + let once = cli.once; + let agent = Agent::new(cli); + + info!( + cluster_id = %agent.cluster_id, + node_id = %agent.node_id, + flake_root = %agent.flake_root, + apply = agent.apply, + "starting nix-agent" + ); + + if once { + agent.tick().await?; + } else { + agent.run_loop().await?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use deployer_types::InstallPlan; + + fn test_node() -> ClusterNodeRecord { + ClusterNodeRecord { + node_id: "node01".to_string(), + machine_id: None, + ip: "10.0.0.10".to_string(), + hostname: "node01".to_string(), + roles: vec!["control-plane".to_string()], + labels: Default::default(), + pool: Some("control".to_string()), + node_class: Some("control-plane".to_string()), + failure_domain: Some("rack-a".to_string()), + nix_profile: Some("profiles/control-plane".to_string()), + install_plan: Some(InstallPlan { + nixos_configuration: Some("node01".to_string()), + disko_config_path: Some("nix/nodes/vm-cluster/node01/disko.nix".to_string()), + }), + state: Some("active".to_string()), + last_heartbeat: None, + } + } + + #[test] + fn desired_configuration_prefers_install_plan() { + assert_eq!(desired_configuration(&test_node()).as_deref(), Some("node01")); + } + + #[test] + fn target_flake_attr_is_rendered_from_root_and_configuration() { + assert_eq!( + target_flake_attr("/opt/plasmacloud-src", "node01"), + "/opt/plasmacloud-src#nixosConfigurations.node01.config.system.build.toplevel" + ); + } + + #[test] + fn read_symlink_target_returns_none_for_missing_path() { + assert_eq!(read_symlink_target("/tmp/photoncloud-nix-agent-missing-link"), None); + } +} diff --git a/flake.nix b/flake.nix index 5038175..6b5a1c6 100644 --- a/flake.nix +++ b/flake.nix @@ -390,6 +390,13 @@ description = "Node-local runtime agent for PhotonCloud scheduled services"; }; + nix-agent = buildRustWorkspace { + name = "nix-agent"; + workspaceSubdir = "deployer"; + mainCrate = "nix-agent"; + description = "Node-local NixOS reconciliation agent for PhotonCloud hosts"; + }; + # -------------------------------------------------------------------- # Fleet Scheduler: Non-Kubernetes service scheduler for bare-metal nodes # -------------------------------------------------------------------- @@ -424,6 +431,7 @@ self.packages.${system}.k8shost-server self.packages.${system}.deployer-server self.packages.${system}.deployer-ctl + self.packages.${system}.nix-agent self.packages.${system}.node-agent self.packages.${system}.fleet-scheduler self.packages.${system}.vmClusterDeployerState @@ -495,6 +503,10 @@ drv = self.packages.${system}.deployer-ctl; }; + nix-agent = flake-utils.lib.mkApp { + drv = self.packages.${system}.nix-agent; + }; + node-agent = flake-utils.lib.mkApp { drv = self.packages.${system}.node-agent; }; @@ -571,6 +583,30 @@ nix-nos.nixosModules.default ./nix/nodes/vm-cluster/node01/configuration.nix self.nixosModules.default + { + services.deployer = { + enable = true; + bindAddr = "0.0.0.0:8088"; + chainfireEndpoints = [ "http://192.168.100.11:2379" ]; + clusterId = "plasmacloud-vm-cluster"; + requireChainfire = true; + allowUnknownNodes = false; + allowUnauthenticated = false; + bootstrapToken = "vm-cluster-bootstrap-token"; + adminToken = "vm-cluster-admin-token"; + seedClusterState = true; + }; + + services.nix-agent = { + enable = true; + chainfireEndpoint = "http://192.168.100.11:2379"; + clusterId = "plasmacloud-vm-cluster"; + nodeId = "node01"; + flakeRoot = self.outPath; + intervalSecs = 30; + apply = true; + }; + } { nixpkgs.overlays = [ self.overlays.default ]; } ]; }; @@ -582,6 +618,17 @@ nix-nos.nixosModules.default ./nix/nodes/vm-cluster/node02/configuration.nix self.nixosModules.default + { + services.nix-agent = { + enable = true; + chainfireEndpoint = "http://192.168.100.11:2379"; + clusterId = "plasmacloud-vm-cluster"; + nodeId = "node02"; + flakeRoot = self.outPath; + intervalSecs = 30; + apply = true; + }; + } { nixpkgs.overlays = [ self.overlays.default ]; } ]; }; @@ -593,6 +640,17 @@ nix-nos.nixosModules.default ./nix/nodes/vm-cluster/node03/configuration.nix self.nixosModules.default + { + services.nix-agent = { + enable = true; + chainfireEndpoint = "http://192.168.100.11:2379"; + clusterId = "plasmacloud-vm-cluster"; + nodeId = "node03"; + flakeRoot = self.outPath; + intervalSecs = 30; + apply = true; + }; + } { nixpkgs.overlays = [ self.overlays.default ]; } ]; }; @@ -620,6 +678,7 @@ k8shost-server = self.packages.${final.system}.k8shost-server; deployer-server = self.packages.${final.system}.deployer-server; deployer-ctl = self.packages.${final.system}.deployer-ctl; + nix-agent = self.packages.${final.system}.nix-agent; node-agent = self.packages.${final.system}.node-agent; fleet-scheduler = self.packages.${final.system}.fleet-scheduler; }; diff --git a/nix/modules/default.nix b/nix/modules/default.nix index 5b1dc56..e17342e 100644 --- a/nix/modules/default.nix +++ b/nix/modules/default.nix @@ -14,6 +14,7 @@ ./k8shost.nix ./nightlight.nix ./deployer.nix + ./nix-agent.nix ./node-agent.nix ./fleet-scheduler.nix ./observability.nix diff --git a/nix/modules/nix-agent.nix b/nix/modules/nix-agent.nix new file mode 100644 index 0000000..a062cbd --- /dev/null +++ b/nix/modules/nix-agent.nix @@ -0,0 +1,90 @@ +{ config, lib, pkgs, ... }: + +let + cfg = config.services.nix-agent; +in +{ + options.services.nix-agent = { + enable = lib.mkEnableOption "PhotonCloud nix-agent service"; + + chainfireEndpoint = lib.mkOption { + type = lib.types.str; + default = "http://127.0.0.1:7000"; + description = "ChainFire endpoint consumed by nix-agent"; + }; + + clusterNamespace = lib.mkOption { + type = lib.types.str; + default = "photoncloud"; + description = "Cluster namespace prefix"; + }; + + clusterId = lib.mkOption { + type = lib.types.str; + description = "Cluster ID reconciled by nix-agent"; + }; + + nodeId = lib.mkOption { + type = lib.types.str; + default = config.networking.hostName; + description = "Node ID represented by this agent"; + }; + + flakeRoot = lib.mkOption { + type = lib.types.str; + default = "/etc/nixos"; + description = "Flake root used to build target nixosConfigurations"; + }; + + intervalSecs = lib.mkOption { + type = lib.types.int; + default = 30; + description = "Polling interval in seconds"; + }; + + switchAction = lib.mkOption { + type = lib.types.enum [ "switch" "test" "boot" "dry-activate" ]; + default = "switch"; + description = "switch-to-configuration action executed after building the target system"; + }; + + apply = lib.mkOption { + type = lib.types.bool; + default = true; + description = "Apply desired NixOS system state on the node"; + }; + + package = lib.mkOption { + type = lib.types.package; + default = pkgs.nix-agent or (throw "nix-agent package not found"); + description = "Package to use for nix-agent"; + }; + }; + + config = lib.mkIf cfg.enable { + systemd.services.nix-agent = { + description = "PhotonCloud Nix Agent"; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" ]; + wants = [ "network-online.target" ]; + path = [ config.system.path ]; + + serviceConfig = { + Type = "simple"; + Restart = "on-failure"; + RestartSec = "5s"; + ExecStart = '' + ${cfg.package}/bin/nix-agent \ + --chainfire-endpoint ${lib.escapeShellArg cfg.chainfireEndpoint} \ + --cluster-namespace ${lib.escapeShellArg cfg.clusterNamespace} \ + --cluster-id ${lib.escapeShellArg cfg.clusterId} \ + --node-id ${lib.escapeShellArg cfg.nodeId} \ + --flake-root ${lib.escapeShellArg cfg.flakeRoot} \ + --interval-secs ${toString cfg.intervalSecs} \ + --switch-action ${lib.escapeShellArg cfg.switchAction} \ + ${lib.optionalString cfg.apply "--apply"} + ''; + }; + }; + }; +} diff --git a/nix/test-cluster/flake.nix b/nix/test-cluster/flake.nix index 411f7b3..7de63b8 100644 --- a/nix/test-cluster/flake.nix +++ b/nix/test-cluster/flake.nix @@ -31,6 +31,7 @@ k8shost-server = disableChecks prev.k8shost-server; deployer-server = disableChecks prev.deployer-server; deployer-ctl = disableChecks prev.deployer-ctl; + nix-agent = disableChecks prev.nix-agent; node-agent = disableChecks prev.node-agent; fleet-scheduler = disableChecks prev.fleet-scheduler; };