Add dependency-aware fleet scheduling

This commit is contained in:
centra 2026-04-01 22:22:18 +09:00
parent b07bcb3772
commit 7450832bff
Signed by: centra
GPG key ID: 0C09689D20B25ACA
4 changed files with 879 additions and 19 deletions

View file

@ -319,6 +319,14 @@ fn default_dns_ttl() -> u32 {
30
}
fn default_service_dependency_condition() -> ServiceDependencyCondition {
ServiceDependencyCondition::Healthy
}
fn default_service_dependency_min_ready() -> u32 {
1
}
/// Process specification executed by node-agent for a scheduled instance.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ProcessSpec {
@ -542,6 +550,37 @@ pub struct ServicePublicationSpec {
pub load_balancer: Option<LoadBalancerPublicationSpec>,
}
/// Readiness condition required from another service before this service can reconcile.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ServiceDependencyCondition {
/// Require a minimum number of healthy dependency instances.
#[default]
Healthy,
/// Require the dependency to publish at least one DNS or load-balancer artifact.
Published,
}
/// Dependency edge between scheduler-managed services.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServiceDependencySpec {
pub service: String,
#[serde(default = "default_service_dependency_condition")]
pub condition: ServiceDependencyCondition,
#[serde(default = "default_service_dependency_min_ready")]
pub min_ready: u32,
}
impl Default for ServiceDependencySpec {
fn default() -> Self {
Self {
service: String::new(),
condition: default_service_dependency_condition(),
min_ready: default_service_dependency_min_ready(),
}
}
}
/// Cluster node record stored under photoncloud/clusters/{cluster_id}/nodes/{node_id}.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ClusterNodeRecord {
@ -849,6 +888,8 @@ pub struct ServiceSpec {
#[serde(default)]
pub mesh_mode: Option<String>,
#[serde(default)]
pub depends_on: Vec<ServiceDependencySpec>,
#[serde(default)]
pub schedule: Option<ServiceScheduleSpec>,
#[serde(default)]
pub publish: Option<ServicePublicationSpec>,
@ -915,6 +956,30 @@ pub struct ServicePublicationState {
pub observed_at: Option<DateTime<Utc>>,
}
/// Scheduler-observed readiness and dependency state for a logical service.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ServiceStatusRecord {
pub service: String,
#[serde(default)]
pub phase: String,
#[serde(default)]
pub desired_instances: u32,
#[serde(default)]
pub scheduled_instances: u32,
#[serde(default)]
pub healthy_instances: u32,
#[serde(default)]
pub publish_ready: bool,
#[serde(default)]
pub dependencies_ready: bool,
#[serde(default)]
pub blockers: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub observed_at: Option<DateTime<Utc>>,
}
/// mTLS policy definition.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MtlsPolicySpec {

View file

