From 7450832bffdf5f065ca30301bc6d1bb99f0287dd Mon Sep 17 00:00:00 2001
From: centra
Date: Wed, 1 Apr 2026 22:22:18 +0900
Subject: [PATCH] Add dependency-aware fleet scheduling
---
deployer/crates/deployer-types/src/lib.rs | 65 ++
deployer/crates/fleet-scheduler/src/main.rs | 599 +++++++++++++++++-
.../crates/fleet-scheduler/src/publish.rs | 3 +-
.../scripts/verify-fleet-scheduler-e2e.sh | 231 ++++++-
4 files changed, 879 insertions(+), 19 deletions(-)
diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs
index 3fafaa8..20cbdcf 100644
--- a/deployer/crates/deployer-types/src/lib.rs
+++ b/deployer/crates/deployer-types/src/lib.rs
@@ -319,6 +319,14 @@ fn default_dns_ttl() -> u32 {
30
}
+fn default_service_dependency_condition() -> ServiceDependencyCondition {
+ ServiceDependencyCondition::Healthy
+}
+
+fn default_service_dependency_min_ready() -> u32 {
+ 1
+}
+
/// Process specification executed by node-agent for a scheduled instance.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ProcessSpec {
@@ -542,6 +550,37 @@ pub struct ServicePublicationSpec {
pub load_balancer: Option,
}
+/// Readiness condition required from another service before this service can reconcile.
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
+#[serde(rename_all = "snake_case")]
+pub enum ServiceDependencyCondition {
+ /// Require a minimum number of healthy dependency instances.
+ #[default]
+ Healthy,
+ /// Require the dependency to publish at least one DNS or load-balancer artifact.
+ Published,
+}
+
+/// Dependency edge between scheduler-managed services.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct ServiceDependencySpec {
+ pub service: String,
+ #[serde(default = "default_service_dependency_condition")]
+ pub condition: ServiceDependencyCondition,
+ #[serde(default = "default_service_dependency_min_ready")]
+ pub min_ready: u32,
+}
+
+impl Default for ServiceDependencySpec {
+ fn default() -> Self {
+ Self {
+ service: String::new(),
+ condition: default_service_dependency_condition(),
+ min_ready: default_service_dependency_min_ready(),
+ }
+ }
+}
+
/// Cluster node record stored under photoncloud/clusters/{cluster_id}/nodes/{node_id}.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ClusterNodeRecord {
@@ -849,6 +888,8 @@ pub struct ServiceSpec {
#[serde(default)]
pub mesh_mode: Option,
#[serde(default)]
+ pub depends_on: Vec,
+ #[serde(default)]
pub schedule: Option,
#[serde(default)]
pub publish: Option,
@@ -915,6 +956,30 @@ pub struct ServicePublicationState {
pub observed_at: Option>,
}
+/// Scheduler-observed readiness and dependency state for a logical service.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
+pub struct ServiceStatusRecord {
+ pub service: String,
+ #[serde(default)]
+ pub phase: String,
+ #[serde(default)]
+ pub desired_instances: u32,
+ #[serde(default)]
+ pub scheduled_instances: u32,
+ #[serde(default)]
+ pub healthy_instances: u32,
+ #[serde(default)]
+ pub publish_ready: bool,
+ #[serde(default)]
+ pub dependencies_ready: bool,
+ #[serde(default)]
+ pub blockers: Vec,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub message: Option,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub observed_at: Option>,
+}
+
/// mTLS policy definition.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MtlsPolicySpec {
diff --git a/deployer/crates/fleet-scheduler/src/main.rs b/deployer/crates/fleet-scheduler/src/main.rs
index c7d9b74..10e920b 100644
--- a/deployer/crates/fleet-scheduler/src/main.rs
+++ b/deployer/crates/fleet-scheduler/src/main.rs
@@ -6,7 +6,8 @@ use chainfire_client::Client;
use chrono::Utc;
use clap::Parser;
use deployer_types::{
- ClusterNodeRecord, PlacementPolicy, ServiceInstanceSpec, ServiceScheduleMode, ServiceSpec,
+ ClusterNodeRecord, PlacementPolicy, ServiceDependencyCondition, ServiceInstanceSpec,
+ ServicePublicationState, ServiceScheduleMode, ServiceSpec, ServiceStatusRecord,
};
use publish::{PublicationConfig, PublicationReconciler};
use serde_json::Value;
@@ -90,6 +91,12 @@ struct ReconcilePlan {
deferred_deletes: usize,
}
+#[derive(Debug, Default)]
+struct DependencySummary {
+ dependencies_ready: bool,
+ blockers: Vec,
+}
+
impl Scheduler {
fn new(cli: Cli) -> Self {
let cluster_namespace = cli.cluster_namespace;
@@ -135,6 +142,18 @@ impl Scheduler {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_cluster_nodes(&mut client).await?;
let services = self.load_services(&mut client).await?;
+ let instances_before = self.load_instances_by_service(&mut client).await?;
+ let publications_before = publish::load_publication_states(
+ &mut client,
+ &self.cluster_namespace,
+ &self.cluster_id,
+ )
+ .await?;
+ let dependency_cycles = dependency_cycle_services(&services);
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
debug!(
nodes = nodes.len(),
@@ -146,6 +165,22 @@ impl Scheduler {
if service.schedule.is_none() {
continue;
}
+ let dependencies = dependency_summary(
+ service,
+ &services_by_name,
+ &instances_before,
+ &publications_before,
+ &dependency_cycles,
+ self.heartbeat_timeout_secs,
+ );
+ if !dependencies.dependencies_ready {
+ warn!(
+ service = %service.name,
+ blockers = ?dependencies.blockers,
+ "skipping service reconciliation until dependencies are ready"
+ );
+ continue;
+ }
self.reconcile_service(&mut client, &nodes, service).await?;
}
@@ -159,6 +194,25 @@ impl Scheduler {
)
.await?;
+ let instances_after = self.load_instances_by_service(&mut client).await?;
+ let publications_after = publish::load_publication_states(
+ &mut client,
+ &self.cluster_namespace,
+ &self.cluster_id,
+ )
+ .await?;
+ self.persist_service_statuses(
+ &mut client,
+ &nodes,
+ &services,
+ &instances_after,
+ &publications_after,
+ &dependency_cycles,
+ )
+ .await?;
+ self.cleanup_stale_service_statuses(&mut client, &services)
+ .await?;
+
Ok(())
}
@@ -200,6 +254,112 @@ impl Scheduler {
Ok(services)
}
+ async fn load_instances_by_service(
+ &self,
+ client: &mut Client,
+ ) -> Result>> {
+ let prefix = format!(
+ "{}/clusters/{}/instances/",
+ self.cluster_namespace, self.cluster_id
+ );
+ let kvs = client.get_prefix(prefix.as_bytes()).await?;
+ let mut instances = HashMap::>::new();
+
+ for (_key, value) in kvs {
+ match serde_json::from_slice::(&value) {
+ Ok(instance) => {
+ instances
+ .entry(instance.service.clone())
+ .or_default()
+ .push(instance);
+ }
+ Err(error) => warn!(error = %error, "failed to decode service instance"),
+ }
+ }
+
+ for service_instances in instances.values_mut() {
+ service_instances.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id));
+ }
+
+ Ok(instances)
+ }
+
+ async fn persist_service_statuses(
+ &self,
+ client: &mut Client,
+ nodes: &[ClusterNodeRecord],
+ services: &[ServiceSpec],
+ instances_by_service: &HashMap>,
+ publications: &HashMap,
+ dependency_cycles: &HashSet,
+ ) -> Result<()> {
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
+
+ for service in services {
+ let status = build_service_status(
+ service,
+ nodes,
+ &services_by_name,
+ instances_by_service,
+ publications,
+ dependency_cycles,
+ self.heartbeat_timeout_secs,
+ );
+ let key = service_status_key(&self.cluster_namespace, &self.cluster_id, &service.name);
+
+ if self.dry_run {
+ info!(
+ service = %service.name,
+ phase = %status.phase,
+ healthy_instances = status.healthy_instances,
+ desired_instances = status.desired_instances,
+ blockers = ?status.blockers,
+ "would update service status"
+ );
+ } else {
+ client.put(&key, serde_json::to_vec(&status)?).await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn cleanup_stale_service_statuses(
+ &self,
+ client: &mut Client,
+ services: &[ServiceSpec],
+ ) -> Result<()> {
+ let desired = services
+ .iter()
+ .map(|service| service.name.as_str())
+ .collect::>();
+ let prefix = service_status_prefix(&self.cluster_namespace, &self.cluster_id);
+ let existing = client.get_prefix(prefix.as_bytes()).await?;
+
+ for (key, _) in existing {
+ let key_str = String::from_utf8_lossy(&key);
+ let Some(service_name) = key_str.strip_prefix(&prefix) else {
+ continue;
+ };
+ if service_name.is_empty()
+ || service_name.contains('/')
+ || desired.contains(service_name)
+ {
+ continue;
+ }
+ if self.dry_run {
+ info!(service = %service_name, "would delete stale service status");
+ } else if client.delete(&key).await? {
+ info!(service = %service_name, "deleted stale service status");
+ }
+ }
+
+ Ok(())
+ }
+
async fn reconcile_service(
&self,
client: &mut Client,
@@ -222,7 +382,8 @@ impl Scheduler {
let existing_instances = decode_managed_instances(&existing);
let desired_instances =
build_desired_instances(service, &eligible_nodes, &existing_instances)?;
- let target_instances = schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas);
+ let target_instances =
+ schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas);
if desired_instances.len() < target_instances {
warn!(
service = %service.name,
@@ -341,6 +502,284 @@ impl Scheduler {
}
}
+fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
+ format!(
+ "{}/clusters/{}/service-statuses/",
+ cluster_namespace, cluster_id
+ )
+}
+
+fn service_status_key(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec {
+ format!(
+ "{}{}",
+ service_status_prefix(cluster_namespace, cluster_id),
+ service
+ )
+ .into_bytes()
+}
+
+fn dependency_cycle_services(services: &[ServiceSpec]) -> HashSet {
+ let service_names = services
+ .iter()
+ .map(|service| service.name.clone())
+ .collect::>();
+ let dependencies = services
+ .iter()
+ .map(|service| {
+ let deps = service
+ .depends_on
+ .iter()
+ .filter(|dependency| service_names.contains(&dependency.service))
+ .map(|dependency| dependency.service.clone())
+ .collect::>();
+ (service.name.clone(), deps)
+ })
+ .collect::>();
+
+ let mut permanent = HashSet::new();
+ let mut visiting = Vec::::new();
+ let mut cycles = HashSet::new();
+
+ for service in services {
+ visit_dependency_cycles(
+ &service.name,
+ &dependencies,
+ &mut permanent,
+ &mut visiting,
+ &mut cycles,
+ );
+ }
+
+ cycles
+}
+
+fn visit_dependency_cycles(
+ service: &str,
+ dependencies: &HashMap>,
+ permanent: &mut HashSet,
+ visiting: &mut Vec,
+ cycles: &mut HashSet,
+) {
+ if permanent.contains(service) {
+ return;
+ }
+
+ if let Some(position) = visiting.iter().position(|current| current == service) {
+ cycles.extend(visiting[position..].iter().cloned());
+ return;
+ }
+
+ visiting.push(service.to_string());
+ if let Some(depends_on) = dependencies.get(service) {
+ for dependency in depends_on {
+ visit_dependency_cycles(dependency, dependencies, permanent, visiting, cycles);
+ }
+ }
+ visiting.pop();
+ permanent.insert(service.to_string());
+}
+
+fn dependency_summary(
+ service: &ServiceSpec,
+ services_by_name: &HashMap<&str, &ServiceSpec>,
+ instances_by_service: &HashMap>,
+ publications: &HashMap,
+ dependency_cycles: &HashSet,
+ heartbeat_timeout_secs: u64,
+) -> DependencySummary {
+ let mut blockers = Vec::new();
+
+ if dependency_cycles.contains(&service.name) {
+ blockers.push("cyclic dependency graph".to_string());
+ }
+
+ for dependency in &service.depends_on {
+ if dependency.service == service.name {
+ blockers.push(format!(
+ "dependency {} points to itself",
+ dependency.service
+ ));
+ continue;
+ }
+
+ if !services_by_name.contains_key(dependency.service.as_str()) {
+ blockers.push(format!("dependency {} is not defined", dependency.service));
+ continue;
+ }
+
+ if dependency_cycles.contains(&dependency.service) {
+ blockers.push(format!(
+ "dependency {} is part of a dependency cycle",
+ dependency.service
+ ));
+ continue;
+ }
+
+ match dependency.condition {
+ ServiceDependencyCondition::Healthy => {
+ let ready = instances_by_service
+ .get(&dependency.service)
+ .map(|instances| {
+ instances
+ .iter()
+ .filter(|instance| {
+ instance_is_available(instance, heartbeat_timeout_secs)
+ })
+ .count() as u32
+ })
+ .unwrap_or(0);
+ let min_ready = dependency.min_ready.max(1);
+ if ready < min_ready {
+ blockers.push(format!(
+ "dependency {} has {ready}/{min_ready} healthy instance(s)",
+ dependency.service
+ ));
+ }
+ }
+ ServiceDependencyCondition::Published => {
+ let ready = publications
+ .get(&dependency.service)
+ .map(publication_ready)
+ .unwrap_or(false);
+ if !ready {
+ blockers.push(format!(
+ "dependency {} is not published",
+ dependency.service
+ ));
+ }
+ }
+ }
+ }
+
+ DependencySummary {
+ dependencies_ready: blockers.is_empty(),
+ blockers,
+ }
+}
+
+fn publication_ready(state: &ServicePublicationState) -> bool {
+ state
+ .dns
+ .as_ref()
+ .map(|dns| !dns.values.is_empty())
+ .unwrap_or(false)
+ || state
+ .load_balancer
+ .as_ref()
+ .and_then(|load_balancer| load_balancer.vip_address.as_ref())
+ .is_some()
+}
+
+fn desired_instance_count(
+ service: &ServiceSpec,
+ nodes: &[ClusterNodeRecord],
+ heartbeat_timeout_secs: u64,
+) -> u32 {
+ let Some(schedule) = service.schedule.as_ref() else {
+ return 0;
+ };
+ match schedule.mode {
+ ServiceScheduleMode::Replicated => schedule.replicas,
+ ServiceScheduleMode::Daemon => {
+ eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32
+ }
+ }
+}
+
+fn build_service_status(
+ service: &ServiceSpec,
+ nodes: &[ClusterNodeRecord],
+ services_by_name: &HashMap<&str, &ServiceSpec>,
+ instances_by_service: &HashMap>,
+ publications: &HashMap,
+ dependency_cycles: &HashSet,
+ heartbeat_timeout_secs: u64,
+) -> ServiceStatusRecord {
+ let instances = instances_by_service
+ .get(&service.name)
+ .map(Vec::as_slice)
+ .unwrap_or(&[]);
+ let healthy_instances = instances
+ .iter()
+ .filter(|instance| instance_is_available(instance, heartbeat_timeout_secs))
+ .count() as u32;
+ let scheduled_instances = instances.len() as u32;
+ let desired_instances = desired_instance_count(service, nodes, heartbeat_timeout_secs);
+ let publish_ready = publications
+ .get(&service.name)
+ .map(publication_ready)
+ .unwrap_or(false);
+ let dependencies = dependency_summary(
+ service,
+ services_by_name,
+ instances_by_service,
+ publications,
+ dependency_cycles,
+ heartbeat_timeout_secs,
+ );
+ let eligible_node_count = service
+ .schedule
+ .as_ref()
+ .map(|schedule| eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len())
+ .unwrap_or(0);
+
+ let (phase, message) = if service.schedule.is_none() {
+ (
+ "unmanaged".to_string(),
+ Some("service is not managed by fleet-scheduler".to_string()),
+ )
+ } else if !dependencies.dependencies_ready {
+ (
+ "blocked".to_string(),
+ Some(dependencies.blockers.join("; ")),
+ )
+ } else if eligible_node_count == 0 {
+ (
+ "unschedulable".to_string(),
+ Some("no eligible nodes match the service placement policy".to_string()),
+ )
+ } else if desired_instances == 0 && scheduled_instances == 0 {
+ (
+ "idle".to_string(),
+ Some("service has no desired instances".to_string()),
+ )
+ } else if desired_instances > 0 && healthy_instances >= desired_instances {
+ (
+ "healthy".to_string(),
+ Some(format!(
+ "healthy instances satisfy desired count ({healthy_instances}/{desired_instances})"
+ )),
+ )
+ } else if scheduled_instances > 0 || healthy_instances > 0 {
+ (
+ "degraded".to_string(),
+ Some(format!(
+ "healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}"
+ )),
+ )
+ } else {
+ (
+ "pending".to_string(),
+ Some(format!(
+ "waiting for instances to reach desired count ({healthy_instances}/{desired_instances})"
+ )),
+ )
+ };
+
+ ServiceStatusRecord {
+ service: service.name.clone(),
+ phase,
+ desired_instances,
+ scheduled_instances,
+ healthy_instances,
+ publish_ready,
+ dependencies_ready: dependencies.dependencies_ready,
+ blockers: dependencies.blockers,
+ message,
+ observed_at: Some(Utc::now()),
+ }
+}
+
fn eligible_nodes<'a>(
nodes: &'a [ClusterNodeRecord],
placement: &PlacementPolicy,
@@ -853,7 +1292,10 @@ fn instance_is_available(instance: &ServiceInstanceSpec, heartbeat_timeout_secs:
&& instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs)
}
-fn instance_has_fresh_heartbeat(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool {
+fn instance_has_fresh_heartbeat(
+ instance: &ServiceInstanceSpec,
+ heartbeat_timeout_secs: u64,
+) -> bool {
if heartbeat_timeout_secs == 0 {
return true;
}
@@ -961,8 +1403,9 @@ mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
use deployer_types::{
- ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec, RolloutStrategySpec,
- ServicePorts, ServiceScheduleMode, ServiceScheduleSpec,
+ ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec,
+ PublishedLoadBalancerState, RolloutStrategySpec, ServiceDependencyCondition,
+ ServiceDependencySpec, ServicePorts, ServiceScheduleMode, ServiceScheduleSpec,
};
fn active_node(node_id: &str, roles: &[&str], labels: &[(&str, &str)]) -> ClusterNodeRecord {
@@ -1003,6 +1446,7 @@ mod tests {
protocol: Some("http".to_string()),
mtls_required: None,
mesh_mode: None,
+ depends_on: Vec::new(),
schedule: Some(ServiceScheduleSpec {
mode: ServiceScheduleMode::Replicated,
replicas: 2,
@@ -1036,6 +1480,31 @@ mod tests {
}
}
+ fn service_named(name: &str) -> ServiceSpec {
+ let mut service = scheduled_service();
+ service.name = name.to_string();
+ service
+ }
+
+ fn healthy_instance(service: &str, node_id: &str) -> ServiceInstanceSpec {
+ ServiceInstanceSpec {
+ instance_id: format!("{service}-{node_id}"),
+ service: service.to_string(),
+ node_id: node_id.to_string(),
+ ip: "10.0.0.10".to_string(),
+ port: 8080,
+ mesh_port: Some(18080),
+ version: None,
+ health_check: None,
+ process: None,
+ container: None,
+ managed_by: Some(MANAGED_BY.to_string()),
+ state: Some("healthy".to_string()),
+ last_heartbeat: Some(Utc::now()),
+ observed_at: None,
+ }
+ }
+
#[test]
fn test_node_eligibility_matches_roles_and_labels() {
let node = active_node("node01", &["worker"], &[("tier", "general")]);
@@ -1083,6 +1552,126 @@ mod tests {
assert_eq!(desired[0].process.as_ref().unwrap().command, "/usr/bin/api");
}
+ #[test]
+ fn test_dependency_cycles_detect_all_services_in_cycle() {
+ let mut api = service_named("api");
+ api.depends_on = vec![ServiceDependencySpec {
+ service: "worker".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 1,
+ }];
+
+ let mut worker = service_named("worker");
+ worker.depends_on = vec![ServiceDependencySpec {
+ service: "edge".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 1,
+ }];
+
+ let mut edge = service_named("edge");
+ edge.depends_on = vec![ServiceDependencySpec {
+ service: "api".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 1,
+ }];
+
+ let cycles = dependency_cycle_services(&[api, worker, edge]);
+ assert_eq!(
+ cycles,
+ HashSet::from(["api".to_string(), "worker".to_string(), "edge".to_string(),])
+ );
+ }
+
+ #[test]
+ fn test_dependency_summary_blocks_until_dependency_is_healthy() {
+ let api = service_named("api");
+ let mut worker = service_named("worker");
+ worker.depends_on = vec![ServiceDependencySpec {
+ service: "api".to_string(),
+ condition: ServiceDependencyCondition::Healthy,
+ min_ready: 2,
+ }];
+
+ let services = vec![api.clone(), worker.clone()];
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
+ let instances_by_service = HashMap::from([(
+ "api".to_string(),
+ vec![
+ healthy_instance("api", "node01"),
+ ServiceInstanceSpec {
+ state: Some("starting".to_string()),
+ ..healthy_instance("api", "node02")
+ },
+ ],
+ )]);
+
+ let summary = dependency_summary(
+ &worker,
+ &services_by_name,
+ &instances_by_service,
+ &HashMap::new(),
+ &HashSet::new(),
+ 300,
+ );
+
+ assert!(!summary.dependencies_ready);
+ assert_eq!(
+ summary.blockers,
+ vec!["dependency api has 1/2 healthy instance(s)".to_string()]
+ );
+ }
+
+ #[test]
+ fn test_build_service_status_recognizes_published_dependency() {
+ let api = service_named("api");
+ let mut worker = service_named("worker");
+ worker.depends_on = vec![ServiceDependencySpec {
+ service: "api".to_string(),
+ condition: ServiceDependencyCondition::Published,
+ min_ready: 1,
+ }];
+
+ let services = vec![api.clone(), worker.clone()];
+ let services_by_name = services
+ .iter()
+ .map(|service| (service.name.as_str(), service))
+ .collect::>();
+ let publications = HashMap::from([(
+ "api".to_string(),
+ ServicePublicationState {
+ service: "api".to_string(),
+ org_id: "default-org".to_string(),
+ project_id: "default-project".to_string(),
+ load_balancer: Some(PublishedLoadBalancerState {
+ id: "lb-1".to_string(),
+ pool_id: "pool-1".to_string(),
+ listener_id: "listener-1".to_string(),
+ vip_address: Some("203.0.113.10".to_string()),
+ }),
+ dns: None,
+ observed_at: Some(Utc::now()),
+ },
+ )]);
+ let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])];
+
+ let status = build_service_status(
+ &worker,
+ &nodes,
+ &services_by_name,
+ &HashMap::new(),
+ &publications,
+ &HashSet::new(),
+ 300,
+ );
+
+ assert_eq!(status.phase, "pending");
+ assert!(status.dependencies_ready);
+ assert!(status.blockers.is_empty());
+ }
+
#[test]
fn test_build_desired_instances_honors_max_instances_per_node() {
let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])];
diff --git a/deployer/crates/fleet-scheduler/src/publish.rs b/deployer/crates/fleet-scheduler/src/publish.rs
index bdec3e5..58c461f 100644
--- a/deployer/crates/fleet-scheduler/src/publish.rs
+++ b/deployer/crates/fleet-scheduler/src/publish.rs
@@ -1129,7 +1129,7 @@ fn publication_key(cluster_namespace: &str, cluster_id: &str, service: &str) ->
.into_bytes()
}
-async fn load_publication_states(
+pub(crate) async fn load_publication_states(
client: &mut Client,
cluster_namespace: &str,
cluster_id: &str,
@@ -1194,6 +1194,7 @@ mod tests {
protocol: Some("http".to_string()),
mtls_required: None,
mesh_mode: None,
+ depends_on: Vec::new(),
schedule: Some(ServiceScheduleSpec::default()),
publish: Some(ServicePublicationSpec {
org_id: Some("default-org".to_string()),
diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh
index aa57301..ac7ebf8 100755
--- a/deployer/scripts/verify-fleet-scheduler-e2e.sh
+++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh
@@ -212,6 +212,41 @@ services:
path: /
interval_secs: 1
timeout_secs: 2
+ - name: worker
+ ports:
+ http: 18081
+ protocol: http
+ depends_on:
+ - service: api
+ condition: healthy
+ min_ready: 2
+ schedule:
+ replicas: 2
+ placement:
+ roles:
+ - worker
+ pools:
+ - general
+ node_classes:
+ - worker-linux
+ match_labels:
+ tier: general
+ spread_by_label: failure_domain
+ max_instances_per_node: 1
+ instance_port: 18081
+ process:
+ command: python3
+ args:
+ - -m
+ - http.server
+ - ${INSTANCE_PORT}
+ - --bind
+ - ${INSTANCE_IP}
+ health_check:
+ type: http
+ path: /
+ interval_secs: 1
+ timeout_secs: 2
EOF
cat >"$tmp_dir/cluster-scaled.yaml" <<'EOF'
@@ -283,6 +318,41 @@ services:
path: /
interval_secs: 1
timeout_secs: 2
+ - name: worker
+ ports:
+ http: 18081
+ protocol: http
+ depends_on:
+ - service: api
+ condition: healthy
+ min_ready: 1
+ schedule:
+ replicas: 1
+ placement:
+ roles:
+ - worker
+ pools:
+ - general
+ node_classes:
+ - worker-linux
+ match_labels:
+ tier: general
+ spread_by_label: failure_domain
+ max_instances_per_node: 1
+ instance_port: 18081
+ process:
+ command: python3
+ args:
+ - -m
+ - http.server
+ - ${INSTANCE_PORT}
+ - --bind
+ - ${INSTANCE_IP}
+ health_check:
+ type: http
+ path: /
+ interval_secs: 1
+ timeout_secs: 2
EOF
endpoint="http://127.0.0.1:${api_port}"
@@ -326,7 +396,59 @@ run_node_agent_once node02
echo "Scheduling managed instances"
run_scheduler_once
-echo "Reconciling processes and health"
+echo "Validating dependency block before api is healthy"
+run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-blocked.dump"
+python3 - "$tmp_dir/worker-blocked.dump" <<'PY'
+import sys
+
+path = sys.argv[1]
+lines = [line.strip() for line in open(path, "r", encoding="utf-8") if line.strip()]
+if lines:
+ raise SystemExit(f"expected no worker instances before api is healthy, found {len(lines)}")
+print("worker instances correctly blocked before dependency becomes healthy")
+PY
+
+run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/service-statuses/worker" >"$tmp_dir/worker-status-blocked.dump"
+python3 - "$tmp_dir/worker-status-blocked.dump" <<'PY'
+import json
+import sys
+
+path = sys.argv[1]
+statuses = []
+with open(path, "r", encoding="utf-8") as handle:
+ for line in handle:
+ line = line.strip()
+ if not line:
+ continue
+ marker = " value="
+ if marker not in line:
+ continue
+ statuses.append(json.loads(line.split(marker, 1)[1]))
+
+if len(statuses) != 1:
+ raise SystemExit(f"expected exactly one worker service status, found {len(statuses)}")
+
+status = statuses[0]
+if status.get("phase") != "blocked":
+ raise SystemExit(f"expected worker phase=blocked, found {status.get('phase')}")
+blockers = status.get("blockers") or []
+if not blockers or "dependency api has 0/2 healthy instance(s)" not in blockers[0]:
+ raise SystemExit(f"unexpected blockers: {blockers}")
+
+print("worker service status reports dependency block")
+PY
+
+echo "Reconciling processes and health for api"
+for _ in 1 2 3; do
+ run_node_agent_once node01
+ run_node_agent_once node02
+ sleep 1
+done
+
+echo "Re-running scheduler after api became healthy"
+run_scheduler_once
+
+echo "Reconciling processes and health for dependent worker service"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
@@ -337,7 +459,12 @@ echo "Validating HTTP endpoints"
python3 - <<'PY'
import urllib.request
-for address in ("http://127.0.0.2:18080/", "http://127.0.0.3:18080/"):
+for address in (
+ "http://127.0.0.2:18080/",
+ "http://127.0.0.3:18080/",
+ "http://127.0.0.2:18081/",
+ "http://127.0.0.3:18081/",
+):
with urllib.request.urlopen(address, timeout=5) as response:
body = response.read().decode("utf-8")
if response.status != 200:
@@ -381,13 +508,56 @@ if states != ["healthy", "healthy"]:
print("Observed two healthy scheduled instances across node01 and node02")
PY
+run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances.dump"
+python3 - "$tmp_dir/worker-instances.dump" <<'PY'
+import json
+import sys
+
+path = sys.argv[1]
+instances = []
+
+with open(path, "r", encoding="utf-8") as handle:
+ for line in handle:
+ line = line.strip()
+ if not line:
+ continue
+ marker = " value="
+ if marker not in line:
+ continue
+ value = line.split(marker, 1)[1]
+ instances.append(json.loads(value))
+
+if len(instances) != 2:
+ raise SystemExit(f"expected 2 worker instances, found {len(instances)}")
+
+node_ids = sorted(instance["node_id"] for instance in instances)
+states = sorted(instance.get("state") for instance in instances)
+
+if node_ids != ["node01", "node02"]:
+ raise SystemExit(f"unexpected worker placement: {node_ids}")
+if states != ["healthy", "healthy"]:
+ raise SystemExit(f"unexpected worker states: {states}")
+
+print("Observed two healthy dependent worker instances across node01 and node02")
+PY
+
echo "Applying scaled declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune
echo "Re-running scheduler after scale-down"
run_scheduler_once
-echo "Reconciling processes and health after scale-down"
+echo "Reconciling api after scale-down"
+for _ in 1 2 3; do
+ run_node_agent_once node01
+ run_node_agent_once node02
+ sleep 1
+done
+
+echo "Re-running scheduler after scaled api became healthy"
+run_scheduler_once
+
+echo "Reconciling dependent worker service after scale-down"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
@@ -426,6 +596,37 @@ if instance.get("state") != "healthy":
print("Observed one healthy scheduled instance on node01 after scale-down")
PY
+run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances-scaled.dump"
+python3 - "$tmp_dir/worker-instances-scaled.dump" <<'PY'
+import json
+import sys
+
+path = sys.argv[1]
+instances = []
+
+with open(path, "r", encoding="utf-8") as handle:
+ for line in handle:
+ line = line.strip()
+ if not line:
+ continue
+ marker = " value="
+ if marker not in line:
+ continue
+ value = line.split(marker, 1)[1]
+ instances.append(json.loads(value))
+
+if len(instances) != 1:
+ raise SystemExit(f"expected 1 worker instance after scale-down, found {len(instances)}")
+
+instance = instances[0]
+if instance["node_id"] != "node01":
+ raise SystemExit(f"expected remaining worker instance on node01, found {instance['node_id']}")
+if instance.get("state") != "healthy":
+ raise SystemExit(f"expected remaining worker instance to be healthy, found {instance.get('state')}")
+
+print("Observed one healthy dependent worker instance on node01 after scale-down")
+PY
+
echo "Validating endpoint convergence after scale-down"
python3 - <<'PY'
import socket
@@ -434,17 +635,21 @@ import urllib.request
with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 endpoint returned {response.status}")
+with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
+ if response.status != 200:
+ raise SystemExit(f"node01 worker endpoint returned {response.status}")
-sock = socket.socket()
-sock.settimeout(1.5)
-try:
- sock.connect(("127.0.0.3", 18080))
-except OSError:
- pass
-else:
- raise SystemExit("node02 endpoint still accepts connections after scale-down")
-finally:
- sock.close()
+for port, label in ((18080, "api"), (18081, "worker")):
+ sock = socket.socket()
+ sock.settimeout(1.5)
+ try:
+ sock.connect(("127.0.0.3", port))
+ except OSError:
+ pass
+ else:
+ raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
+ finally:
+ sock.close()
print("Endpoint convergence validated")
PY