Make deployer controllers watch-driven

This commit is contained in:
centra 2026-04-02 07:09:03 +09:00
parent faabcbfc2e
commit 82a4c6a941
Signed by: centra
GPG key ID: 0C09689D20B25ACA
12 changed files with 1981 additions and 195 deletions

View file

@ -35,6 +35,8 @@ pub struct WatchHandle {
watch_id: i64,
/// Event receiver
rx: mpsc::Receiver<WatchEvent>,
/// Keep the bidi request stream alive for the lifetime of the watch.
_req_tx: mpsc::Sender<WatchRequest>,
}
impl WatchHandle {
@ -109,7 +111,11 @@ impl WatchHandle {
}
});
Ok(Self { watch_id, rx })
Ok(Self {
watch_id,
rx,
_req_tx: req_tx,
})
}
/// Get the watch ID

View file

@ -11,9 +11,9 @@ use chainfire_raft::network::RaftRpcClient;
use chainfire_storage::{RocksStore, LogStorage, StateMachine};
use chainfire_types::node::NodeRole;
use chainfire_types::RaftRole;
use chainfire_watch::WatchRegistry;
use chainfire_watch::{stream::WatchEventHandler, WatchRegistry};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tracing::info;
/// Node instance managing all components
@ -49,7 +49,11 @@ impl Node {
// Create LogStorage and StateMachine from store
let log_storage = Arc::new(LogStorage::new(store.clone()));
let state_machine = Arc::new(StateMachine::new(store.clone())?);
let mut state_machine = StateMachine::new(store.clone())?;
let (watch_tx, watch_rx) = mpsc::unbounded_channel();
state_machine.set_watch_sender(watch_tx);
let state_machine = Arc::new(state_machine);
WatchEventHandler::new(Arc::clone(&watch_registry)).spawn_dispatcher(watch_rx);
// Create gRPC Raft client and register peer addresses
let rpc_client = Arc::new(GrpcRaftClient::new());

View file

@ -1,5 +1,6 @@
mod auth;
mod publish;
mod watcher;
use anyhow::{Context, Result};
use chainfire_client::Client;
@ -15,9 +16,11 @@ use publish::{PublicationConfig, PublicationReconciler};
use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
use tokio::time::sleep;
use tokio::sync::{mpsc, oneshot};
use tokio::time::MissedTickBehavior;
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;
use watcher::{ChainfireWatcher, WatchChange};
#[cfg(test)]
use chrono::Utc;
@ -26,6 +29,22 @@ use deployer_types::cluster_node_is_eligible;
const MANAGED_BY: &str = "fleet-scheduler";
fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/clusters/{}/", cluster_namespace, cluster_id)
}
fn nodes_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!("{}nodes/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
}
fn services_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!("{}services/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
}
fn instances_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!("{}instances/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
}
#[derive(Debug, Parser)]
#[command(author, version, about = "PhotonCloud non-Kubernetes fleet scheduler")]
struct Cli {
@ -81,6 +100,11 @@ struct Scheduler {
dry_run: bool,
publication: PublicationReconciler,
once: bool,
watched_node_signatures: HashMap<String, Vec<u8>>,
watched_service_signatures: HashMap<String, Vec<u8>>,
watched_instance_signatures: HashMap<String, Vec<u8>>,
pending_instance_writes: HashMap<String, Vec<u8>>,
pending_instance_deletes: HashSet<String>,
}
#[derive(Debug)]
@ -100,6 +124,85 @@ struct ReconcilePlan {
type DependencySummary = ServiceDependencySummary;
async fn spawn_watch_task(
watcher: ChainfireWatcher,
tx: mpsc::Sender<WatchChange>,
ready: oneshot::Sender<()>,
) {
let mut ready = Some(ready);
if let Err(error) = watcher
.watch_with_ready(
move || {
if let Some(ready) = ready.take() {
let _ = ready.send(());
}
Ok(())
},
move |change| {
tx.try_send(change).map_err(|send_error| {
anyhow::anyhow!("failed to enqueue watch change: {send_error}")
})
},
)
.await
{
warn!(error = %error, "fleet scheduler watch task exited");
}
}
async fn wait_for_watchers_ready(receivers: Vec<oneshot::Receiver<()>>) {
for receiver in receivers {
match tokio::time::timeout(Duration::from_secs(10), receiver).await {
Ok(Ok(())) => {}
Ok(Err(_)) => warn!("fleet scheduler watch task ended before signaling readiness"),
Err(_) => warn!("timed out waiting for fleet scheduler watch readiness"),
}
}
}
fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option<Vec<u8>> {
let mut value: Value = serde_json::from_slice(bytes).ok()?;
strip_observed_fields(&mut value, drop_keys);
serde_json::to_vec(&sort_json_value(value)).ok()
}
fn strip_observed_fields(value: &mut Value, drop_keys: &[&str]) {
match value {
Value::Object(map) => {
for key in drop_keys {
map.remove(*key);
}
for nested in map.values_mut() {
strip_observed_fields(nested, drop_keys);
}
}
Value::Array(items) => {
for item in items {
strip_observed_fields(item, drop_keys);
}
}
_ => {}
}
}
fn sort_json_value(value: Value) -> Value {
match value {
Value::Array(items) => Value::Array(items.into_iter().map(sort_json_value).collect()),
Value::Object(map) => {
let mut keys = map.keys().cloned().collect::<Vec<_>>();
keys.sort();
let mut normalized = serde_json::Map::new();
for key in keys {
if let Some(value) = map.get(&key) {
normalized.insert(key, sort_json_value(value.clone()));
}
}
Value::Object(normalized)
}
other => other,
}
}
impl Scheduler {
fn new(cli: Cli) -> Self {
let cluster_namespace = cli.cluster_namespace;
@ -125,23 +228,58 @@ impl Scheduler {
dry_run: cli.dry_run,
}),
once: cli.once,
watched_node_signatures: HashMap::new(),
watched_service_signatures: HashMap::new(),
watched_instance_signatures: HashMap::new(),
pending_instance_writes: HashMap::new(),
pending_instance_deletes: HashSet::new(),
}
}
async fn run_loop(&self) -> Result<()> {
async fn run_loop(&mut self) -> Result<()> {
if self.once {
return self.reconcile_once().await;
}
self.reconcile_once().await?;
let (watch_tx, mut watch_rx) = mpsc::channel::<WatchChange>(128);
let ready_receivers = self.spawn_watchers(watch_tx);
wait_for_watchers_ready(ready_receivers).await;
self.reconcile_once().await?;
let mut interval = tokio::time::interval(self.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(error) = self.reconcile_once().await {
warn!(error = %error, "fleet scheduler reconciliation failed");
}
sleep(self.interval).await;
}
maybe_change = watch_rx.recv() => {
let Some(change) = maybe_change else {
continue;
};
let mut should_reconcile = self.apply_watch_change(&change);
while let Ok(extra) = watch_rx.try_recv() {
should_reconcile |= self.apply_watch_change(&extra);
}
if should_reconcile {
if let Err(error) = self.reconcile_once().await {
warn!(error = %error, "fleet scheduler watch-triggered reconciliation failed");
}
}
}
}
}
}
async fn reconcile_once(&self) -> Result<()> {
async fn reconcile_once(&mut self) -> Result<()> {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_cluster_nodes(&mut client).await?;
let services = self.load_services(&mut client).await?;
@ -219,6 +357,152 @@ impl Scheduler {
Ok(())
}
fn spawn_watchers(&self, tx: mpsc::Sender<WatchChange>) -> Vec<oneshot::Receiver<()>> {
let mut ready_receivers = Vec::new();
let node_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
nodes_prefix(&self.cluster_namespace, &self.cluster_id),
);
let (node_ready_tx, node_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx));
ready_receivers.push(node_ready_rx);
let service_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
services_prefix(&self.cluster_namespace, &self.cluster_id),
);
let (service_ready_tx, service_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(
service_watcher,
tx.clone(),
service_ready_tx,
));
ready_receivers.push(service_ready_rx);
let instance_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
instances_prefix(&self.cluster_namespace, &self.cluster_id),
);
let (instance_ready_tx, instance_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(instance_watcher, tx, instance_ready_tx));
ready_receivers.push(instance_ready_rx);
ready_receivers
}
fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
let key = String::from_utf8_lossy(&change.key).to_string();
let nodes_prefix = String::from_utf8_lossy(&nodes_prefix(&self.cluster_namespace, &self.cluster_id))
.to_string();
let services_prefix =
String::from_utf8_lossy(&services_prefix(&self.cluster_namespace, &self.cluster_id))
.to_string();
let instances_prefix =
String::from_utf8_lossy(&instances_prefix(&self.cluster_namespace, &self.cluster_id))
.to_string();
if let Some(suffix) = key.strip_prefix(&nodes_prefix) {
if !suffix.is_empty() && !suffix.contains('/') {
return self.apply_node_watch_change(&key, change);
}
return false;
}
if let Some(suffix) = key.strip_prefix(&services_prefix) {
if !suffix.is_empty() && !suffix.contains('/') {
return self.apply_service_watch_change(&key, change);
}
return false;
}
if key.starts_with(&instances_prefix) {
return self.apply_instance_watch_change(&key, change);
}
false
}
fn apply_node_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_node_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat"]) else {
return true;
};
if self.watched_node_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_node_signatures
.insert(key.to_string(), signature);
true
}
fn apply_service_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_service_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &[]) else {
return true;
};
if self.watched_service_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_service_signatures
.insert(key.to_string(), signature);
true
}
fn apply_instance_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if let Some(expected) = self.pending_instance_writes.get(key) {
if matches!(change.event_type, chainfire_client::EventType::Put) && change.value == *expected
{
self.pending_instance_writes.remove(key);
self.pending_instance_deletes.remove(key);
if let Some(signature) =
canonical_json_signature(&change.value, &["last_heartbeat", "observed_at"])
{
self.watched_instance_signatures
.insert(key.to_string(), signature);
} else {
self.watched_instance_signatures.remove(key);
}
return false;
}
self.pending_instance_writes.remove(key);
}
if self.pending_instance_deletes.remove(key)
&& matches!(change.event_type, chainfire_client::EventType::Delete)
{
self.watched_instance_signatures.remove(key);
return false;
}
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_instance_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat", "observed_at"])
else {
self.watched_instance_signatures.remove(key);
return true;
};
if self.watched_instance_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_instance_signatures
.insert(key.to_string(), signature);
true
}
async fn load_cluster_nodes(&self, client: &mut Client) -> Result<Vec<ClusterNodeRecord>> {
let prefix = format!(
"{}/clusters/{}/nodes/",
@ -364,7 +648,7 @@ impl Scheduler {
}
async fn reconcile_service(
&self,
&mut self,
client: &mut Client,
nodes: &[ClusterNodeRecord],
service: &ServiceSpec,
@ -420,9 +704,15 @@ impl Scheduler {
"would upsert managed instance"
);
} else {
client
.put(&key, serde_json::to_vec(&upsert.desired_value)?)
.await?;
let key_str = key.clone();
let payload = serde_json::to_vec(&upsert.desired_value)?;
self.pending_instance_deletes.remove(&key_str);
self.pending_instance_writes
.insert(key_str.clone(), payload.clone());
if let Err(error) = client.put(&key, &payload).await {
self.pending_instance_writes.remove(&key_str);
return Err(error.into());
}
info!(
service = %service.name,
instance_id = %upsert.instance.instance_id,
@ -456,12 +746,19 @@ impl Scheduler {
instance_id = %instance_id,
"would delete stale managed instance"
);
} else if client.delete(&key).await? {
} else {
let key_str = key.clone();
self.pending_instance_writes.remove(&key_str);
self.pending_instance_deletes.insert(key_str.clone());
if client.delete(&key).await? {
info!(
service = %service.name,
instance_id = %instance_id,
"deleted stale managed instance"
);
} else {
self.pending_instance_deletes.remove(&key_str);
}
}
}
@ -1095,7 +1392,7 @@ async fn main() -> Result<()> {
.with_env_filter(EnvFilter::from_default_env().add_directive("info".parse()?))
.init();
let scheduler = Scheduler::new(Cli::parse());
let mut scheduler = Scheduler::new(Cli::parse());
scheduler.run_loop().await
}
@ -1103,6 +1400,7 @@ async fn main() -> Result<()> {
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
use chainfire_client::EventType;
use deployer_types::{
ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec,
PublishedLoadBalancerState, RolloutStrategySpec, ServiceDependencyCondition,
@ -1206,6 +1504,25 @@ mod tests {
}
}
fn test_scheduler() -> Scheduler {
Scheduler::new(Cli {
chainfire_endpoint: "http://127.0.0.1:7000".to_string(),
cluster_namespace: "photoncloud".to_string(),
cluster_id: "test-cluster".to_string(),
interval_secs: 1,
heartbeat_timeout_secs: 300,
dry_run: true,
iam_endpoint: None,
fiberlb_endpoint: None,
flashdns_endpoint: None,
publish_address: None,
default_org_id: "default-org".to_string(),
default_project_id: "default-project".to_string(),
controller_principal_id: MANAGED_BY.to_string(),
once: false,
})
}
#[test]
fn test_node_eligibility_matches_roles_and_labels() {
let node = active_node("node01", &["worker"], &[("tier", "general")]);
@ -1836,4 +2153,135 @@ mod tests {
assert_eq!(instance_id, "api-node01-3");
}
#[test]
fn test_scheduler_watch_ignores_node_heartbeat_only_changes() {
let mut scheduler = test_scheduler();
let node_key = format!(
"{}nodes/node01",
cluster_prefix(&scheduler.cluster_namespace, &scheduler.cluster_id)
);
let first = WatchChange {
event_type: EventType::Put,
key: node_key.as_bytes().to_vec(),
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "active",
"last_heartbeat": "2026-04-01T00:00:00Z"
}))
.unwrap(),
revision: 1,
};
assert!(scheduler.apply_watch_change(&first));
let heartbeat_only = WatchChange {
revision: 2,
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "active",
"last_heartbeat": "2026-04-01T00:00:05Z"
}))
.unwrap(),
..first.clone()
};
assert!(!scheduler.apply_watch_change(&heartbeat_only));
let drain = WatchChange {
revision: 3,
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "draining",
"last_heartbeat": "2026-04-01T00:00:10Z"
}))
.unwrap(),
..first
};
assert!(scheduler.apply_watch_change(&drain));
}
#[test]
fn test_scheduler_watch_suppresses_self_written_instance_update_but_reacts_to_health_change() {
let mut scheduler = test_scheduler();
let key = instance_key(
&scheduler.cluster_namespace,
&scheduler.cluster_id,
"api",
"api-node01",
);
let initial = serde_json::to_vec(&serde_json::json!({
"instance_id": "api-node01",
"service": "api",
"node_id": "node01",
"ip": "10.0.0.11",
"port": 18080,
"managed_by": "fleet-scheduler",
"state": "starting",
"last_heartbeat": "2026-04-01T00:00:00Z",
"observed_at": "2026-04-01T00:00:00Z"
}))
.unwrap();
let first = WatchChange {
event_type: EventType::Put,
key: key.as_bytes().to_vec(),
value: initial.clone(),
revision: 1,
};
assert!(scheduler.apply_watch_change(&first));
let self_write = serde_json::to_vec(&serde_json::json!({
"instance_id": "api-node01",
"service": "api",
"node_id": "node01",
"ip": "10.0.0.11",
"port": 18081,
"managed_by": "fleet-scheduler",
"state": "starting",
"last_heartbeat": "2026-04-01T00:00:00Z",
"observed_at": "2026-04-01T00:00:00Z"
}))
.unwrap();
scheduler
.pending_instance_writes
.insert(key.clone(), self_write.clone());
assert!(!scheduler.apply_watch_change(&WatchChange {
event_type: EventType::Put,
key: key.as_bytes().to_vec(),
value: self_write,
revision: 2,
}));
let healthy = serde_json::to_vec(&serde_json::json!({
"instance_id": "api-node01",
"service": "api",
"node_id": "node01",
"ip": "10.0.0.11",
"port": 18081,
"managed_by": "fleet-scheduler",
"state": "healthy",
"last_heartbeat": "2026-04-01T00:00:05Z",
"observed_at": "2026-04-01T00:00:05Z"
}))
.unwrap();
assert!(scheduler.apply_watch_change(&WatchChange {
event_type: EventType::Put,
key: key.as_bytes().to_vec(),
value: healthy,
revision: 3,
}));
}
}

View file

@ -0,0 +1,122 @@
use std::time::Duration;
use anyhow::Result;
use chainfire_client::{Client, EventType};
use tracing::{info, warn};
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub enum WatchScope {
Key,
Prefix,
}
#[derive(Debug, Clone)]
pub struct WatchChange {
pub event_type: EventType,
pub key: Vec<u8>,
pub value: Vec<u8>,
#[allow(dead_code)]
pub revision: u64,
}
pub struct ChainfireWatcher {
endpoint: String,
scope: WatchScope,
target: Vec<u8>,
reconnect_backoff: Duration,
}
impl ChainfireWatcher {
#[allow(dead_code)]
pub fn key(endpoint: String, key: Vec<u8>) -> Self {
Self {
endpoint,
scope: WatchScope::Key,
target: key,
reconnect_backoff: Duration::from_secs(1),
}
}
pub fn prefix(endpoint: String, prefix: Vec<u8>) -> Self {
Self {
endpoint,
scope: WatchScope::Prefix,
target: prefix,
reconnect_backoff: Duration::from_secs(1),
}
}
#[allow(dead_code)]
pub async fn watch<F>(&self, mut callback: F) -> Result<()>
where
F: FnMut(WatchChange) -> Result<()>,
{
self.watch_with_ready(|| Ok(()), move |change| callback(change))
.await
}
pub async fn watch_with_ready<F, G>(&self, mut on_connected: G, mut callback: F) -> Result<()>
where
F: FnMut(WatchChange) -> Result<()>,
G: FnMut() -> Result<()>,
{
loop {
match Client::connect(self.endpoint.clone()).await {
Ok(mut client) => {
let watch_result = match self.scope {
WatchScope::Key => client.watch(&self.target).await,
WatchScope::Prefix => client.watch_prefix(&self.target).await,
};
match watch_result {
Ok(mut handle) => {
info!(
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
watch_id = handle.id(),
"connected ChainFire watch"
);
on_connected()?;
while let Some(event) = handle.recv().await {
if let Err(error) = callback(WatchChange {
event_type: event.event_type,
key: event.key,
value: event.value,
revision: event.revision,
}) {
warn!(error = %error, "watch callback failed");
}
}
warn!(
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"ChainFire watch stream ended; reconnecting"
);
}
Err(error) => {
warn!(
error = %error,
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"failed to create ChainFire watch"
);
}
}
}
Err(error) => {
warn!(
error = %error,
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"failed to connect ChainFire watch"
);
}
}
tokio::time::sleep(self.reconnect_backoff).await;
}
}
}