@ -6,7 +6,8 @@ use chainfire_client::Client;
use chrono::Utc;
use clap::Parser;
use deployer_types::{
ClusterNodeRecord, PlacementPolicy, ServiceInstanceSpec, ServiceScheduleMode, ServiceSpec,
ClusterNodeRecord, PlacementPolicy, ServiceDependencyCondition, ServiceInstanceSpec,
ServicePublicationState, ServiceScheduleMode, ServiceSpec, ServiceStatusRecord,
};
use publish::{PublicationConfig, PublicationReconciler};
use serde_json::Value;
@ -90,6 +91,12 @@ struct ReconcilePlan {
deferred_deletes: usize,
}
#[derive(Debug, Default)]
struct DependencySummary {
dependencies_ready: bool,
blockers: Vec<String>,
}
impl Scheduler {
fn new(cli: Cli) -> Self {
let cluster_namespace = cli.cluster_namespace;
@ -135,6 +142,18 @@ impl Scheduler {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_cluster_nodes(&mut client).await?;
let services = self.load_services(&mut client).await?;
let instances_before = self.load_instances_by_service(&mut client).await?;
let publications_before = publish::load_publication_states(
&mut client,
&self.cluster_namespace,
&self.cluster_id,
)
.await?;
let dependency_cycles = dependency_cycle_services(&services);
let services_by_name = services
.iter()
.map(|service| (service.name.as_str(), service))
.collect::<HashMap<_, _>>();
debug!(
nodes = nodes.len(),
@ -146,6 +165,22 @@ impl Scheduler {
if service.schedule.is_none() {
continue;
}
let dependencies = dependency_summary(
service,
&services_by_name,
&instances_before,
&publications_before,
&dependency_cycles,
self.heartbeat_timeout_secs,
);
if !dependencies.dependencies_ready {
warn!(
service = %service.name,
blockers = ?dependencies.blockers,
"skipping service reconciliation until dependencies are ready"
);
continue;
}
self.reconcile_service(&mut client, &nodes, service).await?;
}
@ -159,6 +194,25 @@ impl Scheduler {
)
.await?;
let instances_after = self.load_instances_by_service(&mut client).await?;
let publications_after = publish::load_publication_states(
&mut client,
&self.cluster_namespace,
&self.cluster_id,
)
.await?;
self.persist_service_statuses(
&mut client,
&nodes,
&services,
&instances_after,
&publications_after,
&dependency_cycles,
)
.await?;
self.cleanup_stale_service_statuses(&mut client, &services)
.await?;
Ok(())
}
@ -200,6 +254,112 @@ impl Scheduler {
Ok(services)
}
async fn load_instances_by_service(
&self,
client: &mut Client,
) -> Result<HashMap<String, Vec<ServiceInstanceSpec>>> {
let prefix = format!(
"{}/clusters/{}/instances/",
self.cluster_namespace, self.cluster_id
);
let kvs = client.get_prefix(prefix.as_bytes()).await?;
let mut instances = HashMap::<String, Vec<ServiceInstanceSpec>>::new();
for (_key, value) in kvs {
match serde_json::from_slice::<ServiceInstanceSpec>(&value) {
Ok(instance) => {
instances
.entry(instance.service.clone())
.or_default()
.push(instance);
}
Err(error) => warn!(error = %error, "failed to decode service instance"),
}
}
for service_instances in instances.values_mut() {
service_instances.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id));
}
Ok(instances)
}
async fn persist_service_statuses(
&self,
client: &mut Client,
nodes: &[ClusterNodeRecord],
services: &[ServiceSpec],
instances_by_service: &HashMap<String, Vec<ServiceInstanceSpec>>,
publications: &HashMap<String, ServicePublicationState>,
dependency_cycles: &HashSet<String>,
) -> Result<()> {
let services_by_name = services
.iter()
.map(|service| (service.name.as_str(), service))
.collect::<HashMap<_, _>>();
for service in services {
let status = build_service_status(
service,
nodes,
&services_by_name,
instances_by_service,
publications,
dependency_cycles,
self.heartbeat_timeout_secs,
);
let key = service_status_key(&self.cluster_namespace, &self.cluster_id, &service.name);
if self.dry_run {
info!(
service = %service.name,
phase = %status.phase,
healthy_instances = status.healthy_instances,
desired_instances = status.desired_instances,
blockers = ?status.blockers,
"would update service status"
);
} else {
client.put(&key, serde_json::to_vec(&status)?).await?;
}
}
Ok(())
}
async fn cleanup_stale_service_statuses(
&self,
client: &mut Client,
services: &[ServiceSpec],
) -> Result<()> {
let desired = services
.iter()
.map(|service| service.name.as_str())
.collect::<HashSet<_>>();
let prefix = service_status_prefix(&self.cluster_namespace, &self.cluster_id);
let existing = client.get_prefix(prefix.as_bytes()).await?;
for (key, _) in existing {
let key_str = String::from_utf8_lossy(&key);
let Some(service_name) = key_str.strip_prefix(&prefix) else {
continue;
};
if service_name.is_empty()
|| service_name.contains('/')
|| desired.contains(service_name)
{
continue;
}
if self.dry_run {
info!(service = %service_name, "would delete stale service status");
} else if client.delete(&key).await? {
info!(service = %service_name, "deleted stale service status");
}
}
Ok(())
}
async fn reconcile_service(
&self,
client: &mut Client,
@ -222,7 +382,8 @@ impl Scheduler {
let existing_instances = decode_managed_instances(&existing);
let desired_instances =
build_desired_instances(service, &eligible_nodes, &existing_instances)?;
let target_instances = schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas);
let target_instances =
schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas);
if desired_instances.len() < target_instances {
warn!(
service = %service.name,
@ -341,6 +502,284 @@ impl Scheduler {
}
}
fn service_status_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!(
"{}/clusters/{}/service-statuses/",
cluster_namespace, cluster_id
)
}
fn service_status_key(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec<u8> {
format!(
"{}{}",
service_status_prefix(cluster_namespace, cluster_id),
service
)
.into_bytes()
}
fn dependency_cycle_services(services: &[ServiceSpec]) -> HashSet<String> {
let service_names = services
.iter()
.map(|service| service.name.clone())
.collect::<HashSet<_>>();
let dependencies = services
.iter()
.map(|service| {
let deps = service
.depends_on
.iter()
.filter(|dependency| service_names.contains(&dependency.service))
.map(|dependency| dependency.service.clone())
.collect::<Vec<_>>();
(service.name.clone(), deps)
})
.collect::<HashMap<_, _>>();
let mut permanent = HashSet::new();
let mut visiting = Vec::<String>::new();
let mut cycles = HashSet::new();
for service in services {
visit_dependency_cycles(
&service.name,
&dependencies,
&mut permanent,
&mut visiting,
&mut cycles,
);
}
cycles
}
fn visit_dependency_cycles(
service: &str,
dependencies: &HashMap<String, Vec<String>>,
permanent: &mut HashSet<String>,
visiting: &mut Vec<String>,
cycles: &mut HashSet<String>,
) {
if permanent.contains(service) {
return;
}
if let Some(position) = visiting.iter().position(|current| current == service) {
cycles.extend(visiting[position..].iter().cloned());
return;
}
visiting.push(service.to_string());
if let Some(depends_on) = dependencies.get(service) {
for dependency in depends_on {
visit_dependency_cycles(dependency, dependencies, permanent, visiting, cycles);
}
}
visiting.pop();
permanent.insert(service.to_string());
}
fn dependency_summary(
service: &ServiceSpec,
services_by_name: &HashMap<&str, &ServiceSpec>,
instances_by_service: &HashMap<String, Vec<ServiceInstanceSpec>>,
publications: &HashMap<String, ServicePublicationState>,
dependency_cycles: &HashSet<String>,
heartbeat_timeout_secs: u64,
) -> DependencySummary {
let mut blockers = Vec::new();
if dependency_cycles.contains(&service.name) {
blockers.push("cyclic dependency graph".to_string());
}
for dependency in &service.depends_on {
if dependency.service == service.name {
blockers.push(format!(
"dependency {} points to itself",
dependency.service
));
continue;
}
if !services_by_name.contains_key(dependency.service.as_str()) {
blockers.push(format!("dependency {} is not defined", dependency.service));
continue;
}
if dependency_cycles.contains(&dependency.service) {
blockers.push(format!(
"dependency {} is part of a dependency cycle",
dependency.service
));
continue;
}
match dependency.condition {
ServiceDependencyCondition::Healthy => {
let ready = instances_by_service
.get(&dependency.service)
.map(|instances| {
instances
.iter()
.filter(|instance| {
instance_is_available(instance, heartbeat_timeout_secs)
})
.count() as u32
})
.unwrap_or(0);
let min_ready = dependency.min_ready.max(1);
if ready < min_ready {
blockers.push(format!(
"dependency {} has {ready}/{min_ready} healthy instance(s)",
dependency.service
));
}
}
ServiceDependencyCondition::Published => {
let ready = publications
.get(&dependency.service)
.map(publication_ready)
.unwrap_or(false);
if !ready {
blockers.push(format!(
"dependency {} is not published",
dependency.service
));
}
}
}
}
DependencySummary {
dependencies_ready: blockers.is_empty(),
blockers,
}
}
fn publication_ready(state: &ServicePublicationState) -> bool {
state
.dns
.as_ref()
.map(|dns| !dns.values.is_empty())
.unwrap_or(false)
|| state
.load_balancer
.as_ref()
.and_then(|load_balancer| load_balancer.vip_address.as_ref())
.is_some()
}
fn desired_instance_count(
service: &ServiceSpec,
nodes: &[ClusterNodeRecord],
heartbeat_timeout_secs: u64,
) -> u32 {
let Some(schedule) = service.schedule.as_ref() else {
return 0;
};
match schedule.mode {
ServiceScheduleMode::Replicated => schedule.replicas,
ServiceScheduleMode::Daemon => {
eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len() as u32
}
}
}
fn build_service_status(
service: &ServiceSpec,
nodes: &[ClusterNodeRecord],
services_by_name: &HashMap<&str, &ServiceSpec>,
instances_by_service: &HashMap<String, Vec<ServiceInstanceSpec>>,
publications: &HashMap<String, ServicePublicationState>,
dependency_cycles: &HashSet<String>,
heartbeat_timeout_secs: u64,
) -> ServiceStatusRecord {
let instances = instances_by_service
.get(&service.name)
.map(Vec::as_slice)
.unwrap_or(&[]);
let healthy_instances = instances
.iter()
.filter(|instance| instance_is_available(instance, heartbeat_timeout_secs))
.count() as u32;
let scheduled_instances = instances.len() as u32;
let desired_instances = desired_instance_count(service, nodes, heartbeat_timeout_secs);
let publish_ready = publications
.get(&service.name)
.map(publication_ready)
.unwrap_or(false);
let dependencies = dependency_summary(
service,
services_by_name,
instances_by_service,
publications,
dependency_cycles,
heartbeat_timeout_secs,
);
let eligible_node_count = service
.schedule
.as_ref()
.map(|schedule| eligible_nodes(nodes, &schedule.placement, heartbeat_timeout_secs).len())
.unwrap_or(0);
let (phase, message) = if service.schedule.is_none() {
(
"unmanaged".to_string(),
Some("service is not managed by fleet-scheduler".to_string()),
)
} else if !dependencies.dependencies_ready {
(
"blocked".to_string(),
Some(dependencies.blockers.join("; ")),
)
} else if eligible_node_count == 0 {
(
"unschedulable".to_string(),
Some("no eligible nodes match the service placement policy".to_string()),
)
} else if desired_instances == 0 && scheduled_instances == 0 {
(
"idle".to_string(),
Some("service has no desired instances".to_string()),
)
} else if desired_instances > 0 && healthy_instances >= desired_instances {
(
"healthy".to_string(),
Some(format!(
"healthy instances satisfy desired count ({healthy_instances}/{desired_instances})"
)),
)
} else if scheduled_instances > 0 || healthy_instances > 0 {
(
"degraded".to_string(),
Some(format!(
"healthy={healthy_instances} scheduled={scheduled_instances} desired={desired_instances}"
)),
)
} else {
(
"pending".to_string(),
Some(format!(
"waiting for instances to reach desired count ({healthy_instances}/{desired_instances})"
)),
)
};
ServiceStatusRecord {
service: service.name.clone(),
phase,
desired_instances,
scheduled_instances,
healthy_instances,
publish_ready,
dependencies_ready: dependencies.dependencies_ready,
blockers: dependencies.blockers,
message,
observed_at: Some(Utc::now()),
}
}
fn eligible_nodes<'a>(
nodes: &'a [ClusterNodeRecord],
placement: &PlacementPolicy,
@ -853,7 +1292,10 @@ fn instance_is_available(instance: &ServiceInstanceSpec, heartbeat_timeout_secs:
&& instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs)
}
fn instance_has_fresh_heartbeat(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool {
fn instance_has_fresh_heartbeat(
instance: &ServiceInstanceSpec,
heartbeat_timeout_secs: u64,
) -> bool {
if heartbeat_timeout_secs == 0 {
return true;
}
@ -961,8 +1403,9 @@ mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
use deployer_types::{
ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec, RolloutStrategySpec,
ServicePorts, ServiceScheduleMode, ServiceScheduleSpec,
ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec,
PublishedLoadBalancerState, RolloutStrategySpec, ServiceDependencyCondition,
ServiceDependencySpec, ServicePorts, ServiceScheduleMode, ServiceScheduleSpec,
};
fn active_node(node_id: &str, roles: &[&str], labels: &[(&str, &str)]) -> ClusterNodeRecord {
@ -1003,6 +1446,7 @@ mod tests {
protocol: Some("http".to_string()),
mtls_required: None,
mesh_mode: None,
depends_on: Vec::new(),
schedule: Some(ServiceScheduleSpec {
mode: ServiceScheduleMode::Replicated,
replicas: 2,
@ -1036,6 +1480,31 @@ mod tests {
}
}
fn service_named(name: &str) -> ServiceSpec {
let mut service = scheduled_service();
service.name = name.to_string();
service
}
fn healthy_instance(service: &str, node_id: &str) -> ServiceInstanceSpec {
ServiceInstanceSpec {
instance_id: format!("{service}-{node_id}"),
service: service.to_string(),
node_id: node_id.to_string(),
ip: "10.0.0.10".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: None,
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: Some(Utc::now()),
observed_at: None,
}
}
#[test]
fn test_node_eligibility_matches_roles_and_labels() {
let node = active_node("node01", &["worker"], &[("tier", "general")]);
@ -1083,6 +1552,126 @@ mod tests {
assert_eq!(desired[0].process.as_ref().unwrap().command, "/usr/bin/api");
}
#[test]
fn test_dependency_cycles_detect_all_services_in_cycle() {
let mut api = service_named("api");
api.depends_on = vec![ServiceDependencySpec {
service: "worker".to_string(),
condition: ServiceDependencyCondition::Healthy,
min_ready: 1,
}];
let mut worker = service_named("worker");
worker.depends_on = vec![ServiceDependencySpec {
service: "edge".to_string(),
condition: ServiceDependencyCondition::Healthy,
min_ready: 1,
}];
let mut edge = service_named("edge");
edge.depends_on = vec![ServiceDependencySpec {
service: "api".to_string(),
condition: ServiceDependencyCondition::Healthy,
min_ready: 1,
}];
let cycles = dependency_cycle_services(&[api, worker, edge]);
assert_eq!(
cycles,
HashSet::from(["api".to_string(), "worker".to_string(), "edge".to_string(),])
);
}
#[test]
fn test_dependency_summary_blocks_until_dependency_is_healthy() {
let api = service_named("api");
let mut worker = service_named("worker");
worker.depends_on = vec![ServiceDependencySpec {
service: "api".to_string(),
condition: ServiceDependencyCondition::Healthy,
min_ready: 2,
}];
let services = vec![api.clone(), worker.clone()];
let services_by_name = services
.iter()
.map(|service| (service.name.as_str(), service))
.collect::<HashMap<_, _>>();
let instances_by_service = HashMap::from([(
"api".to_string(),
vec![
healthy_instance("api", "node01"),
ServiceInstanceSpec {
state: Some("starting".to_string()),
..healthy_instance("api", "node02")
},
],
)]);
let summary = dependency_summary(
&worker,
&services_by_name,
&instances_by_service,
&HashMap::new(),
&HashSet::new(),
300,
);
assert!(!summary.dependencies_ready);
assert_eq!(
summary.blockers,
vec!["dependency api has 1/2 healthy instance(s)".to_string()]
);
}
#[test]
fn test_build_service_status_recognizes_published_dependency() {
let api = service_named("api");
let mut worker = service_named("worker");
worker.depends_on = vec![ServiceDependencySpec {
service: "api".to_string(),
condition: ServiceDependencyCondition::Published,
min_ready: 1,
}];
let services = vec![api.clone(), worker.clone()];
let services_by_name = services
.iter()
.map(|service| (service.name.as_str(), service))
.collect::<HashMap<_, _>>();
let publications = HashMap::from([(
"api".to_string(),
ServicePublicationState {
service: "api".to_string(),
org_id: "default-org".to_string(),
project_id: "default-project".to_string(),
load_balancer: Some(PublishedLoadBalancerState {
id: "lb-1".to_string(),
pool_id: "pool-1".to_string(),
listener_id: "listener-1".to_string(),
vip_address: Some("203.0.113.10".to_string()),
}),
dns: None,
observed_at: Some(Utc::now()),
},
)]);
let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])];
let status = build_service_status(
&worker,
&nodes,
&services_by_name,
&HashMap::new(),
&publications,
&HashSet::new(),
300,
);
assert_eq!(status.phase, "pending");
assert!(status.dependencies_ready);
assert!(status.blockers.is_empty());
}
#[test]
fn test_build_desired_instances_honors_max_instances_per_node() {
let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])];

View file

@ -1129,7 +1129,7 @@ fn publication_key(cluster_namespace: &str, cluster_id: &str, service: &str) ->
.into_bytes()
}
async fn load_publication_states(
pub(crate) async fn load_publication_states(
client: &mut Client,
cluster_namespace: &str,
cluster_id: &str,
@ -1194,6 +1194,7 @@ mod tests {
protocol: Some("http".to_string()),
mtls_required: None,
mesh_mode: None,
depends_on: Vec::new(),
schedule: Some(ServiceScheduleSpec::default()),
publish: Some(ServicePublicationSpec {
org_id: Some("default-org".to_string()),

View file

@ -212,6 +212,41 @@ services:
path: /
interval_secs: 1
timeout_secs: 2
- name: worker
ports:
http: 18081
protocol: http
depends_on:
- service: api
condition: healthy
min_ready: 2
schedule:
replicas: 2
placement:
roles:
- worker
pools:
- general
node_classes:
- worker-linux
match_labels:
tier: general
spread_by_label: failure_domain
max_instances_per_node: 1
instance_port: 18081
process:
command: python3
args:
- -m
- http.server
- ${INSTANCE_PORT}
- --bind
- ${INSTANCE_IP}
health_check:
type: http
path: /
interval_secs: 1
timeout_secs: 2
EOF
cat >"$tmp_dir/cluster-scaled.yaml" <<'EOF'
@ -283,6 +318,41 @@ services:
path: /
interval_secs: 1
timeout_secs: 2
- name: worker
ports:
http: 18081
protocol: http
depends_on:
- service: api
condition: healthy
min_ready: 1
schedule:
replicas: 1
placement:
roles:
- worker
pools:
- general
node_classes:
- worker-linux
match_labels:
tier: general
spread_by_label: failure_domain
max_instances_per_node: 1
instance_port: 18081
process:
command: python3
args:
- -m
- http.server
- ${INSTANCE_PORT}
- --bind
- ${INSTANCE_IP}
health_check:
type: http
path: /
interval_secs: 1
timeout_secs: 2
EOF
endpoint="http://127.0.0.1:${api_port}"
@ -326,7 +396,59 @@ run_node_agent_once node02
echo "Scheduling managed instances"
run_scheduler_once
echo "Reconciling processes and health"
echo "Validating dependency block before api is healthy"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-blocked.dump"
python3 - "$tmp_dir/worker-blocked.dump" <<'PY'
import sys
path = sys.argv[1]
lines = [line.strip() for line in open(path, "r", encoding="utf-8") if line.strip()]
if lines:
raise SystemExit(f"expected no worker instances before api is healthy, found {len(lines)}")
print("worker instances correctly blocked before dependency becomes healthy")
PY
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/service-statuses/worker" >"$tmp_dir/worker-status-blocked.dump"
python3 - "$tmp_dir/worker-status-blocked.dump" <<'PY'
import json
import sys
path = sys.argv[1]
statuses = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
statuses.append(json.loads(line.split(marker, 1)[1]))
if len(statuses) != 1:
raise SystemExit(f"expected exactly one worker service status, found {len(statuses)}")
status = statuses[0]
if status.get("phase") != "blocked":
raise SystemExit(f"expected worker phase=blocked, found {status.get('phase')}")
blockers = status.get("blockers") or []
if not blockers or "dependency api has 0/2 healthy instance(s)" not in blockers[0]:
raise SystemExit(f"unexpected blockers: {blockers}")
print("worker service status reports dependency block")
PY
echo "Reconciling processes and health for api"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Re-running scheduler after api became healthy"
run_scheduler_once
echo "Reconciling processes and health for dependent worker service"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
@ -337,7 +459,12 @@ echo "Validating HTTP endpoints"
python3 - <<'PY'
import urllib.request
for address in ("http://127.0.0.2:18080/", "http://127.0.0.3:18080/"):
for address in (
"http://127.0.0.2:18080/",
"http://127.0.0.3:18080/",
"http://127.0.0.2:18081/",
"http://127.0.0.3:18081/",
):
with urllib.request.urlopen(address, timeout=5) as response:
body = response.read().decode("utf-8")
if response.status != 200:
@ -381,13 +508,56 @@ if states != ["healthy", "healthy"]:
print("Observed two healthy scheduled instances across node01 and node02")
PY
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances.dump"
python3 - "$tmp_dir/worker-instances.dump" <<'PY'
import json
import sys
path = sys.argv[1]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 2:
raise SystemExit(f"expected 2 worker instances, found {len(instances)}")
node_ids = sorted(instance["node_id"] for instance in instances)
states = sorted(instance.get("state") for instance in instances)
if node_ids != ["node01", "node02"]:
raise SystemExit(f"unexpected worker placement: {node_ids}")
if states != ["healthy", "healthy"]:
raise SystemExit(f"unexpected worker states: {states}")
print("Observed two healthy dependent worker instances across node01 and node02")
PY
echo "Applying scaled declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune
echo "Re-running scheduler after scale-down"
run_scheduler_once
echo "Reconciling processes and health after scale-down"
echo "Reconciling api after scale-down"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Re-running scheduler after scaled api became healthy"
run_scheduler_once
echo "Reconciling dependent worker service after scale-down"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
@ -426,6 +596,37 @@ if instance.get("state") != "healthy":
print("Observed one healthy scheduled instance on node01 after scale-down")
PY
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances-scaled.dump"
python3 - "$tmp_dir/worker-instances-scaled.dump" <<'PY'
import json
import sys
path = sys.argv[1]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 1:
raise SystemExit(f"expected 1 worker instance after scale-down, found {len(instances)}")
instance = instances[0]
if instance["node_id"] != "node01":
raise SystemExit(f"expected remaining worker instance on node01, found {instance['node_id']}")
if instance.get("state") != "healthy":
raise SystemExit(f"expected remaining worker instance to be healthy, found {instance.get('state')}")
print("Observed one healthy dependent worker instance on node01 after scale-down")
PY
echo "Validating endpoint convergence after scale-down"
python3 - <<'PY'
import socket
@ -434,15 +635,19 @@ import urllib.request
with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 endpoint returned {response.status}")
with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 worker endpoint returned {response.status}")
for port, label in ((18080, "api"), (18081, "worker")):
sock = socket.socket()
sock.settimeout(1.5)
try:
sock.connect(("127.0.0.3", 18080))
sock.connect(("127.0.0.3", port))
except OSError:
pass
else:
raise SystemExit("node02 endpoint still accepts connections after scale-down")
raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
finally:
sock.close()