#!/usr/bin/env python3 import argparse import fnmatch import json from pathlib import Path import tomllib from typing import Any def load_changed_files(args: argparse.Namespace) -> list[str]: changed_files: list[str] = [] for path in args.changed_files_file: for line in Path(path).read_text().splitlines(): candidate = line.strip() if candidate: changed_files.append(candidate) changed_files.extend(path.strip() for path in args.changed_file if path.strip()) return changed_files def matches_any(path: str, patterns: list[str]) -> bool: return any(fnmatch.fnmatchcase(path, pattern) for pattern in patterns) def workspace_root(workspace: dict[str, Any]) -> str: for pattern in workspace["paths"]: root = pattern.split("/", 1)[0] if root and "*" not in root and "?" not in root: return root raise ValueError(f"Could not infer workspace root for {workspace['name']}") def collect_path_dependencies(obj: Any) -> list[str]: path_dependencies: list[str] = [] if isinstance(obj, dict): path = obj.get("path") if isinstance(path, str): path_dependencies.append(path) for value in obj.values(): path_dependencies.extend(collect_path_dependencies(value)) elif isinstance(obj, list): for value in obj: path_dependencies.extend(collect_path_dependencies(value)) return path_dependencies def build_nodes(config: dict[str, Any], repo_root: Path) -> dict[str, dict[str, Any]]: nodes: dict[str, dict[str, Any]] = {} for workspace in config["workspaces"]: nodes[workspace["name"]] = { "name": workspace["name"], "kind": "workspace", "root": repo_root / workspace_root(workspace), } crates_dir = repo_root / "crates" if crates_dir.is_dir(): for manifest in sorted(crates_dir.glob("*/Cargo.toml")): crate_name = manifest.parent.name nodes[f"crate:{crate_name}"] = { "name": crate_name, "kind": "shared_crate", "root": manifest.parent, } return nodes def resolve_node_for_path(path: Path, nodes: dict[str, dict[str, Any]]) -> str | None: for node_name, node in sorted( nodes.items(), key=lambda item: len(str(item[1]["root"])), reverse=True, ): try: path.relative_to(node["root"]) except ValueError: continue return node_name return None def build_dependency_graph( nodes: dict[str, dict[str, Any]], ) -> dict[str, set[str]]: graph = {node_name: set() for node_name in nodes} for node_name, node in nodes.items(): root = node["root"] for manifest in root.rglob("Cargo.toml"): manifest_data = tomllib.loads(manifest.read_text()) for dependency_path in collect_path_dependencies(manifest_data): resolved_dependency = (manifest.parent / dependency_path).resolve() dependency_node = resolve_node_for_path(resolved_dependency, nodes) if dependency_node is None or dependency_node == node_name: continue graph[node_name].add(dependency_node) return graph def reverse_graph(graph: dict[str, set[str]]) -> dict[str, set[str]]: reversed_graph = {node_name: set() for node_name in graph} for node_name, dependencies in graph.items(): for dependency in dependencies: reversed_graph[dependency].add(node_name) return reversed_graph def impacted_nodes( changed_nodes: set[str], reversed_dependencies: dict[str, set[str]], ) -> set[str]: selected = set(changed_nodes) queue = list(changed_nodes) while queue: current = queue.pop() for dependent in reversed_dependencies[current]: if dependent in selected: continue selected.add(dependent) queue.append(dependent) return selected def detect_changes( config: dict[str, Any], changed_files: list[str], repo_root: Path, ) -> dict[str, Any]: workspaces: list[dict[str, Any]] = config["workspaces"] all_workspace_names = [workspace["name"] for workspace in workspaces] nodes = build_nodes(config, repo_root) dependency_graph = build_dependency_graph(nodes) reversed_dependencies = reverse_graph(dependency_graph) global_changed = any( matches_any(path, config["global_paths"]) for path in changed_files ) directly_changed_nodes = { node_name for path in changed_files for node_name in [resolve_node_for_path((repo_root / path).resolve(), nodes)] if node_name is not None } shared_crates = sorted( nodes[node_name]["name"] for node_name in directly_changed_nodes if nodes[node_name]["kind"] == "shared_crate" ) shared_crates_changed = bool(shared_crates) if global_changed: changed_workspaces = all_workspace_names else: selected_nodes = impacted_nodes(directly_changed_nodes, reversed_dependencies) changed_workspaces = [ workspace["name"] for workspace in workspaces if workspace["name"] in selected_nodes ] selected_workspaces = set(changed_workspaces) build_targets: list[dict[str, str]] = [] seen_build_targets: set[tuple[str, str]] = set() for workspace in workspaces: if workspace["name"] not in selected_workspaces: continue for package in workspace.get("build_packages", []): key = (workspace["name"], package) if key in seen_build_targets: continue seen_build_targets.add(key) build_targets.append({ "workspace": workspace["name"], "package": package, }) return { "workspaces": changed_workspaces, "build_targets": build_targets, "any_changed": global_changed or bool(changed_workspaces), "build_changed": bool(build_targets), "global_changed": global_changed, "shared_crates": shared_crates, "shared_crates_changed": shared_crates_changed, } def write_github_output(path: Path, result: dict[str, Any]) -> None: serialized = { "workspaces": json.dumps(result["workspaces"], separators=(",", ":")), "build_targets": json.dumps(result["build_targets"], separators=(",", ":")), "any_changed": str(result["any_changed"]).lower(), "build_changed": str(result["build_changed"]).lower(), "global_changed": str(result["global_changed"]).lower(), "shared_crates": json.dumps(result["shared_crates"], separators=(",", ":")), "shared_crates_changed": str(result["shared_crates_changed"]).lower(), } with path.open("a", encoding="utf-8") as handle: for key, value in serialized.items(): handle.write(f"{key}={value}\n") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Map changed files to PhotonCloud CI workspaces." ) parser.add_argument( "--config", required=True, help="Path to the JSON CI workspace inventory.", ) parser.add_argument( "--changed-files-file", action="append", default=[], help="File containing newline-separated changed paths.", ) parser.add_argument( "--changed-file", action="append", default=[], help="Single changed path. Can be repeated.", ) parser.add_argument( "--github-output", help="Optional path to append GitHub Actions step outputs.", ) return parser.parse_args() def main() -> int: args = parse_args() config_path = Path(args.config).resolve() repo_root = config_path.parents[2] config = json.loads(config_path.read_text()) changed_files = load_changed_files(args) result = detect_changes(config, changed_files, repo_root) if args.github_output: write_github_output(Path(args.github_output), result) print(json.dumps(result, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())