View file

@ -15,10 +15,12 @@ use deployer_types::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
use tokio::time::sleep;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
use crate::process::{render_container_process_spec, ProcessManager};
use crate::watcher::{ChainfireWatcher, WatchChange};
fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/clusters/{}/", cluster_namespace, cluster_id)
@ -48,6 +50,10 @@ fn key_instance(
.into_bytes()
}
fn instances_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!("{}instances/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
}
fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!(
"{}/clusters/{}/service-statuses/",
@ -92,6 +98,8 @@ pub struct Agent {
allow_local_instance_upsert: bool,
process_manager: ProcessManager,
next_health_checks: HashMap<String, DateTime<Utc>>,
watched_node_signature: Option<Vec<u8>>,
watched_instance_signatures: HashMap<String, Vec<u8>>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
@ -112,6 +120,61 @@ struct LocalInstanceSpec {
container: Option<ContainerSpec>,
}
async fn spawn_watch_task(watcher: ChainfireWatcher, tx: mpsc::Sender<WatchChange>) {
if let Err(error) = watcher
.watch(move |change| {
tx.try_send(change)
.map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error))
})
.await
{
warn!(error = %error, "node-agent watch task exited");
}
}
fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option<Vec<u8>> {
let mut value: Value = serde_json::from_slice(bytes).ok()?;
strip_observed_fields(&mut value, drop_keys);
serde_json::to_vec(&sort_json_value(value)).ok()
}
fn strip_observed_fields(value: &mut Value, drop_keys: &[&str]) {
match value {
Value::Object(map) => {
for key in drop_keys {
map.remove(*key);
}
for nested in map.values_mut() {
strip_observed_fields(nested, drop_keys);
}
}
Value::Array(items) => {
for item in items {
strip_observed_fields(item, drop_keys);
}
}
_ => {}
}
}
fn sort_json_value(value: Value) -> Value {
match value {
Value::Array(items) => Value::Array(items.into_iter().map(sort_json_value).collect()),
Value::Object(map) => {
let mut keys = map.keys().cloned().collect::<Vec<_>>();
keys.sort();
let mut normalized = serde_json::Map::new();
for key in keys {
if let Some(value) = map.get(&key) {
normalized.insert(key, sort_json_value(value.clone()));
}
}
Value::Object(normalized)
}
other => other,
}
}
impl Agent {
pub fn new(
endpoint: String,
@ -135,15 +198,44 @@ impl Agent {
allow_local_instance_upsert,
process_manager: ProcessManager::new(pid_dir),
next_health_checks: HashMap::new(),
watched_node_signature: None,
watched_instance_signatures: HashMap::new(),
}
}
pub async fn run_loop(&mut self) -> Result<()> {
self.tick().await?;
let (watch_tx, mut watch_rx) = mpsc::channel::<WatchChange>(64);
self.spawn_watchers(watch_tx);
let mut interval = tokio::time::interval(self.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
loop {
if let Err(e) = self.tick().await {
warn!(error = %e, "node-agent tick failed");
tokio::select! {
_ = interval.tick() => {
if let Err(error) = self.tick().await {
warn!(error = %error, "node-agent tick failed");
}
}
maybe_change = watch_rx.recv() => {
let Some(change) = maybe_change else {
continue;
};
let mut should_reconcile = self.apply_watch_change(&change);
while let Ok(extra) = watch_rx.try_recv() {
should_reconcile |= self.apply_watch_change(&extra);
}
if should_reconcile {
if let Err(error) = self.tick().await {
warn!(error = %error, "node-agent watch-triggered tick failed");
}
}
}
}
sleep(self.interval).await;
}
}
@ -234,6 +326,76 @@ impl Agent {
);
}
fn spawn_watchers(&self, tx: mpsc::Sender<WatchChange>) {
let node_watcher = ChainfireWatcher::key(
self.endpoint.clone(),
key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id),
);
tokio::spawn(spawn_watch_task(node_watcher, tx.clone()));
let instances_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
instances_prefix(&self.cluster_namespace, &self.cluster_id),
);
tokio::spawn(spawn_watch_task(instances_watcher, tx));
}
fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
let key = String::from_utf8_lossy(&change.key).to_string();
let node_key =
String::from_utf8_lossy(&key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id))
.to_string();
let instances_prefix =
String::from_utf8_lossy(&instances_prefix(&self.cluster_namespace, &self.cluster_id))
.to_string();
if key == node_key {
return self.apply_node_watch_change(change);
}
if key.starts_with(&instances_prefix) {
return self.apply_instance_watch_change(&key, change);
}
false
}
fn apply_node_watch_change(&mut self, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_node_signature = None;
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat"]) else {
return true;
};
if self.watched_node_signature.as_ref() == Some(&signature) {
return false;
}
self.watched_node_signature = Some(signature);
true
}
fn apply_instance_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_instance_signatures.remove(key);
return true;
}
let Some(signature) =
canonical_json_signature(&change.value, &["state", "last_heartbeat", "observed_at"])
else {
self.watched_instance_signatures.remove(key);
return true;
};
if self.watched_instance_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_instance_signatures
.insert(key.to_string(), signature);
true
}
fn is_local_instance(&self, inst: &LocalInstanceSpec) -> bool {
matches!(inst.node_id.as_deref(), Some(node_id) if node_id == self.node_id)
}
@ -1099,4 +1261,125 @@ mod tests {
Some("healthy")
);
}
#[test]
fn watch_signature_ignores_node_heartbeat_only_changes() {
let mut agent = test_agent();
let node_key = key_node(&agent.cluster_namespace, &agent.cluster_id, &agent.node_id);
let first = WatchChange {
event_type: chainfire_client::EventType::Put,
key: node_key.clone(),
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "active",
"last_heartbeat": "2026-04-01T00:00:00Z"
}))
.unwrap(),
revision: 1,
};
assert!(agent.apply_watch_change(&first));
let heartbeat_only = WatchChange {
revision: 2,
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "active",
"last_heartbeat": "2026-04-01T00:00:05Z"
}))
.unwrap(),
..first.clone()
};
assert!(!agent.apply_watch_change(&heartbeat_only));
let drain = WatchChange {
revision: 3,
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "draining",
"last_heartbeat": "2026-04-01T00:00:10Z"
}))
.unwrap(),
..first
};
assert!(agent.apply_watch_change(&drain));
}
#[test]
fn watch_signature_ignores_instance_observed_fields_only_changes() {
let mut agent = test_agent();
let instance_key = key_instance(
&agent.cluster_namespace,
&agent.cluster_id,
"api",
"api-node01",
);
let first = WatchChange {
event_type: chainfire_client::EventType::Put,
key: instance_key.clone(),
value: serde_json::to_vec(&serde_json::json!({
"instance_id": "api-node01",
"service": "api",
"node_id": "node01",
"ip": "127.0.0.2",
"port": 18080,
"managed_by": "fleet-scheduler",
"state": null,
"last_heartbeat": null,
"observed_at": null
}))
.unwrap(),
revision: 1,
};
assert!(agent.apply_watch_change(&first));
let observed_only = WatchChange {
revision: 2,
value: serde_json::to_vec(&serde_json::json!({
"instance_id": "api-node01",
"service": "api",
"node_id": "node01",
"ip": "127.0.0.2",
"port": 18080,
"managed_by": "fleet-scheduler",
"state": "healthy",
"last_heartbeat": "2026-04-01T00:00:05Z",
"observed_at": "2026-04-01T00:00:05Z"
}))
.unwrap(),
..first.clone()
};
assert!(!agent.apply_watch_change(&observed_only));
let desired_change = WatchChange {
revision: 3,
value: serde_json::to_vec(&serde_json::json!({
"instance_id": "api-node01",
"service": "api",
"node_id": "node01",
"ip": "127.0.0.2",
"port": 18081,
"managed_by": "fleet-scheduler",
"state": "healthy",
"last_heartbeat": "2026-04-01T00:00:10Z",
"observed_at": "2026-04-01T00:00:10Z"
}))
.unwrap(),
..first
};
assert!(agent.apply_watch_change(&desired_change));
}
}

