diff --git a/deployer/crates/node-agent/src/agent.rs b/deployer/crates/node-agent/src/agent.rs index 1438857..7a251df 100644 --- a/deployer/crates/node-agent/src/agent.rs +++ b/deployer/crates/node-agent/src/agent.rs @@ -15,7 +15,7 @@ use deployer_types::{ use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::process::Command; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio::time::MissedTickBehavior; use tracing::{info, warn}; @@ -120,18 +120,41 @@ struct LocalInstanceSpec { container: Option, } -async fn spawn_watch_task(watcher: ChainfireWatcher, tx: mpsc::Sender) { +async fn spawn_watch_task( + watcher: ChainfireWatcher, + tx: mpsc::Sender, + ready: oneshot::Sender<()>, +) { + let mut ready = Some(ready); if let Err(error) = watcher - .watch(move |change| { - tx.try_send(change) - .map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error)) - }) + .watch_with_ready( + move || { + if let Some(ready) = ready.take() { + let _ = ready.send(()); + } + Ok(()) + }, + move |change| { + tx.try_send(change) + .map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error)) + }, + ) .await { warn!(error = %error, "node-agent watch task exited"); } } +async fn wait_for_watchers_ready(receivers: Vec>) { + for receiver in receivers { + match tokio::time::timeout(std::time::Duration::from_secs(10), receiver).await { + Ok(Ok(())) => {} + Ok(Err(_)) => warn!("node-agent watch task ended before signaling readiness"), + Err(_) => warn!("timed out waiting for node-agent watch readiness"), + } + } +} + fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option> { let mut value: Value = serde_json::from_slice(bytes).ok()?; strip_observed_fields(&mut value, drop_keys); @@ -207,7 +230,9 @@ impl Agent { self.tick().await?; let (watch_tx, mut watch_rx) = mpsc::channel::(64); - self.spawn_watchers(watch_tx); + let ready_receivers = self.spawn_watchers(watch_tx); + wait_for_watchers_ready(ready_receivers).await; + self.tick().await?; let mut interval = tokio::time::interval(self.interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval.tick().await; @@ -326,18 +351,26 @@ impl Agent { ); } - fn spawn_watchers(&self, tx: mpsc::Sender) { + fn spawn_watchers(&self, tx: mpsc::Sender) -> Vec> { + let mut ready_receivers = Vec::new(); + let node_watcher = ChainfireWatcher::key( self.endpoint.clone(), key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id), ); - tokio::spawn(spawn_watch_task(node_watcher, tx.clone())); + let (node_ready_tx, node_ready_rx) = oneshot::channel(); + tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx)); + ready_receivers.push(node_ready_rx); let instances_watcher = ChainfireWatcher::prefix( self.endpoint.clone(), instances_prefix(&self.cluster_namespace, &self.cluster_id), ); - tokio::spawn(spawn_watch_task(instances_watcher, tx)); + let (instances_ready_tx, instances_ready_rx) = oneshot::channel(); + tokio::spawn(spawn_watch_task(instances_watcher, tx, instances_ready_tx)); + ready_receivers.push(instances_ready_rx); + + ready_receivers } fn apply_watch_change(&mut self, change: &WatchChange) -> bool { @@ -907,12 +940,12 @@ impl Agent { fn startup_grace_secs(&self, inst: &ServiceInstanceSpec, spec: &HealthCheckSpec) -> u64 { spec.startup_grace_secs.unwrap_or_else(|| { + let interval = spec.interval_secs.unwrap_or(self.interval.as_secs()).max(1); + let timeout = spec.timeout_secs.unwrap_or(5).max(1); if inst.container.is_some() { - let interval = spec.interval_secs.unwrap_or(self.interval.as_secs()).max(1); - let timeout = spec.timeout_secs.unwrap_or(5).max(1); std::cmp::max(60, std::cmp::max(interval * 6, timeout * 6)) } else { - 0 + std::cmp::max(5, std::cmp::max(interval * 3, timeout * 3)) } }) } @@ -1207,6 +1240,22 @@ mod tests { assert!(agent.health_check_due(&instance, &health_check)); } + #[test] + fn test_process_instances_get_default_startup_grace() { + let agent = test_agent(); + let mut instance = test_instance(); + instance.container = None; + let health_check = HealthCheckSpec { + check_type: "http".to_string(), + path: Some("/health".to_string()), + interval_secs: Some(1), + timeout_secs: Some(2), + startup_grace_secs: None, + }; + + assert_eq!(agent.startup_grace_secs(&instance, &health_check), 6); + } + #[test] fn test_apply_instance_health_fields_replaces_nulls_and_preserves_observed_at() { let started_at = DateTime::parse_from_rfc3339("2026-03-31T03:00:00Z") diff --git a/deployer/crates/node-agent/src/watcher.rs b/deployer/crates/node-agent/src/watcher.rs index 4be0430..22d8d6a 100644 --- a/deployer/crates/node-agent/src/watcher.rs +++ b/deployer/crates/node-agent/src/watcher.rs @@ -45,9 +45,10 @@ impl ChainfireWatcher { } } - pub async fn watch(&self, mut callback: F) -> Result<()> + pub async fn watch_with_ready(&self, mut on_connected: G, mut callback: F) -> Result<()> where F: FnMut(WatchChange) -> Result<()>, + G: FnMut() -> Result<()>, { loop { match Client::connect(self.endpoint.clone()).await { @@ -65,6 +66,7 @@ impl ChainfireWatcher { watch_id = handle.id(), "connected ChainFire watch" ); + on_connected()?; while let Some(event) = handle.recv().await { if let Err(error) = callback(WatchChange { diff --git a/deployer/scripts/verify-deployer-bootstrap-e2e.sh b/deployer/scripts/verify-deployer-bootstrap-e2e.sh index fa1076b..0e851d0 100755 --- a/deployer/scripts/verify-deployer-bootstrap-e2e.sh +++ b/deployer/scripts/verify-deployer-bootstrap-e2e.sh @@ -123,6 +123,7 @@ api_port="$(free_port)" http_port="$(free_port)" raft_port="$(free_port)" gossip_port="$(free_port)" +metrics_port="$(free_port)" deployer_port="$(free_port)" bootstrap_token="bootstrap-secret" printf 'bundle-bytes' >"$tmp_dir/flake-bundle.tar.gz" @@ -154,6 +155,7 @@ EOF echo "Starting ChainFire on 127.0.0.1:${api_port}" run_chainfire_server_bin \ --config "$tmp_dir/chainfire.toml" \ + --metrics-port "$metrics_port" \ >"$tmp_dir/chainfire.log" 2>&1 & cf_pid="$!" diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh index 1c2d742..a899514 100755 --- a/deployer/scripts/verify-fleet-scheduler-e2e.sh +++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh @@ -118,6 +118,7 @@ api_port="$(free_port)" http_port="$(free_port)" raft_port="$(free_port)" gossip_port="$(free_port)" +metrics_port="$(free_port)" cat >"$tmp_dir/chainfire.toml" <"$tmp_dir/chainfire.log" 2>&1 & cf_pid="$!" @@ -485,8 +487,6 @@ run_deployer_ctl apply --config "$tmp_dir/cluster.yaml" echo "Starting watch-driven controllers" run_scheduler_bg -run_node_agent_bg node01 -run_node_agent_bg node02 echo "Waiting for worker to remain blocked until api becomes healthy" wait_for_service_state worker blocked 0 - 120 @@ -549,6 +549,10 @@ if status.get("phase") != "blocked": print("service inspect reports blocked dependency state") PY +echo "Starting node agents after dependency block is confirmed" +run_node_agent_bg node01 +run_node_agent_bg node02 + echo "Reconciling processes and health for api" wait_for_service_state api healthy 2 healthy 120 diff --git a/deployer/scripts/verify-host-lifecycle-e2e.sh b/deployer/scripts/verify-host-lifecycle-e2e.sh index c6addf5..34b6f10 100644 --- a/deployer/scripts/verify-host-lifecycle-e2e.sh +++ b/deployer/scripts/verify-host-lifecycle-e2e.sh @@ -105,6 +105,7 @@ api_port="$(free_port)" http_port="$(free_port)" raft_port="$(free_port)" gossip_port="$(free_port)" +metrics_port="$(free_port)" redfish_port="$(free_port)" cat >"$tmp_dir/chainfire.toml" <"$tmp_dir/chainfire.log" 2>&1 & +run_chainfire_server_bin --config "$tmp_dir/chainfire.toml" --metrics-port "$metrics_port" >"$tmp_dir/chainfire.log" 2>&1 & cf_pid="$!" wait_for_port "127.0.0.1" "$api_port" 120 wait_for_port "127.0.0.1" "$http_port" 120