photoncloud-monorepo/nix/tests/verify-fleet-scheduler-e2e-stable.sh

284 lines
8.4 KiB
Bash

#!/usr/bin/env bash
set -euo pipefail
ROOT="${ULTRACLOUD_FLEET_E2E_REPO_ROOT:-$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)}"
ORIGINAL_SCRIPT="${ROOT}/deployer/scripts/verify-fleet-scheduler-e2e.sh"
PATCHED_SCRIPT="$(mktemp "${TMPDIR:-/tmp}/verify-fleet-scheduler-e2e-stable.XXXXXX.sh")"
cleanup() {
rm -f "${PATCHED_SCRIPT}"
}
trap cleanup EXIT
python3 - "${ORIGINAL_SCRIPT}" "${PATCHED_SCRIPT}" "${ROOT}" <<'PATCHPY'
from __future__ import annotations
import sys
from pathlib import Path
source_path = Path(sys.argv[1])
patched_path = Path(sys.argv[2])
repo_root = sys.argv[3]
source = source_path.read_text()
replacements = [
(
'ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"',
f'ROOT="{repo_root}"',
),
(
"""wait_for_endpoint_convergence() {
local timeout_secs="${1:-60}"
local deadline=$((SECONDS + timeout_secs))
while (( SECONDS < deadline )); do
if python3 - <<'PY'
import socket
import urllib.request
with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 endpoint returned {response.status}")
with urllib.request.urlopen("http://127.0.0.2:18081/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 worker endpoint returned {response.status}")
for port, label in ((18080, "api"), (18081, "worker")):
sock = socket.socket()
sock.settimeout(1.5)
try:
sock.connect(("127.0.0.3", port))
except OSError:
pass
else:
raise SystemExit(f"node02 {label} endpoint still accepts connections after scale-down")
finally:
sock.close()
PY
then
return 0
fi
sleep 1
done
echo "timed out waiting for endpoint convergence after scale-down" >&2
return 1
}""",
"""wait_for_endpoint_convergence() {
local api_node_file="$1"
local worker_node_file="$2"
local timeout_secs="${3:-60}"
local deadline=$((SECONDS + timeout_secs))
while (( SECONDS < deadline )); do
if python3 - "$api_node_file" "$worker_node_file" <<'PY'
import socket
import sys
import urllib.request
NODE_IPS = {
"node01": "127.0.0.2",
"node02": "127.0.0.3",
}
def read_node(path):
with open(path, "r", encoding="utf-8") as handle:
node_id = handle.read().strip()
if node_id not in NODE_IPS:
raise SystemExit(f"unexpected scaled node id in {path}: {node_id!r}")
return node_id
def assert_http(node_id, port, label):
address = f"http://{NODE_IPS[node_id]}:{port}/"
with urllib.request.urlopen(address, timeout=5) as response:
if response.status != 200:
raise SystemExit(f"{label} endpoint on {node_id} returned {response.status}")
def assert_closed(node_id, port, label):
sock = socket.socket()
sock.settimeout(1.5)
try:
sock.connect((NODE_IPS[node_id], port))
except OSError:
return
finally:
sock.close()
raise SystemExit(f"{label} endpoint still accepts connections on {node_id} after scale-down")
api_node = read_node(sys.argv[1])
worker_node = read_node(sys.argv[2])
assert_http(api_node, 18080, "api")
assert_http(worker_node, 18081, "worker")
for node_id in NODE_IPS:
if node_id != api_node:
assert_closed(node_id, 18080, "api")
if node_id != worker_node:
assert_closed(node_id, 18081, "worker")
PY
then
return 0
fi
sleep 1
done
echo "timed out waiting for endpoint convergence after scale-down" >&2
return 1
}""",
),
(
"""run_deployer_ctl dump --prefix "ultracloud/clusters/test-cluster/instances/api/" >"$tmp_dir/instances-scaled.dump"
python3 - "$tmp_dir/instances-scaled.dump" <<'PY'
import json
import sys
path = sys.argv[1]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 1:
raise SystemExit(f"expected 1 scheduled instance after scale-down, found {len(instances)}")
instance = instances[0]
if instance["node_id"] != "node01":
raise SystemExit(f"expected remaining instance on node01, found {instance['node_id']}")
if instance.get("state") != "healthy":
raise SystemExit(f"expected remaining instance to be healthy, found {instance.get('state')}")
print("Observed one healthy scheduled instance on node01 after scale-down")
PY""",
"""run_deployer_ctl dump --prefix "ultracloud/clusters/test-cluster/instances/api/" >"$tmp_dir/instances-scaled.dump"
python3 - "$tmp_dir/instances-scaled.dump" "$tmp_dir/api-scaled-node.txt" <<'PY'
import json
import sys
path = sys.argv[1]
node_path = sys.argv[2]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 1:
raise SystemExit(f"expected 1 scheduled instance after scale-down, found {len(instances)}")
instance = instances[0]
node_id = instance["node_id"]
if node_id not in {"node01", "node02"}:
raise SystemExit(f"unexpected remaining api instance node {node_id}")
if instance.get("state") != "healthy":
raise SystemExit(f"expected remaining instance to be healthy, found {instance.get('state')}")
with open(node_path, "w", encoding="utf-8") as handle:
handle.write(node_id + "\\n")
print(f"Observed one healthy scheduled instance on {node_id} after scale-down")
PY""",
),
(
"""run_deployer_ctl dump --prefix "ultracloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances-scaled.dump"
python3 - "$tmp_dir/worker-instances-scaled.dump" <<'PY'
import json
import sys
path = sys.argv[1]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 1:
raise SystemExit(f"expected 1 worker instance after scale-down, found {len(instances)}")
instance = instances[0]
if instance["node_id"] != "node01":
raise SystemExit(f"expected remaining worker instance on node01, found {instance['node_id']}")
if instance.get("state") != "healthy":
raise SystemExit(f"expected remaining worker instance to be healthy, found {instance.get('state')}")
print("Observed one healthy dependent worker instance on node01 after scale-down")
PY""",
"""run_deployer_ctl dump --prefix "ultracloud/clusters/test-cluster/instances/worker/" >"$tmp_dir/worker-instances-scaled.dump"
python3 - "$tmp_dir/worker-instances-scaled.dump" "$tmp_dir/worker-scaled-node.txt" <<'PY'
import json
import sys
path = sys.argv[1]
node_path = sys.argv[2]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 1:
raise SystemExit(f"expected 1 worker instance after scale-down, found {len(instances)}")
instance = instances[0]
node_id = instance["node_id"]
if node_id not in {"node01", "node02"}:
raise SystemExit(f"unexpected remaining worker instance node {node_id}")
if instance.get("state") != "healthy":
raise SystemExit(f"expected remaining worker instance to be healthy, found {instance.get('state')}")
with open(node_path, "w", encoding="utf-8") as handle:
handle.write(node_id + "\\n")
print(f"Observed one healthy dependent worker instance on {node_id} after scale-down")
PY""",
),
(
'wait_for_endpoint_convergence 60',
'wait_for_endpoint_convergence "$tmp_dir/api-scaled-node.txt" "$tmp_dir/worker-scaled-node.txt" 60',
),
]
for old, new in replacements:
if old not in source:
raise SystemExit(f"expected snippet not found while patching {source_path}")
source = source.replace(old, new, 1)
patched_path.write_text(source)
PATCHPY
chmod +x "${PATCHED_SCRIPT}"
exec bash "${PATCHED_SCRIPT}" "$@"