View file

@ -7,11 +7,12 @@ use tracing_subscriber::EnvFilter;
mod agent;
mod process;
mod watcher;
/// PhotonCloud NodeAgent
///
/// - Chainfire 上の `photoncloud/clusters/{cluster_id}/nodes/{node_id}` と
/// `.../instances/*` をポーリング/将来的には watch してローカル状態と比較する
/// `.../instances/*` を watch しつつ、周期 heartbeat/safety reconcile も行う
/// - `--apply` が指定された場合のみプロセス起動/停止を行う(デフォルトは dry-run
#[derive(Parser, Debug)]
#[command(author, version, about)]
@ -32,7 +33,7 @@ struct Cli {
#[arg(long)]
node_id: String,
/// ポーリング間隔(秒)
/// heartbeat / safety reconcile 間隔(秒)
#[arg(long, default_value_t = 15)]
interval_secs: u64,

View file

@ -1,74 +1,109 @@
use std::time::Duration;
use anyhow::Result;
use chainfire_client::Client;
use tokio::time::sleep;
use chainfire_client::{Client, EventType};
use tracing::{info, warn};
#[derive(Debug, Clone, Copy)]
pub enum WatchScope {
Key,
Prefix,
}
#[derive(Debug, Clone)]
pub struct WatchChange {
pub event_type: EventType,
pub key: Vec<u8>,
pub value: Vec<u8>,
#[allow(dead_code)]
pub revision: u64,
}
pub struct ChainfireWatcher {
endpoint: String,
prefix: String,
interval: Duration,
scope: WatchScope,
target: Vec<u8>,
reconnect_backoff: Duration,
}
impl ChainfireWatcher {
pub fn new(endpoint: String, prefix: String, interval_secs: u64) -> Self {
pub fn key(endpoint: String, key: Vec<u8>) -> Self {
Self {
endpoint,
prefix,
interval: Duration::from_secs(interval_secs),
scope: WatchScope::Key,
target: key,
reconnect_backoff: Duration::from_secs(1),
}
}
pub fn prefix(endpoint: String, prefix: Vec<u8>) -> Self {
Self {
endpoint,
scope: WatchScope::Prefix,
target: prefix,
reconnect_backoff: Duration::from_secs(1),
}
}
pub async fn watch<F>(&self, mut callback: F) -> Result<()>
where
F: FnMut(Vec<(Vec<u8>, Vec<u8>)>) -> Result<()>,
F: FnMut(WatchChange) -> Result<()>,
{
let mut last_revision = 0u64;
loop {
match self.fetch_updates(last_revision).await {
Ok((kvs, max_rev)) => {
if !kvs.is_empty() {
match Client::connect(self.endpoint.clone()).await {
Ok(mut client) => {
let watch_result = match self.scope {
WatchScope::Key => client.watch(&self.target).await,
WatchScope::Prefix => client.watch_prefix(&self.target).await,
};
match watch_result {
Ok(mut handle) => {
info!(
prefix = %self.prefix,
count = kvs.len(),
"detected changes in Chainfire"
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
watch_id = handle.id(),
"connected ChainFire watch"
);
if let Err(e) = callback(kvs) {
warn!(error = %e, "callback failed");
} else if max_rev > last_revision {
last_revision = max_rev;
}
} else if max_rev > last_revision {
last_revision = max_rev;
}
}
Err(e) => {
warn!(error = %e, "failed to fetch updates from Chainfire");
while let Some(event) = handle.recv().await {
if let Err(error) = callback(WatchChange {
event_type: event.event_type,
key: event.key,
value: event.value,
revision: event.revision,
}) {
warn!(error = %error, "watch callback failed");
}
}
sleep(self.interval).await;
warn!(
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"ChainFire watch stream ended; reconnecting"
);
}
Err(error) => {
warn!(
error = %error,
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"failed to create ChainFire watch"
);
}
}
}
Err(error) => {
warn!(
error = %error,
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"failed to connect ChainFire watch"
);
}
}
async fn fetch_updates(&self, last_revision: u64) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, u64)> {
let mut client = Client::connect(self.endpoint.clone()).await?;
let (kvs, _) = client.scan_prefix(self.prefix.as_bytes(), 0).await?;
// 簡易実装: 全てのKVペアを返すrevisionフィルタリングは未実装
let mut max_rev = last_revision;
let mut result = Vec::new();
for (k, v, rev) in kvs {
if rev > last_revision {
result.push((k, v));
if rev > max_rev {
max_rev = rev;
tokio::time::sleep(self.reconnect_backoff).await;
}
}
}
Ok((result, max_rev))
}
}

View file

@ -7,15 +7,35 @@ use deployer_types::{
HostDeploymentSpec, HostDeploymentStatus, InstallState, ObservedSystemState,
ServiceInstanceSpec,
};
use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
use tokio::time::sleep;
use tokio::sync::{mpsc, oneshot};
use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
use crate::watcher::{ChainfireWatcher, WatchChange};
fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/clusters/{}/", cluster_namespace, cluster_id)
}
fn nodes_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!("{}nodes/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
}
fn instances_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!("{}instances/", cluster_prefix(cluster_namespace, cluster_id)).into_bytes()
}
fn host_deployments_prefix(cluster_namespace: &str, cluster_id: &str) -> Vec<u8> {
format!(
"{}deployments/hosts/",
cluster_prefix(cluster_namespace, cluster_id)
)
.into_bytes()
}
fn key_node(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> Vec<u8> {
format!(
"{}nodes/{}",
@ -72,17 +92,8 @@ pub struct HostsCommand {
}
pub async fn run(command: HostsCommand) -> Result<()> {
let controller = HostDeploymentController::new(command);
if controller.once {
controller.reconcile_once().await
} else {
loop {
if let Err(error) = controller.reconcile_once().await {
warn!(error = %error, "host deployment reconciliation failed");
}
sleep(controller.interval).await;
}
}
let mut controller = HostDeploymentController::new(command);
controller.run_loop().await
}
struct HostDeploymentController {
@ -93,6 +104,102 @@ struct HostDeploymentController {
heartbeat_timeout_secs: u64,
dry_run: bool,
once: bool,
watched_node_signatures: HashMap<String, Vec<u8>>,
watched_desired_signatures: HashMap<String, Vec<u8>>,
watched_observed_signatures: HashMap<String, Vec<u8>>,
watched_instance_signatures: HashMap<String, Vec<u8>>,
watched_deployment_spec_signatures: HashMap<String, Vec<u8>>,
watched_deployment_status_signatures: HashMap<String, Vec<u8>>,
pending_kv_writes: HashMap<String, Vec<u8>>,
pending_kv_deletes: HashSet<String>,
}
async fn spawn_watch_task(
watcher: ChainfireWatcher,
tx: mpsc::Sender<WatchChange>,
ready: oneshot::Sender<()>,
) {
let mut ready = Some(ready);
if let Err(error) = watcher
.watch_with_ready(
move || {
if let Some(ready) = ready.take() {
let _ = ready.send(());
}
Ok(())
},
move |change| {
tx.try_send(change).map_err(|send_error| {
anyhow::anyhow!("failed to enqueue watch change: {send_error}")
})
},
)
.await
{
warn!(error = %error, "host deployment watch task exited");
}
}
async fn wait_for_watchers_ready(receivers: Vec<oneshot::Receiver<()>>) {
for receiver in receivers {
match tokio::time::timeout(Duration::from_secs(10), receiver).await {
Ok(Ok(())) => {}
Ok(Err(_)) => warn!("host deployment watch task ended before signaling readiness"),
Err(_) => warn!("timed out waiting for host deployment watch readiness"),
}
}
}
fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option<Vec<u8>> {
let mut value: Value = serde_json::from_slice(bytes).ok()?;
strip_observed_fields(&mut value, drop_keys);
serde_json::to_vec(&sort_json_value(value)).ok()
}
fn strip_observed_fields(value: &mut Value, drop_keys: &[&str]) {
match value {
Value::Object(map) => {
for key in drop_keys {
map.remove(*key);
}
for nested in map.values_mut() {
strip_observed_fields(nested, drop_keys);
}
}
Value::Array(items) => {
for item in items {
strip_observed_fields(item, drop_keys);
}
}
_ => {}
}
}
fn sort_json_value(value: Value) -> Value {
match value {
Value::Array(items) => Value::Array(items.into_iter().map(sort_json_value).collect()),
Value::Object(map) => {
let mut keys = map.keys().cloned().collect::<Vec<_>>();
keys.sort();
let mut normalized = serde_json::Map::new();
for key in keys {
if let Some(value) = map.get(&key) {
normalized.insert(key, sort_json_value(value.clone()));
}
}
Value::Object(normalized)
}
other => other,
}
}
fn host_deployment_status_signature(bytes: &[u8]) -> Option<Vec<u8>> {
let value: Value = serde_json::from_slice(bytes).ok()?;
serde_json::to_vec(&serde_json::json!({
"paused_by_operator": value.get("paused_by_operator").and_then(Value::as_bool).unwrap_or(false),
"aborted": value.get("phase").and_then(Value::as_str) == Some("aborted"),
}))
.ok()
}
impl HostDeploymentController {
@ -105,10 +212,60 @@ impl HostDeploymentController {
heartbeat_timeout_secs: command.heartbeat_timeout_secs,
dry_run: command.dry_run,
once: command.once,
watched_node_signatures: HashMap::new(),
watched_desired_signatures: HashMap::new(),
watched_observed_signatures: HashMap::new(),
watched_instance_signatures: HashMap::new(),
watched_deployment_spec_signatures: HashMap::new(),
watched_deployment_status_signatures: HashMap::new(),
pending_kv_writes: HashMap::new(),
pending_kv_deletes: HashSet::new(),
}
}
async fn reconcile_once(&self) -> Result<()> {
async fn run_loop(&mut self) -> Result<()> {
if self.once {
return self.reconcile_once().await;
}
self.reconcile_once().await?;
let (watch_tx, mut watch_rx) = mpsc::channel::<WatchChange>(128);
let ready_receivers = self.spawn_watchers(watch_tx);
wait_for_watchers_ready(ready_receivers).await;
self.reconcile_once().await?;
let mut interval = tokio::time::interval(self.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(error) = self.reconcile_once().await {
warn!(error = %error, "host deployment reconciliation failed");
}
}
maybe_change = watch_rx.recv() => {
let Some(change) = maybe_change else {
continue;
};
let mut should_reconcile = self.apply_watch_change(&change);
while let Ok(extra) = watch_rx.try_recv() {
should_reconcile |= self.apply_watch_change(&extra);
}
if should_reconcile {
if let Err(error) = self.reconcile_once().await {
warn!(error = %error, "host deployment watch-triggered reconciliation failed");
}
}
}
}
}
}
async fn reconcile_once(&mut self) -> Result<()> {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_nodes(&mut client).await?;
let desired_systems = self.load_desired_systems(&mut client).await?;
@ -149,52 +306,309 @@ impl HostDeploymentController {
}
for desired in &plan.desired_upserts {
client
.put(
&key_desired_system(
&self.cluster_namespace,
&self.cluster_id,
&desired.node_id,
),
&serde_json::to_vec(desired)?,
)
.await?;
let key = key_desired_system(&self.cluster_namespace, &self.cluster_id, &desired.node_id);
let payload = serde_json::to_vec(desired)?;
let key_str = self.record_pending_write(&key, payload.clone());
if let Err(error) = client.put(&key, &payload).await {
self.pending_kv_writes.remove(&key_str);
return Err(error.into());
}
}
for node_id in &plan.desired_deletes {
client
.delete(&key_desired_system(
&self.cluster_namespace,
&self.cluster_id,
node_id,
))
.await?;
let key = key_desired_system(&self.cluster_namespace, &self.cluster_id, node_id);
let key_str = self.record_pending_delete(&key);
if !client.delete(&key).await? {
self.pending_kv_deletes.remove(&key_str);
}
}
for node in plan.node_updates.values() {
client
.put(
&key_node(&self.cluster_namespace, &self.cluster_id, &node.node_id),
&serde_json::to_vec(node)?,
)
.await?;
let key = key_node(&self.cluster_namespace, &self.cluster_id, &node.node_id);
let payload = serde_json::to_vec(node)?;
let key_str = self.record_pending_write(&key, payload.clone());
if let Err(error) = client.put(&key, &payload).await {
self.pending_kv_writes.remove(&key_str);
return Err(error.into());
}
}
client
.put(
&key_host_deployment_status(
let key = key_host_deployment_status(
&self.cluster_namespace,
&self.cluster_id,
&deployment.name,
),
&serde_json::to_vec(&plan.status)?,
)
.await?;
);
let payload = serde_json::to_vec(&plan.status)?;
let key_str = self.record_pending_write(&key, payload.clone());
if let Err(error) = client.put(&key, &payload).await {
self.pending_kv_writes.remove(&key_str);
return Err(error.into());
}
}
Ok(())
}
fn spawn_watchers(&self, tx: mpsc::Sender<WatchChange>) -> Vec<oneshot::Receiver<()>> {
let mut ready_receivers = Vec::new();
let node_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
nodes_prefix(&self.cluster_namespace, &self.cluster_id),
);
let (node_ready_tx, node_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx));
ready_receivers.push(node_ready_rx);
let instance_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
instances_prefix(&self.cluster_namespace, &self.cluster_id),
);
let (instance_ready_tx, instance_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(
instance_watcher,
tx.clone(),
instance_ready_tx,
));
ready_receivers.push(instance_ready_rx);
let deployment_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
host_deployments_prefix(&self.cluster_namespace, &self.cluster_id),
);
let (deployment_ready_tx, deployment_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(deployment_watcher, tx, deployment_ready_tx));
ready_receivers.push(deployment_ready_rx);
ready_receivers
}
fn record_pending_write(&mut self, key: &[u8], payload: Vec<u8>) -> String {
let key_str = String::from_utf8_lossy(key).to_string();
self.pending_kv_deletes.remove(&key_str);
self.pending_kv_writes.insert(key_str.clone(), payload);
key_str
}
fn record_pending_delete(&mut self, key: &[u8]) -> String {
let key_str = String::from_utf8_lossy(key).to_string();
self.pending_kv_writes.remove(&key_str);
self.pending_kv_deletes.insert(key_str.clone());
key_str
}
fn consume_pending_write(&mut self, key: &str, change: &WatchChange) -> Option<Vec<u8>> {
let expected = self.pending_kv_writes.remove(key)?;
if matches!(change.event_type, chainfire_client::EventType::Put) && change.value == expected {
self.pending_kv_deletes.remove(key);
return Some(expected);
}
None
}
fn consume_pending_delete(&mut self, key: &str, change: &WatchChange) -> bool {
self.pending_kv_deletes.remove(key)
&& matches!(change.event_type, chainfire_client::EventType::Delete)
}
fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
let key = String::from_utf8_lossy(&change.key).to_string();
let nodes_prefix = String::from_utf8_lossy(&nodes_prefix(&self.cluster_namespace, &self.cluster_id))
.to_string();
let instances_prefix =
String::from_utf8_lossy(&instances_prefix(&self.cluster_namespace, &self.cluster_id))
.to_string();
let deployments_prefix = String::from_utf8_lossy(&host_deployments_prefix(
&self.cluster_namespace,
&self.cluster_id,
))
.to_string();
if let Some(suffix) = key.strip_prefix(&nodes_prefix) {
if suffix.ends_with("/desired-system") {
return self.apply_desired_system_watch_change(&key, change);
}
if suffix.ends_with("/observed-system") {
return self.apply_observed_system_watch_change(&key, change);
}
if !suffix.is_empty() && !suffix.contains('/') {
return self.apply_node_record_watch_change(&key, change);
}
return false;
}
if key.starts_with(&instances_prefix) {
return self.apply_instance_watch_change(&key, change);
}
if let Some(suffix) = key.strip_prefix(&deployments_prefix) {
if suffix.ends_with("/spec") {
return self.apply_deployment_spec_watch_change(&key, change);
}
if suffix.ends_with("/status") {
return self.apply_deployment_status_watch_change(&key, change);
}
}
false
}
fn apply_node_record_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if self.consume_pending_delete(key, change) {
self.watched_node_signatures.remove(key);
return false;
}
if let Some(payload) = self.consume_pending_write(key, change) {
if let Some(signature) = canonical_json_signature(&payload, &["last_heartbeat"]) {
self.watched_node_signatures
.insert(key.to_string(), signature);
} else {
self.watched_node_signatures.remove(key);
}
return false;
}
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_node_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat"]) else {
return true;
};
if self.watched_node_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_node_signatures
.insert(key.to_string(), signature);
true
}
fn apply_desired_system_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if self.consume_pending_delete(key, change) {
self.watched_desired_signatures.remove(key);
return false;
}
if let Some(payload) = self.consume_pending_write(key, change) {
if let Some(signature) = canonical_json_signature(&payload, &[]) {
self.watched_desired_signatures
.insert(key.to_string(), signature);
} else {
self.watched_desired_signatures.remove(key);
}
return false;
}
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_desired_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &[]) else {
return true;
};
if self.watched_desired_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_desired_signatures
.insert(key.to_string(), signature);
true
}
fn apply_observed_system_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_observed_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(
&change.value,
&[
"flake_root",
"configured_system",
"current_system",
"booted_system",
"rollback_system",
"switch_action",
"reboot_required",
"last_attempt",
"last_success",
"last_error",
],
) else {
return true;
};
if self.watched_observed_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_observed_signatures
.insert(key.to_string(), signature);
true
}
fn apply_instance_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_instance_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &["last_heartbeat", "observed_at"])
else {
return true;
};
if self.watched_instance_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_instance_signatures
.insert(key.to_string(), signature);
true
}
fn apply_deployment_spec_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_deployment_spec_signatures.remove(key);
return true;
}
let Some(signature) = canonical_json_signature(&change.value, &[]) else {
return true;
};
if self.watched_deployment_spec_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_deployment_spec_signatures
.insert(key.to_string(), signature);
true
}
fn apply_deployment_status_watch_change(&mut self, key: &str, change: &WatchChange) -> bool {
if self.consume_pending_delete(key, change) {
self.watched_deployment_status_signatures.remove(key);
return false;
}
if let Some(payload) = self.consume_pending_write(key, change) {
if let Some(signature) = host_deployment_status_signature(&payload) {
self.watched_deployment_status_signatures
.insert(key.to_string(), signature);
} else {
self.watched_deployment_status_signatures.remove(key);
}
return false;
}
if matches!(change.event_type, chainfire_client::EventType::Delete) {
self.watched_deployment_status_signatures.remove(key);
return true;
}
let Some(signature) = host_deployment_status_signature(&change.value) else {
return true;
};
if self.watched_deployment_status_signatures.get(key) == Some(&signature) {
return false;
}
self.watched_deployment_status_signatures
.insert(key.to_string(), signature);
true
}
async fn load_nodes(&self, client: &mut Client) -> Result<Vec<ClusterNodeRecord>> {
let prefix = format!(
"{}nodes/",
@ -385,6 +799,9 @@ fn plan_host_deployment(
let operator_paused = existing_status
.map(|status| status.paused_by_operator)
.unwrap_or(false);
let aborted = existing_status
.and_then(|status| status.phase.as_deref())
== Some("aborted");
let spec_paused = deployment.paused.unwrap_or(false);
let mut desired_deletes = desired_systems
.iter()
@ -395,6 +812,44 @@ fn plan_host_deployment(
.map(|(node_id, _)| node_id.clone())
.collect::<Vec<_>>();
if aborted {
desired_deletes = desired_systems
.iter()
.filter(|(_, desired)| {
desired.deployment_id.as_deref() == Some(deployment.name.as_str())
})
.map(|(node_id, _)| node_id.clone())
.collect();
for node in &selector_matches {
if node.state.as_deref() == Some("draining") {
let mut updated = (*node).clone();
updated.state = Some("active".to_string());
node_updates.insert(updated.node_id.clone(), updated);
}
}
let mut status = existing_status.cloned().unwrap_or_default();
status.name = deployment.name.clone();
status.paused = true;
status.paused_by_operator = true;
status.phase = Some("aborted".to_string());
if status.message.is_none() {
status.message = Some("aborted by operator".to_string());
}
status.updated_at = Some(now);
desired_deletes.sort();
desired_deletes.dedup();
return HostDeploymentPlan {
status,
desired_upserts: Vec::new(),
desired_deletes,
node_updates,
};
}
for node in &selector_matches {
let desired = desired_systems.get(&node.node_id);
let observed = observed_systems.get(&node.node_id);
@ -707,6 +1162,7 @@ fn dedup_sorted(mut values: Vec<String>) -> Vec<String> {
#[cfg(test)]
mod tests {
use super::*;
use chainfire_client::EventType;
fn test_node(node_id: &str, failure_domain: &str) -> ClusterNodeRecord {
ClusterNodeRecord {
@ -760,6 +1216,18 @@ mod tests {
}
}
fn test_controller() -> HostDeploymentController {
HostDeploymentController::new(HostsCommand {
endpoint: "http://127.0.0.1:7000".to_string(),
cluster_namespace: "photoncloud".to_string(),
cluster_id: "test-cluster".to_string(),
interval_secs: 1,
heartbeat_timeout_secs: 300,
dry_run: true,
once: false,
})
}
#[test]
fn plan_rollout_starts_one_node_per_batch() {
let deployment = test_deployment();
@ -859,4 +1327,185 @@ mod tests {
);
assert_eq!(plan.status.in_progress_nodes, vec!["node01".to_string()]);
}
#[test]
fn plan_rollout_keeps_aborted_state_and_clears_desired_system() {
let deployment = test_deployment();
let mut draining = test_node("node01", "rack-a");
draining.state = Some("draining".to_string());
let nodes = vec![draining];
let desired = HashMap::from([(
"node01".to_string(),
DesiredSystemSpec {
node_id: "node01".to_string(),
deployment_id: Some("worker-rollout".to_string()),
nixos_configuration: Some("worker-golden".to_string()),
target_system: Some("/nix/store/worker-golden".to_string()),
flake_ref: None,
switch_action: Some("switch".to_string()),
health_check_command: Vec::new(),
rollback_on_failure: Some(true),
drain_before_apply: Some(false),
},
)]);
let status = HostDeploymentStatus {
name: "worker-rollout".to_string(),
phase: Some("aborted".to_string()),
paused: true,
paused_by_operator: true,
message: Some("aborted by operator".to_string()),
..HostDeploymentStatus::default()
};
let plan = plan_host_deployment(
&deployment,
Some(&status),
&nodes,
&desired,
&HashMap::new(),
&[],
300,
);
assert!(plan.desired_upserts.is_empty());
assert_eq!(plan.desired_deletes, vec!["node01".to_string()]);
assert_eq!(plan.status.phase.as_deref(), Some("aborted"));
assert!(plan.status.paused);
assert_eq!(
plan.node_updates
.get("node01")
.and_then(|node| node.state.as_deref()),
Some("active")
);
}
#[test]
fn host_watch_ignores_node_heartbeat_only_changes() {
let mut controller = test_controller();
let node_key = key_node(
&controller.cluster_namespace,
&controller.cluster_id,
"node01",
);
let first = WatchChange {
event_type: EventType::Put,
key: node_key.clone(),
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "active",
"last_heartbeat": "2026-04-01T00:00:00Z"
}))
.unwrap(),
revision: 1,
};
assert!(controller.apply_watch_change(&first));
let heartbeat_only = WatchChange {
revision: 2,
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "active",
"last_heartbeat": "2026-04-01T00:00:05Z"
}))
.unwrap(),
..first.clone()
};
assert!(!controller.apply_watch_change(&heartbeat_only));
let drain = WatchChange {
revision: 3,
value: serde_json::to_vec(&serde_json::json!({
"node_id": "node01",
"hostname": "node01",
"ip": "10.0.0.11",
"roles": ["worker"],
"labels": {"tier": "general"},
"state": "draining",
"last_heartbeat": "2026-04-01T00:00:10Z"
}))
.unwrap(),
..first
};
assert!(controller.apply_watch_change(&drain));
}
#[test]
fn host_watch_reacts_only_to_operator_pause_or_abort_status_changes() {
let mut controller = test_controller();
let key = key_host_deployment_status(
&controller.cluster_namespace,
&controller.cluster_id,
"worker-rollout",
);
let running = WatchChange {
event_type: EventType::Put,
key: key.clone(),
value: serde_json::to_vec(&serde_json::json!({
"name": "worker-rollout",
"phase": "running",
"paused": false,
"paused_by_operator": false,
"selected_nodes": ["node01"],
"in_progress_nodes": ["node01"],
"updated_at": "2026-04-01T00:00:00Z"
}))
.unwrap(),
revision: 1,
};
assert!(controller.apply_watch_change(&running));
let completed = WatchChange {
revision: 2,
value: serde_json::to_vec(&serde_json::json!({
"name": "worker-rollout",
"phase": "completed",
"paused": false,
"paused_by_operator": false,
"selected_nodes": ["node01"],
"completed_nodes": ["node01"],
"updated_at": "2026-04-01T00:00:10Z"
}))
.unwrap(),
..running.clone()
};
assert!(!controller.apply_watch_change(&completed));
let paused = WatchChange {
revision: 3,
value: serde_json::to_vec(&serde_json::json!({
"name": "worker-rollout",
"phase": "paused",
"paused": true,
"paused_by_operator": true,
"updated_at": "2026-04-01T00:00:20Z"
}))
.unwrap(),
..running.clone()
};
assert!(controller.apply_watch_change(&paused));
let aborted = WatchChange {
revision: 4,
value: serde_json::to_vec(&serde_json::json!({
"name": "worker-rollout",
"phase": "aborted",
"paused": true,
"paused_by_operator": true,
"updated_at": "2026-04-01T00:00:30Z"
}))
.unwrap(),
..running
};
assert!(controller.apply_watch_change(&aborted));
}
}

View file

@ -40,6 +40,7 @@ use flashdns_api::proto::{
};
mod hosts;
mod watcher;
#[derive(Parser)]
#[command(author, version, about)]

View file

@ -0,0 +1,122 @@
use std::time::Duration;
use anyhow::Result;
use chainfire_client::{Client, EventType};
use tracing::{info, warn};
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub enum WatchScope {
Key,
Prefix,
}
#[derive(Debug, Clone)]
pub struct WatchChange {
pub event_type: EventType,
pub key: Vec<u8>,
pub value: Vec<u8>,
#[allow(dead_code)]
pub revision: u64,
}
pub struct ChainfireWatcher {
endpoint: String,
scope: WatchScope,
target: Vec<u8>,
reconnect_backoff: Duration,
}
impl ChainfireWatcher {
#[allow(dead_code)]
pub fn key(endpoint: String, key: Vec<u8>) -> Self {
Self {
endpoint,
scope: WatchScope::Key,
target: key,
reconnect_backoff: Duration::from_secs(1),
}
}
pub fn prefix(endpoint: String, prefix: Vec<u8>) -> Self {
Self {
endpoint,
scope: WatchScope::Prefix,
target: prefix,
reconnect_backoff: Duration::from_secs(1),
}
}
#[allow(dead_code)]
pub async fn watch<F>(&self, mut callback: F) -> Result<()>
where
F: FnMut(WatchChange) -> Result<()>,
{
self.watch_with_ready(|| Ok(()), move |change| callback(change))
.await
}
pub async fn watch_with_ready<F, G>(&self, mut on_connected: G, mut callback: F) -> Result<()>
where
F: FnMut(WatchChange) -> Result<()>,
G: FnMut() -> Result<()>,
{
loop {
match Client::connect(self.endpoint.clone()).await {
Ok(mut client) => {
let watch_result = match self.scope {
WatchScope::Key => client.watch(&self.target).await,
WatchScope::Prefix => client.watch_prefix(&self.target).await,
};
match watch_result {
Ok(mut handle) => {
info!(
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
watch_id = handle.id(),
"connected ChainFire watch"
);
on_connected()?;
while let Some(event) = handle.recv().await {
if let Err(error) = callback(WatchChange {
event_type: event.event_type,
key: event.key,
value: event.value,
revision: event.revision,
}) {
warn!(error = %error, "watch callback failed");
}
}
warn!(
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"ChainFire watch stream ended; reconnecting"
);
}
Err(error) => {
warn!(
error = %error,
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"failed to create ChainFire watch"
);
}
}
}
Err(error) => {
warn!(
error = %error,
scope = ?self.scope,
target = %String::from_utf8_lossy(&self.target),
"failed to connect ChainFire watch"
);
}
}
tokio::time::sleep(self.reconnect_backoff).await;
}
}
}

View file

@ -41,10 +41,16 @@ run_fleet_scheduler_bin() {
tmp_dir="$(mktemp -d)"
cf_pid=""
controller_pids=()
cleanup() {
set +e
for pid in "${controller_pids[@]}"; do
kill "$pid" 2>/dev/null || true
wait "$pid" 2>/dev/null || true
done
if [[ -d "$tmp_dir/pids" ]]; then
while IFS= read -r -d '' pid_file; do
[[ -f "$pid_file" ]] || continue
@ -57,7 +63,11 @@ cleanup() {
wait "$cf_pid" 2>/dev/null || true
fi
if [[ "${PHOTONCLOUD_KEEP_TMP:-}" == "1" ]]; then
echo "Keeping temporary directory: $tmp_dir" >&2
else
rm -rf "$tmp_dir"
fi
}
trap cleanup EXIT
@ -364,7 +374,7 @@ run_deployer_ctl() {
"$@"
}
run_node_agent_once() {
run_node_agent_bg() {
local node_id="$1"
local pid_dir="$tmp_dir/pids/$node_id"
mkdir -p "$pid_dir"
@ -375,26 +385,111 @@ run_node_agent_once() {
--pid-dir "$pid_dir" \
--interval-secs 1 \
--apply \
--once
>"$tmp_dir/node-agent-${node_id}.log" 2>&1 &
controller_pids+=("$!")
}
run_scheduler_once() {
run_scheduler_bg() {
run_fleet_scheduler_bin \
--chainfire-endpoint "$endpoint" \
--cluster-id test-cluster \
--interval-secs 1 \
--once
--interval-secs 300 \
>"$tmp_dir/fleet-scheduler.log" 2>&1 &
controller_pids+=("$!")
}
wait_for_service_state() {
local service_name="$1"
local expected_phase="$2"
local expected_instances="$3"
local expected_state="${4:--}"
local timeout_secs="${5:-120}"
local inspect_path="$tmp_dir/${service_name}-wait.json"
local deadline=$((SECONDS + timeout_secs))
while (( SECONDS < deadline )); do
if run_deployer_ctl service inspect --name "$service_name" --include-instances >"$inspect_path" 2>/dev/null; then
if python3 - "$inspect_path" "$expected_phase" "$expected_instances" "$expected_state" <<'PY'
import json
import sys
payload = json.load(open(sys.argv[1], "r", encoding="utf-8"))
expected_phase = sys.argv[2]
expected_instances = int(sys.argv[3])
expected_state = sys.argv[4]
status = payload.get("status") or {}
instances = payload.get("instances") or []
if status.get("phase") != expected_phase:
raise SystemExit(1)
if len(instances) != expected_instances:
raise SystemExit(1)
if expected_state != "-":
actual_states = sorted(instance.get("state") for instance in instances)
if actual_states != [expected_state] * expected_instances:
raise SystemExit(1)
PY
then
return 0
fi
fi
sleep 1
done
echo "timed out waiting for ${service_name} phase=${expected_phase} instances=${expected_instances}" >&2
[[ -f "$inspect_path" ]] && cat "$inspect_path" >&2
return 1
}
wait_for_endpoint_convergence() {
local timeout_secs="${1:-60}"
local deadline=$((SECONDS + timeout_secs))
while (( SECONDS < deadline )); do
if python3 - <<'PY'
import socket
import urllib.request
with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 endpoint returned {response.status}")
with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 worker endpoint returned {response.status}")
for port, label in ((18080, "api"), (18081, "worker")):
sock = socket.socket()
sock.settimeout(1.5)
try:
sock.connect(("127.0.0.3", port))
except OSError:
pass
else:
raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
finally:
sock.close()
PY
then
return 0
fi
sleep 1
done
echo "timed out waiting for endpoint convergence after scale-down" >&2
return 1
}
echo "Applying cluster declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster.yaml"
echo "Activating nodes through node-agent"
run_node_agent_once node01
run_node_agent_once node02
echo "Starting watch-driven controllers"
run_scheduler_bg
run_node_agent_bg node01
run_node_agent_bg node02
echo "Scheduling managed instances"
run_scheduler_once
echo "Waiting for worker to remain blocked until api becomes healthy"
wait_for_service_state worker blocked 0 - 120
echo "Validating dependency block before api is healthy"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-blocked.dump"
@ -455,11 +550,7 @@ print("service inspect reports blocked dependency state")
PY
echo "Reconciling processes and health for api"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
wait_for_service_state api healthy 2 healthy 120
run_deployer_ctl service inspect --name api --include-instances >"$tmp_dir/api-inspect-healthy.json"
python3 - "$tmp_dir/api-inspect-healthy.json" <<'PY'
@ -478,15 +569,8 @@ if len(instances) != 2:
print("api service inspect refreshed to healthy from node-agent updates")
PY
echo "Re-running scheduler after api became healthy"
run_scheduler_once
echo "Reconciling processes and health for dependent worker service"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Waiting for dependent worker service to converge automatically"
wait_for_service_state worker healthy 2 healthy 120
echo "Validating HTTP endpoints"
python3 - <<'PY'
@ -594,25 +678,9 @@ PY
echo "Applying scaled declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune
echo "Re-running scheduler after scale-down"
run_scheduler_once
echo "Reconciling api after scale-down"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Re-running scheduler after scaled api became healthy"
run_scheduler_once
echo "Reconciling dependent worker service after scale-down"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Waiting for scaled services to converge automatically"
wait_for_service_state api healthy 1 healthy 120
wait_for_service_state worker healthy 1 healthy 120
echo "Inspecting scaled instance state in ChainFire"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/api/" >"$tmp_dir/instances-scaled.dump"
@ -695,30 +763,7 @@ print("service inspect reports scaled healthy worker state without waiting for s
PY
echo "Validating endpoint convergence after scale-down"
python3 - <<'PY'
import socket
import urllib.request
with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 endpoint returned {response.status}")
with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 worker endpoint returned {response.status}")
for port, label in ((18080, "api"), (18081, "worker")):
sock = socket.socket()
sock.settimeout(1.5)
try:
sock.connect(("127.0.0.3", port))
except OSError:
pass
else:
raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
finally:
sock.close()
print("Endpoint convergence validated")
PY
wait_for_endpoint_convergence 60
echo "Endpoint convergence validated"
echo "Fleet scheduler E2E verification passed"

View file

@ -34,9 +34,14 @@ run_plasmacloud_reconciler_bin() {
tmp_dir="$(mktemp -d)"
cf_pid=""
redfish_pid=""
controller_pids=()
cleanup() {
set +e
for pid in "${controller_pids[@]}"; do
kill "$pid" 2>/dev/null || true
wait "$pid" 2>/dev/null || true
done
if [[ -n "$redfish_pid" ]]; then
kill "$redfish_pid" 2>/dev/null || true
wait "$redfish_pid" 2>/dev/null || true
@ -45,7 +50,11 @@ cleanup() {
kill "$cf_pid" 2>/dev/null || true
wait "$cf_pid" 2>/dev/null || true
fi
if [[ "${PHOTONCLOUD_KEEP_TMP:-}" == "1" ]]; then
echo "Keeping temporary directory: $tmp_dir" >&2
else
rm -rf "$tmp_dir"
fi
}
trap cleanup EXIT
@ -248,21 +257,80 @@ run_deployer_ctl() {
"$@"
}
run_hosts_once() {
run_hosts_bg() {
run_plasmacloud_reconciler_bin \
hosts \
--endpoint "$chainfire_endpoint" \
--cluster-namespace photoncloud \
--cluster-id test-cluster \
--heartbeat-timeout-secs 300 \
--once
--interval-secs 300 \
>"$tmp_dir/hosts-controller.log" 2>&1 &
controller_pids+=("$!")
}
wait_for_deployment_state() {
local expected_phase="$1"
local expected_paused="$2"
local expected_completed="$3"
local expected_in_progress="$4"
local expected_failed="$5"
local timeout_secs="${6:-120}"
local inspect_path="$tmp_dir/deployment-wait.json"
local deadline=$((SECONDS + timeout_secs))
while (( SECONDS < deadline )); do
if run_deployer_ctl deployment inspect --name worker-rollout --format json >"$inspect_path" 2>/dev/null; then
if python3 - "$inspect_path" "$expected_phase" "$expected_paused" "$expected_completed" "$expected_in_progress" "$expected_failed" <<'PY'
import json
import sys
payload = json.load(open(sys.argv[1], "r", encoding="utf-8"))
status = payload.get("status") or {}
def parse_csv(value):
if value == "-":
return None
if not value:
return []
return value.split(",")
expected_phase = sys.argv[2]
expected_paused = sys.argv[3].lower() == "true"
expected_completed = parse_csv(sys.argv[4])
expected_in_progress = parse_csv(sys.argv[5])
expected_failed = parse_csv(sys.argv[6])
if status.get("phase") != expected_phase:
raise SystemExit(1)
if bool(status.get("paused")) != expected_paused:
raise SystemExit(1)
if expected_completed is not None and status.get("completed_nodes", []) != expected_completed:
raise SystemExit(1)
if expected_in_progress is not None and status.get("in_progress_nodes", []) != expected_in_progress:
raise SystemExit(1)
if expected_failed is not None and status.get("failed_nodes", []) != expected_failed:
raise SystemExit(1)
PY
then
return 0
fi
fi
sleep 1
done
echo "timed out waiting for deployment state ${expected_phase}" >&2
[[ -f "$inspect_path" ]] && cat "$inspect_path" >&2
return 1
}
echo "Applying host lifecycle cluster config"
run_deployer_ctl apply --config "$tmp_dir/cluster.yaml" --prune
echo "Running host rollout controller"
run_hosts_once
echo "Starting watch-driven host rollout controller"
run_hosts_bg
wait_for_deployment_state running false "" "node01" "" 120
run_deployer_ctl deployment inspect --name worker-rollout --format json >"$tmp_dir/deployment-1.json"
python3 - "$tmp_dir/deployment-1.json" <<'PY'
@ -325,7 +393,7 @@ run_deployer_ctl node set-observed \
--node-id node01 \
--status active \
--nixos-configuration worker-next >/dev/null
run_hosts_once
wait_for_deployment_state running false "node01" "node02" "" 120
run_deployer_ctl deployment inspect --name worker-rollout --format json >"$tmp_dir/deployment-2.json"
python3 - "$tmp_dir/deployment-2.json" <<'PY'
@ -344,7 +412,7 @@ run_deployer_ctl node set-observed \
--node-id node02 \
--status rolled-back \
--nixos-configuration worker-next >/dev/null
run_hosts_once
wait_for_deployment_state paused true "node01" "" "node02" 120
run_deployer_ctl deployment inspect --name worker-rollout --format json >"$tmp_dir/deployment-3.json"
python3 - "$tmp_dir/deployment-3.json" <<'PY'
@ -407,6 +475,8 @@ assert payload["paused"] is True, payload
print("abort command validated")
PY
wait_for_deployment_state aborted true - - - 120
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/nodes/" >"$tmp_dir/nodes-2.dump"
python3 - "$tmp_dir/nodes-2.dump" <<'PY'
import json