From faabcbfc2e60fd814bc6856d2c59a4e8472ba273 Mon Sep 17 00:00:00 2001
From: centra
Date: Wed, 1 Apr 2026 23:07:42 +0900
Subject: [PATCH] Refresh service status from node agent
---
deployer/crates/deployer-types/src/lib.rs | 588 +++++++++++++++++-
deployer/crates/fleet-scheduler/src/main.rs | 357 +----------
deployer/crates/node-agent/src/agent.rs | 180 +++++-
deployer/crates/node-agent/src/main.rs | 5 +
.../scripts/verify-fleet-scheduler-e2e.sh | 37 +-
5 files changed, 834 insertions(+), 333 deletions(-)
diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs
index 20cbdcf..744ef5e 100644
--- a/deployer/crates/deployer-types/src/lib.rs
+++ b/deployer/crates/deployer-types/src/lib.rs
@@ -1,6 +1,6 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
/// Node lifecycle state
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -980,6 +980,391 @@ pub struct ServiceStatusRecord {
pub observed_at: Option>,
}
+/// Summarized readiness of a service's declared dependencies.
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub struct ServiceDependencySummary {
+ pub dependencies_ready: bool,
+ pub blockers: Vec,
+}
+
+pub fn cluster_node_pool(node: &ClusterNodeRecord) -> Option<&str> {
+ node.pool
+ .as_deref()
+ .or_else(|| node.labels.get("pool").map(String::as_str))
+ .or_else(|| {
+ node.labels
+ .get("pool.photoncloud.io/name")
+ .map(String::as_str)
+ })
+}
+
+pub fn cluster_node_class(node: &ClusterNodeRecord) -> Option<&str> {
+ node.node_class
+ .as_deref()
+ .or_else(|| node.labels.get("node_class").map(String::as_str))
+ .or_else(|| {
+ node.labels
+ .get("nodeclass.photoncloud.io/name")
+ .map(String::as_str)
+ })
+}
+
+pub fn cluster_node_is_eligible(
+ node: &ClusterNodeRecord,
+ placement: &PlacementPolicy,
+ heartbeat_timeout_secs: u64,
+) -> bool {
+ if node.state.as_deref() != Some("active") {
+ return false;
+ }
+
+ if heartbeat_timeout_secs > 0 {
+ let Some(last) = node.last_heartbeat else {
+ return false;
+ };
+ let age = Utc::now().signed_duration_since(last).num_seconds();
+ if age > heartbeat_timeout_secs as i64 {
+ return false;
+ }
+ }
+
+ if !placement.roles.is_empty()
+ && !node
+ .roles
+ .iter()
+ .any(|role| placement.roles.iter().any(|expected| expected == role))
+ {
+ return false;
+ }
+
+ if !placement.pools.is_empty()
+ && !cluster_node_pool(node)
+ .map(|pool| placement.pools.iter().any(|expected| expected == pool))
+ .unwrap_or(false)
+ {
+ return false;
+ }
+
+ if !placement.node_classes.is_empty()
+ && !cluster_node_class(node)
+ .map(|node_class| {
+ placement
+ .node_classes
+ .iter()
+ .any(|expected| expected == node_class)
+ })
+ .unwrap_or(false)
+ {
+ return false;
+ }
+
+ placement
+ .match_labels
+ .iter()
+ .all(|(key, value)| node.labels.get(key) == Some(value))
+}
+
+pub fn eligible_cluster_nodes<'a>(
+ nodes: &'a [ClusterNodeRecord],
+ placement: &PlacementPolicy,
+ heartbeat_timeout_secs: u64,
+) -> Vec<&'a ClusterNodeRecord> {
+ nodes
+ .iter()
+ .filter(|node| cluster_node_is_eligible(node, placement, heartbeat_timeout_secs))
+ .collect()
+}
+
+pub fn service_instance_has_fresh_heartbeat(
+ instance: &ServiceInstanceSpec,
+ heartbeat_timeout_secs: u64,
+) -> bool {
+ if heartbeat_timeout_secs == 0 {
+ return true;
+ }
+
+ let Some(last_heartbeat) = instance.last_heartbeat.or(instance.observed_at) else {
+ return false;
+ };
+
+ Utc::now()
+ .signed_duration_since(last_heartbeat)
+ .num_seconds()
+ <= heartbeat_timeout_secs as i64
+}
+
+pub fn service_instance_is_available(
+ instance: &ServiceInstanceSpec,
+ heartbeat_timeout_secs: u64,
+) -> bool {
+ matches!(instance.state.as_deref(), Some("healthy"))
+ && service_instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs)
+}
+
+pub fn service_publication_is_ready(state: &ServicePublicationState) -> bool {
+ state
+ .dns
+ .as_ref()
+ .map(|dns| !dns.values.is_empty())
+ .unwrap_or(false)
+ || state
+ .load_balancer
+ .as_ref()
+ .and_then(|load_balancer| load_balancer.vip_address.as_ref())
+ .is_some()
+}
+
+pub fn compute_service_dependency_cycles(services: &[ServiceSpec]) -> HashSet {
+ let service_names = services
+ .iter()
+ .map(|service| service.name.clone())
+ .collect::>();
+ let dependencies = services
+ .iter()
+ .map(|service| {
+ let deps = service
+ .depends_on
+ .iter()
+ .filter(|dependency| service_names.contains(&dependency.service))
+ .map(|dependency| dependency.service.clone())
+ .collect::>();
+ (service.name.clone(), deps)
+ })
+ .collect::>();
+
+ let mut permanent = HashSet::new();
+ let mut visiting = Vec::::new();
+ let mut cycles = HashSet::new();
+
+ for service in services {
+ visit_service_dependency_cycles(
+ &service.name,
+ &dependencies,
+ &mut permanent,
+ &mut visiting,
+ &mut cycles,
+ );
+ }
+
+ cycles
+}
+
+fn visit_service_dependency_cycles(
+ service: &str,
+ dependencies: &HashMap>,
+ permanent: &mut HashSet,
+ visiting: &mut Vec,
+ cycles: &mut HashSet,
+) {
+ if permanent.contains(service) {
+ return;
+ }
+
+ if let Some(position) = visiting.iter().position(|current| current == service) {
+ cycles.extend(visiting[position..].iter().cloned());
+ return;
+ }
+
+ visiting.push(service.to_string());
+ if let Some(depends_on) = dependencies.get(service) {
+ for dependency in depends_on {
+ visit_service_dependency_cycles(dependency, dependencies, permanent, visiting, cycles);
+ }
+ }
+ visiting.pop();
+ permanent.insert(service.to_string());
+}
+
+pub fn summarize_service_dependencies(
+ service: &ServiceSpec,
+ services_by_name: &HashMap<&str, &ServiceSpec>,
+ instances_by_service: &HashMap>,
+ publications: &HashMap,
+ dependency_cycles: &HashSet,
+ heartbeat_timeout_secs: u64,
+) -> ServiceDependencySummary {
+ let mut blockers = Vec::new();
+
+ if dependency_cycles.contains(&service.name) {
+ blockers.push("cyclic dependency graph".to_string());
+ }
+
+ for dependency in &service.depends_on {
+ if dependency.service == service.name {
+ blockers.push(format!(
+ "dependency {} points to itself",
+ dependency.service
+ ));
+ continue;
+ }
+
+ if !services_by_name.contains_key(dependency.service.as_str()) {
+ blockers.push(format!("dependency {} is not defined", dependency.service));
+ continue;
+ }
+
+ if dependency_cycles.contains(&dependency.service) {
+ blockers.push(format!(
+ "dependency {} is part of a dependency cycle",
+ dependency.service
+ ));
+ continue;
+ }
+
+ match dependency.condition {
+ ServiceDependencyCondition::Healthy => {
+ let ready = instances_by_service
+ .get(&dependency.service)
+ .map(|instances| {
+ instances
+ .iter()
+ .filter(|instance| {
+ service_instance_is_available(instance, heartbeat_timeout_secs)
+ })
+ .count() as u32
+ })
+ .unwrap_or(0);
+ let min_ready = dependency.min_ready.max(1);
+ if ready < min_ready {
+ blockers.push(format!(
+ "dependency {} has {ready}/{min_ready} healthy instance(s)",
+ dependency.service
+ ));
+ }
+ }
+ ServiceDependencyCondition::Published => {
+ let ready = publications
+ .get(&dependency.service)
+ .map(service_publication_is_ready)
+ .unwrap_or(false);
+ if !ready {
+ blockers.push(format!(
+ "dependency {} is not published",
+ dependency.service
+ ));
+ }
+ }
+ }
+ }
+
+ ServiceDependencySummary {
+ dependencies_ready: blockers.is_empty(),
+ blockers,
+ }
+}
+
+pub fn desired_service_instance_count(
+ service: &ServiceSpec,
+ nodes: &[ClusterNodeRecord],
+ heartbeat_timeout_secs: u64,
+) -> u32 {
+ let Some(schedule) = service.schedule.as_ref() else {
+ return 0;
+ };
+ match schedule.mode {
+ ServiceScheduleMode::Replicated => schedule.replicas,
+ ServiceScheduleMode::Daemon => {
+ eligible_cluster_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32
+ }
+ }
+}
+
+pub fn build_service_status_record(
+ service: &ServiceSpec,
+ nodes: &[ClusterNodeRecord],
+ services_by_name: &HashMap<&str, &ServiceSpec>,
+ instances_by_service: &HashMap>,
+ publications: &HashMap,
+ dependency_cycles: &HashSet,
+ heartbeat_timeout_secs: u64,
+) -> ServiceStatusRecord {
+ let instances = instances_by_service
+ .get(&service.name)
+ .map(Vec::as_slice)
+ .unwrap_or(&[]);
+ let healthy_instances = instances
+ .iter()
+ .filter(|instance| service_instance_is_available(instance, heartbeat_timeout_secs))
+ .count() as u32;
+ let scheduled_instances = instances.len() as u32;
+ let desired_instances = desired_service_instance_count(service, nodes, heartbeat_timeout_secs);
+ let publish_ready = publications
+ .get(&service.name)
+ .map(service_publication_is_ready)
+ .unwrap_or(false);
+ let dependencies = summarize_service_dependencies(
+ service,
+ services_by_name,
+ instances_by_service,
+ publications,
+ dependency_cycles,
+ heartbeat_timeout_secs,
+ );
+ let eligible_node_count = service
+ .schedule
+ .as_ref()
+ .map(|schedule| {
+ eligible_cluster_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len()
+ })
+ .unwrap_or(0);
+
+ let (phase, message) = if service.schedule.is_none() {
+ (
+ "unmanaged".to_string(),
+ Some("service is not managed by fleet-scheduler".to_string()),
+ )
+ } else if !dependencies.dependencies_ready {
+ (
+ "blocked".to_string(),
+ Some(dependencies.blockers.join("; ")),
+ )
+ } else if eligible_node_count == 0 {
+ (
+ "unschedulable".to_string(),
+ Some("no eligible nodes match the service placement policy".to_string()),
+ )
+ } else if desired_instances == 0 && scheduled_instances == 0 {
+ (
+ "idle".to_string(),
+ Some("service has no desired instances".to_string()),
+ )
+ } else if desired_instances > 0 && healthy_instances >= desired_instances {
+ (
+ "healthy".to_string(),
+ Some(format!(
+ "healthy instances satisfy desired count ({healthy_instances}/{desired_instances})"
+ )),
+ )
+ } else if scheduled_instances > 0 || healthy_instances > 0 {
+ (
+ "degraded".to_string(),
+ Some(format!(
+ "healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}"
+ )),
+ )
+ } else {
+ (
+ "pending".to_string(),
+ Some(format!(
+ "waiting for instances to reach desired count ({healthy_instances}/{desired_instances})"
+ )),
+ )
+ };
+
+ ServiceStatusRecord {
+ service: service.name.clone(),
+ phase,
+ desired_instances,
+ scheduled_instances,
+ healthy_instances,
+ publish_ready,
+ dependencies_ready: dependencies.dependencies_ready,
+ blockers: dependencies.blockers,
+ message,
+ observed_at: Some(Utc::now()),
+ }
+}
+
/// mTLS policy definition.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MtlsPolicySpec {
@@ -1253,6 +1638,207 @@ mod tests {
assert_eq!(decoded.dns.unwrap().fqdn, "api.test.cluster.local");
}
+ fn active_cluster_node(node_id: &str) -> ClusterNodeRecord {
+ ClusterNodeRecord {
+ node_id: node_id.to_string(),
+ machine_id: None,
+ ip: format!("10.0.0.{}", if node_id == "node01" { 11 } else { 12 }),
+ hostname: node_id.to_string(),
+ roles: vec!["worker".to_string()],
+ labels: HashMap::from([("tier".to_string(), "general".to_string())]),
+ pool: Some("general".to_string()),
+ node_class: Some("worker-linux".to_string()),
+ failure_domain: None,
+ nix_profile: None,
+ install_plan: None,
+ hardware_facts: None,
+ state: Some("active".to_string()),
+ commission_state: None,
+ install_state: None,
+ commissioned_at: None,
+ last_inventory_hash: None,
+ power_state: None,
+ bmc_ref: None,
+ last_heartbeat: Some(Utc::now()),
+ }
+ }
+
+ fn scheduled_service_named(name: &str) -> ServiceSpec {
+ ServiceSpec {
+ name: name.to_string(),
+ ports: Some(ServicePorts {
+ http: Some(8080),
+ grpc: None,
+ }),
+ protocol: None,
+ mtls_required: None,
+ mesh_mode: None,
+ depends_on: Vec::new(),
+ schedule: Some(ServiceScheduleSpec {
+ mode: ServiceScheduleMode::Replicated,
+ replicas: 2,
+ placement: PlacementPolicy {
+ roles: vec!["worker".to_string()],
+ pools: Vec::new(),
+ node_classes: Vec::new(),
+ match_labels: HashMap::new(),
+ spread_by_label: None,
+ max_instances_per_node: 1,
+ },
+ rollout: RolloutStrategySpec::default(),
+ instance_port: Some(8080),
+ mesh_port: None,
+ process: None,
+ container: None,
+ health_check: None,
+ }),
+ publish: None,
+ }
+ }
+
+ fn healthy_service_instance(service: &str, node_id: &str) -> ServiceInstanceSpec {
+ ServiceInstanceSpec {
+ instance_id: format!("{service}-{node_id}"),
+ service: service.to_string(),
+ node_id: node_id.to_string(),
+ ip: if node_id == "node01" {
+ "10.0.0.11".to_string()
+ } else {
+ "10.0.0.12".to_string()
+ },
+ port: 8080,
+ mesh_port: None,
+ version: None,
+ health_check: None,
+ process: None,
+ container: None,
+ managed_by: Some("fleet-scheduler".to_string()),
+ state: Some("healthy".to_string()),
+ last_heartbeat: Some(Utc::now()),
+ observed_at: Some(Utc::now()),
+ }
+ }
+
+ #[test]
+ fn test_compute_service_dependency_cycles_detects_full_cycle() {
+ let mut api = scheduled_service_named("api");
+ api.depends_on = vec![ServiceDependencySpec {
+ service: "worker".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 1,
+ }];
+
+ let mut worker = scheduled_service_named("worker");
+ worker.depends_on = vec![ServiceDependencySpec {
+ service: "edge".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 1,
+ }];
+
+ let mut edge = scheduled_service_named("edge");
+ edge.depends_on = vec![ServiceDependencySpec {
+ service: "api".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 1,
+ }];
+
+ let cycles = compute_service_dependency_cycles(&[api, worker, edge]);
+ assert_eq!(
+ cycles,
+ HashSet::from(["api".to_string(), "worker".to_string(), "edge".to_string()])
+ );
+ }
+
+ #[test]
+ fn test_summarize_service_dependencies_blocks_until_dependency_is_healthy() {
+ let api = scheduled_service_named("api");
+ let mut worker = scheduled_service_named("worker");
+ worker.depends_on = vec![ServiceDependencySpec {
+ service: "api".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 2,
+ }];
+
+ let services = vec![api.clone(), worker.clone()];
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
+ let instances_by_service = HashMap::from([(
+ "api".to_string(),
+ vec![
+ healthy_service_instance("api", "node01"),
+ ServiceInstanceSpec {
+ state: Some("starting".to_string()),
+ ..healthy_service_instance("api", "node02")
+ },
+ ],
+ )]);
+
+ let summary = summarize_service_dependencies(
+ &worker,
+ &services_by_name,
+ &instances_by_service,
+ &HashMap::new(),
+ &HashSet::new(),
+ 300,
+ );
+
+ assert!(!summary.dependencies_ready);
+ assert_eq!(
+ summary.blockers,
+ vec!["dependency api has 1/2 healthy instance(s)".to_string()]
+ );
+ }
+
+ #[test]
+ fn test_build_service_status_record_recognizes_published_dependency() {
+ let api = scheduled_service_named("api");
+ let mut worker = scheduled_service_named("worker");
+ worker.depends_on = vec![ServiceDependencySpec {
+ service: "api".to_string(),
+ condition: ServiceDependencyCondition::Published,
+ min_ready: 1,
+ }];
+
+ let services = vec![api.clone(), worker.clone()];
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
+ let publications = HashMap::from([(
+ "api".to_string(),
+ ServicePublicationState {
+ service: "api".to_string(),
+ org_id: "default-org".to_string(),
+ project_id: "default-project".to_string(),
+ load_balancer: Some(PublishedLoadBalancerState {
+ id: "lb-1".to_string(),
+ pool_id: "pool-1".to_string(),
+ listener_id: "listener-1".to_string(),
+ vip_address: Some("203.0.113.10".to_string()),
+ }),
+ dns: None,
+ observed_at: Some(Utc::now()),
+ },
+ )]);
+ let nodes = vec![active_cluster_node("node01")];
+
+ let status = build_service_status_record(
+ &worker,
+ &nodes,
+ &services_by_name,
+ &HashMap::new(),
+ &publications,
+ &HashSet::new(),
+ 300,
+ );
+
+ assert_eq!(status.phase, "pending");
+ assert!(status.dependencies_ready);
+ assert!(status.blockers.is_empty());
+ }
+
#[test]
fn test_observed_system_state_roundtrip() {
let observed = ObservedSystemState {
diff --git a/deployer/crates/fleet-scheduler/src/main.rs b/deployer/crates/fleet-scheduler/src/main.rs
index 10e920b..618fea2 100644
--- a/deployer/crates/fleet-scheduler/src/main.rs
+++ b/deployer/crates/fleet-scheduler/src/main.rs
@@ -3,11 +3,13 @@ mod publish;
use anyhow::{Context, Result};
use chainfire_client::Client;
-use chrono::Utc;
use clap::Parser;
use deployer_types::{
- ClusterNodeRecord, PlacementPolicy, ServiceDependencyCondition, ServiceInstanceSpec,
- ServicePublicationState, ServiceScheduleMode, ServiceSpec, ServiceStatusRecord,
+ build_service_status_record, cluster_node_class, cluster_node_pool,
+ compute_service_dependency_cycles, eligible_cluster_nodes, service_instance_is_available,
+ summarize_service_dependencies, ClusterNodeRecord, PlacementPolicy, ServiceDependencySummary,
+ ServiceInstanceSpec, ServicePublicationState, ServiceScheduleMode, ServiceSpec,
+ ServiceStatusRecord,
};
use publish::{PublicationConfig, PublicationReconciler};
use serde_json::Value;
@@ -17,6 +19,11 @@ use tokio::time::sleep;
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;
+#[cfg(test)]
+use chrono::Utc;
+#[cfg(test)]
+use deployer_types::cluster_node_is_eligible;
+
const MANAGED_BY: &str = "fleet-scheduler";
#[derive(Debug, Parser)]
@@ -91,11 +98,7 @@ struct ReconcilePlan {
deferred_deletes: usize,
}
-#[derive(Debug, Default)]
-struct DependencySummary {
- dependencies_ready: bool,
- blockers: Vec,
-}
+type DependencySummary = ServiceDependencySummary;
impl Scheduler {
fn new(cli: Cli) -> Self {
@@ -519,64 +522,7 @@ fn service_status_key(cluster_namespace: &str, cluster_id: &str, service: &str)
}
fn dependency_cycle_services(services: &[ServiceSpec]) -> HashSet {
- let service_names = services
- .iter()
- .map(|service| service.name.clone())
- .collect::>();
- let dependencies = services
- .iter()
- .map(|service| {
- let deps = service
- .depends_on
- .iter()
- .filter(|dependency| service_names.contains(&dependency.service))
- .map(|dependency| dependency.service.clone())
- .collect::>();
- (service.name.clone(), deps)
- })
- .collect::>();
-
- let mut permanent = HashSet::new();
- let mut visiting = Vec::::new();
- let mut cycles = HashSet::new();
-
- for service in services {
- visit_dependency_cycles(
- &service.name,
- &dependencies,
- &mut permanent,
- &mut visiting,
- &mut cycles,
- );
- }
-
- cycles
-}
-
-fn visit_dependency_cycles(
- service: &str,
- dependencies: &HashMap>,
- permanent: &mut HashSet,
- visiting: &mut Vec,
- cycles: &mut HashSet,
-) {
- if permanent.contains(service) {
- return;
- }
-
- if let Some(position) = visiting.iter().position(|current| current == service) {
- cycles.extend(visiting[position..].iter().cloned());
- return;
- }
-
- visiting.push(service.to_string());
- if let Some(depends_on) = dependencies.get(service) {
- for dependency in depends_on {
- visit_dependency_cycles(dependency, dependencies, permanent, visiting, cycles);
- }
- }
- visiting.pop();
- permanent.insert(service.to_string());
+ compute_service_dependency_cycles(services)
}
fn dependency_summary(
@@ -587,103 +533,14 @@ fn dependency_summary(
dependency_cycles: &HashSet,
heartbeat_timeout_secs: u64,
) -> DependencySummary {
- let mut blockers = Vec::new();
-
- if dependency_cycles.contains(&service.name) {
- blockers.push("cyclic dependency graph".to_string());
- }
-
- for dependency in &service.depends_on {
- if dependency.service == service.name {
- blockers.push(format!(
- "dependency {} points to itself",
- dependency.service
- ));
- continue;
- }
-
- if !services_by_name.contains_key(dependency.service.as_str()) {
- blockers.push(format!("dependency {} is not defined", dependency.service));
- continue;
- }
-
- if dependency_cycles.contains(&dependency.service) {
- blockers.push(format!(
- "dependency {} is part of a dependency cycle",
- dependency.service
- ));
- continue;
- }
-
- match dependency.condition {
- ServiceDependencyCondition::Healthy => {
- let ready = instances_by_service
- .get(&dependency.service)
- .map(|instances| {
- instances
- .iter()
- .filter(|instance| {
- instance_is_available(instance, heartbeat_timeout_secs)
- })
- .count() as u32
- })
- .unwrap_or(0);
- let min_ready = dependency.min_ready.max(1);
- if ready < min_ready {
- blockers.push(format!(
- "dependency {} has {ready}/{min_ready} healthy instance(s)",
- dependency.service
- ));
- }
- }
- ServiceDependencyCondition::Published => {
- let ready = publications
- .get(&dependency.service)
- .map(publication_ready)
- .unwrap_or(false);
- if !ready {
- blockers.push(format!(
- "dependency {} is not published",
- dependency.service
- ));
- }
- }
- }
- }
-
- DependencySummary {
- dependencies_ready: blockers.is_empty(),
- blockers,
- }
-}
-
-fn publication_ready(state: &ServicePublicationState) -> bool {
- state
- .dns
- .as_ref()
- .map(|dns| !dns.values.is_empty())
- .unwrap_or(false)
- || state
- .load_balancer
- .as_ref()
- .and_then(|load_balancer| load_balancer.vip_address.as_ref())
- .is_some()
-}
-
-fn desired_instance_count(
- service: &ServiceSpec,
- nodes: &[ClusterNodeRecord],
- heartbeat_timeout_secs: u64,
-) -> u32 {
- let Some(schedule) = service.schedule.as_ref() else {
- return 0;
- };
- match schedule.mode {
- ServiceScheduleMode::Replicated => schedule.replicas,
- ServiceScheduleMode::Daemon => {
- eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32
- }
- }
+ summarize_service_dependencies(
+ service,
+ services_by_name,
+ instances_by_service,
+ publications,
+ dependency_cycles,
+ heartbeat_timeout_secs,
+ )
}
fn build_service_status(
@@ -695,89 +552,15 @@ fn build_service_status(
dependency_cycles: &HashSet,
heartbeat_timeout_secs: u64,
) -> ServiceStatusRecord {
- let instances = instances_by_service
- .get(&service.name)
- .map(Vec::as_slice)
- .unwrap_or(&[]);
- let healthy_instances = instances
- .iter()
- .filter(|instance| instance_is_available(instance, heartbeat_timeout_secs))
- .count() as u32;
- let scheduled_instances = instances.len() as u32;
- let desired_instances = desired_instance_count(service, nodes, heartbeat_timeout_secs);
- let publish_ready = publications
- .get(&service.name)
- .map(publication_ready)
- .unwrap_or(false);
- let dependencies = dependency_summary(
+ build_service_status_record(
service,
+ nodes,
services_by_name,
instances_by_service,
publications,
dependency_cycles,
heartbeat_timeout_secs,
- );
- let eligible_node_count = service
- .schedule
- .as_ref()
- .map(|schedule| eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len())
- .unwrap_or(0);
-
- let (phase, message) = if service.schedule.is_none() {
- (
- "unmanaged".to_string(),
- Some("service is not managed by fleet-scheduler".to_string()),
- )
- } else if !dependencies.dependencies_ready {
- (
- "blocked".to_string(),
- Some(dependencies.blockers.join("; ")),
- )
- } else if eligible_node_count == 0 {
- (
- "unschedulable".to_string(),
- Some("no eligible nodes match the service placement policy".to_string()),
- )
- } else if desired_instances == 0 && scheduled_instances == 0 {
- (
- "idle".to_string(),
- Some("service has no desired instances".to_string()),
- )
- } else if desired_instances > 0 && healthy_instances >= desired_instances {
- (
- "healthy".to_string(),
- Some(format!(
- "healthy instances satisfy desired count ({healthy_instances}/{desired_instances})"
- )),
- )
- } else if scheduled_instances > 0 || healthy_instances > 0 {
- (
- "degraded".to_string(),
- Some(format!(
- "healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}"
- )),
- )
- } else {
- (
- "pending".to_string(),
- Some(format!(
- "waiting for instances to reach desired count ({healthy_instances}/{desired_instances})"
- )),
- )
- };
-
- ServiceStatusRecord {
- service: service.name.clone(),
- phase,
- desired_instances,
- scheduled_instances,
- healthy_instances,
- publish_ready,
- dependencies_ready: dependencies.dependencies_ready,
- blockers: dependencies.blockers,
- message,
- observed_at: Some(Utc::now()),
- }
+ )
}
fn eligible_nodes<'a>(
@@ -785,65 +568,16 @@ fn eligible_nodes<'a>(
placement: &PlacementPolicy,
heartbeat_timeout_secs: u64,
) -> Vec<&'a ClusterNodeRecord> {
- nodes
- .iter()
- .filter(|node| node_is_eligible(node, placement, heartbeat_timeout_secs))
- .collect()
+ eligible_cluster_nodes(nodes, placement, heartbeat_timeout_secs)
}
+#[cfg(test)]
fn node_is_eligible(
node: &ClusterNodeRecord,
placement: &PlacementPolicy,
heartbeat_timeout_secs: u64,
) -> bool {
- if node.state.as_deref() != Some("active") {
- return false;
- }
-
- if heartbeat_timeout_secs > 0 {
- let Some(last) = node.last_heartbeat else {
- return false;
- };
- let age = Utc::now().signed_duration_since(last).num_seconds();
- if age > heartbeat_timeout_secs as i64 {
- return false;
- }
- }
-
- if !placement.roles.is_empty()
- && !node
- .roles
- .iter()
- .any(|role| placement.roles.iter().any(|expected| expected == role))
- {
- return false;
- }
-
- if !placement.pools.is_empty()
- && !node_pool(node)
- .map(|pool| placement.pools.iter().any(|expected| expected == pool))
- .unwrap_or(false)
- {
- return false;
- }
-
- if !placement.node_classes.is_empty()
- && !node_class(node)
- .map(|node_class| {
- placement
- .node_classes
- .iter()
- .any(|expected| expected == node_class)
- })
- .unwrap_or(false)
- {
- return false;
- }
-
- placement
- .match_labels
- .iter()
- .all(|(key, value)| node.labels.get(key) == Some(value))
+ cluster_node_is_eligible(node, placement, heartbeat_timeout_secs)
}
fn build_desired_instances(
@@ -1104,25 +838,11 @@ fn spread_value(node: &ClusterNodeRecord, spread_by_label: Option<&str>) -> Stri
}
fn node_pool(node: &ClusterNodeRecord) -> Option<&str> {
- node.pool
- .as_deref()
- .or_else(|| node.labels.get("pool").map(String::as_str))
- .or_else(|| {
- node.labels
- .get("pool.photoncloud.io/name")
- .map(String::as_str)
- })
+ cluster_node_pool(node)
}
fn node_class(node: &ClusterNodeRecord) -> Option<&str> {
- node.node_class
- .as_deref()
- .or_else(|| node.labels.get("node_class").map(String::as_str))
- .or_else(|| {
- node.labels
- .get("nodeclass.photoncloud.io/name")
- .map(String::as_str)
- })
+ cluster_node_class(node)
}
fn resolve_instance_port(service: &ServiceSpec) -> Option {
@@ -1288,26 +1008,7 @@ fn plan_managed_reconciliation(
}
fn instance_is_available(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool {
- matches!(instance.state.as_deref(), Some("healthy"))
- && instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs)
-}
-
-fn instance_has_fresh_heartbeat(
- instance: &ServiceInstanceSpec,
- heartbeat_timeout_secs: u64,
-) -> bool {
- if heartbeat_timeout_secs == 0 {
- return true;
- }
-
- let Some(last_heartbeat) = instance.last_heartbeat.or(instance.observed_at) else {
- return false;
- };
-
- Utc::now()
- .signed_duration_since(last_heartbeat)
- .num_seconds()
- <= heartbeat_timeout_secs as i64
+ service_instance_is_available(instance, heartbeat_timeout_secs)
}
fn instance_is_reusable(instance: &ServiceInstanceSpec) -> bool {
diff --git a/deployer/crates/node-agent/src/agent.rs b/deployer/crates/node-agent/src/agent.rs
index 13f8319..770d0bd 100644
--- a/deployer/crates/node-agent/src/agent.rs
+++ b/deployer/crates/node-agent/src/agent.rs
@@ -7,7 +7,11 @@ use std::time::Duration;
use anyhow::{Context, Result};
use chainfire_client::{Client, ClientError};
use chrono::{DateTime, Utc};
-use deployer_types::{ContainerSpec, HealthCheckSpec, ProcessSpec, ServiceInstanceSpec};
+use deployer_types::{
+ build_service_status_record, compute_service_dependency_cycles, ClusterNodeRecord,
+ ContainerSpec, HealthCheckSpec, ProcessSpec, ServiceInstanceSpec, ServicePublicationState,
+ ServiceSpec,
+};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
@@ -44,6 +48,22 @@ fn key_instance(
.into_bytes()
}
+fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
+ format!(
+ "{}/clusters/{}/service-statuses/",
+ cluster_namespace, cluster_id
+ )
+}
+
+fn key_service_status(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec {
+ format!(
+ "{}{}",
+ service_status_prefix(cluster_namespace, cluster_id),
+ service
+ )
+ .into_bytes()
+}
+
#[derive(Debug, Deserialize, Serialize)]
pub struct NodeState {
pub node_id: String,
@@ -67,6 +87,7 @@ pub struct Agent {
cluster_id: String,
node_id: String,
interval: Duration,
+ heartbeat_timeout_secs: u64,
apply: bool,
allow_local_instance_upsert: bool,
process_manager: ProcessManager,
@@ -98,6 +119,7 @@ impl Agent {
cluster_id: String,
node_id: String,
interval: Duration,
+ heartbeat_timeout_secs: u64,
apply: bool,
allow_local_instance_upsert: bool,
pid_dir: PathBuf,
@@ -108,6 +130,7 @@ impl Agent {
cluster_id,
node_id,
interval,
+ heartbeat_timeout_secs,
apply,
allow_local_instance_upsert,
process_manager: ProcessManager::new(pid_dir),
@@ -193,6 +216,10 @@ impl Agent {
info!("apply=false; skipping process reconciliation and health checks");
}
+ if let Err(e) = self.persist_service_statuses(&mut client).await {
+ warn!(error = %e, "failed to persist aggregated service statuses");
+ }
+
self.log_node_only(&node);
Ok(())
@@ -457,6 +484,156 @@ impl Agent {
Ok(())
}
+ async fn load_cluster_nodes(&self, client: &mut Client) -> Result> {
+ let prefix = format!(
+ "{}nodes/",
+ cluster_prefix(&self.cluster_namespace, &self.cluster_id)
+ );
+ let kvs = client.get_prefix(prefix.as_bytes()).await?;
+ let mut nodes = Vec::with_capacity(kvs.len());
+
+ for (_key, value) in kvs {
+ match serde_json::from_slice::(&value) {
+ Ok(node) => nodes.push(node),
+ Err(error) => warn!(error = %error, "failed to decode cluster node"),
+ }
+ }
+
+ nodes.sort_by(|lhs, rhs| lhs.node_id.cmp(&rhs.node_id));
+ Ok(nodes)
+ }
+
+ async fn load_services(&self, client: &mut Client) -> Result> {
+ let prefix = format!(
+ "{}services/",
+ cluster_prefix(&self.cluster_namespace, &self.cluster_id)
+ );
+ let kvs = client.get_prefix(prefix.as_bytes()).await?;
+ let mut services = Vec::with_capacity(kvs.len());
+
+ for (_key, value) in kvs {
+ match serde_json::from_slice::(&value) {
+ Ok(service) => services.push(service),
+ Err(error) => warn!(error = %error, "failed to decode service spec"),
+ }
+ }
+
+ services.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
+ Ok(services)
+ }
+
+ async fn load_instances_by_service(
+ &self,
+ client: &mut Client,
+ ) -> Result>> {
+ let prefix = format!(
+ "{}instances/",
+ cluster_prefix(&self.cluster_namespace, &self.cluster_id)
+ );
+ let kvs = client.get_prefix(prefix.as_bytes()).await?;
+ let mut instances = HashMap::>::new();
+
+ for (_key, value) in kvs {
+ match serde_json::from_slice::(&value) {
+ Ok(instance) => {
+ instances
+ .entry(instance.service.clone())
+ .or_default()
+ .push(instance);
+ }
+ Err(error) => warn!(error = %error, "failed to decode service instance"),
+ }
+ }
+
+ for service_instances in instances.values_mut() {
+ service_instances.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id));
+ }
+
+ Ok(instances)
+ }
+
+ async fn load_publications(
+ &self,
+ client: &mut Client,
+ ) -> Result> {
+ let prefix = format!(
+ "{}publications/",
+ cluster_prefix(&self.cluster_namespace, &self.cluster_id)
+ );
+ let kvs = client.get_prefix(prefix.as_bytes()).await?;
+ let mut publications = HashMap::with_capacity(kvs.len());
+
+ for (_key, value) in kvs {
+ match serde_json::from_slice::(&value) {
+ Ok(publication) => {
+ publications.insert(publication.service.clone(), publication);
+ }
+ Err(error) => warn!(error = %error, "failed to decode service publication"),
+ }
+ }
+
+ Ok(publications)
+ }
+
+ async fn persist_service_statuses(&self, client: &mut Client) -> Result<()> {
+ let nodes = self.load_cluster_nodes(client).await?;
+ let services = self.load_services(client).await?;
+ let instances_by_service = self.load_instances_by_service(client).await?;
+ let publications = self.load_publications(client).await?;
+ let dependency_cycles = compute_service_dependency_cycles(&services);
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
+
+ for service in &services {
+ let status = build_service_status_record(
+ service,
+ &nodes,
+ &services_by_name,
+ &instances_by_service,
+ &publications,
+ &dependency_cycles,
+ self.heartbeat_timeout_secs,
+ );
+ let key = key_service_status(&self.cluster_namespace, &self.cluster_id, &service.name);
+ client.put(&key, serde_json::to_vec(&status)?).await?;
+ }
+
+ self.cleanup_stale_service_statuses(client, &services)
+ .await?;
+ Ok(())
+ }
+
+ async fn cleanup_stale_service_statuses(
+ &self,
+ client: &mut Client,
+ services: &[ServiceSpec],
+ ) -> Result<()> {
+ let desired = services
+ .iter()
+ .map(|service| service.name.as_str())
+ .collect::>();
+ let prefix = service_status_prefix(&self.cluster_namespace, &self.cluster_id);
+ let existing = client.get_prefix(prefix.as_bytes()).await?;
+
+ for (key, _) in existing {
+ let key_str = String::from_utf8_lossy(&key);
+ let Some(service_name) = key_str.strip_prefix(&prefix) else {
+ continue;
+ };
+ if service_name.is_empty()
+ || service_name.contains('/')
+ || desired.contains(service_name)
+ {
+ continue;
+ }
+ let _ = client.delete(&key).await?;
+ }
+
+ Ok(())
+ }
+
/// Desired Stateに基づいてプロセスを起動/停止する
async fn reconcile_processes(&mut self, client: &mut Client) -> Result<()> {
let prefix = format!(
@@ -771,6 +948,7 @@ mod tests {
"test-cluster".to_string(),
"node01".to_string(),
Duration::from_secs(1),
+ 300,
false,
false,
PathBuf::from("/tmp/photoncloud-node-agent-tests"),
diff --git a/deployer/crates/node-agent/src/main.rs b/deployer/crates/node-agent/src/main.rs
index 3fe36f0..d8c8d8f 100644
--- a/deployer/crates/node-agent/src/main.rs
+++ b/deployer/crates/node-agent/src/main.rs
@@ -36,6 +36,10 @@ struct Cli {
#[arg(long, default_value_t = 15)]
interval_secs: u64,
+ /// service status 集約時に利用する heartbeat 許容秒数
+ #[arg(long, default_value_t = 300)]
+ heartbeat_timeout_secs: u64,
+
/// PIDファイル出力ディレクトリ
#[arg(long, default_value = "/var/run/photoncloud")]
pid_dir: String,
@@ -73,6 +77,7 @@ async fn main() -> Result<()> {
cli.cluster_id,
cli.node_id,
Duration::from_secs(cli.interval_secs),
+ cli.heartbeat_timeout_secs,
cli.apply,
cli.allow_local_instance_upsert,
std::path::PathBuf::from(cli.pid_dir),
diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh
index ecb99ed..1c2ea80 100755
--- a/deployer/scripts/verify-fleet-scheduler-e2e.sh
+++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh
@@ -461,6 +461,23 @@ for _ in 1 2 3; do
sleep 1
done
+run_deployer_ctl service inspect --name api --include-instances >"$tmp_dir/api-inspect-healthy.json"
+python3 - "$tmp_dir/api-inspect-healthy.json" <<'PY'
+import json
+import sys
+
+payload = json.load(open(sys.argv[1], "r", encoding="utf-8"))
+status = payload.get("status") or {}
+instances = payload.get("instances") or []
+
+if status.get("phase") != "healthy":
+ raise SystemExit(f"expected api inspect phase=healthy before scheduler rerun, found {status.get('phase')}")
+if len(instances) != 2:
+ raise SystemExit(f"expected 2 api instances from service inspect, found {len(instances)}")
+
+print("api service inspect refreshed to healthy from node-agent updates")
+PY
+
echo "Re-running scheduler after api became healthy"
run_scheduler_once
@@ -557,9 +574,6 @@ if states != ["healthy", "healthy"]:
print("Observed two healthy dependent worker instances across node01 and node02")
PY
-echo "Refreshing aggregated service status after worker became healthy"
-run_scheduler_once
-
run_deployer_ctl service inspect --name worker --include-instances >"$tmp_dir/worker-inspect-healthy.json"
python3 - "$tmp_dir/worker-inspect-healthy.json" <<'PY'
import json
@@ -663,6 +677,23 @@ if instance.get("state") != "healthy":
print("Observed one healthy dependent worker instance on node01 after scale-down")
PY
+run_deployer_ctl service inspect --name worker --include-instances >"$tmp_dir/worker-inspect-scaled.json"
+python3 - "$tmp_dir/worker-inspect-scaled.json" <<'PY'
+import json
+import sys
+
+payload = json.load(open(sys.argv[1], "r", encoding="utf-8"))
+status = payload.get("status") or {}
+instances = payload.get("instances") or []
+
+if status.get("phase") != "healthy":
+ raise SystemExit(f"expected scaled worker inspect phase=healthy, found {status.get('phase')}")
+if len(instances) != 1:
+ raise SystemExit(f"expected 1 scaled worker instance from service inspect, found {len(instances)}")
+
+print("service inspect reports scaled healthy worker state without waiting for scheduler status refresh")
+PY
+
echo "Validating endpoint convergence after scale-down"
python3 - <<'PY'
import socket