From 260fb4c5763c660d68d59696fedbe4857b19a97a Mon Sep 17 00:00:00 2001
From: centra
Date: Thu, 2 Apr 2026 07:28:00 +0900
Subject: [PATCH] Stabilize node-agent startup convergence
---
deployer/crates/node-agent/src/agent.rs | 75 +++++++++++++++----
deployer/crates/node-agent/src/watcher.rs | 4 +-
.../scripts/verify-deployer-bootstrap-e2e.sh | 2 +
.../scripts/verify-fleet-scheduler-e2e.sh | 8 +-
deployer/scripts/verify-host-lifecycle-e2e.sh | 3 +-
5 files changed, 75 insertions(+), 17 deletions(-)
diff --git a/deployer/crates/node-agent/src/agent.rs b/deployer/crates/node-agent/src/agent.rs
index 1438857..7a251df 100644
--- a/deployer/crates/node-agent/src/agent.rs
+++ b/deployer/crates/node-agent/src/agent.rs
@@ -15,7 +15,7 @@ use deployer_types::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, oneshot};
use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
@@ -120,18 +120,41 @@ struct LocalInstanceSpec {
container: Option,
}
-async fn spawn_watch_task(watcher: ChainfireWatcher, tx: mpsc::Sender) {
+async fn spawn_watch_task(
+ watcher: ChainfireWatcher,
+ tx: mpsc::Sender,
+ ready: oneshot::Sender<()>,
+) {
+ let mut ready = Some(ready);
if let Err(error) = watcher
- .watch(move |change| {
- tx.try_send(change)
- .map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error))
- })
+ .watch_with_ready(
+ move || {
+ if let Some(ready) = ready.take() {
+ let _ = ready.send(());
+ }
+ Ok(())
+ },
+ move |change| {
+ tx.try_send(change)
+ .map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error))
+ },
+ )
.await
{
warn!(error = %error, "node-agent watch task exited");
}
}
+async fn wait_for_watchers_ready(receivers: Vec>) {
+ for receiver in receivers {
+ match tokio::time::timeout(std::time::Duration::from_secs(10), receiver).await {
+ Ok(Ok(())) => {}
+ Ok(Err(_)) => warn!("node-agent watch task ended before signaling readiness"),
+ Err(_) => warn!("timed out waiting for node-agent watch readiness"),
+ }
+ }
+}
+
fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option> {
let mut value: Value = serde_json::from_slice(bytes).ok()?;
strip_observed_fields(&mut value, drop_keys);
@@ -207,7 +230,9 @@ impl Agent {
self.tick().await?;
let (watch_tx, mut watch_rx) = mpsc::channel::(64);
- self.spawn_watchers(watch_tx);
+ let ready_receivers = self.spawn_watchers(watch_tx);
+ wait_for_watchers_ready(ready_receivers).await;
+ self.tick().await?;
let mut interval = tokio::time::interval(self.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
@@ -326,18 +351,26 @@ impl Agent {
);
}
- fn spawn_watchers(&self, tx: mpsc::Sender) {
+ fn spawn_watchers(&self, tx: mpsc::Sender) -> Vec> {
+ let mut ready_receivers = Vec::new();
+
let node_watcher = ChainfireWatcher::key(
self.endpoint.clone(),
key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id),
);
- tokio::spawn(spawn_watch_task(node_watcher, tx.clone()));
+ let (node_ready_tx, node_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx));
+ ready_receivers.push(node_ready_rx);
let instances_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
instances_prefix(&self.cluster_namespace, &self.cluster_id),
);
- tokio::spawn(spawn_watch_task(instances_watcher, tx));
+ let (instances_ready_tx, instances_ready_rx) = oneshot::channel();
+ tokio::spawn(spawn_watch_task(instances_watcher, tx, instances_ready_tx));
+ ready_receivers.push(instances_ready_rx);
+
+ ready_receivers
}
fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
@@ -907,12 +940,12 @@ impl Agent {
fn startup_grace_secs(&self, inst: &ServiceInstanceSpec, spec: &HealthCheckSpec) -> u64 {
spec.startup_grace_secs.unwrap_or_else(|| {
+ let interval = spec.interval_secs.unwrap_or(self.interval.as_secs()).max(1);
+ let timeout = spec.timeout_secs.unwrap_or(5).max(1);
if inst.container.is_some() {
- let interval = spec.interval_secs.unwrap_or(self.interval.as_secs()).max(1);
- let timeout = spec.timeout_secs.unwrap_or(5).max(1);
std::cmp::max(60, std::cmp::max(interval * 6, timeout * 6))
} else {
- 0
+ std::cmp::max(5, std::cmp::max(interval * 3, timeout * 3))
}
})
}
@@ -1207,6 +1240,22 @@ mod tests {
assert!(agent.health_check_due(&instance, &health_check));
}
+ #[test]
+ fn test_process_instances_get_default_startup_grace() {
+ let agent = test_agent();
+ let mut instance = test_instance();
+ instance.container = None;
+ let health_check = HealthCheckSpec {
+ check_type: "http".to_string(),
+ path: Some("/health".to_string()),
+ interval_secs: Some(1),
+ timeout_secs: Some(2),
+ startup_grace_secs: None,
+ };
+
+ assert_eq!(agent.startup_grace_secs(&instance, &health_check), 6);
+ }
+
#[test]
fn test_apply_instance_health_fields_replaces_nulls_and_preserves_observed_at() {
let started_at = DateTime::parse_from_rfc3339("2026-03-31T03:00:00Z")
diff --git a/deployer/crates/node-agent/src/watcher.rs b/deployer/crates/node-agent/src/watcher.rs
index 4be0430..22d8d6a 100644
--- a/deployer/crates/node-agent/src/watcher.rs
+++ b/deployer/crates/node-agent/src/watcher.rs
@@ -45,9 +45,10 @@ impl ChainfireWatcher {
}
}
- pub async fn watch(&self, mut callback: F) -> Result<()>
+ pub async fn watch_with_ready(&self, mut on_connected: G, mut callback: F) -> Result<()>
where
F: FnMut(WatchChange) -> Result<()>,
+ G: FnMut() -> Result<()>,
{
loop {
match Client::connect(self.endpoint.clone()).await {
@@ -65,6 +66,7 @@ impl ChainfireWatcher {
watch_id = handle.id(),
"connected ChainFire watch"
);
+ on_connected()?;
while let Some(event) = handle.recv().await {
if let Err(error) = callback(WatchChange {
diff --git a/deployer/scripts/verify-deployer-bootstrap-e2e.sh b/deployer/scripts/verify-deployer-bootstrap-e2e.sh
index fa1076b..0e851d0 100755
--- a/deployer/scripts/verify-deployer-bootstrap-e2e.sh
+++ b/deployer/scripts/verify-deployer-bootstrap-e2e.sh
@@ -123,6 +123,7 @@ api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
+metrics_port="$(free_port)"
deployer_port="$(free_port)"
bootstrap_token="bootstrap-secret"
printf 'bundle-bytes' >"$tmp_dir/flake-bundle.tar.gz"
@@ -154,6 +155,7 @@ EOF
echo "Starting ChainFire on 127.0.0.1:${api_port}"
run_chainfire_server_bin \
--config "$tmp_dir/chainfire.toml" \
+ --metrics-port "$metrics_port" \
>"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"
diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh
index 1c2d742..a899514 100755
--- a/deployer/scripts/verify-fleet-scheduler-e2e.sh
+++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh
@@ -118,6 +118,7 @@ api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
+metrics_port="$(free_port)"
cat >"$tmp_dir/chainfire.toml" <"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"
@@ -485,8 +487,6 @@ run_deployer_ctl apply --config "$tmp_dir/cluster.yaml"
echo "Starting watch-driven controllers"
run_scheduler_bg
-run_node_agent_bg node01
-run_node_agent_bg node02
echo "Waiting for worker to remain blocked until api becomes healthy"
wait_for_service_state worker blocked 0 - 120
@@ -549,6 +549,10 @@ if status.get("phase") != "blocked":
print("service inspect reports blocked dependency state")
PY
+echo "Starting node agents after dependency block is confirmed"
+run_node_agent_bg node01
+run_node_agent_bg node02
+
echo "Reconciling processes and health for api"
wait_for_service_state api healthy 2 healthy 120
diff --git a/deployer/scripts/verify-host-lifecycle-e2e.sh b/deployer/scripts/verify-host-lifecycle-e2e.sh
index c6addf5..34b6f10 100644
--- a/deployer/scripts/verify-host-lifecycle-e2e.sh
+++ b/deployer/scripts/verify-host-lifecycle-e2e.sh
@@ -105,6 +105,7 @@ api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
+metrics_port="$(free_port)"
redfish_port="$(free_port)"
cat >"$tmp_dir/chainfire.toml" <"$tmp_dir/chainfire.log" 2>&1 &
+run_chainfire_server_bin --config "$tmp_dir/chainfire.toml" --metrics-port "$metrics_port" >"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"
wait_for_port "127.0.0.1" "$api_port" 120
wait_for_port "127.0.0.1" "$http_port" 120