From 82a4c6a9419aa00aa2cca52e499011b87ca26fb9 Mon Sep 17 00:00:00 2001
From: centra
Date: Thu, 2 Apr 2026 07:09:03 +0900
Subject: [PATCH] Make deployer controllers watch-driven
---
chainfire/chainfire-client/src/watch.rs | 8 +-
chainfire/crates/chainfire-server/src/node.rs | 10 +-
deployer/crates/fleet-scheduler/src/main.rs | 482 +++++++++++-
.../crates/fleet-scheduler/src/watcher.rs | 122 +++
deployer/crates/node-agent/src/agent.rs | 291 ++++++-
deployer/crates/node-agent/src/main.rs | 5 +-
deployer/crates/node-agent/src/watcher.rs | 127 +--
.../plasmacloud-reconciler/src/hosts.rs | 741 ++++++++++++++++--
.../crates/plasmacloud-reconciler/src/main.rs | 1 +
.../plasmacloud-reconciler/src/watcher.rs | 122 +++
.../scripts/verify-fleet-scheduler-e2e.sh | 183 +++--
deployer/scripts/verify-host-lifecycle-e2e.sh | 84 +-
12 files changed, 1981 insertions(+), 195 deletions(-)
create mode 100644 deployer/crates/fleet-scheduler/src/watcher.rs
create mode 100644 deployer/crates/plasmacloud-reconciler/src/watcher.rs
diff --git a/chainfire/chainfire-client/src/watch.rs b/chainfire/chainfire-client/src/watch.rs
index efc2b26..d1a338f 100644
--- a/chainfire/chainfire-client/src/watch.rs
+++ b/chainfire/chainfire-client/src/watch.rs
@@ -35,6 +35,8 @@ pub struct WatchHandle {
watch_id: i64,
/// Event receiver
rx: mpsc::Receiver,
+ /// Keep the bidi request stream alive for the lifetime of the watch.
+ _req_tx: mpsc::Sender,
}
impl WatchHandle {
@@ -109,7 +111,11 @@ impl WatchHandle {
}
});
- Ok(Self { watch_id, rx })
+ Ok(Self {
+ watch_id,
+ rx,
+ _req_tx: req_tx,
+ })
}
/// Get the watch ID
diff --git a/chainfire/crates/chainfire-server/src/node.rs b/chainfire/crates/chainfire-server/src/node.rs
index 4226821..9d89eaf 100644
--- a/chainfire/crates/chainfire-server/src/node.rs
+++ b/chainfire/crates/chainfire-server/src/node.rs
@@ -11,9 +11,9 @@ use chainfire_raft::network::RaftRpcClient;
use chainfire_storage::{RocksStore, LogStorage, StateMachine};
use chainfire_types::node::NodeRole;
use chainfire_types::RaftRole;
-use chainfire_watch::WatchRegistry;
+use chainfire_watch::{stream::WatchEventHandler, WatchRegistry};
use std::sync::Arc;
-use tokio::sync::broadcast;
+use tokio::sync::{broadcast, mpsc};
use tracing::info;
/// Node instance managing all components
@@ -49,7 +49,11 @@ impl Node {
// Create LogStorage and StateMachine from store
let log_storage = Arc::new(LogStorage::new(store.clone()));
- let state_machine = Arc::new(StateMachine::new(store.clone())?);
+ let mut state_machine = StateMachine::new(store.clone())?;
+ let (watch_tx, watch_rx) = mpsc::unbounded_channel();
+ state_machine.set_watch_sender(watch_tx);
+ let state_machine = Arc::new(state_machine);
+ WatchEventHandler::new(Arc::clone(&watch_registry)).spawn_dispatcher(watch_rx);
// Create gRPC Raft client and register peer addresses
let rpc_client = Arc::new(GrpcRaftClient::new());
diff --git a/deployer/crates/fleet-scheduler/src/main.rs b/deployer/crates/fleet-scheduler/src/main.rs
index 618fea2..39f4c27 100644
--- a/deployer/crates/fleet-scheduler/src/main.rs
+++ b/deployer/crates/fleet-scheduler/src/main.rs
@@ -1,5 +1,6 @@
mod auth;
mod publish;
+mod watcher;
use anyhow::{Context, Result};
use chainfire_client::Client;
@@ -15,9 +16,11 @@ use publish::{PublicationConfig, PublicationReconciler};
use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
-use tokio::time::sleep;
+use tokio::sync::{mpsc, oneshot};
+use tokio::time::MissedTickBehavior;
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;
+use watcher::{ChainfireWatcher, WatchChange};
#[cfg(test)]
use chrono::Utc;
@@ -26,6 +29,22 @@ use deployer_types::cluster_node_is_eligible;
const MANAGED_BY: &str = "fleet-scheduler";
+fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
+ format!("{}/clusters/{}/", cluster_namespace, cluster_id)
+}
+
+fn nodes_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!("{}nodes/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
+}
+
+fn services_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!("{}services/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
+}
+
+fn instances_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!("{}instances/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
+}
+
#[derive(Debug, Parser)]
#[command(author, version, about = "PhotonCloud non-Kubernetes fleet scheduler")]
struct Cli {
@@ -81,6 +100,11 @@ struct Scheduler {
dry_run: bool,
publication: PublicationReconciler,
once: bool,
+ watched_node_signatures: HashMap>,
+ watched_service_signatures: HashMap>,
+ watched_instance_signatures: HashMap>,
+ pending_instance_writes: HashMap>,
+ pending_instance_deletes: HashSet,
}
#[derive(Debug)]
@@ -100,6 +124,85 @@ struct ReconcilePlan {
type DependencySummary = ServiceDependencySummary;
+async fn spawn_watch_task(
+ watcher: ChainfireWatcher,
+ tx: mpsc::Sender,
+ ready: oneshot::Sender<()>,
+) {
+ let mut ready = Some(ready);
+ if let Err(error) = watcher
+ .watch_with_ready(
+ move || {
+ if let Some(ready) = ready.take() {
+ let _ = ready.send(());
+ }
+ Ok(())
+ },
+ move |change| {
+ tx.try_send(change).map_err(|send_error| {
+ anyhow::anyhow!("failed to enqueue watch change: {send_error}")
+ })
+ },
+ )
+ .await
+ {
+ warn!(error = %error, "fleet scheduler watch task exited");
+ }
+}
+
+async fn wait_for_watchers_ready(receivers: Vec>) {
+ for receiver in receivers {
+ match tokio::time::timeout(Duration::from_secs(10), receiver).await {
+ Ok(Ok(())) => {}
+ Ok(Err(_)) => warn!("fleet scheduler watch task ended before signaling readiness"),
+ Err(_) => warn!("timed out waiting for fleet scheduler watch readiness"),
+ }
+ }
+}
+
+fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option> {
+ let mut value: Value = serde_json::from_slice(bytes).ok()?;
+ strip_observed_fields(&mut value, drop_keys);
+ serde_json::to_vec(&sort_json_value(value)).ok()
+}
+
+fn strip_observed_fields(value: &mut Value, drop_keys: &[&str]) {
+ match value {
+ Value::Object(map) => {
+ for key in drop_keys {
+ map.remove(*key);
+ }
+ for nested in map.values_mut() {
+ strip_observed_fields(nested, drop_keys);
+ }
+ }
+ Value::Array(items) => {
+ for item in items {
+ strip_observed_fields(item, drop_keys);
+ }
+ }
+ _ => {}
+ }
+}
+
+fn sort_json_value(value: Value) -> Value {
+ match value {
+ Value::Array(items) => Value::Array(items.into_iter().map(sort_json_value).collect()),
+ Value::Object(map) => {
+ let mut keys = map.keys().cloned().collect::>();
+ keys.sort();
+ let mut normalized = serde_json::Map::new();
+ for key in keys {
+ if let Some(value) = map.get(&key) {
+ normalized.insert(key, sort_json_value(value.clone()));
+ }
+ }
+ Value::Object(normalized)
+ }
+ other => other,
+ }
+}
+
impl Scheduler {
fn new(cli: Cli) -> Self {
let cluster_namespace = cli.cluster_namespace;
@@ -125,23 +228,58 @@ impl Scheduler {
dry_run: cli.dry_run,
}),
once: cli.once,
+ watched_node_signatures: HashMap::new(),
+ watched_service_signatures: HashMap::new(),
+ watched_instance_signatures: HashMap::new(),
+ pending_instance_writes: HashMap::new(),
+ pending_instance_deletes: HashSet::new(),
}
}
- async fn run_loop(&self) -> Result<()> {
+ async fn run_loop(&mut self) -> Result<()> {
if self.once {
return self.reconcile_once().await;
}
+ self.reconcile_once().await?;
+
+ let (watch_tx, mut watch_rx) = mpsc::channel::(128);
+ let ready_receivers = self.spawn_watchers(watch_tx);
+ wait_for_watchers_ready(ready_receivers).await;
+ self.reconcile_once().await?;
+
+ let mut interval = tokio::time::interval(self.interval);
+ interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
+ interval.tick().await;
+
loop {
- if let Err(error) = self.reconcile_once().await {
- warn!(error = %error, "fleet scheduler reconciliation failed");
+ tokio::select! {
+ _ = interval.tick() => {
+ if let Err(error) = self.reconcile_once().await {
+ warn!(error = %error, "fleet scheduler reconciliation failed");
+ }
+ }
+ maybe_change = watch_rx.recv() => {
+ let Some(change) = maybe_change else {
+ continue;
+ };
+
+ let mut should_reconcile = self.apply_watch_change(&change);
+ while let Ok(extra) = watch_rx.try_recv() {
+ should_reconcile |= self.apply_watch_change(&extra);
+ }
+
+ if should_reconcile {
+ if let Err(error) = self.reconcile_once().await {
+ warn!(error = %error, "fleet scheduler watch-triggered reconciliation failed");
+ }
+ }
+ }
}
- sleep(self.interval).await;
}
}
- async fn reconcile_once(&self) -> Result<()> {
+ async fn reconcile_once(&mut self) -> Result<()> {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_cluster_nodes(&mut client).await?;
let services = self.load_services(&mut client).await?;
@@ -219,6 +357,152 @@ impl Scheduler {
Ok(())
}
+ fn spawn_watchers(&self, tx: mpsc::Sender) -> Vec> {
+ let mut ready_receivers = Vec::new();
+
+ let node_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ nodes_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ let (node_ready_tx, node_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx));
+ ready_receivers.push(node_ready_rx);
+
+ let service_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ services_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ let (service_ready_tx, service_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(
+ service_watcher,
+ tx.clone(),
+ service_ready_tx,
+ ));
+ ready_receivers.push(service_ready_rx);
+
+ let instance_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ instances_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ let (instance_ready_tx, instance_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(instance_watcher, tx, instance_ready_tx));
+ ready_receivers.push(instance_ready_rx);
+
+ ready_receivers
+ }
+
+ fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
+ let key = String::from_utf8_lossy(&change.key).to_string();
+ let nodes_prefix = String::from_utf8_lossy(&nodes_prefix(&self.cluster_namespace, &self.cluster_id))
+ .to_string();
+ let services_prefix =
+ String::from_utf8_lossy(&services_prefix(&self.cluster_namespace, &self.cluster_id))
+ .to_string();
+ let instances_prefix =
+ String::from_utf8_lossy(&instances_prefix(&self.cluster_namespace, &self.cluster_id))
+ .to_string();
+
+ if let Some(suffix) = key.strip_prefix(&nodes_prefix) {
+ if !suffix.is_empty() && !suffix.contains('/') {
+ return self.apply_node_watch_change(&key, change);
+ }
+ return false;
+ }
+
+ if let Some(suffix) = key.strip_prefix(&services_prefix) {
+ if !suffix.is_empty() && !suffix.contains('/') {
+ return self.apply_service_watch_change(&key, change);
+ }
+ return false;
+ }
+
+ if key.starts_with(&instances_prefix) {
+ return self.apply_instance_watch_change(&key, change);
+ }
+
+ false
+ }
+
+ fn apply_node_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_node_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat"]) else {
+ return true;
+ };
+ if self.watched_node_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_node_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_service_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_service_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &[]) else {
+ return true;
+ };
+ if self.watched_service_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_service_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_instance_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if let Some(expected) = self.pending_instance_writes.get(key) {
+ if matches!(change.event_type, chainfire_client::EventType::Put) && change.value == *expected
+ {
+ self.pending_instance_writes.remove(key);
+ self.pending_instance_deletes.remove(key);
+ if let Some(signature) =
+ canonical_json_signature(&change.value, &["last_heartbeat", "observed_at"])
+ {
+ self.watched_instance_signatures
+ .insert(key.to_string(), signature);
+ } else {
+ self.watched_instance_signatures.remove(key);
+ }
+ return false;
+ }
+ self.pending_instance_writes.remove(key);
+ }
+
+ if self.pending_instance_deletes.remove(key)
+ && matches!(change.event_type, chainfire_client::EventType::Delete)
+ {
+ self.watched_instance_signatures.remove(key);
+ return false;
+ }
+
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_instance_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat", "observed_at"])
+ else {
+ self.watched_instance_signatures.remove(key);
+ return true;
+ };
+
+ if self.watched_instance_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+
+ self.watched_instance_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
async fn load_cluster_nodes(&self, client: &mut Client) -> Result> {
let prefix = format!(
"{}/clusters/{}/nodes/",
@@ -364,7 +648,7 @@ impl Scheduler {
}
async fn reconcile_service(
- &self,
+ &mut self,
client: &mut Client,
nodes: &[ClusterNodeRecord],
service: &ServiceSpec,
@@ -420,9 +704,15 @@ impl Scheduler {
"would upsert managed instance"
);
} else {
- client
- .put(&key, serde_json::to_vec(&upsert.desired_value)?)
- .await?;
+ let key_str = key.clone();
+ let payload = serde_json::to_vec(&upsert.desired_value)?;
+ self.pending_instance_deletes.remove(&key_str);
+ self.pending_instance_writes
+ .insert(key_str.clone(), payload.clone());
+ if let Err(error) = client.put(&key, &payload).await {
+ self.pending_instance_writes.remove(&key_str);
+ return Err(error.into());
+ }
info!(
service = %service.name,
instance_id = %upsert.instance.instance_id,
@@ -456,12 +746,19 @@ impl Scheduler {
instance_id = %instance_id,
"would delete stale managed instance"
);
- } else if client.delete(&key).await? {
- info!(
- service = %service.name,
- instance_id = %instance_id,
- "deleted stale managed instance"
- );
+ } else {
+ let key_str = key.clone();
+ self.pending_instance_writes.remove(&key_str);
+ self.pending_instance_deletes.insert(key_str.clone());
+ if client.delete(&key).await? {
+ info!(
+ service = %service.name,
+ instance_id = %instance_id,
+ "deleted stale managed instance"
+ );
+ } else {
+ self.pending_instance_deletes.remove(&key_str);
+ }
}
}
@@ -1095,7 +1392,7 @@ async fn main() -> Result<()> {
.with_env_filter(EnvFilter::from_default_env().add_directive("info".parse()?))
.init();
- let scheduler = Scheduler::new(Cli::parse());
+ let mut scheduler = Scheduler::new(Cli::parse());
scheduler.run_loop().await
}
@@ -1103,6 +1400,7 @@ async fn main() -> Result<()> {
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
+ use chainfire_client::EventType;
use deployer_types::{
ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec,
PublishedLoadBalancerState, RolloutStrategySpec, ServiceDependencyCondition,
@@ -1206,6 +1504,25 @@ mod tests {
}
}
+ fn test_scheduler() -> Scheduler {
+ Scheduler::new(Cli {
+ chainfire_endpoint: "http://127.0.0.1:7000".to_string(),
+ cluster_namespace: "photoncloud".to_string(),
+ cluster_id: "test-cluster".to_string(),
+ interval_secs: 1,
+ heartbeat_timeout_secs: 300,
+ dry_run: true,
+ iam_endpoint: None,
+ fiberlb_endpoint: None,
+ flashdns_endpoint: None,
+ publish_address: None,
+ default_org_id: "default-org".to_string(),
+ default_project_id: "default-project".to_string(),
+ controller_principal_id: MANAGED_BY.to_string(),
+ once: false,
+ })
+ }
+
#[test]
fn test_node_eligibility_matches_roles_and_labels() {
let node = active_node("node01", &["worker"], &[("tier", "general")]);
@@ -1836,4 +2153,135 @@ mod tests {
assert_eq!(instance_id, "api-node01-3");
}
+
+ #[test]
+ fn test_scheduler_watch_ignores_node_heartbeat_only_changes() {
+ let mut scheduler = test_scheduler();
+ let node_key = format!(
+ "{}nodes/node01",
+ cluster_prefix(&scheduler.cluster_namespace, &scheduler.cluster_id)
+ );
+
+ let first = WatchChange {
+ event_type: EventType::Put,
+ key: node_key.as_bytes().to_vec(),
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "active",
+ "last_heartbeat": "2026-04-01T00:00:00Z"
+ }))
+ .unwrap(),
+ revision: 1,
+ };
+ assert!(scheduler.apply_watch_change(&first));
+
+ let heartbeat_only = WatchChange {
+ revision: 2,
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "active",
+ "last_heartbeat": "2026-04-01T00:00:05Z"
+ }))
+ .unwrap(),
+ ..first.clone()
+ };
+ assert!(!scheduler.apply_watch_change(&heartbeat_only));
+
+ let drain = WatchChange {
+ revision: 3,
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "draining",
+ "last_heartbeat": "2026-04-01T00:00:10Z"
+ }))
+ .unwrap(),
+ ..first
+ };
+ assert!(scheduler.apply_watch_change(&drain));
+ }
+
+ #[test]
+ fn test_scheduler_watch_suppresses_self_written_instance_update_but_reacts_to_health_change() {
+ let mut scheduler = test_scheduler();
+ let key = instance_key(
+ &scheduler.cluster_namespace,
+ &scheduler.cluster_id,
+ "api",
+ "api-node01",
+ );
+ let initial = serde_json::to_vec(&serde_json::json!({
+ "instance_id": "api-node01",
+ "service": "api",
+ "node_id": "node01",
+ "ip": "10.0.0.11",
+ "port": 18080,
+ "managed_by": "fleet-scheduler",
+ "state": "starting",
+ "last_heartbeat": "2026-04-01T00:00:00Z",
+ "observed_at": "2026-04-01T00:00:00Z"
+ }))
+ .unwrap();
+
+ let first = WatchChange {
+ event_type: EventType::Put,
+ key: key.as_bytes().to_vec(),
+ value: initial.clone(),
+ revision: 1,
+ };
+ assert!(scheduler.apply_watch_change(&first));
+
+ let self_write = serde_json::to_vec(&serde_json::json!({
+ "instance_id": "api-node01",
+ "service": "api",
+ "node_id": "node01",
+ "ip": "10.0.0.11",
+ "port": 18081,
+ "managed_by": "fleet-scheduler",
+ "state": "starting",
+ "last_heartbeat": "2026-04-01T00:00:00Z",
+ "observed_at": "2026-04-01T00:00:00Z"
+ }))
+ .unwrap();
+ scheduler
+ .pending_instance_writes
+ .insert(key.clone(), self_write.clone());
+
+ assert!(!scheduler.apply_watch_change(&WatchChange {
+ event_type: EventType::Put,
+ key: key.as_bytes().to_vec(),
+ value: self_write,
+ revision: 2,
+ }));
+
+ let healthy = serde_json::to_vec(&serde_json::json!({
+ "instance_id": "api-node01",
+ "service": "api",
+ "node_id": "node01",
+ "ip": "10.0.0.11",
+ "port": 18081,
+ "managed_by": "fleet-scheduler",
+ "state": "healthy",
+ "last_heartbeat": "2026-04-01T00:00:05Z",
+ "observed_at": "2026-04-01T00:00:05Z"
+ }))
+ .unwrap();
+ assert!(scheduler.apply_watch_change(&WatchChange {
+ event_type: EventType::Put,
+ key: key.as_bytes().to_vec(),
+ value: healthy,
+ revision: 3,
+ }));
+ }
}
diff --git a/deployer/crates/fleet-scheduler/src/watcher.rs b/deployer/crates/fleet-scheduler/src/watcher.rs
new file mode 100644
index 0000000..e1f1dd9
--- /dev/null
+++ b/deployer/crates/fleet-scheduler/src/watcher.rs
@@ -0,0 +1,122 @@
+use std::time::Duration;
+
+use anyhow::Result;
+use chainfire_client::{Client, EventType};
+use tracing::{info, warn};
+
+#[allow(dead_code)]
+#[derive(Debug, Clone, Copy)]
+pub enum WatchScope {
+ Key,
+ Prefix,
+}
+
+#[derive(Debug, Clone)]
+pub struct WatchChange {
+ pub event_type: EventType,
+ pub key: Vec,
+ pub value: Vec,
+ #[allow(dead_code)]
+ pub revision: u64,
+}
+
+pub struct ChainfireWatcher {
+ endpoint: String,
+ scope: WatchScope,
+ target: Vec,
+ reconnect_backoff: Duration,
+}
+
+impl ChainfireWatcher {
+ #[allow(dead_code)]
+ pub fn key(endpoint: String, key: Vec) -> Self {
+ Self {
+ endpoint,
+ scope: WatchScope::Key,
+ target: key,
+ reconnect_backoff: Duration::from_secs(1),
+ }
+ }
+
+ pub fn prefix(endpoint: String, prefix: Vec) -> Self {
+ Self {
+ endpoint,
+ scope: WatchScope::Prefix,
+ target: prefix,
+ reconnect_backoff: Duration::from_secs(1),
+ }
+ }
+
+ #[allow(dead_code)]
+ pub async fn watch(&self, mut callback: F) -> Result<()>
+ where
+ F: FnMut(WatchChange) -> Result<()>,
+ {
+ self.watch_with_ready(|| Ok(()), move |change| callback(change))
+ .await
+ }
+
+ pub async fn watch_with_ready(&self, mut on_connected: G, mut callback: F) -> Result<()>
+ where
+ F: FnMut(WatchChange) -> Result<()>,
+ G: FnMut() -> Result<()>,
+ {
+ loop {
+ match Client::connect(self.endpoint.clone()).await {
+ Ok(mut client) => {
+ let watch_result = match self.scope {
+ WatchScope::Key => client.watch(&self.target).await,
+ WatchScope::Prefix => client.watch_prefix(&self.target).await,
+ };
+
+ match watch_result {
+ Ok(mut handle) => {
+ info!(
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ watch_id = handle.id(),
+ "connected ChainFire watch"
+ );
+ on_connected()?;
+
+ while let Some(event) = handle.recv().await {
+ if let Err(error) = callback(WatchChange {
+ event_type: event.event_type,
+ key: event.key,
+ value: event.value,
+ revision: event.revision,
+ }) {
+ warn!(error = %error, "watch callback failed");
+ }
+ }
+
+ warn!(
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "ChainFire watch stream ended; reconnecting"
+ );
+ }
+ Err(error) => {
+ warn!(
+ error = %error,
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "failed to create ChainFire watch"
+ );
+ }
+ }
+ }
+ Err(error) => {
+ warn!(
+ error = %error,
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "failed to connect ChainFire watch"
+ );
+ }
+ }
+
+ tokio::time::sleep(self.reconnect_backoff).await;
+ }
+ }
+}
diff --git a/deployer/crates/node-agent/src/agent.rs b/deployer/crates/node-agent/src/agent.rs
index 770d0bd..1438857 100644
--- a/deployer/crates/node-agent/src/agent.rs
+++ b/deployer/crates/node-agent/src/agent.rs
@@ -15,10 +15,12 @@ use deployer_types::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
-use tokio::time::sleep;
+use tokio::sync::mpsc;
+use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
use crate::process::{render_container_process_spec, ProcessManager};
+use crate::watcher::{ChainfireWatcher, WatchChange};
fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/clusters/{}/", cluster_namespace, cluster_id)
@@ -48,6 +50,10 @@ fn key_instance(
.into_bytes()
}
+fn instances_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!("{}instances/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
+}
+
fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!(
"{}/clusters/{}/service-statuses/",
@@ -92,6 +98,8 @@ pub struct Agent {
allow_local_instance_upsert: bool,
process_manager: ProcessManager,
next_health_checks: HashMap>,
+ watched_node_signature: Option>,
+ watched_instance_signatures: HashMap>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
@@ -112,6 +120,61 @@ struct LocalInstanceSpec {
container: Option,
}
+async fn spawn_watch_task(watcher: ChainfireWatcher, tx: mpsc::Sender) {
+ if let Err(error) = watcher
+ .watch(move |change| {
+ tx.try_send(change)
+ .map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error))
+ })
+ .await
+ {
+ warn!(error = %error, "node-agent watch task exited");
+ }
+}
+
+fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option> {
+ let mut value: Value = serde_json::from_slice(bytes).ok()?;
+ strip_observed_fields(&mut value, drop_keys);
+ serde_json::to_vec(&sort_json_value(value)).ok()
+}
+
+fn strip_observed_fields(value: &mut Value, drop_keys: &[&str]) {
+ match value {
+ Value::Object(map) => {
+ for key in drop_keys {
+ map.remove(*key);
+ }
+ for nested in map.values_mut() {
+ strip_observed_fields(nested, drop_keys);
+ }
+ }
+ Value::Array(items) => {
+ for item in items {
+ strip_observed_fields(item, drop_keys);
+ }
+ }
+ _ => {}
+ }
+}
+
+fn sort_json_value(value: Value) -> Value {
+ match value {
+ Value::Array(items) => Value::Array(items.into_iter().map(sort_json_value).collect()),
+ Value::Object(map) => {
+ let mut keys = map.keys().cloned().collect::>();
+ keys.sort();
+ let mut normalized = serde_json::Map::new();
+ for key in keys {
+ if let Some(value) = map.get(&key) {
+ normalized.insert(key, sort_json_value(value.clone()));
+ }
+ }
+ Value::Object(normalized)
+ }
+ other => other,
+ }
+}
+
impl Agent {
pub fn new(
endpoint: String,
@@ -135,15 +198,44 @@ impl Agent {
allow_local_instance_upsert,
process_manager: ProcessManager::new(pid_dir),
next_health_checks: HashMap::new(),
+ watched_node_signature: None,
+ watched_instance_signatures: HashMap::new(),
}
}
pub async fn run_loop(&mut self) -> Result<()> {
+ self.tick().await?;
+
+ let (watch_tx, mut watch_rx) = mpsc::channel::(64);
+ self.spawn_watchers(watch_tx);
+ let mut interval = tokio::time::interval(self.interval);
+ interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
+ interval.tick().await;
+
loop {
- if let Err(e) = self.tick().await {
- warn!(error = %e, "node-agent tick failed");
+ tokio::select! {
+ _ = interval.tick() => {
+ if let Err(error) = self.tick().await {
+ warn!(error = %error, "node-agent tick failed");
+ }
+ }
+ maybe_change = watch_rx.recv() => {
+ let Some(change) = maybe_change else {
+ continue;
+ };
+
+ let mut should_reconcile = self.apply_watch_change(&change);
+ while let Ok(extra) = watch_rx.try_recv() {
+ should_reconcile |= self.apply_watch_change(&extra);
+ }
+
+ if should_reconcile {
+ if let Err(error) = self.tick().await {
+ warn!(error = %error, "node-agent watch-triggered tick failed");
+ }
+ }
+ }
}
- sleep(self.interval).await;
}
}
@@ -234,6 +326,76 @@ impl Agent {
);
}
+ fn spawn_watchers(&self, tx: mpsc::Sender) {
+ let node_watcher = ChainfireWatcher::key(
+ self.endpoint.clone(),
+ key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id),
+ );
+ tokio::spawn(spawn_watch_task(node_watcher, tx.clone()));
+
+ let instances_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ instances_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ tokio::spawn(spawn_watch_task(instances_watcher, tx));
+ }
+
+ fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
+ let key = String::from_utf8_lossy(&change.key).to_string();
+ let node_key =
+ String::from_utf8_lossy(&key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id))
+ .to_string();
+ let instances_prefix =
+ String::from_utf8_lossy(&instances_prefix(&self.cluster_namespace, &self.cluster_id))
+ .to_string();
+
+ if key == node_key {
+ return self.apply_node_watch_change(change);
+ }
+
+ if key.starts_with(&instances_prefix) {
+ return self.apply_instance_watch_change(&key, change);
+ }
+
+ false
+ }
+
+ fn apply_node_watch_change(&mut self, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_node_signature = None;
+ return true;
+ }
+ let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat"]) else {
+ return true;
+ };
+ if self.watched_node_signature.as_ref() == Some(&signature) {
+ return false;
+ }
+ self.watched_node_signature = Some(signature);
+ true
+ }
+
+ fn apply_instance_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_instance_signatures.remove(key);
+ return true;
+ }
+ let Some(signature) =
+ canonical_json_signature(&change.value, &["state", "last_heartbeat", "observed_at"])
+ else {
+ self.watched_instance_signatures.remove(key);
+ return true;
+ };
+
+ if self.watched_instance_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+
+ self.watched_instance_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
fn is_local_instance(&self, inst: &LocalInstanceSpec) -> bool {
matches!(inst.node_id.as_deref(), Some(node_id) if node_id == self.node_id)
}
@@ -1099,4 +1261,125 @@ mod tests {
Some("healthy")
);
}
+
+ #[test]
+ fn watch_signature_ignores_node_heartbeat_only_changes() {
+ let mut agent = test_agent();
+ let node_key = key_node(&agent.cluster_namespace, &agent.cluster_id, &agent.node_id);
+
+ let first = WatchChange {
+ event_type: chainfire_client::EventType::Put,
+ key: node_key.clone(),
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "active",
+ "last_heartbeat": "2026-04-01T00:00:00Z"
+ }))
+ .unwrap(),
+ revision: 1,
+ };
+ assert!(agent.apply_watch_change(&first));
+
+ let heartbeat_only = WatchChange {
+ revision: 2,
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "active",
+ "last_heartbeat": "2026-04-01T00:00:05Z"
+ }))
+ .unwrap(),
+ ..first.clone()
+ };
+ assert!(!agent.apply_watch_change(&heartbeat_only));
+
+ let drain = WatchChange {
+ revision: 3,
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "draining",
+ "last_heartbeat": "2026-04-01T00:00:10Z"
+ }))
+ .unwrap(),
+ ..first
+ };
+ assert!(agent.apply_watch_change(&drain));
+ }
+
+ #[test]
+ fn watch_signature_ignores_instance_observed_fields_only_changes() {
+ let mut agent = test_agent();
+ let instance_key = key_instance(
+ &agent.cluster_namespace,
+ &agent.cluster_id,
+ "api",
+ "api-node01",
+ );
+
+ let first = WatchChange {
+ event_type: chainfire_client::EventType::Put,
+ key: instance_key.clone(),
+ value: serde_json::to_vec(&serde_json::json!({
+ "instance_id": "api-node01",
+ "service": "api",
+ "node_id": "node01",
+ "ip": "127.0.0.2",
+ "port": 18080,
+ "managed_by": "fleet-scheduler",
+ "state": null,
+ "last_heartbeat": null,
+ "observed_at": null
+ }))
+ .unwrap(),
+ revision: 1,
+ };
+ assert!(agent.apply_watch_change(&first));
+
+ let observed_only = WatchChange {
+ revision: 2,
+ value: serde_json::to_vec(&serde_json::json!({
+ "instance_id": "api-node01",
+ "service": "api",
+ "node_id": "node01",
+ "ip": "127.0.0.2",
+ "port": 18080,
+ "managed_by": "fleet-scheduler",
+ "state": "healthy",
+ "last_heartbeat": "2026-04-01T00:00:05Z",
+ "observed_at": "2026-04-01T00:00:05Z"
+ }))
+ .unwrap(),
+ ..first.clone()
+ };
+ assert!(!agent.apply_watch_change(&observed_only));
+
+ let desired_change = WatchChange {
+ revision: 3,
+ value: serde_json::to_vec(&serde_json::json!({
+ "instance_id": "api-node01",
+ "service": "api",
+ "node_id": "node01",
+ "ip": "127.0.0.2",
+ "port": 18081,
+ "managed_by": "fleet-scheduler",
+ "state": "healthy",
+ "last_heartbeat": "2026-04-01T00:00:10Z",
+ "observed_at": "2026-04-01T00:00:10Z"
+ }))
+ .unwrap(),
+ ..first
+ };
+ assert!(agent.apply_watch_change(&desired_change));
+ }
}
diff --git a/deployer/crates/node-agent/src/main.rs b/deployer/crates/node-agent/src/main.rs
index d8c8d8f..6f42856 100644
--- a/deployer/crates/node-agent/src/main.rs
+++ b/deployer/crates/node-agent/src/main.rs
@@ -7,11 +7,12 @@ use tracing_subscriber::EnvFilter;
mod agent;
mod process;
+mod watcher;
/// PhotonCloud NodeAgent
///
/// - Chainfire 上の `photoncloud/clusters/{cluster_id}/nodes/{node_id}` と
-/// `.../instances/*` をポーリング/将来的には watch してローカル状態と比較する。
+/// `.../instances/*` を watch しつつ、周期 heartbeat/safety reconcile も行う。
/// - `--apply` が指定された場合のみプロセス起動/停止を行う(デフォルトは dry-run)。
#[derive(Parser, Debug)]
#[command(author, version, about)]
@@ -32,7 +33,7 @@ struct Cli {
#[arg(long)]
node_id: String,
- /// ポーリング間隔(秒)
+ /// heartbeat / safety reconcile 間隔(秒)
#[arg(long, default_value_t = 15)]
interval_secs: u64,
diff --git a/deployer/crates/node-agent/src/watcher.rs b/deployer/crates/node-agent/src/watcher.rs
index 2bcf603..4be0430 100644
--- a/deployer/crates/node-agent/src/watcher.rs
+++ b/deployer/crates/node-agent/src/watcher.rs
@@ -1,74 +1,109 @@
use std::time::Duration;
use anyhow::Result;
-use chainfire_client::Client;
-use tokio::time::sleep;
+use chainfire_client::{Client, EventType};
use tracing::{info, warn};
+#[derive(Debug, Clone, Copy)]
+pub enum WatchScope {
+ Key,
+ Prefix,
+}
+
+#[derive(Debug, Clone)]
+pub struct WatchChange {
+ pub event_type: EventType,
+ pub key: Vec,
+ pub value: Vec,
+ #[allow(dead_code)]
+ pub revision: u64,
+}
+
pub struct ChainfireWatcher {
endpoint: String,
- prefix: String,
- interval: Duration,
+ scope: WatchScope,
+ target: Vec,
+ reconnect_backoff: Duration,
}
impl ChainfireWatcher {
- pub fn new(endpoint: String, prefix: String, interval_secs: u64) -> Self {
+ pub fn key(endpoint: String, key: Vec) -> Self {
Self {
endpoint,
- prefix,
- interval: Duration::from_secs(interval_secs),
+ scope: WatchScope::Key,
+ target: key,
+ reconnect_backoff: Duration::from_secs(1),
+ }
+ }
+
+ pub fn prefix(endpoint: String, prefix: Vec) -> Self {
+ Self {
+ endpoint,
+ scope: WatchScope::Prefix,
+ target: prefix,
+ reconnect_backoff: Duration::from_secs(1),
}
}
pub async fn watch(&self, mut callback: F) -> Result<()>
where
- F: FnMut(Vec<(Vec, Vec)>) -> Result<()>,
+ F: FnMut(WatchChange) -> Result<()>,
{
- let mut last_revision = 0u64;
-
loop {
- match self.fetch_updates(last_revision).await {
- Ok((kvs, max_rev)) => {
- if !kvs.is_empty() {
- info!(
- prefix = %self.prefix,
- count = kvs.len(),
- "detected changes in Chainfire"
- );
- if let Err(e) = callback(kvs) {
- warn!(error = %e, "callback failed");
- } else if max_rev > last_revision {
- last_revision = max_rev;
+ match Client::connect(self.endpoint.clone()).await {
+ Ok(mut client) => {
+ let watch_result = match self.scope {
+ WatchScope::Key => client.watch(&self.target).await,
+ WatchScope::Prefix => client.watch_prefix(&self.target).await,
+ };
+
+ match watch_result {
+ Ok(mut handle) => {
+ info!(
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ watch_id = handle.id(),
+ "connected ChainFire watch"
+ );
+
+ while let Some(event) = handle.recv().await {
+ if let Err(error) = callback(WatchChange {
+ event_type: event.event_type,
+ key: event.key,
+ value: event.value,
+ revision: event.revision,
+ }) {
+ warn!(error = %error, "watch callback failed");
+ }
+ }
+
+ warn!(
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "ChainFire watch stream ended; reconnecting"
+ );
+ }
+ Err(error) => {
+ warn!(
+ error = %error,
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "failed to create ChainFire watch"
+ );
}
- } else if max_rev > last_revision {
- last_revision = max_rev;
}
}
- Err(e) => {
- warn!(error = %e, "failed to fetch updates from Chainfire");
+ Err(error) => {
+ warn!(
+ error = %error,
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "failed to connect ChainFire watch"
+ );
}
}
- sleep(self.interval).await;
+ tokio::time::sleep(self.reconnect_backoff).await;
}
}
-
- async fn fetch_updates(&self, last_revision: u64) -> Result<(Vec<(Vec, Vec)>, u64)> {
- let mut client = Client::connect(self.endpoint.clone()).await?;
- let (kvs, _) = client.scan_prefix(self.prefix.as_bytes(), 0).await?;
-
- // 簡易実装: 全てのKVペアを返す(revisionフィルタリングは未実装)
- let mut max_rev = last_revision;
- let mut result = Vec::new();
- for (k, v, rev) in kvs {
- if rev > last_revision {
- result.push((k, v));
- if rev > max_rev {
- max_rev = rev;
- }
- }
- }
-
- Ok((result, max_rev))
- }
}
diff --git a/deployer/crates/plasmacloud-reconciler/src/hosts.rs b/deployer/crates/plasmacloud-reconciler/src/hosts.rs
index a17803e..313befe 100644
--- a/deployer/crates/plasmacloud-reconciler/src/hosts.rs
+++ b/deployer/crates/plasmacloud-reconciler/src/hosts.rs
@@ -7,15 +7,35 @@ use deployer_types::{
HostDeploymentSpec, HostDeploymentStatus, InstallState, ObservedSystemState,
ServiceInstanceSpec,
};
+use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
-use tokio::time::sleep;
+use tokio::sync::{mpsc, oneshot};
+use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
+use crate::watcher::{ChainfireWatcher, WatchChange};
+
fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/clusters/{}/", cluster_namespace, cluster_id)
}
+fn nodes_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!("{}nodes/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
+}
+
+fn instances_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!("{}instances/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
+}
+
+fn host_deployments_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec {
+ format!(
+ "{}deployments/hosts/",
+ cluster_prefix(cluster_namespace, cluster_id)
+ )
+ .into_bytes()
+}
+
fn key_node(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec {
format!(
"{}nodes/{}",
@@ -72,17 +92,8 @@ pub struct HostsCommand {
}
pub async fn run(command: HostsCommand) -> Result<()> {
- let controller = HostDeploymentController::new(command);
- if controller.once {
- controller.reconcile_once().await
- } else {
- loop {
- if let Err(error) = controller.reconcile_once().await {
- warn!(error = %error, "host deployment reconciliation failed");
- }
- sleep(controller.interval).await;
- }
- }
+ let mut controller = HostDeploymentController::new(command);
+ controller.run_loop().await
}
struct HostDeploymentController {
@@ -93,6 +104,102 @@ struct HostDeploymentController {
heartbeat_timeout_secs: u64,
dry_run: bool,
once: bool,
+ watched_node_signatures: HashMap>,
+ watched_desired_signatures: HashMap>,
+ watched_observed_signatures: HashMap>,
+ watched_instance_signatures: HashMap>,
+ watched_deployment_spec_signatures: HashMap>,
+ watched_deployment_status_signatures: HashMap>,
+ pending_kv_writes: HashMap>,
+ pending_kv_deletes: HashSet,
+}
+
+async fn spawn_watch_task(
+ watcher: ChainfireWatcher,
+ tx: mpsc::Sender,
+ ready: oneshot::Sender<()>,
+) {
+ let mut ready = Some(ready);
+ if let Err(error) = watcher
+ .watch_with_ready(
+ move || {
+ if let Some(ready) = ready.take() {
+ let _ = ready.send(());
+ }
+ Ok(())
+ },
+ move |change| {
+ tx.try_send(change).map_err(|send_error| {
+ anyhow::anyhow!("failed to enqueue watch change: {send_error}")
+ })
+ },
+ )
+ .await
+ {
+ warn!(error = %error, "host deployment watch task exited");
+ }
+}
+
+async fn wait_for_watchers_ready(receivers: Vec>) {
+ for receiver in receivers {
+ match tokio::time::timeout(Duration::from_secs(10), receiver).await {
+ Ok(Ok(())) => {}
+ Ok(Err(_)) => warn!("host deployment watch task ended before signaling readiness"),
+ Err(_) => warn!("timed out waiting for host deployment watch readiness"),
+ }
+ }
+}
+
+fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option> {
+ let mut value: Value = serde_json::from_slice(bytes).ok()?;
+ strip_observed_fields(&mut value, drop_keys);
+ serde_json::to_vec(&sort_json_value(value)).ok()
+}
+
+fn strip_observed_fields(value: &mut Value, drop_keys: &[&str]) {
+ match value {
+ Value::Object(map) => {
+ for key in drop_keys {
+ map.remove(*key);
+ }
+ for nested in map.values_mut() {
+ strip_observed_fields(nested, drop_keys);
+ }
+ }
+ Value::Array(items) => {
+ for item in items {
+ strip_observed_fields(item, drop_keys);
+ }
+ }
+ _ => {}
+ }
+}
+
+fn sort_json_value(value: Value) -> Value {
+ match value {
+ Value::Array(items) => Value::Array(items.into_iter().map(sort_json_value).collect()),
+ Value::Object(map) => {
+ let mut keys = map.keys().cloned().collect::>();
+ keys.sort();
+ let mut normalized = serde_json::Map::new();
+ for key in keys {
+ if let Some(value) = map.get(&key) {
+ normalized.insert(key, sort_json_value(value.clone()));
+ }
+ }
+ Value::Object(normalized)
+ }
+ other => other,
+ }
+}
+
+fn host_deployment_status_signature(bytes: &[u8]) -> Option> {
+ let value: Value = serde_json::from_slice(bytes).ok()?;
+ serde_json::to_vec(&serde_json::json!({
+ "paused_by_operator": value.get("paused_by_operator").and_then(Value::as_bool).unwrap_or(false),
+ "aborted": value.get("phase").and_then(Value::as_str) == Some("aborted"),
+ }))
+ .ok()
}
impl HostDeploymentController {
@@ -105,10 +212,60 @@ impl HostDeploymentController {
heartbeat_timeout_secs: command.heartbeat_timeout_secs,
dry_run: command.dry_run,
once: command.once,
+ watched_node_signatures: HashMap::new(),
+ watched_desired_signatures: HashMap::new(),
+ watched_observed_signatures: HashMap::new(),
+ watched_instance_signatures: HashMap::new(),
+ watched_deployment_spec_signatures: HashMap::new(),
+ watched_deployment_status_signatures: HashMap::new(),
+ pending_kv_writes: HashMap::new(),
+ pending_kv_deletes: HashSet::new(),
}
}
- async fn reconcile_once(&self) -> Result<()> {
+ async fn run_loop(&mut self) -> Result<()> {
+ if self.once {
+ return self.reconcile_once().await;
+ }
+
+ self.reconcile_once().await?;
+
+ let (watch_tx, mut watch_rx) = mpsc::channel::(128);
+ let ready_receivers = self.spawn_watchers(watch_tx);
+ wait_for_watchers_ready(ready_receivers).await;
+ self.reconcile_once().await?;
+ let mut interval = tokio::time::interval(self.interval);
+ interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
+ interval.tick().await;
+
+ loop {
+ tokio::select! {
+ _ = interval.tick() => {
+ if let Err(error) = self.reconcile_once().await {
+ warn!(error = %error, "host deployment reconciliation failed");
+ }
+ }
+ maybe_change = watch_rx.recv() => {
+ let Some(change) = maybe_change else {
+ continue;
+ };
+
+ let mut should_reconcile = self.apply_watch_change(&change);
+ while let Ok(extra) = watch_rx.try_recv() {
+ should_reconcile |= self.apply_watch_change(&extra);
+ }
+
+ if should_reconcile {
+ if let Err(error) = self.reconcile_once().await {
+ warn!(error = %error, "host deployment watch-triggered reconciliation failed");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ async fn reconcile_once(&mut self) -> Result<()> {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_nodes(&mut client).await?;
let desired_systems = self.load_desired_systems(&mut client).await?;
@@ -149,52 +306,309 @@ impl HostDeploymentController {
}
for desired in &plan.desired_upserts {
- client
- .put(
- &key_desired_system(
- &self.cluster_namespace,
- &self.cluster_id,
- &desired.node_id,
- ),
- &serde_json::to_vec(desired)?,
- )
- .await?;
+ let key = key_desired_system(&self.cluster_namespace, &self.cluster_id, &desired.node_id);
+ let payload = serde_json::to_vec(desired)?;
+ let key_str = self.record_pending_write(&key, payload.clone());
+ if let Err(error) = client.put(&key, &payload).await {
+ self.pending_kv_writes.remove(&key_str);
+ return Err(error.into());
+ }
}
for node_id in &plan.desired_deletes {
- client
- .delete(&key_desired_system(
- &self.cluster_namespace,
- &self.cluster_id,
- node_id,
- ))
- .await?;
+ let key = key_desired_system(&self.cluster_namespace, &self.cluster_id, node_id);
+ let key_str = self.record_pending_delete(&key);
+ if !client.delete(&key).await? {
+ self.pending_kv_deletes.remove(&key_str);
+ }
}
for node in plan.node_updates.values() {
- client
- .put(
- &key_node(&self.cluster_namespace, &self.cluster_id, &node.node_id),
- &serde_json::to_vec(node)?,
- )
- .await?;
+ let key = key_node(&self.cluster_namespace, &self.cluster_id, &node.node_id);
+ let payload = serde_json::to_vec(node)?;
+ let key_str = self.record_pending_write(&key, payload.clone());
+ if let Err(error) = client.put(&key, &payload).await {
+ self.pending_kv_writes.remove(&key_str);
+ return Err(error.into());
+ }
}
- client
- .put(
- &key_host_deployment_status(
- &self.cluster_namespace,
- &self.cluster_id,
- &deployment.name,
- ),
- &serde_json::to_vec(&plan.status)?,
- )
- .await?;
+ let key = key_host_deployment_status(
+ &self.cluster_namespace,
+ &self.cluster_id,
+ &deployment.name,
+ );
+ let payload = serde_json::to_vec(&plan.status)?;
+ let key_str = self.record_pending_write(&key, payload.clone());
+ if let Err(error) = client.put(&key, &payload).await {
+ self.pending_kv_writes.remove(&key_str);
+ return Err(error.into());
+ }
}
Ok(())
}
+ fn spawn_watchers(&self, tx: mpsc::Sender) -> Vec> {
+ let mut ready_receivers = Vec::new();
+
+ let node_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ nodes_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ let (node_ready_tx, node_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx));
+ ready_receivers.push(node_ready_rx);
+
+ let instance_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ instances_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ let (instance_ready_tx, instance_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(
+ instance_watcher,
+ tx.clone(),
+ instance_ready_tx,
+ ));
+ ready_receivers.push(instance_ready_rx);
+
+ let deployment_watcher = ChainfireWatcher::prefix(
+ self.endpoint.clone(),
+ host_deployments_prefix(&self.cluster_namespace, &self.cluster_id),
+ );
+ let (deployment_ready_tx, deployment_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(deployment_watcher, tx, deployment_ready_tx));
+ ready_receivers.push(deployment_ready_rx);
+
+ ready_receivers
+ }
+
+ fn record_pending_write(&mut self, key: &[u8], payload: Vec) -> String {
+ let key_str = String::from_utf8_lossy(key).to_string();
+ self.pending_kv_deletes.remove(&key_str);
+ self.pending_kv_writes.insert(key_str.clone(), payload);
+ key_str
+ }
+
+ fn record_pending_delete(&mut self, key: &[u8]) -> String {
+ let key_str = String::from_utf8_lossy(key).to_string();
+ self.pending_kv_writes.remove(&key_str);
+ self.pending_kv_deletes.insert(key_str.clone());
+ key_str
+ }
+
+ fn consume_pending_write(&mut self, key: &str, change: &WatchChange) -> Option> {
+ let expected = self.pending_kv_writes.remove(key)?;
+ if matches!(change.event_type, chainfire_client::EventType::Put) && change.value == expected {
+ self.pending_kv_deletes.remove(key);
+ return Some(expected);
+ }
+ None
+ }
+
+ fn consume_pending_delete(&mut self, key: &str, change: &WatchChange) -> bool {
+ self.pending_kv_deletes.remove(key)
+ && matches!(change.event_type, chainfire_client::EventType::Delete)
+ }
+
+ fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
+ let key = String::from_utf8_lossy(&change.key).to_string();
+ let nodes_prefix = String::from_utf8_lossy(&nodes_prefix(&self.cluster_namespace, &self.cluster_id))
+ .to_string();
+ let instances_prefix =
+ String::from_utf8_lossy(&instances_prefix(&self.cluster_namespace, &self.cluster_id))
+ .to_string();
+ let deployments_prefix = String::from_utf8_lossy(&host_deployments_prefix(
+ &self.cluster_namespace,
+ &self.cluster_id,
+ ))
+ .to_string();
+
+ if let Some(suffix) = key.strip_prefix(&nodes_prefix) {
+ if suffix.ends_with("/desired-system") {
+ return self.apply_desired_system_watch_change(&key, change);
+ }
+ if suffix.ends_with("/observed-system") {
+ return self.apply_observed_system_watch_change(&key, change);
+ }
+ if !suffix.is_empty() && !suffix.contains('/') {
+ return self.apply_node_record_watch_change(&key, change);
+ }
+ return false;
+ }
+
+ if key.starts_with(&instances_prefix) {
+ return self.apply_instance_watch_change(&key, change);
+ }
+
+ if let Some(suffix) = key.strip_prefix(&deployments_prefix) {
+ if suffix.ends_with("/spec") {
+ return self.apply_deployment_spec_watch_change(&key, change);
+ }
+ if suffix.ends_with("/status") {
+ return self.apply_deployment_status_watch_change(&key, change);
+ }
+ }
+
+ false
+ }
+
+ fn apply_node_record_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if self.consume_pending_delete(key, change) {
+ self.watched_node_signatures.remove(key);
+ return false;
+ }
+ if let Some(payload) = self.consume_pending_write(key, change) {
+ if let Some(signature) = canonical_json_signature(&payload, &["last_heartbeat"]) {
+ self.watched_node_signatures
+ .insert(key.to_string(), signature);
+ } else {
+ self.watched_node_signatures.remove(key);
+ }
+ return false;
+ }
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_node_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat"]) else {
+ return true;
+ };
+ if self.watched_node_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_node_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_desired_system_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if self.consume_pending_delete(key, change) {
+ self.watched_desired_signatures.remove(key);
+ return false;
+ }
+ if let Some(payload) = self.consume_pending_write(key, change) {
+ if let Some(signature) = canonical_json_signature(&payload, &[]) {
+ self.watched_desired_signatures
+ .insert(key.to_string(), signature);
+ } else {
+ self.watched_desired_signatures.remove(key);
+ }
+ return false;
+ }
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_desired_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &[]) else {
+ return true;
+ };
+ if self.watched_desired_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_desired_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_observed_system_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_observed_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(
+ &change.value,
+ &[
+ "flake_root",
+ "configured_system",
+ "current_system",
+ "booted_system",
+ "rollback_system",
+ "switch_action",
+ "reboot_required",
+ "last_attempt",
+ "last_success",
+ "last_error",
+ ],
+ ) else {
+ return true;
+ };
+ if self.watched_observed_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_observed_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_instance_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_instance_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat", "observed_at"])
+ else {
+ return true;
+ };
+ if self.watched_instance_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_instance_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_deployment_spec_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_deployment_spec_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = canonical_json_signature(&change.value, &[]) else {
+ return true;
+ };
+ if self.watched_deployment_spec_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_deployment_spec_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
+ fn apply_deployment_status_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
+ if self.consume_pending_delete(key, change) {
+ self.watched_deployment_status_signatures.remove(key);
+ return false;
+ }
+ if let Some(payload) = self.consume_pending_write(key, change) {
+ if let Some(signature) = host_deployment_status_signature(&payload) {
+ self.watched_deployment_status_signatures
+ .insert(key.to_string(), signature);
+ } else {
+ self.watched_deployment_status_signatures.remove(key);
+ }
+ return false;
+ }
+ if matches!(change.event_type, chainfire_client::EventType::Delete) {
+ self.watched_deployment_status_signatures.remove(key);
+ return true;
+ }
+
+ let Some(signature) = host_deployment_status_signature(&change.value) else {
+ return true;
+ };
+ if self.watched_deployment_status_signatures.get(key) == Some(&signature) {
+ return false;
+ }
+ self.watched_deployment_status_signatures
+ .insert(key.to_string(), signature);
+ true
+ }
+
async fn load_nodes(&self, client: &mut Client) -> Result> {
let prefix = format!(
"{}nodes/",
@@ -385,6 +799,9 @@ fn plan_host_deployment(
let operator_paused = existing_status
.map(|status| status.paused_by_operator)
.unwrap_or(false);
+ let aborted = existing_status
+ .and_then(|status| status.phase.as_deref())
+ == Some("aborted");
let spec_paused = deployment.paused.unwrap_or(false);
let mut desired_deletes = desired_systems
.iter()
@@ -395,6 +812,44 @@ fn plan_host_deployment(
.map(|(node_id, _)| node_id.clone())
.collect::>();
+ if aborted {
+ desired_deletes = desired_systems
+ .iter()
+ .filter(|(_, desired)| {
+ desired.deployment_id.as_deref() == Some(deployment.name.as_str())
+ })
+ .map(|(node_id, _)| node_id.clone())
+ .collect();
+
+ for node in &selector_matches {
+ if node.state.as_deref() == Some("draining") {
+ let mut updated = (*node).clone();
+ updated.state = Some("active".to_string());
+ node_updates.insert(updated.node_id.clone(), updated);
+ }
+ }
+
+ let mut status = existing_status.cloned().unwrap_or_default();
+ status.name = deployment.name.clone();
+ status.paused = true;
+ status.paused_by_operator = true;
+ status.phase = Some("aborted".to_string());
+ if status.message.is_none() {
+ status.message = Some("aborted by operator".to_string());
+ }
+ status.updated_at = Some(now);
+
+ desired_deletes.sort();
+ desired_deletes.dedup();
+
+ return HostDeploymentPlan {
+ status,
+ desired_upserts: Vec::new(),
+ desired_deletes,
+ node_updates,
+ };
+ }
+
for node in &selector_matches {
let desired = desired_systems.get(&node.node_id);
let observed = observed_systems.get(&node.node_id);
@@ -707,6 +1162,7 @@ fn dedup_sorted(mut values: Vec) -> Vec {
#[cfg(test)]
mod tests {
use super::*;
+ use chainfire_client::EventType;
fn test_node(node_id: &str, failure_domain: &str) -> ClusterNodeRecord {
ClusterNodeRecord {
@@ -760,6 +1216,18 @@ mod tests {
}
}
+ fn test_controller() -> HostDeploymentController {
+ HostDeploymentController::new(HostsCommand {
+ endpoint: "http://127.0.0.1:7000".to_string(),
+ cluster_namespace: "photoncloud".to_string(),
+ cluster_id: "test-cluster".to_string(),
+ interval_secs: 1,
+ heartbeat_timeout_secs: 300,
+ dry_run: true,
+ once: false,
+ })
+ }
+
#[test]
fn plan_rollout_starts_one_node_per_batch() {
let deployment = test_deployment();
@@ -859,4 +1327,185 @@ mod tests {
);
assert_eq!(plan.status.in_progress_nodes, vec!["node01".to_string()]);
}
+
+ #[test]
+ fn plan_rollout_keeps_aborted_state_and_clears_desired_system() {
+ let deployment = test_deployment();
+ let mut draining = test_node("node01", "rack-a");
+ draining.state = Some("draining".to_string());
+ let nodes = vec![draining];
+ let desired = HashMap::from([(
+ "node01".to_string(),
+ DesiredSystemSpec {
+ node_id: "node01".to_string(),
+ deployment_id: Some("worker-rollout".to_string()),
+ nixos_configuration: Some("worker-golden".to_string()),
+ target_system: Some("/nix/store/worker-golden".to_string()),
+ flake_ref: None,
+ switch_action: Some("switch".to_string()),
+ health_check_command: Vec::new(),
+ rollback_on_failure: Some(true),
+ drain_before_apply: Some(false),
+ },
+ )]);
+ let status = HostDeploymentStatus {
+ name: "worker-rollout".to_string(),
+ phase: Some("aborted".to_string()),
+ paused: true,
+ paused_by_operator: true,
+ message: Some("aborted by operator".to_string()),
+ ..HostDeploymentStatus::default()
+ };
+
+ let plan = plan_host_deployment(
+ &deployment,
+ Some(&status),
+ &nodes,
+ &desired,
+ &HashMap::new(),
+ &[],
+ 300,
+ );
+
+ assert!(plan.desired_upserts.is_empty());
+ assert_eq!(plan.desired_deletes, vec!["node01".to_string()]);
+ assert_eq!(plan.status.phase.as_deref(), Some("aborted"));
+ assert!(plan.status.paused);
+ assert_eq!(
+ plan.node_updates
+ .get("node01")
+ .and_then(|node| node.state.as_deref()),
+ Some("active")
+ );
+ }
+
+ #[test]
+ fn host_watch_ignores_node_heartbeat_only_changes() {
+ let mut controller = test_controller();
+ let node_key = key_node(
+ &controller.cluster_namespace,
+ &controller.cluster_id,
+ "node01",
+ );
+
+ let first = WatchChange {
+ event_type: EventType::Put,
+ key: node_key.clone(),
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "active",
+ "last_heartbeat": "2026-04-01T00:00:00Z"
+ }))
+ .unwrap(),
+ revision: 1,
+ };
+ assert!(controller.apply_watch_change(&first));
+
+ let heartbeat_only = WatchChange {
+ revision: 2,
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "active",
+ "last_heartbeat": "2026-04-01T00:00:05Z"
+ }))
+ .unwrap(),
+ ..first.clone()
+ };
+ assert!(!controller.apply_watch_change(&heartbeat_only));
+
+ let drain = WatchChange {
+ revision: 3,
+ value: serde_json::to_vec(&serde_json::json!({
+ "node_id": "node01",
+ "hostname": "node01",
+ "ip": "10.0.0.11",
+ "roles": ["worker"],
+ "labels": {"tier": "general"},
+ "state": "draining",
+ "last_heartbeat": "2026-04-01T00:00:10Z"
+ }))
+ .unwrap(),
+ ..first
+ };
+ assert!(controller.apply_watch_change(&drain));
+ }
+
+ #[test]
+ fn host_watch_reacts_only_to_operator_pause_or_abort_status_changes() {
+ let mut controller = test_controller();
+ let key = key_host_deployment_status(
+ &controller.cluster_namespace,
+ &controller.cluster_id,
+ "worker-rollout",
+ );
+
+ let running = WatchChange {
+ event_type: EventType::Put,
+ key: key.clone(),
+ value: serde_json::to_vec(&serde_json::json!({
+ "name": "worker-rollout",
+ "phase": "running",
+ "paused": false,
+ "paused_by_operator": false,
+ "selected_nodes": ["node01"],
+ "in_progress_nodes": ["node01"],
+ "updated_at": "2026-04-01T00:00:00Z"
+ }))
+ .unwrap(),
+ revision: 1,
+ };
+ assert!(controller.apply_watch_change(&running));
+
+ let completed = WatchChange {
+ revision: 2,
+ value: serde_json::to_vec(&serde_json::json!({
+ "name": "worker-rollout",
+ "phase": "completed",
+ "paused": false,
+ "paused_by_operator": false,
+ "selected_nodes": ["node01"],
+ "completed_nodes": ["node01"],
+ "updated_at": "2026-04-01T00:00:10Z"
+ }))
+ .unwrap(),
+ ..running.clone()
+ };
+ assert!(!controller.apply_watch_change(&completed));
+
+ let paused = WatchChange {
+ revision: 3,
+ value: serde_json::to_vec(&serde_json::json!({
+ "name": "worker-rollout",
+ "phase": "paused",
+ "paused": true,
+ "paused_by_operator": true,
+ "updated_at": "2026-04-01T00:00:20Z"
+ }))
+ .unwrap(),
+ ..running.clone()
+ };
+ assert!(controller.apply_watch_change(&paused));
+
+ let aborted = WatchChange {
+ revision: 4,
+ value: serde_json::to_vec(&serde_json::json!({
+ "name": "worker-rollout",
+ "phase": "aborted",
+ "paused": true,
+ "paused_by_operator": true,
+ "updated_at": "2026-04-01T00:00:30Z"
+ }))
+ .unwrap(),
+ ..running
+ };
+ assert!(controller.apply_watch_change(&aborted));
+ }
}
diff --git a/deployer/crates/plasmacloud-reconciler/src/main.rs b/deployer/crates/plasmacloud-reconciler/src/main.rs
index d900380..cdbfde4 100644
--- a/deployer/crates/plasmacloud-reconciler/src/main.rs
+++ b/deployer/crates/plasmacloud-reconciler/src/main.rs
@@ -40,6 +40,7 @@ use flashdns_api::proto::{
};
mod hosts;
+mod watcher;
#[derive(Parser)]
#[command(author, version, about)]
diff --git a/deployer/crates/plasmacloud-reconciler/src/watcher.rs b/deployer/crates/plasmacloud-reconciler/src/watcher.rs
new file mode 100644
index 0000000..e1f1dd9
--- /dev/null
+++ b/deployer/crates/plasmacloud-reconciler/src/watcher.rs
@@ -0,0 +1,122 @@
+use std::time::Duration;
+
+use anyhow::Result;
+use chainfire_client::{Client, EventType};
+use tracing::{info, warn};
+
+#[allow(dead_code)]
+#[derive(Debug, Clone, Copy)]
+pub enum WatchScope {
+ Key,
+ Prefix,
+}
+
+#[derive(Debug, Clone)]
+pub struct WatchChange {
+ pub event_type: EventType,
+ pub key: Vec,
+ pub value: Vec,
+ #[allow(dead_code)]
+ pub revision: u64,
+}
+
+pub struct ChainfireWatcher {
+ endpoint: String,
+ scope: WatchScope,
+ target: Vec,
+ reconnect_backoff: Duration,
+}
+
+impl ChainfireWatcher {
+ #[allow(dead_code)]
+ pub fn key(endpoint: String, key: Vec) -> Self {
+ Self {
+ endpoint,
+ scope: WatchScope::Key,
+ target: key,
+ reconnect_backoff: Duration::from_secs(1),
+ }
+ }
+
+ pub fn prefix(endpoint: String, prefix: Vec) -> Self {
+ Self {
+ endpoint,
+ scope: WatchScope::Prefix,
+ target: prefix,
+ reconnect_backoff: Duration::from_secs(1),
+ }
+ }
+
+ #[allow(dead_code)]
+ pub async fn watch(&self, mut callback: F) -> Result<()>
+ where
+ F: FnMut(WatchChange) -> Result<()>,
+ {
+ self.watch_with_ready(|| Ok(()), move |change| callback(change))
+ .await
+ }
+
+ pub async fn watch_with_ready(&self, mut on_connected: G, mut callback: F) -> Result<()>
+ where
+ F: FnMut(WatchChange) -> Result<()>,
+ G: FnMut() -> Result<()>,
+ {
+ loop {
+ match Client::connect(self.endpoint.clone()).await {
+ Ok(mut client) => {
+ let watch_result = match self.scope {
+ WatchScope::Key => client.watch(&self.target).await,
+ WatchScope::Prefix => client.watch_prefix(&self.target).await,
+ };
+
+ match watch_result {
+ Ok(mut handle) => {
+ info!(
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ watch_id = handle.id(),
+ "connected ChainFire watch"
+ );
+ on_connected()?;
+
+ while let Some(event) = handle.recv().await {
+ if let Err(error) = callback(WatchChange {
+ event_type: event.event_type,
+ key: event.key,
+ value: event.value,
+ revision: event.revision,
+ }) {
+ warn!(error = %error, "watch callback failed");
+ }
+ }
+
+ warn!(
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "ChainFire watch stream ended; reconnecting"
+ );
+ }
+ Err(error) => {
+ warn!(
+ error = %error,
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "failed to create ChainFire watch"
+ );
+ }
+ }
+ }
+ Err(error) => {
+ warn!(
+ error = %error,
+ scope = ?self.scope,
+ target = %String::from_utf8_lossy(&self.target),
+ "failed to connect ChainFire watch"
+ );
+ }
+ }
+
+ tokio::time::sleep(self.reconnect_backoff).await;
+ }
+ }
+}
diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh
index 1c2ea80..1c2d742 100755
--- a/deployer/scripts/verify-fleet-scheduler-e2e.sh
+++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh
@@ -41,10 +41,16 @@ run_fleet_scheduler_bin() {
tmp_dir="$(mktemp -d)"
cf_pid=""
+controller_pids=()
cleanup() {
set +e
+ for pid in "${controller_pids[@]}"; do
+ kill "$pid" 2>/dev/null || true
+ wait "$pid" 2>/dev/null || true
+ done
+
if [[ -d "$tmp_dir/pids" ]]; then
while IFS= read -r -d '' pid_file; do
[[ -f "$pid_file" ]] || continue
@@ -57,7 +63,11 @@ cleanup() {
wait "$cf_pid" 2>/dev/null || true
fi
- rm -rf "$tmp_dir"
+ if [[ "${PHOTONCLOUD_KEEP_TMP:-}" == "1" ]]; then
+ echo "Keeping temporary directory: $tmp_dir" >&2
+ else
+ rm -rf "$tmp_dir"
+ fi
}
trap cleanup EXIT
@@ -364,7 +374,7 @@ run_deployer_ctl() {
"$@"
}
-run_node_agent_once() {
+run_node_agent_bg() {
local node_id="$1"
local pid_dir="$tmp_dir/pids/$node_id"
mkdir -p "$pid_dir"
@@ -375,26 +385,111 @@ run_node_agent_once() {
--pid-dir "$pid_dir" \
--interval-secs 1 \
--apply \
- --once
+ >"$tmp_dir/node-agent-${node_id}.log" 2>&1 &
+ controller_pids+=("$!")
}
-run_scheduler_once() {
+run_scheduler_bg() {
run_fleet_scheduler_bin \
--chainfire-endpoint "$endpoint" \
--cluster-id test-cluster \
- --interval-secs 1 \
- --once
+ --interval-secs 300 \
+ >"$tmp_dir/fleet-scheduler.log" 2>&1 &
+ controller_pids+=("$!")
+}
+
+wait_for_service_state() {
+ local service_name="$1"
+ local expected_phase="$2"
+ local expected_instances="$3"
+ local expected_state="${4:--}"
+ local timeout_secs="${5:-120}"
+ local inspect_path="$tmp_dir/${service_name}-wait.json"
+ local deadline=$((SECONDS + timeout_secs))
+
+ while (( SECONDS < deadline )); do
+ if run_deployer_ctl service inspect --name "$service_name" --include-instances >"$inspect_path" 2>/dev/null; then
+ if python3 - "$inspect_path" "$expected_phase" "$expected_instances" "$expected_state" <<'PY'
+import json
+import sys
+
+payload = json.load(open(sys.argv[1], "r", encoding="utf-8"))
+expected_phase = sys.argv[2]
+expected_instances = int(sys.argv[3])
+expected_state = sys.argv[4]
+
+status = payload.get("status") or {}
+instances = payload.get("instances") or []
+
+if status.get("phase") != expected_phase:
+ raise SystemExit(1)
+if len(instances) != expected_instances:
+ raise SystemExit(1)
+if expected_state != "-":
+ actual_states = sorted(instance.get("state") for instance in instances)
+ if actual_states != [expected_state] * expected_instances:
+ raise SystemExit(1)
+PY
+ then
+ return 0
+ fi
+ fi
+ sleep 1
+ done
+
+ echo "timed out waiting for ${service_name} phase=${expected_phase} instances=${expected_instances}" >&2
+ [[ -f "$inspect_path" ]] && cat "$inspect_path" >&2
+ return 1
+}
+
+wait_for_endpoint_convergence() {
+ local timeout_secs="${1:-60}"
+ local deadline=$((SECONDS + timeout_secs))
+
+ while (( SECONDS < deadline )); do
+ if python3 - <<'PY'
+import socket
+import urllib.request
+
+with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
+ if response.status != 200:
+ raise SystemExit(f"node01 endpoint returned {response.status}")
+with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
+ if response.status != 200:
+ raise SystemExit(f"node01 worker endpoint returned {response.status}")
+
+for port, label in ((18080, "api"), (18081, "worker")):
+ sock = socket.socket()
+ sock.settimeout(1.5)
+ try:
+ sock.connect(("127.0.0.3", port))
+ except OSError:
+ pass
+ else:
+ raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
+ finally:
+ sock.close()
+PY
+ then
+ return 0
+ fi
+ sleep 1
+ done
+
+ echo "timed out waiting for endpoint convergence after scale-down" >&2
+ return 1
}
echo "Applying cluster declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster.yaml"
-echo "Activating nodes through node-agent"
-run_node_agent_once node01
-run_node_agent_once node02
+echo "Starting watch-driven controllers"
+run_scheduler_bg
+run_node_agent_bg node01
+run_node_agent_bg node02
-echo "Scheduling managed instances"
-run_scheduler_once
+echo "Waiting for worker to remain blocked until api becomes healthy"
+wait_for_service_state worker blocked 0 - 120
echo "Validating dependency block before api is healthy"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-blocked.dump"
@@ -455,11 +550,7 @@ print("service inspect reports blocked dependency state")
PY
echo "Reconciling processes and health for api"
-for _ in 1 2 3; do
- run_node_agent_once node01
- run_node_agent_once node02
- sleep 1
-done
+wait_for_service_state api healthy 2 healthy 120
run_deployer_ctl service inspect --name api --include-instances >"$tmp_dir/api-inspect-healthy.json"
python3 - "$tmp_dir/api-inspect-healthy.json" <<'PY'
@@ -478,15 +569,8 @@ if len(instances) != 2:
print("api service inspect refreshed to healthy from node-agent updates")
PY
-echo "Re-running scheduler after api became healthy"
-run_scheduler_once
-
-echo "Reconciling processes and health for dependent worker service"
-for _ in 1 2 3; do
- run_node_agent_once node01
- run_node_agent_once node02
- sleep 1
-done
+echo "Waiting for dependent worker service to converge automatically"
+wait_for_service_state worker healthy 2 healthy 120
echo "Validating HTTP endpoints"
python3 - <<'PY'
@@ -594,25 +678,9 @@ PY
echo "Applying scaled declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune
-echo "Re-running scheduler after scale-down"
-run_scheduler_once
-
-echo "Reconciling api after scale-down"
-for _ in 1 2 3; do
- run_node_agent_once node01
- run_node_agent_once node02
- sleep 1
-done
-
-echo "Re-running scheduler after scaled api became healthy"
-run_scheduler_once
-
-echo "Reconciling dependent worker service after scale-down"
-for _ in 1 2 3; do
- run_node_agent_once node01
- run_node_agent_once node02
- sleep 1
-done
+echo "Waiting for scaled services to converge automatically"
+wait_for_service_state api healthy 1 healthy 120
+wait_for_service_state worker healthy 1 healthy 120
echo "Inspecting scaled instance state in ChainFire"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/api/" >"$tmp_dir/instances-scaled.dump"
@@ -695,30 +763,7 @@ print("service inspect reports scaled healthy worker state without waiting for s
PY
echo "Validating endpoint convergence after scale-down"
-python3 - <<'PY'
-import socket
-import urllib.request
-
-with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
- if response.status != 200:
- raise SystemExit(f"node01 endpoint returned {response.status}")
-with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
- if response.status != 200:
- raise SystemExit(f"node01 worker endpoint returned {response.status}")
-
-for port, label in ((18080, "api"), (18081, "worker")):
- sock = socket.socket()
- sock.settimeout(1.5)
- try:
- sock.connect(("127.0.0.3", port))
- except OSError:
- pass
- else:
- raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
- finally:
- sock.close()
-
-print("Endpoint convergence validated")
-PY
+wait_for_endpoint_convergence 60
+echo "Endpoint convergence validated"
echo "Fleet scheduler E2E verification passed"
diff --git a/deployer/scripts/verify-host-lifecycle-e2e.sh b/deployer/scripts/verify-host-lifecycle-e2e.sh
index e9d6990..c6addf5 100644
--- a/deployer/scripts/verify-host-lifecycle-e2e.sh
+++ b/deployer/scripts/verify-host-lifecycle-e2e.sh
@@ -34,9 +34,14 @@ run_plasmacloud_reconciler_bin() {
tmp_dir="$(mktemp -d)"
cf_pid=""
redfish_pid=""
+controller_pids=()
cleanup() {
set +e
+ for pid in "${controller_pids[@]}"; do
+ kill "$pid" 2>/dev/null || true
+ wait "$pid" 2>/dev/null || true
+ done
if [[ -n "$redfish_pid" ]]; then
kill "$redfish_pid" 2>/dev/null || true
wait "$redfish_pid" 2>/dev/null || true
@@ -45,7 +50,11 @@ cleanup() {
kill "$cf_pid" 2>/dev/null || true
wait "$cf_pid" 2>/dev/null || true
fi
- rm -rf "$tmp_dir"
+ if [[ "${PHOTONCLOUD_KEEP_TMP:-}" == "1" ]]; then
+ echo "Keeping temporary directory: $tmp_dir" >&2
+ else
+ rm -rf "$tmp_dir"
+ fi
}
trap cleanup EXIT
@@ -248,21 +257,80 @@ run_deployer_ctl() {
"$@"
}
-run_hosts_once() {
+run_hosts_bg() {
run_plasmacloud_reconciler_bin \
hosts \
--endpoint "$chainfire_endpoint" \
--cluster-namespace photoncloud \
--cluster-id test-cluster \
--heartbeat-timeout-secs 300 \
- --once
+ --interval-secs 300 \
+ >"$tmp_dir/hosts-controller.log" 2>&1 &
+ controller_pids+=("$!")
+}
+
+wait_for_deployment_state() {
+ local expected_phase="$1"
+ local expected_paused="$2"
+ local expected_completed="$3"
+ local expected_in_progress="$4"
+ local expected_failed="$5"
+ local timeout_secs="${6:-120}"
+ local inspect_path="$tmp_dir/deployment-wait.json"
+ local deadline=$((SECONDS + timeout_secs))
+
+ while (( SECONDS < deadline )); do
+ if run_deployer_ctl deployment inspect --name worker-rollout --format json >"$inspect_path" 2>/dev/null; then
+ if python3 - "$inspect_path" "$expected_phase" "$expected_paused" "$expected_completed" "$expected_in_progress" "$expected_failed" <<'PY'
+import json
+import sys
+
+payload = json.load(open(sys.argv[1], "r", encoding="utf-8"))
+status = payload.get("status") or {}
+
+def parse_csv(value):
+ if value == "-":
+ return None
+ if not value:
+ return []
+ return value.split(",")
+
+expected_phase = sys.argv[2]
+expected_paused = sys.argv[3].lower() == "true"
+expected_completed = parse_csv(sys.argv[4])
+expected_in_progress = parse_csv(sys.argv[5])
+expected_failed = parse_csv(sys.argv[6])
+
+if status.get("phase") != expected_phase:
+ raise SystemExit(1)
+if bool(status.get("paused")) != expected_paused:
+ raise SystemExit(1)
+if expected_completed is not None and status.get("completed_nodes", []) != expected_completed:
+ raise SystemExit(1)
+if expected_in_progress is not None and status.get("in_progress_nodes", []) != expected_in_progress:
+ raise SystemExit(1)
+if expected_failed is not None and status.get("failed_nodes", []) != expected_failed:
+ raise SystemExit(1)
+PY
+ then
+ return 0
+ fi
+ fi
+ sleep 1
+ done
+
+ echo "timed out waiting for deployment state ${expected_phase}" >&2
+ [[ -f "$inspect_path" ]] && cat "$inspect_path" >&2
+ return 1
}
echo "Applying host lifecycle cluster config"
run_deployer_ctl apply --config "$tmp_dir/cluster.yaml" --prune
-echo "Running host rollout controller"
-run_hosts_once
+echo "Starting watch-driven host rollout controller"
+run_hosts_bg
+
+wait_for_deployment_state running false "" "node01" "" 120
run_deployer_ctl deployment inspect --name worker-rollout --format json >"$tmp_dir/deployment-1.json"
python3 - "$tmp_dir/deployment-1.json" <<'PY'
@@ -325,7 +393,7 @@ run_deployer_ctl node set-observed \
--node-id node01 \
--status active \
--nixos-configuration worker-next >/dev/null
-run_hosts_once
+wait_for_deployment_state running false "node01" "node02" "" 120
run_deployer_ctl deployment inspect --name worker-rollout --format json >"$tmp_dir/deployment-2.json"
python3 - "$tmp_dir/deployment-2.json" <<'PY'
@@ -344,7 +412,7 @@ run_deployer_ctl node set-observed \
--node-id node02 \
--status rolled-back \
--nixos-configuration worker-next >/dev/null
-run_hosts_once
+wait_for_deployment_state paused true "node01" "" "node02" 120
run_deployer_ctl deployment inspect --name worker-rollout --format json >"$tmp_dir/deployment-3.json"
python3 - "$tmp_dir/deployment-3.json" <<'PY'
@@ -407,6 +475,8 @@ assert payload["paused"] is True, payload
print("abort command validated")
PY
+wait_for_deployment_state aborted true - - - 120
+
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/nodes/" >"$tmp_dir/nodes-2.dump"
python3 - "$tmp_dir/nodes-2.dump" <<'PY'
import json