diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index 20cbdcf..744ef5e 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; /// Node lifecycle state #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -980,6 +980,391 @@ pub struct ServiceStatusRecord { pub observed_at: Option>, } +/// Summarized readiness of a service's declared dependencies. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct ServiceDependencySummary { + pub dependencies_ready: bool, + pub blockers: Vec, +} + +pub fn cluster_node_pool(node: &ClusterNodeRecord) -> Option<&str> { + node.pool + .as_deref() + .or_else(|| node.labels.get("pool").map(String::as_str)) + .or_else(|| { + node.labels + .get("pool.photoncloud.io/name") + .map(String::as_str) + }) +} + +pub fn cluster_node_class(node: &ClusterNodeRecord) -> Option<&str> { + node.node_class + .as_deref() + .or_else(|| node.labels.get("node_class").map(String::as_str)) + .or_else(|| { + node.labels + .get("nodeclass.photoncloud.io/name") + .map(String::as_str) + }) +} + +pub fn cluster_node_is_eligible( + node: &ClusterNodeRecord, + placement: &PlacementPolicy, + heartbeat_timeout_secs: u64, +) -> bool { + if node.state.as_deref() != Some("active") { + return false; + } + + if heartbeat_timeout_secs > 0 { + let Some(last) = node.last_heartbeat else { + return false; + }; + let age = Utc::now().signed_duration_since(last).num_seconds(); + if age > heartbeat_timeout_secs as i64 { + return false; + } + } + + if !placement.roles.is_empty() + && !node + .roles + .iter() + .any(|role| placement.roles.iter().any(|expected| expected == role)) + { + return false; + } + + if !placement.pools.is_empty() + && !cluster_node_pool(node) + .map(|pool| placement.pools.iter().any(|expected| expected == pool)) + .unwrap_or(false) + { + return false; + } + + if !placement.node_classes.is_empty() + && !cluster_node_class(node) + .map(|node_class| { + placement + .node_classes + .iter() + .any(|expected| expected == node_class) + }) + .unwrap_or(false) + { + return false; + } + + placement + .match_labels + .iter() + .all(|(key, value)| node.labels.get(key) == Some(value)) +} + +pub fn eligible_cluster_nodes<'a>( + nodes: &'a [ClusterNodeRecord], + placement: &PlacementPolicy, + heartbeat_timeout_secs: u64, +) -> Vec<&'a ClusterNodeRecord> { + nodes + .iter() + .filter(|node| cluster_node_is_eligible(node, placement, heartbeat_timeout_secs)) + .collect() +} + +pub fn service_instance_has_fresh_heartbeat( + instance: &ServiceInstanceSpec, + heartbeat_timeout_secs: u64, +) -> bool { + if heartbeat_timeout_secs == 0 { + return true; + } + + let Some(last_heartbeat) = instance.last_heartbeat.or(instance.observed_at) else { + return false; + }; + + Utc::now() + .signed_duration_since(last_heartbeat) + .num_seconds() + <= heartbeat_timeout_secs as i64 +} + +pub fn service_instance_is_available( + instance: &ServiceInstanceSpec, + heartbeat_timeout_secs: u64, +) -> bool { + matches!(instance.state.as_deref(), Some("healthy")) + && service_instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs) +} + +pub fn service_publication_is_ready(state: &ServicePublicationState) -> bool { + state + .dns + .as_ref() + .map(|dns| !dns.values.is_empty()) + .unwrap_or(false) + || state + .load_balancer + .as_ref() + .and_then(|load_balancer| load_balancer.vip_address.as_ref()) + .is_some() +} + +pub fn compute_service_dependency_cycles(services: &[ServiceSpec]) -> HashSet { + let service_names = services + .iter() + .map(|service| service.name.clone()) + .collect::>(); + let dependencies = services + .iter() + .map(|service| { + let deps = service + .depends_on + .iter() + .filter(|dependency| service_names.contains(&dependency.service)) + .map(|dependency| dependency.service.clone()) + .collect::>(); + (service.name.clone(), deps) + }) + .collect::>(); + + let mut permanent = HashSet::new(); + let mut visiting = Vec::::new(); + let mut cycles = HashSet::new(); + + for service in services { + visit_service_dependency_cycles( + &service.name, + &dependencies, + &mut permanent, + &mut visiting, + &mut cycles, + ); + } + + cycles +} + +fn visit_service_dependency_cycles( + service: &str, + dependencies: &HashMap>, + permanent: &mut HashSet, + visiting: &mut Vec, + cycles: &mut HashSet, +) { + if permanent.contains(service) { + return; + } + + if let Some(position) = visiting.iter().position(|current| current == service) { + cycles.extend(visiting[position..].iter().cloned()); + return; + } + + visiting.push(service.to_string()); + if let Some(depends_on) = dependencies.get(service) { + for dependency in depends_on { + visit_service_dependency_cycles(dependency, dependencies, permanent, visiting, cycles); + } + } + visiting.pop(); + permanent.insert(service.to_string()); +} + +pub fn summarize_service_dependencies( + service: &ServiceSpec, + services_by_name: &HashMap<&str, &ServiceSpec>, + instances_by_service: &HashMap>, + publications: &HashMap, + dependency_cycles: &HashSet, + heartbeat_timeout_secs: u64, +) -> ServiceDependencySummary { + let mut blockers = Vec::new(); + + if dependency_cycles.contains(&service.name) { + blockers.push("cyclic dependency graph".to_string()); + } + + for dependency in &service.depends_on { + if dependency.service == service.name { + blockers.push(format!( + "dependency {} points to itself", + dependency.service + )); + continue; + } + + if !services_by_name.contains_key(dependency.service.as_str()) { + blockers.push(format!("dependency {} is not defined", dependency.service)); + continue; + } + + if dependency_cycles.contains(&dependency.service) { + blockers.push(format!( + "dependency {} is part of a dependency cycle", + dependency.service + )); + continue; + } + + match dependency.condition { + ServiceDependencyCondition::Healthy => { + let ready = instances_by_service + .get(&dependency.service) + .map(|instances| { + instances + .iter() + .filter(|instance| { + service_instance_is_available(instance, heartbeat_timeout_secs) + }) + .count() as u32 + }) + .unwrap_or(0); + let min_ready = dependency.min_ready.max(1); + if ready < min_ready { + blockers.push(format!( + "dependency {} has {ready}/{min_ready} healthy instance(s)", + dependency.service + )); + } + } + ServiceDependencyCondition::Published => { + let ready = publications + .get(&dependency.service) + .map(service_publication_is_ready) + .unwrap_or(false); + if !ready { + blockers.push(format!( + "dependency {} is not published", + dependency.service + )); + } + } + } + } + + ServiceDependencySummary { + dependencies_ready: blockers.is_empty(), + blockers, + } +} + +pub fn desired_service_instance_count( + service: &ServiceSpec, + nodes: &[ClusterNodeRecord], + heartbeat_timeout_secs: u64, +) -> u32 { + let Some(schedule) = service.schedule.as_ref() else { + return 0; + }; + match schedule.mode { + ServiceScheduleMode::Replicated => schedule.replicas, + ServiceScheduleMode::Daemon => { + eligible_cluster_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32 + } + } +} + +pub fn build_service_status_record( + service: &ServiceSpec, + nodes: &[ClusterNodeRecord], + services_by_name: &HashMap<&str, &ServiceSpec>, + instances_by_service: &HashMap>, + publications: &HashMap, + dependency_cycles: &HashSet, + heartbeat_timeout_secs: u64, +) -> ServiceStatusRecord { + let instances = instances_by_service + .get(&service.name) + .map(Vec::as_slice) + .unwrap_or(&[]); + let healthy_instances = instances + .iter() + .filter(|instance| service_instance_is_available(instance, heartbeat_timeout_secs)) + .count() as u32; + let scheduled_instances = instances.len() as u32; + let desired_instances = desired_service_instance_count(service, nodes, heartbeat_timeout_secs); + let publish_ready = publications + .get(&service.name) + .map(service_publication_is_ready) + .unwrap_or(false); + let dependencies = summarize_service_dependencies( + service, + services_by_name, + instances_by_service, + publications, + dependency_cycles, + heartbeat_timeout_secs, + ); + let eligible_node_count = service + .schedule + .as_ref() + .map(|schedule| { + eligible_cluster_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() + }) + .unwrap_or(0); + + let (phase, message) = if service.schedule.is_none() { + ( + "unmanaged".to_string(), + Some("service is not managed by fleet-scheduler".to_string()), + ) + } else if !dependencies.dependencies_ready { + ( + "blocked".to_string(), + Some(dependencies.blockers.join("; ")), + ) + } else if eligible_node_count == 0 { + ( + "unschedulable".to_string(), + Some("no eligible nodes match the service placement policy".to_string()), + ) + } else if desired_instances == 0 && scheduled_instances == 0 { + ( + "idle".to_string(), + Some("service has no desired instances".to_string()), + ) + } else if desired_instances > 0 && healthy_instances >= desired_instances { + ( + "healthy".to_string(), + Some(format!( + "healthy instances satisfy desired count ({healthy_instances}/{desired_instances})" + )), + ) + } else if scheduled_instances > 0 || healthy_instances > 0 { + ( + "degraded".to_string(), + Some(format!( + "healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}" + )), + ) + } else { + ( + "pending".to_string(), + Some(format!( + "waiting for instances to reach desired count ({healthy_instances}/{desired_instances})" + )), + ) + }; + + ServiceStatusRecord { + service: service.name.clone(), + phase, + desired_instances, + scheduled_instances, + healthy_instances, + publish_ready, + dependencies_ready: dependencies.dependencies_ready, + blockers: dependencies.blockers, + message, + observed_at: Some(Utc::now()), + } +} + /// mTLS policy definition. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MtlsPolicySpec { @@ -1253,6 +1638,207 @@ mod tests { assert_eq!(decoded.dns.unwrap().fqdn, "api.test.cluster.local"); } + fn active_cluster_node(node_id: &str) -> ClusterNodeRecord { + ClusterNodeRecord { + node_id: node_id.to_string(), + machine_id: None, + ip: format!("10.0.0.{}", if node_id == "node01" { 11 } else { 12 }), + hostname: node_id.to_string(), + roles: vec!["worker".to_string()], + labels: HashMap::from([("tier".to_string(), "general".to_string())]), + pool: Some("general".to_string()), + node_class: Some("worker-linux".to_string()), + failure_domain: None, + nix_profile: None, + install_plan: None, + hardware_facts: None, + state: Some("active".to_string()), + commission_state: None, + install_state: None, + commissioned_at: None, + last_inventory_hash: None, + power_state: None, + bmc_ref: None, + last_heartbeat: Some(Utc::now()), + } + } + + fn scheduled_service_named(name: &str) -> ServiceSpec { + ServiceSpec { + name: name.to_string(), + ports: Some(ServicePorts { + http: Some(8080), + grpc: None, + }), + protocol: None, + mtls_required: None, + mesh_mode: None, + depends_on: Vec::new(), + schedule: Some(ServiceScheduleSpec { + mode: ServiceScheduleMode::Replicated, + replicas: 2, + placement: PlacementPolicy { + roles: vec!["worker".to_string()], + pools: Vec::new(), + node_classes: Vec::new(), + match_labels: HashMap::new(), + spread_by_label: None, + max_instances_per_node: 1, + }, + rollout: RolloutStrategySpec::default(), + instance_port: Some(8080), + mesh_port: None, + process: None, + container: None, + health_check: None, + }), + publish: None, + } + } + + fn healthy_service_instance(service: &str, node_id: &str) -> ServiceInstanceSpec { + ServiceInstanceSpec { + instance_id: format!("{service}-{node_id}"), + service: service.to_string(), + node_id: node_id.to_string(), + ip: if node_id == "node01" { + "10.0.0.11".to_string() + } else { + "10.0.0.12".to_string() + }, + port: 8080, + mesh_port: None, + version: None, + health_check: None, + process: None, + container: None, + managed_by: Some("fleet-scheduler".to_string()), + state: Some("healthy".to_string()), + last_heartbeat: Some(Utc::now()), + observed_at: Some(Utc::now()), + } + } + + #[test] + fn test_compute_service_dependency_cycles_detects_full_cycle() { + let mut api = scheduled_service_named("api"); + api.depends_on = vec![ServiceDependencySpec { + service: "worker".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 1, + }]; + + let mut worker = scheduled_service_named("worker"); + worker.depends_on = vec![ServiceDependencySpec { + service: "edge".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 1, + }]; + + let mut edge = scheduled_service_named("edge"); + edge.depends_on = vec![ServiceDependencySpec { + service: "api".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 1, + }]; + + let cycles = compute_service_dependency_cycles(&[api, worker, edge]); + assert_eq!( + cycles, + HashSet::from(["api".to_string(), "worker".to_string(), "edge".to_string()]) + ); + } + + #[test] + fn test_summarize_service_dependencies_blocks_until_dependency_is_healthy() { + let api = scheduled_service_named("api"); + let mut worker = scheduled_service_named("worker"); + worker.depends_on = vec![ServiceDependencySpec { + service: "api".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 2, + }]; + + let services = vec![api.clone(), worker.clone()]; + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); + let instances_by_service = HashMap::from([( + "api".to_string(), + vec![ + healthy_service_instance("api", "node01"), + ServiceInstanceSpec { + state: Some("starting".to_string()), + ..healthy_service_instance("api", "node02") + }, + ], + )]); + + let summary = summarize_service_dependencies( + &worker, + &services_by_name, + &instances_by_service, + &HashMap::new(), + &HashSet::new(), + 300, + ); + + assert!(!summary.dependencies_ready); + assert_eq!( + summary.blockers, + vec!["dependency api has 1/2 healthy instance(s)".to_string()] + ); + } + + #[test] + fn test_build_service_status_record_recognizes_published_dependency() { + let api = scheduled_service_named("api"); + let mut worker = scheduled_service_named("worker"); + worker.depends_on = vec![ServiceDependencySpec { + service: "api".to_string(), + condition: ServiceDependencyCondition::Published, + min_ready: 1, + }]; + + let services = vec![api.clone(), worker.clone()]; + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); + let publications = HashMap::from([( + "api".to_string(), + ServicePublicationState { + service: "api".to_string(), + org_id: "default-org".to_string(), + project_id: "default-project".to_string(), + load_balancer: Some(PublishedLoadBalancerState { + id: "lb-1".to_string(), + pool_id: "pool-1".to_string(), + listener_id: "listener-1".to_string(), + vip_address: Some("203.0.113.10".to_string()), + }), + dns: None, + observed_at: Some(Utc::now()), + }, + )]); + let nodes = vec![active_cluster_node("node01")]; + + let status = build_service_status_record( + &worker, + &nodes, + &services_by_name, + &HashMap::new(), + &publications, + &HashSet::new(), + 300, + ); + + assert_eq!(status.phase, "pending"); + assert!(status.dependencies_ready); + assert!(status.blockers.is_empty()); + } + #[test] fn test_observed_system_state_roundtrip() { let observed = ObservedSystemState { diff --git a/deployer/crates/fleet-scheduler/src/main.rs b/deployer/crates/fleet-scheduler/src/main.rs index 10e920b..618fea2 100644 --- a/deployer/crates/fleet-scheduler/src/main.rs +++ b/deployer/crates/fleet-scheduler/src/main.rs @@ -3,11 +3,13 @@ mod publish; use anyhow::{Context, Result}; use chainfire_client::Client; -use chrono::Utc; use clap::Parser; use deployer_types::{ - ClusterNodeRecord, PlacementPolicy, ServiceDependencyCondition, ServiceInstanceSpec, - ServicePublicationState, ServiceScheduleMode, ServiceSpec, ServiceStatusRecord, + build_service_status_record, cluster_node_class, cluster_node_pool, + compute_service_dependency_cycles, eligible_cluster_nodes, service_instance_is_available, + summarize_service_dependencies, ClusterNodeRecord, PlacementPolicy, ServiceDependencySummary, + ServiceInstanceSpec, ServicePublicationState, ServiceScheduleMode, ServiceSpec, + ServiceStatusRecord, }; use publish::{PublicationConfig, PublicationReconciler}; use serde_json::Value; @@ -17,6 +19,11 @@ use tokio::time::sleep; use tracing::{debug, info, warn}; use tracing_subscriber::EnvFilter; +#[cfg(test)] +use chrono::Utc; +#[cfg(test)] +use deployer_types::cluster_node_is_eligible; + const MANAGED_BY: &str = "fleet-scheduler"; #[derive(Debug, Parser)] @@ -91,11 +98,7 @@ struct ReconcilePlan { deferred_deletes: usize, } -#[derive(Debug, Default)] -struct DependencySummary { - dependencies_ready: bool, - blockers: Vec, -} +type DependencySummary = ServiceDependencySummary; impl Scheduler { fn new(cli: Cli) -> Self { @@ -519,64 +522,7 @@ fn service_status_key(cluster_namespace: &str, cluster_id: &str, service: &str) } fn dependency_cycle_services(services: &[ServiceSpec]) -> HashSet { - let service_names = services - .iter() - .map(|service| service.name.clone()) - .collect::>(); - let dependencies = services - .iter() - .map(|service| { - let deps = service - .depends_on - .iter() - .filter(|dependency| service_names.contains(&dependency.service)) - .map(|dependency| dependency.service.clone()) - .collect::>(); - (service.name.clone(), deps) - }) - .collect::>(); - - let mut permanent = HashSet::new(); - let mut visiting = Vec::::new(); - let mut cycles = HashSet::new(); - - for service in services { - visit_dependency_cycles( - &service.name, - &dependencies, - &mut permanent, - &mut visiting, - &mut cycles, - ); - } - - cycles -} - -fn visit_dependency_cycles( - service: &str, - dependencies: &HashMap>, - permanent: &mut HashSet, - visiting: &mut Vec, - cycles: &mut HashSet, -) { - if permanent.contains(service) { - return; - } - - if let Some(position) = visiting.iter().position(|current| current == service) { - cycles.extend(visiting[position..].iter().cloned()); - return; - } - - visiting.push(service.to_string()); - if let Some(depends_on) = dependencies.get(service) { - for dependency in depends_on { - visit_dependency_cycles(dependency, dependencies, permanent, visiting, cycles); - } - } - visiting.pop(); - permanent.insert(service.to_string()); + compute_service_dependency_cycles(services) } fn dependency_summary( @@ -587,103 +533,14 @@ fn dependency_summary( dependency_cycles: &HashSet, heartbeat_timeout_secs: u64, ) -> DependencySummary { - let mut blockers = Vec::new(); - - if dependency_cycles.contains(&service.name) { - blockers.push("cyclic dependency graph".to_string()); - } - - for dependency in &service.depends_on { - if dependency.service == service.name { - blockers.push(format!( - "dependency {} points to itself", - dependency.service - )); - continue; - } - - if !services_by_name.contains_key(dependency.service.as_str()) { - blockers.push(format!("dependency {} is not defined", dependency.service)); - continue; - } - - if dependency_cycles.contains(&dependency.service) { - blockers.push(format!( - "dependency {} is part of a dependency cycle", - dependency.service - )); - continue; - } - - match dependency.condition { - ServiceDependencyCondition::Healthy => { - let ready = instances_by_service - .get(&dependency.service) - .map(|instances| { - instances - .iter() - .filter(|instance| { - instance_is_available(instance, heartbeat_timeout_secs) - }) - .count() as u32 - }) - .unwrap_or(0); - let min_ready = dependency.min_ready.max(1); - if ready < min_ready { - blockers.push(format!( - "dependency {} has {ready}/{min_ready} healthy instance(s)", - dependency.service - )); - } - } - ServiceDependencyCondition::Published => { - let ready = publications - .get(&dependency.service) - .map(publication_ready) - .unwrap_or(false); - if !ready { - blockers.push(format!( - "dependency {} is not published", - dependency.service - )); - } - } - } - } - - DependencySummary { - dependencies_ready: blockers.is_empty(), - blockers, - } -} - -fn publication_ready(state: &ServicePublicationState) -> bool { - state - .dns - .as_ref() - .map(|dns| !dns.values.is_empty()) - .unwrap_or(false) - || state - .load_balancer - .as_ref() - .and_then(|load_balancer| load_balancer.vip_address.as_ref()) - .is_some() -} - -fn desired_instance_count( - service: &ServiceSpec, - nodes: &[ClusterNodeRecord], - heartbeat_timeout_secs: u64, -) -> u32 { - let Some(schedule) = service.schedule.as_ref() else { - return 0; - }; - match schedule.mode { - ServiceScheduleMode::Replicated => schedule.replicas, - ServiceScheduleMode::Daemon => { - eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32 - } - } + summarize_service_dependencies( + service, + services_by_name, + instances_by_service, + publications, + dependency_cycles, + heartbeat_timeout_secs, + ) } fn build_service_status( @@ -695,89 +552,15 @@ fn build_service_status( dependency_cycles: &HashSet, heartbeat_timeout_secs: u64, ) -> ServiceStatusRecord { - let instances = instances_by_service - .get(&service.name) - .map(Vec::as_slice) - .unwrap_or(&[]); - let healthy_instances = instances - .iter() - .filter(|instance| instance_is_available(instance, heartbeat_timeout_secs)) - .count() as u32; - let scheduled_instances = instances.len() as u32; - let desired_instances = desired_instance_count(service, nodes, heartbeat_timeout_secs); - let publish_ready = publications - .get(&service.name) - .map(publication_ready) - .unwrap_or(false); - let dependencies = dependency_summary( + build_service_status_record( service, + nodes, services_by_name, instances_by_service, publications, dependency_cycles, heartbeat_timeout_secs, - ); - let eligible_node_count = service - .schedule - .as_ref() - .map(|schedule| eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len()) - .unwrap_or(0); - - let (phase, message) = if service.schedule.is_none() { - ( - "unmanaged".to_string(), - Some("service is not managed by fleet-scheduler".to_string()), - ) - } else if !dependencies.dependencies_ready { - ( - "blocked".to_string(), - Some(dependencies.blockers.join("; ")), - ) - } else if eligible_node_count == 0 { - ( - "unschedulable".to_string(), - Some("no eligible nodes match the service placement policy".to_string()), - ) - } else if desired_instances == 0 && scheduled_instances == 0 { - ( - "idle".to_string(), - Some("service has no desired instances".to_string()), - ) - } else if desired_instances > 0 && healthy_instances >= desired_instances { - ( - "healthy".to_string(), - Some(format!( - "healthy instances satisfy desired count ({healthy_instances}/{desired_instances})" - )), - ) - } else if scheduled_instances > 0 || healthy_instances > 0 { - ( - "degraded".to_string(), - Some(format!( - "healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}" - )), - ) - } else { - ( - "pending".to_string(), - Some(format!( - "waiting for instances to reach desired count ({healthy_instances}/{desired_instances})" - )), - ) - }; - - ServiceStatusRecord { - service: service.name.clone(), - phase, - desired_instances, - scheduled_instances, - healthy_instances, - publish_ready, - dependencies_ready: dependencies.dependencies_ready, - blockers: dependencies.blockers, - message, - observed_at: Some(Utc::now()), - } + ) } fn eligible_nodes<'a>( @@ -785,65 +568,16 @@ fn eligible_nodes<'a>( placement: &PlacementPolicy, heartbeat_timeout_secs: u64, ) -> Vec<&'a ClusterNodeRecord> { - nodes - .iter() - .filter(|node| node_is_eligible(node, placement, heartbeat_timeout_secs)) - .collect() + eligible_cluster_nodes(nodes, placement, heartbeat_timeout_secs) } +#[cfg(test)] fn node_is_eligible( node: &ClusterNodeRecord, placement: &PlacementPolicy, heartbeat_timeout_secs: u64, ) -> bool { - if node.state.as_deref() != Some("active") { - return false; - } - - if heartbeat_timeout_secs > 0 { - let Some(last) = node.last_heartbeat else { - return false; - }; - let age = Utc::now().signed_duration_since(last).num_seconds(); - if age > heartbeat_timeout_secs as i64 { - return false; - } - } - - if !placement.roles.is_empty() - && !node - .roles - .iter() - .any(|role| placement.roles.iter().any(|expected| expected == role)) - { - return false; - } - - if !placement.pools.is_empty() - && !node_pool(node) - .map(|pool| placement.pools.iter().any(|expected| expected == pool)) - .unwrap_or(false) - { - return false; - } - - if !placement.node_classes.is_empty() - && !node_class(node) - .map(|node_class| { - placement - .node_classes - .iter() - .any(|expected| expected == node_class) - }) - .unwrap_or(false) - { - return false; - } - - placement - .match_labels - .iter() - .all(|(key, value)| node.labels.get(key) == Some(value)) + cluster_node_is_eligible(node, placement, heartbeat_timeout_secs) } fn build_desired_instances( @@ -1104,25 +838,11 @@ fn spread_value(node: &ClusterNodeRecord, spread_by_label: Option<&str>) -> Stri } fn node_pool(node: &ClusterNodeRecord) -> Option<&str> { - node.pool - .as_deref() - .or_else(|| node.labels.get("pool").map(String::as_str)) - .or_else(|| { - node.labels - .get("pool.photoncloud.io/name") - .map(String::as_str) - }) + cluster_node_pool(node) } fn node_class(node: &ClusterNodeRecord) -> Option<&str> { - node.node_class - .as_deref() - .or_else(|| node.labels.get("node_class").map(String::as_str)) - .or_else(|| { - node.labels - .get("nodeclass.photoncloud.io/name") - .map(String::as_str) - }) + cluster_node_class(node) } fn resolve_instance_port(service: &ServiceSpec) -> Option { @@ -1288,26 +1008,7 @@ fn plan_managed_reconciliation( } fn instance_is_available(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool { - matches!(instance.state.as_deref(), Some("healthy")) - && instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs) -} - -fn instance_has_fresh_heartbeat( - instance: &ServiceInstanceSpec, - heartbeat_timeout_secs: u64, -) -> bool { - if heartbeat_timeout_secs == 0 { - return true; - } - - let Some(last_heartbeat) = instance.last_heartbeat.or(instance.observed_at) else { - return false; - }; - - Utc::now() - .signed_duration_since(last_heartbeat) - .num_seconds() - <= heartbeat_timeout_secs as i64 + service_instance_is_available(instance, heartbeat_timeout_secs) } fn instance_is_reusable(instance: &ServiceInstanceSpec) -> bool { diff --git a/deployer/crates/node-agent/src/agent.rs b/deployer/crates/node-agent/src/agent.rs index 13f8319..770d0bd 100644 --- a/deployer/crates/node-agent/src/agent.rs +++ b/deployer/crates/node-agent/src/agent.rs @@ -7,7 +7,11 @@ use std::time::Duration; use anyhow::{Context, Result}; use chainfire_client::{Client, ClientError}; use chrono::{DateTime, Utc}; -use deployer_types::{ContainerSpec, HealthCheckSpec, ProcessSpec, ServiceInstanceSpec}; +use deployer_types::{ + build_service_status_record, compute_service_dependency_cycles, ClusterNodeRecord, + ContainerSpec, HealthCheckSpec, ProcessSpec, ServiceInstanceSpec, ServicePublicationState, + ServiceSpec, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::process::Command; @@ -44,6 +48,22 @@ fn key_instance( .into_bytes() } +fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String { + format!( + "{}/clusters/{}/service-statuses/", + cluster_namespace, cluster_id + ) +} + +fn key_service_status(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec { + format!( + "{}{}", + service_status_prefix(cluster_namespace, cluster_id), + service + ) + .into_bytes() +} + #[derive(Debug, Deserialize, Serialize)] pub struct NodeState { pub node_id: String, @@ -67,6 +87,7 @@ pub struct Agent { cluster_id: String, node_id: String, interval: Duration, + heartbeat_timeout_secs: u64, apply: bool, allow_local_instance_upsert: bool, process_manager: ProcessManager, @@ -98,6 +119,7 @@ impl Agent { cluster_id: String, node_id: String, interval: Duration, + heartbeat_timeout_secs: u64, apply: bool, allow_local_instance_upsert: bool, pid_dir: PathBuf, @@ -108,6 +130,7 @@ impl Agent { cluster_id, node_id, interval, + heartbeat_timeout_secs, apply, allow_local_instance_upsert, process_manager: ProcessManager::new(pid_dir), @@ -193,6 +216,10 @@ impl Agent { info!("apply=false; skipping process reconciliation and health checks"); } + if let Err(e) = self.persist_service_statuses(&mut client).await { + warn!(error = %e, "failed to persist aggregated service statuses"); + } + self.log_node_only(&node); Ok(()) @@ -457,6 +484,156 @@ impl Agent { Ok(()) } + async fn load_cluster_nodes(&self, client: &mut Client) -> Result> { + let prefix = format!( + "{}nodes/", + cluster_prefix(&self.cluster_namespace, &self.cluster_id) + ); + let kvs = client.get_prefix(prefix.as_bytes()).await?; + let mut nodes = Vec::with_capacity(kvs.len()); + + for (_key, value) in kvs { + match serde_json::from_slice::(&value) { + Ok(node) => nodes.push(node), + Err(error) => warn!(error = %error, "failed to decode cluster node"), + } + } + + nodes.sort_by(|lhs, rhs| lhs.node_id.cmp(&rhs.node_id)); + Ok(nodes) + } + + async fn load_services(&self, client: &mut Client) -> Result> { + let prefix = format!( + "{}services/", + cluster_prefix(&self.cluster_namespace, &self.cluster_id) + ); + let kvs = client.get_prefix(prefix.as_bytes()).await?; + let mut services = Vec::with_capacity(kvs.len()); + + for (_key, value) in kvs { + match serde_json::from_slice::(&value) { + Ok(service) => services.push(service), + Err(error) => warn!(error = %error, "failed to decode service spec"), + } + } + + services.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name)); + Ok(services) + } + + async fn load_instances_by_service( + &self, + client: &mut Client, + ) -> Result>> { + let prefix = format!( + "{}instances/", + cluster_prefix(&self.cluster_namespace, &self.cluster_id) + ); + let kvs = client.get_prefix(prefix.as_bytes()).await?; + let mut instances = HashMap::>::new(); + + for (_key, value) in kvs { + match serde_json::from_slice::(&value) { + Ok(instance) => { + instances + .entry(instance.service.clone()) + .or_default() + .push(instance); + } + Err(error) => warn!(error = %error, "failed to decode service instance"), + } + } + + for service_instances in instances.values_mut() { + service_instances.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id)); + } + + Ok(instances) + } + + async fn load_publications( + &self, + client: &mut Client, + ) -> Result> { + let prefix = format!( + "{}publications/", + cluster_prefix(&self.cluster_namespace, &self.cluster_id) + ); + let kvs = client.get_prefix(prefix.as_bytes()).await?; + let mut publications = HashMap::with_capacity(kvs.len()); + + for (_key, value) in kvs { + match serde_json::from_slice::(&value) { + Ok(publication) => { + publications.insert(publication.service.clone(), publication); + } + Err(error) => warn!(error = %error, "failed to decode service publication"), + } + } + + Ok(publications) + } + + async fn persist_service_statuses(&self, client: &mut Client) -> Result<()> { + let nodes = self.load_cluster_nodes(client).await?; + let services = self.load_services(client).await?; + let instances_by_service = self.load_instances_by_service(client).await?; + let publications = self.load_publications(client).await?; + let dependency_cycles = compute_service_dependency_cycles(&services); + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); + + for service in &services { + let status = build_service_status_record( + service, + &nodes, + &services_by_name, + &instances_by_service, + &publications, + &dependency_cycles, + self.heartbeat_timeout_secs, + ); + let key = key_service_status(&self.cluster_namespace, &self.cluster_id, &service.name); + client.put(&key, serde_json::to_vec(&status)?).await?; + } + + self.cleanup_stale_service_statuses(client, &services) + .await?; + Ok(()) + } + + async fn cleanup_stale_service_statuses( + &self, + client: &mut Client, + services: &[ServiceSpec], + ) -> Result<()> { + let desired = services + .iter() + .map(|service| service.name.as_str()) + .collect::>(); + let prefix = service_status_prefix(&self.cluster_namespace, &self.cluster_id); + let existing = client.get_prefix(prefix.as_bytes()).await?; + + for (key, _) in existing { + let key_str = String::from_utf8_lossy(&key); + let Some(service_name) = key_str.strip_prefix(&prefix) else { + continue; + }; + if service_name.is_empty() + || service_name.contains('/') + || desired.contains(service_name) + { + continue; + } + let _ = client.delete(&key).await?; + } + + Ok(()) + } + /// Desired Stateに基づいてプロセスを起動/停止する async fn reconcile_processes(&mut self, client: &mut Client) -> Result<()> { let prefix = format!( @@ -771,6 +948,7 @@ mod tests { "test-cluster".to_string(), "node01".to_string(), Duration::from_secs(1), + 300, false, false, PathBuf::from("/tmp/photoncloud-node-agent-tests"), diff --git a/deployer/crates/node-agent/src/main.rs b/deployer/crates/node-agent/src/main.rs index 3fe36f0..d8c8d8f 100644 --- a/deployer/crates/node-agent/src/main.rs +++ b/deployer/crates/node-agent/src/main.rs @@ -36,6 +36,10 @@ struct Cli { #[arg(long, default_value_t = 15)] interval_secs: u64, + /// service status 集約時に利用する heartbeat 許容秒数 + #[arg(long, default_value_t = 300)] + heartbeat_timeout_secs: u64, + /// PIDファイル出力ディレクトリ #[arg(long, default_value = "/var/run/photoncloud")] pid_dir: String, @@ -73,6 +77,7 @@ async fn main() -> Result<()> { cli.cluster_id, cli.node_id, Duration::from_secs(cli.interval_secs), + cli.heartbeat_timeout_secs, cli.apply, cli.allow_local_instance_upsert, std::path::PathBuf::from(cli.pid_dir), diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh index ecb99ed..1c2ea80 100755 --- a/deployer/scripts/verify-fleet-scheduler-e2e.sh +++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh @@ -461,6 +461,23 @@ for _ in 1 2 3; do sleep 1 done +run_deployer_ctl service inspect --name api --include-instances >"$tmp_dir/api-inspect-healthy.json" +python3 - "$tmp_dir/api-inspect-healthy.json" <<'PY' +import json +import sys + +payload = json.load(open(sys.argv[1], "r", encoding="utf-8")) +status = payload.get("status") or {} +instances = payload.get("instances") or [] + +if status.get("phase") != "healthy": + raise SystemExit(f"expected api inspect phase=healthy before scheduler rerun, found {status.get('phase')}") +if len(instances) != 2: + raise SystemExit(f"expected 2 api instances from service inspect, found {len(instances)}") + +print("api service inspect refreshed to healthy from node-agent updates") +PY + echo "Re-running scheduler after api became healthy" run_scheduler_once @@ -557,9 +574,6 @@ if states != ["healthy", "healthy"]: print("Observed two healthy dependent worker instances across node01 and node02") PY -echo "Refreshing aggregated service status after worker became healthy" -run_scheduler_once - run_deployer_ctl service inspect --name worker --include-instances >"$tmp_dir/worker-inspect-healthy.json" python3 - "$tmp_dir/worker-inspect-healthy.json" <<'PY' import json @@ -663,6 +677,23 @@ if instance.get("state") != "healthy": print("Observed one healthy dependent worker instance on node01 after scale-down") PY +run_deployer_ctl service inspect --name worker --include-instances >"$tmp_dir/worker-inspect-scaled.json" +python3 - "$tmp_dir/worker-inspect-scaled.json" <<'PY' +import json +import sys + +payload = json.load(open(sys.argv[1], "r", encoding="utf-8")) +status = payload.get("status") or {} +instances = payload.get("instances") or [] + +if status.get("phase") != "healthy": + raise SystemExit(f"expected scaled worker inspect phase=healthy, found {status.get('phase')}") +if len(instances) != 1: + raise SystemExit(f"expected 1 scaled worker instance from service inspect, found {len(instances)}") + +print("service inspect reports scaled healthy worker state without waiting for scheduler status refresh") +PY + echo "Validating endpoint convergence after scale-down" python3 - <<'PY' import socket