diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index 3fafaa8..20cbdcf 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -319,6 +319,14 @@ fn default_dns_ttl() -> u32 { 30 } +fn default_service_dependency_condition() -> ServiceDependencyCondition { + ServiceDependencyCondition::Healthy +} + +fn default_service_dependency_min_ready() -> u32 { + 1 +} + /// Process specification executed by node-agent for a scheduled instance. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct ProcessSpec { @@ -542,6 +550,37 @@ pub struct ServicePublicationSpec { pub load_balancer: Option, } +/// Readiness condition required from another service before this service can reconcile. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum ServiceDependencyCondition { + /// Require a minimum number of healthy dependency instances. + #[default] + Healthy, + /// Require the dependency to publish at least one DNS or load-balancer artifact. + Published, +} + +/// Dependency edge between scheduler-managed services. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ServiceDependencySpec { + pub service: String, + #[serde(default = "default_service_dependency_condition")] + pub condition: ServiceDependencyCondition, + #[serde(default = "default_service_dependency_min_ready")] + pub min_ready: u32, +} + +impl Default for ServiceDependencySpec { + fn default() -> Self { + Self { + service: String::new(), + condition: default_service_dependency_condition(), + min_ready: default_service_dependency_min_ready(), + } + } +} + /// Cluster node record stored under photoncloud/clusters/{cluster_id}/nodes/{node_id}. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ClusterNodeRecord { @@ -849,6 +888,8 @@ pub struct ServiceSpec { #[serde(default)] pub mesh_mode: Option, #[serde(default)] + pub depends_on: Vec, + #[serde(default)] pub schedule: Option, #[serde(default)] pub publish: Option, @@ -915,6 +956,30 @@ pub struct ServicePublicationState { pub observed_at: Option>, } +/// Scheduler-observed readiness and dependency state for a logical service. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ServiceStatusRecord { + pub service: String, + #[serde(default)] + pub phase: String, + #[serde(default)] + pub desired_instances: u32, + #[serde(default)] + pub scheduled_instances: u32, + #[serde(default)] + pub healthy_instances: u32, + #[serde(default)] + pub publish_ready: bool, + #[serde(default)] + pub dependencies_ready: bool, + #[serde(default)] + pub blockers: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub observed_at: Option>, +} + /// mTLS policy definition. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MtlsPolicySpec { diff --git a/deployer/crates/fleet-scheduler/src/main.rs b/deployer/crates/fleet-scheduler/src/main.rs index c7d9b74..10e920b 100644 --- a/deployer/crates/fleet-scheduler/src/main.rs +++ b/deployer/crates/fleet-scheduler/src/main.rs @@ -6,7 +6,8 @@ use chainfire_client::Client; use chrono::Utc; use clap::Parser; use deployer_types::{ - ClusterNodeRecord, PlacementPolicy, ServiceInstanceSpec, ServiceScheduleMode, ServiceSpec, + ClusterNodeRecord, PlacementPolicy, ServiceDependencyCondition, ServiceInstanceSpec, + ServicePublicationState, ServiceScheduleMode, ServiceSpec, ServiceStatusRecord, }; use publish::{PublicationConfig, PublicationReconciler}; use serde_json::Value; @@ -90,6 +91,12 @@ struct ReconcilePlan { deferred_deletes: usize, } +#[derive(Debug, Default)] +struct DependencySummary { + dependencies_ready: bool, + blockers: Vec, +} + impl Scheduler { fn new(cli: Cli) -> Self { let cluster_namespace = cli.cluster_namespace; @@ -135,6 +142,18 @@ impl Scheduler { let mut client = Client::connect(self.endpoint.clone()).await?; let nodes = self.load_cluster_nodes(&mut client).await?; let services = self.load_services(&mut client).await?; + let instances_before = self.load_instances_by_service(&mut client).await?; + let publications_before = publish::load_publication_states( + &mut client, + &self.cluster_namespace, + &self.cluster_id, + ) + .await?; + let dependency_cycles = dependency_cycle_services(&services); + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); debug!( nodes = nodes.len(), @@ -146,6 +165,22 @@ impl Scheduler { if service.schedule.is_none() { continue; } + let dependencies = dependency_summary( + service, + &services_by_name, + &instances_before, + &publications_before, + &dependency_cycles, + self.heartbeat_timeout_secs, + ); + if !dependencies.dependencies_ready { + warn!( + service = %service.name, + blockers = ?dependencies.blockers, + "skipping service reconciliation until dependencies are ready" + ); + continue; + } self.reconcile_service(&mut client, &nodes, service).await?; } @@ -159,6 +194,25 @@ impl Scheduler { ) .await?; + let instances_after = self.load_instances_by_service(&mut client).await?; + let publications_after = publish::load_publication_states( + &mut client, + &self.cluster_namespace, + &self.cluster_id, + ) + .await?; + self.persist_service_statuses( + &mut client, + &nodes, + &services, + &instances_after, + &publications_after, + &dependency_cycles, + ) + .await?; + self.cleanup_stale_service_statuses(&mut client, &services) + .await?; + Ok(()) } @@ -200,6 +254,112 @@ impl Scheduler { Ok(services) } + async fn load_instances_by_service( + &self, + client: &mut Client, + ) -> Result>> { + let prefix = format!( + "{}/clusters/{}/instances/", + self.cluster_namespace, self.cluster_id + ); + let kvs = client.get_prefix(prefix.as_bytes()).await?; + let mut instances = HashMap::>::new(); + + for (_key, value) in kvs { + match serde_json::from_slice::(&value) { + Ok(instance) => { + instances + .entry(instance.service.clone()) + .or_default() + .push(instance); + } + Err(error) => warn!(error = %error, "failed to decode service instance"), + } + } + + for service_instances in instances.values_mut() { + service_instances.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id)); + } + + Ok(instances) + } + + async fn persist_service_statuses( + &self, + client: &mut Client, + nodes: &[ClusterNodeRecord], + services: &[ServiceSpec], + instances_by_service: &HashMap>, + publications: &HashMap, + dependency_cycles: &HashSet, + ) -> Result<()> { + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); + + for service in services { + let status = build_service_status( + service, + nodes, + &services_by_name, + instances_by_service, + publications, + dependency_cycles, + self.heartbeat_timeout_secs, + ); + let key = service_status_key(&self.cluster_namespace, &self.cluster_id, &service.name); + + if self.dry_run { + info!( + service = %service.name, + phase = %status.phase, + healthy_instances = status.healthy_instances, + desired_instances = status.desired_instances, + blockers = ?status.blockers, + "would update service status" + ); + } else { + client.put(&key, serde_json::to_vec(&status)?).await?; + } + } + + Ok(()) + } + + async fn cleanup_stale_service_statuses( + &self, + client: &mut Client, + services: &[ServiceSpec], + ) -> Result<()> { + let desired = services + .iter() + .map(|service| service.name.as_str()) + .collect::>(); + let prefix = service_status_prefix(&self.cluster_namespace, &self.cluster_id); + let existing = client.get_prefix(prefix.as_bytes()).await?; + + for (key, _) in existing { + let key_str = String::from_utf8_lossy(&key); + let Some(service_name) = key_str.strip_prefix(&prefix) else { + continue; + }; + if service_name.is_empty() + || service_name.contains('/') + || desired.contains(service_name) + { + continue; + } + if self.dry_run { + info!(service = %service_name, "would delete stale service status"); + } else if client.delete(&key).await? { + info!(service = %service_name, "deleted stale service status"); + } + } + + Ok(()) + } + async fn reconcile_service( &self, client: &mut Client, @@ -222,7 +382,8 @@ impl Scheduler { let existing_instances = decode_managed_instances(&existing); let desired_instances = build_desired_instances(service, &eligible_nodes, &existing_instances)?; - let target_instances = schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas); + let target_instances = + schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas); if desired_instances.len() < target_instances { warn!( service = %service.name, @@ -341,6 +502,284 @@ impl Scheduler { } } +fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String { + format!( + "{}/clusters/{}/service-statuses/", + cluster_namespace, cluster_id + ) +} + +fn service_status_key(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec { + format!( + "{}{}", + service_status_prefix(cluster_namespace, cluster_id), + service + ) + .into_bytes() +} + +fn dependency_cycle_services(services: &[ServiceSpec]) -> HashSet { + let service_names = services + .iter() + .map(|service| service.name.clone()) + .collect::>(); + let dependencies = services + .iter() + .map(|service| { + let deps = service + .depends_on + .iter() + .filter(|dependency| service_names.contains(&dependency.service)) + .map(|dependency| dependency.service.clone()) + .collect::>(); + (service.name.clone(), deps) + }) + .collect::>(); + + let mut permanent = HashSet::new(); + let mut visiting = Vec::::new(); + let mut cycles = HashSet::new(); + + for service in services { + visit_dependency_cycles( + &service.name, + &dependencies, + &mut permanent, + &mut visiting, + &mut cycles, + ); + } + + cycles +} + +fn visit_dependency_cycles( + service: &str, + dependencies: &HashMap>, + permanent: &mut HashSet, + visiting: &mut Vec, + cycles: &mut HashSet, +) { + if permanent.contains(service) { + return; + } + + if let Some(position) = visiting.iter().position(|current| current == service) { + cycles.extend(visiting[position..].iter().cloned()); + return; + } + + visiting.push(service.to_string()); + if let Some(depends_on) = dependencies.get(service) { + for dependency in depends_on { + visit_dependency_cycles(dependency, dependencies, permanent, visiting, cycles); + } + } + visiting.pop(); + permanent.insert(service.to_string()); +} + +fn dependency_summary( + service: &ServiceSpec, + services_by_name: &HashMap<&str, &ServiceSpec>, + instances_by_service: &HashMap>, + publications: &HashMap, + dependency_cycles: &HashSet, + heartbeat_timeout_secs: u64, +) -> DependencySummary { + let mut blockers = Vec::new(); + + if dependency_cycles.contains(&service.name) { + blockers.push("cyclic dependency graph".to_string()); + } + + for dependency in &service.depends_on { + if dependency.service == service.name { + blockers.push(format!( + "dependency {} points to itself", + dependency.service + )); + continue; + } + + if !services_by_name.contains_key(dependency.service.as_str()) { + blockers.push(format!("dependency {} is not defined", dependency.service)); + continue; + } + + if dependency_cycles.contains(&dependency.service) { + blockers.push(format!( + "dependency {} is part of a dependency cycle", + dependency.service + )); + continue; + } + + match dependency.condition { + ServiceDependencyCondition::Healthy => { + let ready = instances_by_service + .get(&dependency.service) + .map(|instances| { + instances + .iter() + .filter(|instance| { + instance_is_available(instance, heartbeat_timeout_secs) + }) + .count() as u32 + }) + .unwrap_or(0); + let min_ready = dependency.min_ready.max(1); + if ready < min_ready { + blockers.push(format!( + "dependency {} has {ready}/{min_ready} healthy instance(s)", + dependency.service + )); + } + } + ServiceDependencyCondition::Published => { + let ready = publications + .get(&dependency.service) + .map(publication_ready) + .unwrap_or(false); + if !ready { + blockers.push(format!( + "dependency {} is not published", + dependency.service + )); + } + } + } + } + + DependencySummary { + dependencies_ready: blockers.is_empty(), + blockers, + } +} + +fn publication_ready(state: &ServicePublicationState) -> bool { + state + .dns + .as_ref() + .map(|dns| !dns.values.is_empty()) + .unwrap_or(false) + || state + .load_balancer + .as_ref() + .and_then(|load_balancer| load_balancer.vip_address.as_ref()) + .is_some() +} + +fn desired_instance_count( + service: &ServiceSpec, + nodes: &[ClusterNodeRecord], + heartbeat_timeout_secs: u64, +) -> u32 { + let Some(schedule) = service.schedule.as_ref() else { + return 0; + }; + match schedule.mode { + ServiceScheduleMode::Replicated => schedule.replicas, + ServiceScheduleMode::Daemon => { + eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32 + } + } +} + +fn build_service_status( + service: &ServiceSpec, + nodes: &[ClusterNodeRecord], + services_by_name: &HashMap<&str, &ServiceSpec>, + instances_by_service: &HashMap>, + publications: &HashMap, + dependency_cycles: &HashSet, + heartbeat_timeout_secs: u64, +) -> ServiceStatusRecord { + let instances = instances_by_service + .get(&service.name) + .map(Vec::as_slice) + .unwrap_or(&[]); + let healthy_instances = instances + .iter() + .filter(|instance| instance_is_available(instance, heartbeat_timeout_secs)) + .count() as u32; + let scheduled_instances = instances.len() as u32; + let desired_instances = desired_instance_count(service, nodes, heartbeat_timeout_secs); + let publish_ready = publications + .get(&service.name) + .map(publication_ready) + .unwrap_or(false); + let dependencies = dependency_summary( + service, + services_by_name, + instances_by_service, + publications, + dependency_cycles, + heartbeat_timeout_secs, + ); + let eligible_node_count = service + .schedule + .as_ref() + .map(|schedule| eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len()) + .unwrap_or(0); + + let (phase, message) = if service.schedule.is_none() { + ( + "unmanaged".to_string(), + Some("service is not managed by fleet-scheduler".to_string()), + ) + } else if !dependencies.dependencies_ready { + ( + "blocked".to_string(), + Some(dependencies.blockers.join("; ")), + ) + } else if eligible_node_count == 0 { + ( + "unschedulable".to_string(), + Some("no eligible nodes match the service placement policy".to_string()), + ) + } else if desired_instances == 0 && scheduled_instances == 0 { + ( + "idle".to_string(), + Some("service has no desired instances".to_string()), + ) + } else if desired_instances > 0 && healthy_instances >= desired_instances { + ( + "healthy".to_string(), + Some(format!( + "healthy instances satisfy desired count ({healthy_instances}/{desired_instances})" + )), + ) + } else if scheduled_instances > 0 || healthy_instances > 0 { + ( + "degraded".to_string(), + Some(format!( + "healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}" + )), + ) + } else { + ( + "pending".to_string(), + Some(format!( + "waiting for instances to reach desired count ({healthy_instances}/{desired_instances})" + )), + ) + }; + + ServiceStatusRecord { + service: service.name.clone(), + phase, + desired_instances, + scheduled_instances, + healthy_instances, + publish_ready, + dependencies_ready: dependencies.dependencies_ready, + blockers: dependencies.blockers, + message, + observed_at: Some(Utc::now()), + } +} + fn eligible_nodes<'a>( nodes: &'a [ClusterNodeRecord], placement: &PlacementPolicy, @@ -853,7 +1292,10 @@ fn instance_is_available(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: && instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs) } -fn instance_has_fresh_heartbeat(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool { +fn instance_has_fresh_heartbeat( + instance: &ServiceInstanceSpec, + heartbeat_timeout_secs: u64, +) -> bool { if heartbeat_timeout_secs == 0 { return true; } @@ -961,8 +1403,9 @@ mod tests { use super::*; use chrono::Duration as ChronoDuration; use deployer_types::{ - ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec, RolloutStrategySpec, - ServicePorts, ServiceScheduleMode, ServiceScheduleSpec, + ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec, + PublishedLoadBalancerState, RolloutStrategySpec, ServiceDependencyCondition, + ServiceDependencySpec, ServicePorts, ServiceScheduleMode, ServiceScheduleSpec, }; fn active_node(node_id: &str, roles: &[&str], labels: &[(&str, &str)]) -> ClusterNodeRecord { @@ -1003,6 +1446,7 @@ mod tests { protocol: Some("http".to_string()), mtls_required: None, mesh_mode: None, + depends_on: Vec::new(), schedule: Some(ServiceScheduleSpec { mode: ServiceScheduleMode::Replicated, replicas: 2, @@ -1036,6 +1480,31 @@ mod tests { } } + fn service_named(name: &str) -> ServiceSpec { + let mut service = scheduled_service(); + service.name = name.to_string(); + service + } + + fn healthy_instance(service: &str, node_id: &str) -> ServiceInstanceSpec { + ServiceInstanceSpec { + instance_id: format!("{service}-{node_id}"), + service: service.to_string(), + node_id: node_id.to_string(), + ip: "10.0.0.10".to_string(), + port: 8080, + mesh_port: Some(18080), + version: None, + health_check: None, + process: None, + container: None, + managed_by: Some(MANAGED_BY.to_string()), + state: Some("healthy".to_string()), + last_heartbeat: Some(Utc::now()), + observed_at: None, + } + } + #[test] fn test_node_eligibility_matches_roles_and_labels() { let node = active_node("node01", &["worker"], &[("tier", "general")]); @@ -1083,6 +1552,126 @@ mod tests { assert_eq!(desired[0].process.as_ref().unwrap().command, "/usr/bin/api"); } + #[test] + fn test_dependency_cycles_detect_all_services_in_cycle() { + let mut api = service_named("api"); + api.depends_on = vec![ServiceDependencySpec { + service: "worker".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 1, + }]; + + let mut worker = service_named("worker"); + worker.depends_on = vec![ServiceDependencySpec { + service: "edge".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 1, + }]; + + let mut edge = service_named("edge"); + edge.depends_on = vec![ServiceDependencySpec { + service: "api".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 1, + }]; + + let cycles = dependency_cycle_services(&[api, worker, edge]); + assert_eq!( + cycles, + HashSet::from(["api".to_string(), "worker".to_string(), "edge".to_string(),]) + ); + } + + #[test] + fn test_dependency_summary_blocks_until_dependency_is_healthy() { + let api = service_named("api"); + let mut worker = service_named("worker"); + worker.depends_on = vec![ServiceDependencySpec { + service: "api".to_string(), + condition: ServiceDependencyCondition::Healthy, + min_ready: 2, + }]; + + let services = vec![api.clone(), worker.clone()]; + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); + let instances_by_service = HashMap::from([( + "api".to_string(), + vec![ + healthy_instance("api", "node01"), + ServiceInstanceSpec { + state: Some("starting".to_string()), + ..healthy_instance("api", "node02") + }, + ], + )]); + + let summary = dependency_summary( + &worker, + &services_by_name, + &instances_by_service, + &HashMap::new(), + &HashSet::new(), + 300, + ); + + assert!(!summary.dependencies_ready); + assert_eq!( + summary.blockers, + vec!["dependency api has 1/2 healthy instance(s)".to_string()] + ); + } + + #[test] + fn test_build_service_status_recognizes_published_dependency() { + let api = service_named("api"); + let mut worker = service_named("worker"); + worker.depends_on = vec![ServiceDependencySpec { + service: "api".to_string(), + condition: ServiceDependencyCondition::Published, + min_ready: 1, + }]; + + let services = vec![api.clone(), worker.clone()]; + let services_by_name = services + .iter() + .map(|service| (service.name.as_str(), service)) + .collect::>(); + let publications = HashMap::from([( + "api".to_string(), + ServicePublicationState { + service: "api".to_string(), + org_id: "default-org".to_string(), + project_id: "default-project".to_string(), + load_balancer: Some(PublishedLoadBalancerState { + id: "lb-1".to_string(), + pool_id: "pool-1".to_string(), + listener_id: "listener-1".to_string(), + vip_address: Some("203.0.113.10".to_string()), + }), + dns: None, + observed_at: Some(Utc::now()), + }, + )]); + let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])]; + + let status = build_service_status( + &worker, + &nodes, + &services_by_name, + &HashMap::new(), + &publications, + &HashSet::new(), + 300, + ); + + assert_eq!(status.phase, "pending"); + assert!(status.dependencies_ready); + assert!(status.blockers.is_empty()); + } + #[test] fn test_build_desired_instances_honors_max_instances_per_node() { let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])]; diff --git a/deployer/crates/fleet-scheduler/src/publish.rs b/deployer/crates/fleet-scheduler/src/publish.rs index bdec3e5..58c461f 100644 --- a/deployer/crates/fleet-scheduler/src/publish.rs +++ b/deployer/crates/fleet-scheduler/src/publish.rs @@ -1129,7 +1129,7 @@ fn publication_key(cluster_namespace: &str, cluster_id: &str, service: &str) -> .into_bytes() } -async fn load_publication_states( +pub(crate) async fn load_publication_states( client: &mut Client, cluster_namespace: &str, cluster_id: &str, @@ -1194,6 +1194,7 @@ mod tests { protocol: Some("http".to_string()), mtls_required: None, mesh_mode: None, + depends_on: Vec::new(), schedule: Some(ServiceScheduleSpec::default()), publish: Some(ServicePublicationSpec { org_id: Some("default-org".to_string()), diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh index aa57301..ac7ebf8 100755 --- a/deployer/scripts/verify-fleet-scheduler-e2e.sh +++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh @@ -212,6 +212,41 @@ services: path: / interval_secs: 1 timeout_secs: 2 + - name: worker + ports: + http: 18081 + protocol: http + depends_on: + - service: api + condition: healthy + min_ready: 2 + schedule: + replicas: 2 + placement: + roles: + - worker + pools: + - general + node_classes: + - worker-linux + match_labels: + tier: general + spread_by_label: failure_domain + max_instances_per_node: 1 + instance_port: 18081 + process: + command: python3 + args: + - -m + - http.server + - ${INSTANCE_PORT} + - --bind + - ${INSTANCE_IP} + health_check: + type: http + path: / + interval_secs: 1 + timeout_secs: 2 EOF cat >"$tmp_dir/cluster-scaled.yaml" <<'EOF' @@ -283,6 +318,41 @@ services: path: / interval_secs: 1 timeout_secs: 2 + - name: worker + ports: + http: 18081 + protocol: http + depends_on: + - service: api + condition: healthy + min_ready: 1 + schedule: + replicas: 1 + placement: + roles: + - worker + pools: + - general + node_classes: + - worker-linux + match_labels: + tier: general + spread_by_label: failure_domain + max_instances_per_node: 1 + instance_port: 18081 + process: + command: python3 + args: + - -m + - http.server + - ${INSTANCE_PORT} + - --bind + - ${INSTANCE_IP} + health_check: + type: http + path: / + interval_secs: 1 + timeout_secs: 2 EOF endpoint="http://127.0.0.1:${api_port}" @@ -326,7 +396,59 @@ run_node_agent_once node02 echo "Scheduling managed instances" run_scheduler_once -echo "Reconciling processes and health" +echo "Validating dependency block before api is healthy" +run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-blocked.dump" +python3 - "$tmp_dir/worker-blocked.dump" <<'PY' +import sys + +path = sys.argv[1] +lines = [line.strip() for line in open(path, "r", encoding="utf-8") if line.strip()] +if lines: + raise SystemExit(f"expected no worker instances before api is healthy, found {len(lines)}") +print("worker instances correctly blocked before dependency becomes healthy") +PY + +run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/service-statuses/worker" >"$tmp_dir/worker-status-blocked.dump" +python3 - "$tmp_dir/worker-status-blocked.dump" <<'PY' +import json +import sys + +path = sys.argv[1] +statuses = [] +with open(path, "r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + marker = " value=" + if marker not in line: + continue + statuses.append(json.loads(line.split(marker, 1)[1])) + +if len(statuses) != 1: + raise SystemExit(f"expected exactly one worker service status, found {len(statuses)}") + +status = statuses[0] +if status.get("phase") != "blocked": + raise SystemExit(f"expected worker phase=blocked, found {status.get('phase')}") +blockers = status.get("blockers") or [] +if not blockers or "dependency api has 0/2 healthy instance(s)" not in blockers[0]: + raise SystemExit(f"unexpected blockers: {blockers}") + +print("worker service status reports dependency block") +PY + +echo "Reconciling processes and health for api" +for _ in 1 2 3; do + run_node_agent_once node01 + run_node_agent_once node02 + sleep 1 +done + +echo "Re-running scheduler after api became healthy" +run_scheduler_once + +echo "Reconciling processes and health for dependent worker service" for _ in 1 2 3; do run_node_agent_once node01 run_node_agent_once node02 @@ -337,7 +459,12 @@ echo "Validating HTTP endpoints" python3 - <<'PY' import urllib.request -for address in ("http://127.0.0.2:18080/", "http://127.0.0.3:18080/"): +for address in ( + "http://127.0.0.2:18080/", + "http://127.0.0.3:18080/", + "http://127.0.0.2:18081/", + "http://127.0.0.3:18081/", +): with urllib.request.urlopen(address, timeout=5) as response: body = response.read().decode("utf-8") if response.status != 200: @@ -381,13 +508,56 @@ if states != ["healthy", "healthy"]: print("Observed two healthy scheduled instances across node01 and node02") PY +run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances.dump" +python3 - "$tmp_dir/worker-instances.dump" <<'PY' +import json +import sys + +path = sys.argv[1] +instances = [] + +with open(path, "r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + marker = " value=" + if marker not in line: + continue + value = line.split(marker, 1)[1] + instances.append(json.loads(value)) + +if len(instances) != 2: + raise SystemExit(f"expected 2 worker instances, found {len(instances)}") + +node_ids = sorted(instance["node_id"] for instance in instances) +states = sorted(instance.get("state") for instance in instances) + +if node_ids != ["node01", "node02"]: + raise SystemExit(f"unexpected worker placement: {node_ids}") +if states != ["healthy", "healthy"]: + raise SystemExit(f"unexpected worker states: {states}") + +print("Observed two healthy dependent worker instances across node01 and node02") +PY + echo "Applying scaled declaration" run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune echo "Re-running scheduler after scale-down" run_scheduler_once -echo "Reconciling processes and health after scale-down" +echo "Reconciling api after scale-down" +for _ in 1 2 3; do + run_node_agent_once node01 + run_node_agent_once node02 + sleep 1 +done + +echo "Re-running scheduler after scaled api became healthy" +run_scheduler_once + +echo "Reconciling dependent worker service after scale-down" for _ in 1 2 3; do run_node_agent_once node01 run_node_agent_once node02 @@ -426,6 +596,37 @@ if instance.get("state") != "healthy": print("Observed one healthy scheduled instance on node01 after scale-down") PY +run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances-scaled.dump" +python3 - "$tmp_dir/worker-instances-scaled.dump" <<'PY' +import json +import sys + +path = sys.argv[1] +instances = [] + +with open(path, "r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + marker = " value=" + if marker not in line: + continue + value = line.split(marker, 1)[1] + instances.append(json.loads(value)) + +if len(instances) != 1: + raise SystemExit(f"expected 1 worker instance after scale-down, found {len(instances)}") + +instance = instances[0] +if instance["node_id"] != "node01": + raise SystemExit(f"expected remaining worker instance on node01, found {instance['node_id']}") +if instance.get("state") != "healthy": + raise SystemExit(f"expected remaining worker instance to be healthy, found {instance.get('state')}") + +print("Observed one healthy dependent worker instance on node01 after scale-down") +PY + echo "Validating endpoint convergence after scale-down" python3 - <<'PY' import socket @@ -434,17 +635,21 @@ import urllib.request with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response: if response.status != 200: raise SystemExit(f"node01 endpoint returned {response.status}") +with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response: + if response.status != 200: + raise SystemExit(f"node01 worker endpoint returned {response.status}") -sock = socket.socket() -sock.settimeout(1.5) -try: - sock.connect(("127.0.0.3", 18080)) -except OSError: - pass -else: - raise SystemExit("node02 endpoint still accepts connections after scale-down") -finally: - sock.close() +for port, label in ((18080, "api"), (18081, "worker")): + sock = socket.socket() + sock.settimeout(1.5) + try: + sock.connect(("127.0.0.3", port)) + except OSError: + pass + else: + raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down") + finally: + sock.close() print("Endpoint convergence validated") PY