Stabilize node-agent startup convergence

This commit is contained in:
centra 2026-04-02 07:28:00 +09:00
parent 82a4c6a941
commit 260fb4c576
Signed by: centra
GPG key ID: 0C09689D20B25ACA
5 changed files with 75 additions and 17 deletions

View file

@ -15,7 +15,7 @@ use deployer_types::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
@ -120,18 +120,41 @@ struct LocalInstanceSpec {
container: Option<ContainerSpec>,
}
async fn spawn_watch_task(watcher: ChainfireWatcher, tx: mpsc::Sender<WatchChange>) {
async fn spawn_watch_task(
watcher: ChainfireWatcher,
tx: mpsc::Sender<WatchChange>,
ready: oneshot::Sender<()>,
) {
let mut ready = Some(ready);
if let Err(error) = watcher
.watch(move |change| {
tx.try_send(change)
.map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error))
})
.watch_with_ready(
move || {
if let Some(ready) = ready.take() {
let _ = ready.send(());
}
Ok(())
},
move |change| {
tx.try_send(change)
.map_err(|send_error| anyhow::anyhow!("failed to enqueue watch change: {}", send_error))
},
)
.await
{
warn!(error = %error, "node-agent watch task exited");
}
}
async fn wait_for_watchers_ready(receivers: Vec<oneshot::Receiver<()>>) {
for receiver in receivers {
match tokio::time::timeout(std::time::Duration::from_secs(10), receiver).await {
Ok(Ok(())) => {}
Ok(Err(_)) => warn!("node-agent watch task ended before signaling readiness"),
Err(_) => warn!("timed out waiting for node-agent watch readiness"),
}
}
}
fn canonical_json_signature(bytes: &[u8], drop_keys: &[&str]) -> Option<Vec<u8>> {
let mut value: Value = serde_json::from_slice(bytes).ok()?;
strip_observed_fields(&mut value, drop_keys);
@ -207,7 +230,9 @@ impl Agent {
self.tick().await?;
let (watch_tx, mut watch_rx) = mpsc::channel::<WatchChange>(64);
self.spawn_watchers(watch_tx);
let ready_receivers = self.spawn_watchers(watch_tx);
wait_for_watchers_ready(ready_receivers).await;
self.tick().await?;
let mut interval = tokio::time::interval(self.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
@ -326,18 +351,26 @@ impl Agent {
);
}
fn spawn_watchers(&self, tx: mpsc::Sender<WatchChange>) {
fn spawn_watchers(&self, tx: mpsc::Sender<WatchChange>) -> Vec<oneshot::Receiver<()>> {
let mut ready_receivers = Vec::new();
let node_watcher = ChainfireWatcher::key(
self.endpoint.clone(),
key_node(&self.cluster_namespace, &self.cluster_id, &self.node_id),
);
tokio::spawn(spawn_watch_task(node_watcher, tx.clone()));
let (node_ready_tx, node_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(node_watcher, tx.clone(), node_ready_tx));
ready_receivers.push(node_ready_rx);
let instances_watcher = ChainfireWatcher::prefix(
self.endpoint.clone(),
instances_prefix(&self.cluster_namespace, &self.cluster_id),
);
tokio::spawn(spawn_watch_task(instances_watcher, tx));
let (instances_ready_tx, instances_ready_rx) = oneshot::channel();
tokio::spawn(spawn_watch_task(instances_watcher, tx, instances_ready_tx));
ready_receivers.push(instances_ready_rx);
ready_receivers
}
fn apply_watch_change(&mut self, change: &WatchChange) -> bool {
@ -907,12 +940,12 @@ impl Agent {
fn startup_grace_secs(&self, inst: &ServiceInstanceSpec, spec: &HealthCheckSpec) -> u64 {
spec.startup_grace_secs.unwrap_or_else(|| {
let interval = spec.interval_secs.unwrap_or(self.interval.as_secs()).max(1);
let timeout = spec.timeout_secs.unwrap_or(5).max(1);
if inst.container.is_some() {
let interval = spec.interval_secs.unwrap_or(self.interval.as_secs()).max(1);
let timeout = spec.timeout_secs.unwrap_or(5).max(1);
std::cmp::max(60, std::cmp::max(interval * 6, timeout * 6))
} else {
0
std::cmp::max(5, std::cmp::max(interval * 3, timeout * 3))
}
})
}
@ -1207,6 +1240,22 @@ mod tests {
assert!(agent.health_check_due(&instance, &health_check));
}
#[test]
fn test_process_instances_get_default_startup_grace() {
let agent = test_agent();
let mut instance = test_instance();
instance.container = None;
let health_check = HealthCheckSpec {
check_type: "http".to_string(),
path: Some("/health".to_string()),
interval_secs: Some(1),
timeout_secs: Some(2),
startup_grace_secs: None,
};
assert_eq!(agent.startup_grace_secs(&instance, &health_check), 6);
}
#[test]
fn test_apply_instance_health_fields_replaces_nulls_and_preserves_observed_at() {
let started_at = DateTime::parse_from_rfc3339("2026-03-31T03:00:00Z")

View file

@ -45,9 +45,10 @@ impl ChainfireWatcher {
}
}
pub async fn watch<F>(&self, mut callback: F) -> Result<()>
pub async fn watch_with_ready<F, G>(&self, mut on_connected: G, mut callback: F) -> Result<()>
where
F: FnMut(WatchChange) -> Result<()>,
G: FnMut() -> Result<()>,
{
loop {
match Client::connect(self.endpoint.clone()).await {
@ -65,6 +66,7 @@ impl ChainfireWatcher {
watch_id = handle.id(),
"connected ChainFire watch"
);
on_connected()?;
while let Some(event) = handle.recv().await {
if let Err(error) = callback(WatchChange {

View file

@ -123,6 +123,7 @@ api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
metrics_port="$(free_port)"
deployer_port="$(free_port)"
bootstrap_token="bootstrap-secret"
printf 'bundle-bytes' >"$tmp_dir/flake-bundle.tar.gz"
@ -154,6 +155,7 @@ EOF
echo "Starting ChainFire on 127.0.0.1:${api_port}"
run_chainfire_server_bin \
--config "$tmp_dir/chainfire.toml" \
--metrics-port "$metrics_port" \
>"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"

View file

@ -118,6 +118,7 @@ api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
metrics_port="$(free_port)"
cat >"$tmp_dir/chainfire.toml" <<EOF
[node]
@ -148,6 +149,7 @@ mkdir -p "$tmp_dir/pids"
echo "Starting ChainFire on 127.0.0.1:${api_port}"
run_chainfire_server_bin \
--config "$tmp_dir/chainfire.toml" \
--metrics-port "$metrics_port" \
>"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"
@ -485,8 +487,6 @@ run_deployer_ctl apply --config "$tmp_dir/cluster.yaml"
echo "Starting watch-driven controllers"
run_scheduler_bg
run_node_agent_bg node01
run_node_agent_bg node02
echo "Waiting for worker to remain blocked until api becomes healthy"
wait_for_service_state worker blocked 0 - 120
@ -549,6 +549,10 @@ if status.get("phase") != "blocked":
print("service inspect reports blocked dependency state")
PY
echo "Starting node agents after dependency block is confirmed"
run_node_agent_bg node01
run_node_agent_bg node02
echo "Reconciling processes and health for api"
wait_for_service_state api healthy 2 healthy 120

View file

@ -105,6 +105,7 @@ api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
metrics_port="$(free_port)"
redfish_port="$(free_port)"
cat >"$tmp_dir/chainfire.toml" <<EOF
@ -170,7 +171,7 @@ server.serve_forever()
PY
echo "Starting ChainFire on 127.0.0.1:${api_port}"
run_chainfire_server_bin --config "$tmp_dir/chainfire.toml" >"$tmp_dir/chainfire.log" 2>&1 &
run_chainfire_server_bin --config "$tmp_dir/chainfire.toml" --metrics-port "$metrics_port" >"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"
wait_for_port "127.0.0.1" "$api_port" 120
wait_for_port "127.0.0.1" "$http_port" 120