From 0a5c823134f8159390b08aa7c69121fd410362d8 Mon Sep 17 00:00:00 2001 From: centra Date: Wed, 1 Apr 2026 22:36:14 +0900 Subject: [PATCH] Add service inspection to deployer ctl --- deployer/crates/deployer-ctl/src/chainfire.rs | 125 +++++++++++++++++- deployer/crates/deployer-ctl/src/main.rs | 55 +++++++- .../scripts/verify-fleet-scheduler-e2e.sh | 36 +++++ 3 files changed, 209 insertions(+), 7 deletions(-) diff --git a/deployer/crates/deployer-ctl/src/chainfire.rs b/deployer/crates/deployer-ctl/src/chainfire.rs index fdc5878..19ea1f3 100644 --- a/deployer/crates/deployer-ctl/src/chainfire.rs +++ b/deployer/crates/deployer-ctl/src/chainfire.rs @@ -8,7 +8,7 @@ use chrono::Utc; use deployer_types::{ ClusterNodeRecord, ClusterStateSpec, CommissionState, DesiredSystemSpec, HostDeploymentSpec, HostDeploymentStatus, InstallPlan, InstallState, NodeConfig, NodeSpec, ObservedSystemState, - PowerState, + PowerState, ServiceInstanceSpec, ServicePublicationState, ServiceSpec, ServiceStatusRecord, }; use serde::de::DeserializeOwned; use serde_json::{json, Value}; @@ -140,6 +140,24 @@ fn key_service(cluster_namespace: &str, cluster_id: &str, svc: &str) -> Vec .into_bytes() } +fn key_service_status(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec { + format!( + "{}service-statuses/{}", + cluster_prefix(cluster_namespace, cluster_id), + service + ) + .into_bytes() +} + +fn key_publication(cluster_namespace: &str, cluster_id: &str, service: &str) -> Vec { + format!( + "{}publications/{}", + cluster_prefix(cluster_namespace, cluster_id), + service + ) + .into_bytes() +} + fn key_instance(cluster_namespace: &str, cluster_id: &str, svc: &str, inst: &str) -> Vec { format!( "{}instances/{}/{}", @@ -1077,6 +1095,111 @@ pub async fn inspect_host_deployment( .await } +pub async fn inspect_service( + endpoint: &str, + cluster_namespace: &str, + cluster_id: &str, + service_name: &str, + include_instances: bool, + json_output: bool, +) -> Result<()> { + let endpoints = chainfire_endpoints(endpoint); + with_chainfire_endpoint_failover(&endpoints, "inspect service", |endpoint| { + let endpoint = endpoint.to_string(); + let cluster_namespace = cluster_namespace.to_string(); + let cluster_id = cluster_id.to_string(); + let service_name = service_name.to_string(); + async move { + let mut client = Client::connect(endpoint).await?; + let spec = get_json_key::( + &mut client, + &key_service(&cluster_namespace, &cluster_id, &service_name), + ) + .await? + .with_context(|| format!("service {} not found", service_name))?; + let status = get_json_key::( + &mut client, + &key_service_status(&cluster_namespace, &cluster_id, &service_name), + ) + .await?; + let publication = get_json_key::( + &mut client, + &key_publication(&cluster_namespace, &cluster_id, &service_name), + ) + .await?; + + let instances = if include_instances { + let prefix = format!( + "{}instances/{}/", + cluster_prefix(&cluster_namespace, &cluster_id), + service_name + ); + let kvs = client.get_prefix(prefix.as_bytes()).await?; + let mut instances = Vec::new(); + for (_key, value) in kvs { + match serde_json::from_slice::(&value) { + Ok(instance) => instances.push(instance), + Err(error) => warn!( + error = %error, + service = %service_name, + "failed to decode service instance" + ), + } + } + instances.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id)); + Some(instances) + } else { + None + }; + + if json_output { + println!( + "{}", + serde_json::to_string_pretty(&json!({ + "spec": spec, + "status": status, + "publication": publication, + "instances": instances, + }))? + ); + } else { + println!("name={}", spec.name); + println!( + "schedule_mode={}", + spec.schedule + .as_ref() + .map(|schedule| serde_json::to_string(&schedule.mode).unwrap_or_default()) + .unwrap_or_else(|| "\"none\"".to_string()) + ); + println!("depends_on={}", spec.depends_on.len()); + if let Some(status) = status { + println!("phase={}", status.phase); + println!("healthy_instances={}", status.healthy_instances); + println!("scheduled_instances={}", status.scheduled_instances); + println!("desired_instances={}", status.desired_instances); + println!("dependencies_ready={}", status.dependencies_ready); + println!("publish_ready={}", status.publish_ready); + if !status.blockers.is_empty() { + println!("blockers={}", status.blockers.join(";")); + } + } + if let Some(publication) = publication { + println!( + "published={}", + publication.dns.is_some() || publication.load_balancer.is_some() + ); + } + if let Some(instances) = instances { + println!("instances={}", instances.len()); + } + } + + Ok(()) + } + }) + .await +} + pub async fn set_host_deployment_paused( endpoint: &str, cluster_namespace: &str, diff --git a/deployer/crates/deployer-ctl/src/main.rs b/deployer/crates/deployer-ctl/src/main.rs index cb4d841..0f870a4 100644 --- a/deployer/crates/deployer-ctl/src/main.rs +++ b/deployer/crates/deployer-ctl/src/main.rs @@ -95,6 +95,12 @@ enum Command { #[command(subcommand)] command: DeploymentCommand, }, + + /// Service spec/status/publication/instances を表示する + Service { + #[command(subcommand)] + command: ServiceCommand, + }, } #[derive(Subcommand, Debug)] @@ -211,6 +217,21 @@ enum DeploymentCommand { }, } +#[derive(Subcommand, Debug)] +enum ServiceCommand { + /// Service の spec/status/publication を表示する + Inspect { + #[arg(long)] + name: String, + + #[arg(long, default_value_t = false)] + include_instances: bool, + + #[arg(long, value_enum, default_value_t = DumpFormat::Json)] + format: DumpFormat, + }, +} + #[derive(Clone, Copy, Debug, ValueEnum)] enum DumpFormat { Text, @@ -316,8 +337,7 @@ impl PowerActionArg { #[tokio::main] async fn main() -> Result<()> { - let env_filter = - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt() .with_writer(std::io::stderr) .with_env_filter(env_filter) @@ -459,10 +479,9 @@ async fn main() -> Result<()> { } } Command::Deployment { command } => { - let cluster_id = cli - .cluster_id - .as_deref() - .ok_or_else(|| anyhow::anyhow!("--cluster-id is required for deployment commands"))?; + let cluster_id = cli.cluster_id.as_deref().ok_or_else(|| { + anyhow::anyhow!("--cluster-id is required for deployment commands") + })?; match command { DeploymentCommand::Inspect { name, format } => { @@ -506,6 +525,30 @@ async fn main() -> Result<()> { } } } + Command::Service { command } => { + let cluster_id = cli + .cluster_id + .as_deref() + .ok_or_else(|| anyhow::anyhow!("--cluster-id is required for service commands"))?; + + match command { + ServiceCommand::Inspect { + name, + include_instances, + format, + } => { + chainfire::inspect_service( + &cli.chainfire_endpoint, + &cli.cluster_namespace, + cluster_id, + &name, + include_instances, + matches!(format, DumpFormat::Json), + ) + .await?; + } + } + } } Ok(()) diff --git a/deployer/scripts/verify-fleet-scheduler-e2e.sh b/deployer/scripts/verify-fleet-scheduler-e2e.sh index ac7ebf8..ecb99ed 100755 --- a/deployer/scripts/verify-fleet-scheduler-e2e.sh +++ b/deployer/scripts/verify-fleet-scheduler-e2e.sh @@ -438,6 +438,22 @@ if not blockers or "dependency api has 0/2 healthy instance(s)" not in blockers[ print("worker service status reports dependency block") PY +run_deployer_ctl service inspect --name worker >"$tmp_dir/worker-inspect-blocked.json" +python3 - "$tmp_dir/worker-inspect-blocked.json" <<'PY' +import json +import sys + +payload = json.load(open(sys.argv[1], "r", encoding="utf-8")) +status = payload.get("status") or {} + +if payload.get("spec", {}).get("name") != "worker": + raise SystemExit("service inspect did not return worker spec") +if status.get("phase") != "blocked": + raise SystemExit(f"expected worker inspect phase=blocked, found {status.get('phase')}") + +print("service inspect reports blocked dependency state") +PY + echo "Reconciling processes and health for api" for _ in 1 2 3; do run_node_agent_once node01 @@ -541,6 +557,26 @@ if states != ["healthy", "healthy"]: print("Observed two healthy dependent worker instances across node01 and node02") PY +echo "Refreshing aggregated service status after worker became healthy" +run_scheduler_once + +run_deployer_ctl service inspect --name worker --include-instances >"$tmp_dir/worker-inspect-healthy.json" +python3 - "$tmp_dir/worker-inspect-healthy.json" <<'PY' +import json +import sys + +payload = json.load(open(sys.argv[1], "r", encoding="utf-8")) +status = payload.get("status") or {} +instances = payload.get("instances") or [] + +if status.get("phase") != "healthy": + raise SystemExit(f"expected worker inspect phase=healthy, found {status.get('phase')}") +if len(instances) != 2: + raise SystemExit(f"expected 2 worker instances from service inspect, found {len(instances)}") + +print("service inspect reports healthy dependent instances") +PY + echo "Applying scaled declaration" run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune