photoncloud-monorepo/scripts/ci_changed_workspaces.py

262 lines
8.2 KiB
Python

#!/usr/bin/env python3
import argparse
import fnmatch
import json
from pathlib import Path
import tomllib
from typing import Any
def load_changed_files(args: argparse.Namespace) -> list[str]:
changed_files: list[str] = []
for path in args.changed_files_file:
for line in Path(path).read_text().splitlines():
candidate = line.strip()
if candidate:
changed_files.append(candidate)
changed_files.extend(path.strip() for path in args.changed_file if path.strip())
return changed_files
def matches_any(path: str, patterns: list[str]) -> bool:
return any(fnmatch.fnmatchcase(path, pattern) for pattern in patterns)
def workspace_root(workspace: dict[str, Any]) -> str:
for pattern in workspace["paths"]:
root = pattern.split("/", 1)[0]
if root and "*" not in root and "?" not in root:
return root
raise ValueError(f"Could not infer workspace root for {workspace['name']}")
def collect_path_dependencies(obj: Any) -> list[str]:
path_dependencies: list[str] = []
if isinstance(obj, dict):
path = obj.get("path")
if isinstance(path, str):
path_dependencies.append(path)
for value in obj.values():
path_dependencies.extend(collect_path_dependencies(value))
elif isinstance(obj, list):
for value in obj:
path_dependencies.extend(collect_path_dependencies(value))
return path_dependencies
def build_nodes(config: dict[str, Any], repo_root: Path) -> dict[str, dict[str, Any]]:
nodes: dict[str, dict[str, Any]] = {}
for workspace in config["workspaces"]:
nodes[workspace["name"]] = {
"name": workspace["name"],
"kind": "workspace",
"root": repo_root / workspace_root(workspace),
}
crates_dir = repo_root / "crates"
if crates_dir.is_dir():
for manifest in sorted(crates_dir.glob("*/Cargo.toml")):
crate_name = manifest.parent.name
nodes[f"crate:{crate_name}"] = {
"name": crate_name,
"kind": "shared_crate",
"root": manifest.parent,
}
return nodes
def resolve_node_for_path(path: Path, nodes: dict[str, dict[str, Any]]) -> str | None:
for node_name, node in sorted(
nodes.items(),
key=lambda item: len(str(item[1]["root"])),
reverse=True,
):
try:
path.relative_to(node["root"])
except ValueError:
continue
return node_name
return None
def build_dependency_graph(
nodes: dict[str, dict[str, Any]],
) -> dict[str, set[str]]:
graph = {node_name: set() for node_name in nodes}
for node_name, node in nodes.items():
root = node["root"]
for manifest in root.rglob("Cargo.toml"):
manifest_data = tomllib.loads(manifest.read_text())
for dependency_path in collect_path_dependencies(manifest_data):
resolved_dependency = (manifest.parent / dependency_path).resolve()
dependency_node = resolve_node_for_path(resolved_dependency, nodes)
if dependency_node is None or dependency_node == node_name:
continue
graph[node_name].add(dependency_node)
return graph
def reverse_graph(graph: dict[str, set[str]]) -> dict[str, set[str]]:
reversed_graph = {node_name: set() for node_name in graph}
for node_name, dependencies in graph.items():
for dependency in dependencies:
reversed_graph[dependency].add(node_name)
return reversed_graph
def impacted_nodes(
changed_nodes: set[str],
reversed_dependencies: dict[str, set[str]],
) -> set[str]:
selected = set(changed_nodes)
queue = list(changed_nodes)
while queue:
current = queue.pop()
for dependent in reversed_dependencies[current]:
if dependent in selected:
continue
selected.add(dependent)
queue.append(dependent)
return selected
def detect_changes(
config: dict[str, Any],
changed_files: list[str],
repo_root: Path,
) -> dict[str, Any]:
workspaces: list[dict[str, Any]] = config["workspaces"]
all_workspace_names = [workspace["name"] for workspace in workspaces]
nodes = build_nodes(config, repo_root)
dependency_graph = build_dependency_graph(nodes)
reversed_dependencies = reverse_graph(dependency_graph)
global_changed = any(
matches_any(path, config["global_paths"])
for path in changed_files
)
directly_changed_nodes = {
node_name
for path in changed_files
for node_name in [resolve_node_for_path((repo_root / path).resolve(), nodes)]
if node_name is not None
}
shared_crates = sorted(
nodes[node_name]["name"]
for node_name in directly_changed_nodes
if nodes[node_name]["kind"] == "shared_crate"
)
shared_crates_changed = bool(shared_crates)
if global_changed:
changed_workspaces = all_workspace_names
else:
selected_nodes = impacted_nodes(directly_changed_nodes, reversed_dependencies)
changed_workspaces = [
workspace["name"]
for workspace in workspaces
if workspace["name"] in selected_nodes
]
selected_workspaces = set(changed_workspaces)
build_targets: list[dict[str, str]] = []
seen_build_targets: set[tuple[str, str]] = set()
for workspace in workspaces:
if workspace["name"] not in selected_workspaces:
continue
for package in workspace.get("build_packages", []):
key = (workspace["name"], package)
if key in seen_build_targets:
continue
seen_build_targets.add(key)
build_targets.append({
"workspace": workspace["name"],
"package": package,
})
return {
"workspaces": changed_workspaces,
"build_targets": build_targets,
"any_changed": global_changed or bool(changed_workspaces),
"build_changed": bool(build_targets),
"global_changed": global_changed,
"shared_crates": shared_crates,
"shared_crates_changed": shared_crates_changed,
}
def write_github_output(path: Path, result: dict[str, Any]) -> None:
serialized = {
"workspaces": json.dumps(result["workspaces"], separators=(",", ":")),
"build_targets": json.dumps(result["build_targets"], separators=(",", ":")),
"any_changed": str(result["any_changed"]).lower(),
"build_changed": str(result["build_changed"]).lower(),
"global_changed": str(result["global_changed"]).lower(),
"shared_crates": json.dumps(result["shared_crates"], separators=(",", ":")),
"shared_crates_changed": str(result["shared_crates_changed"]).lower(),
}
with path.open("a", encoding="utf-8") as handle:
for key, value in serialized.items():
handle.write(f"{key}={value}\n")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Map changed files to PhotonCloud CI workspaces."
)
parser.add_argument(
"--config",
required=True,
help="Path to the JSON CI workspace inventory.",
)
parser.add_argument(
"--changed-files-file",
action="append",
default=[],
help="File containing newline-separated changed paths.",
)
parser.add_argument(
"--changed-file",
action="append",
default=[],
help="Single changed path. Can be repeated.",
)
parser.add_argument(
"--github-output",
help="Optional path to append GitHub Actions step outputs.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
config_path = Path(args.config).resolve()
repo_root = config_path.parents[2]
config = json.loads(config_path.read_text())
changed_files = load_changed_files(args)
result = detect_changes(config, changed_files, repo_root)
if args.github_output:
write_github_output(Path(args.github_output), result)
print(json.dumps(result, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())