diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..f0965f2 --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +GEMINI_API_KEY=your_api_key +GEMINI_BASE_URL= +GEMINI_MODEL=gemini-2.5-flash diff --git a/README.md b/README.md new file mode 100644 index 0000000..1db6230 --- /dev/null +++ b/README.md @@ -0,0 +1,52 @@ +# UiPath Explainator + +`uipath-explainator` 会按 UiPath 项目入口 `main.xaml` 递归提取 `Invoke Workflow` 和 `Invoke VBA`,复制到指定工作区,删除 `` 里的无效代码,再重新扫描并裁掉不再使用的文件,最后逐个调用 Gemini 生成说明文档。 + +## 安装 + +```bash +./.venv/bin/pip install -e . +``` + +## 配置 + +把 `.env.example` 复制成 `.env`,至少填入: + +```env +GEMINI_API_KEY=your_api_key +GEMINI_BASE_URL= +GEMINI_MODEL=gemini-2.5-flash +``` + +`GEMINI_BASE_URL` 留空时走官方默认地址;如果你前面挂了代理或网关,可以填自定义地址。 + +## 使用 + +```bash +./.venv/bin/python -m uipath_explainator \ + /path/to/uipath-project \ + --output-dir workspace/project-copy \ + --force +``` + +常用参数: + +- `--entry`: 入口文件名,默认 `main.xaml` +- `--env-file`: 自定义 `.env` 路径 +- `--model`: 临时覆盖 `.env` 里的模型名 +- `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini +- `--force`: 允许覆盖已有输出目录 + +## 输出内容 + +输出目录会保留原项目的相对目录结构,并额外生成: + +- `manifest.json`: 扫描、剪枝、告警和分析结果清单 +- `OVERVIEW.md`: 总览摘要 +- `*.analysis.md`: 每个最终保留文件对应的 Gemini 说明 + +## 当前实现约束 + +- 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。 +- 对动态拼接路径会记录 warning,但不会猜测。 +- `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt`、`.bas`、`.vb`、`.vbs`、`.vba`、`.cls`、`.frm`。 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..54c928f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "uipath-explainator" +version = "0.1.0" +description = "Extract, prune, and explain UiPath workflow dependencies with Gemini." +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "google-genai>=1.12.1", + "python-dotenv>=1.0.1", +] + +[project.scripts] +uipath-explainator = "uipath_explainator.cli:main" + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/uipath_explainator/__init__.py b/src/uipath_explainator/__init__.py new file mode 100644 index 0000000..0f2cc0e --- /dev/null +++ b/src/uipath_explainator/__init__.py @@ -0,0 +1,5 @@ +"""UiPath Explainator package.""" + +__all__ = ["__version__"] + +__version__ = "0.1.0" diff --git a/src/uipath_explainator/__main__.py b/src/uipath_explainator/__main__.py new file mode 100644 index 0000000..a049ad7 --- /dev/null +++ b/src/uipath_explainator/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/uipath_explainator/cli.py b/src/uipath_explainator/cli.py new file mode 100644 index 0000000..903e360 --- /dev/null +++ b/src/uipath_explainator/cli.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from argparse import ArgumentParser +from pathlib import Path + +from .config import Settings +from .gemini import GeminiAnalyzer +from .pipeline import ProjectPipeline + + +def build_parser() -> ArgumentParser: + parser = ArgumentParser(description="Extract and explain UiPath project dependencies.") + parser.add_argument("project_dir", type=Path, help="UiPath project root directory") + parser.add_argument("--output-dir", type=Path, default=Path("workspace"), help="Copied project output directory") + parser.add_argument("--entry", default="main.xaml", help="Entry XAML file name") + parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config") + parser.add_argument("--model", help="Override GEMINI_MODEL") + parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files") + parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists") + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + project_root = args.project_dir.expanduser().resolve() + if not project_root.is_dir(): + parser.error(f"Project directory does not exist: {project_root}") + + output_dir = args.output_dir.expanduser() + if not output_dir.is_absolute(): + output_dir = Path.cwd() / output_dir + + settings = Settings.from_env(args.env_file if args.env_file.exists() else None, model_override=args.model) + analyzer = None if args.skip_analysis else GeminiAnalyzer(settings) + + pipeline = ProjectPipeline( + project_root=project_root, + output_root=output_dir, + entry_name=args.entry, + force=args.force, + ) + report = pipeline.run(analyzer=analyzer) + + print(f"Output written to: {report.output_root}") + print(f"Final files: {len(report.final_files)}") + print(f"Pruned files: {len(report.pruned_files)}") + print(f"Warnings: {len(report.warnings)}") + return 0 diff --git a/src/uipath_explainator/config.py b/src/uipath_explainator/config.py new file mode 100644 index 0000000..58cdfa2 --- /dev/null +++ b/src/uipath_explainator/config.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Self + +from dotenv import load_dotenv +import os + + +@dataclass(slots=True) +class Settings: + api_key: str | None + base_url: str | None + model: str + + @classmethod + def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self: + if env_file: + load_dotenv(env_file) + else: + load_dotenv() + + return cls( + api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"), + base_url=os.getenv("GEMINI_BASE_URL") or None, + model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash", + ) + + def require_api_key(self) -> None: + if not self.api_key: + raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.") diff --git a/src/uipath_explainator/gemini.py b/src/uipath_explainator/gemini.py new file mode 100644 index 0000000..e1228b6 --- /dev/null +++ b/src/uipath_explainator/gemini.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +import json + +from .config import Settings + + +@dataclass(slots=True) +class GeminiAnalyzer: + settings: Settings + + def __post_init__(self) -> None: + self.settings.require_api_key() + + from google import genai + from google.genai import types + + http_options = types.HttpOptions(timeout=120_000) + if self.settings.base_url: + http_options = types.HttpOptions(base_url=self.settings.base_url, timeout=120_000) + + self._types = types + self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options) + + def analyze(self, relative_path: Path, content: str) -> str: + prompt = self._build_prompt(relative_path, content) + response = self._client.models.generate_content( + model=self.settings.model, + contents=prompt, + config=self._types.GenerateContentConfig( + temperature=0.2, + response_mime_type="application/json", + response_schema={ + "type": "OBJECT", + "required": ["summary", "implementation", "data", "example"], + "properties": { + "summary": {"type": "STRING"}, + "implementation": {"type": "STRING"}, + "data": {"type": "STRING"}, + "example": {"type": "STRING"}, + }, + }, + ), + ) + + payload = json.loads(response.text) + return self._to_markdown(relative_path, payload) + + def _build_prompt(self, relative_path: Path, content: str) -> str: + language = "xml" if relative_path.suffix.lower() == ".xaml" else "vb" + return f"""你是资深 UiPath 自动化架构师。请只基于给定文件内容分析,不要臆造未出现的系统、字段或业务规则。 + +请用中文返回 JSON,字段固定为: +- summary: 这个文件在做什么 +- implementation: 这个文件怎么做 +- data: 这个文件涉及的数据、变量、参数、外部文件或系统 +- example: 一个贴近当前代码的说明性范例 + +如果信息不足,请明确写“无法从当前文件确定”。 + +文件路径: {relative_path.as_posix()} +代码: +```{language} +{content} +```""" + + def _to_markdown(self, relative_path: Path, payload: dict[str, str]) -> str: + return "\n".join( + [ + f"# {relative_path.as_posix()}", + "", + "## 做什么", + payload.get("summary", "").strip() or "无法从当前文件确定", + "", + "## 怎么做", + payload.get("implementation", "").strip() or "无法从当前文件确定", + "", + "## 涉及数据", + payload.get("data", "").strip() or "无法从当前文件确定", + "", + "## 范例", + payload.get("example", "").strip() or "无法从当前文件确定", + "", + ] + ) diff --git a/src/uipath_explainator/pipeline.py b/src/uipath_explainator/pipeline.py new file mode 100644 index 0000000..9ce7897 --- /dev/null +++ b/src/uipath_explainator/pipeline.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from shutil import copy2, rmtree +import json + +from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks + + +@dataclass(slots=True) +class PipelineReport: + project_root: Path + output_root: Path + entry_file: Path + initial_files: list[Path] + final_files: list[Path] + pruned_files: list[Path] + cleaned_files: list[Path] + warnings: list[str] + analysis_files: list[Path] + + def to_json(self) -> str: + return json.dumps( + { + "project_root": self.project_root.as_posix(), + "output_root": self.output_root.as_posix(), + "entry_file": self.entry_file.as_posix(), + "initial_files": [item.as_posix() for item in self.initial_files], + "final_files": [item.as_posix() for item in self.final_files], + "pruned_files": [item.as_posix() for item in self.pruned_files], + "cleaned_files": [item.as_posix() for item in self.cleaned_files], + "warnings": self.warnings, + "analysis_files": [item.as_posix() for item in self.analysis_files], + }, + ensure_ascii=False, + indent=2, + ) + + +class ProjectPipeline: + def __init__(self, project_root: Path, output_root: Path, entry_name: str, force: bool = False) -> None: + self.project_root = project_root.resolve() + self.output_root = output_root.resolve() + self.entry_name = entry_name + self.force = force + + def run(self, analyzer=None) -> PipelineReport: + entry_file = find_entry_file(self.project_root, self.entry_name) + entry_rel = entry_file.relative_to(self.project_root) + + initial_scan = crawl_dependencies(self.project_root, entry_file) + initial_rel_files = self._relative_files(initial_scan, self.project_root) + + self._prepare_output_root() + self._copy_files(initial_rel_files) + cleaned_files = self._clean_copied_xaml_files(initial_rel_files) + + final_scan = crawl_dependencies(self.output_root, self.output_root / entry_rel) + final_rel_files = self._relative_files(final_scan, self.output_root) + pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files) + + analysis_files = self._write_analysis(final_rel_files, analyzer) + warnings = initial_scan.warnings + final_scan.warnings + + report = PipelineReport( + project_root=self.project_root, + output_root=self.output_root, + entry_file=entry_rel, + initial_files=initial_rel_files, + final_files=final_rel_files, + pruned_files=pruned_files, + cleaned_files=cleaned_files, + warnings=warnings, + analysis_files=analysis_files, + ) + self._write_report_files(report) + return report + + def _prepare_output_root(self) -> None: + if self.output_root.exists(): + if not self.force: + raise FileExistsError(f"Output directory already exists: {self.output_root}") + rmtree(self.output_root) + self.output_root.mkdir(parents=True, exist_ok=True) + + def _copy_files(self, relative_files: list[Path]) -> None: + for relative_path in relative_files: + source = self.project_root / relative_path + destination = self.output_root / relative_path + destination.parent.mkdir(parents=True, exist_ok=True) + copy2(source, destination) + + def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]: + cleaned: list[Path] = [] + for relative_path in relative_files: + if relative_path.suffix.lower() != ".xaml": + continue + output_file = self.output_root / relative_path + original = read_text(output_file) + updated = strip_comment_out_blocks(original) + if updated != original: + output_file.write_text(updated, encoding="utf-8") + cleaned.append(relative_path) + return cleaned + + def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]: + final_set = set(final_files) + pruned: list[Path] = [] + for relative_path in initial_files: + if relative_path in final_set: + continue + target = self.output_root / relative_path + if target.exists(): + target.unlink() + pruned.append(relative_path) + self._cleanup_empty_dirs() + return pruned + + def _cleanup_empty_dirs(self) -> None: + directories = sorted( + [path for path in self.output_root.rglob("*") if path.is_dir()], + key=lambda item: len(item.parts), + reverse=True, + ) + for directory in directories: + if any(directory.iterdir()): + continue + directory.rmdir() + + def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]: + if analyzer is None: + return [] + + output_files: list[Path] = [] + for relative_path in self._ordered_files(final_files): + content = read_text(self.output_root / relative_path) + analysis = analyzer.analyze(relative_path, content) + analysis_path = self.output_root / f"{relative_path.as_posix()}.analysis.md" + analysis_path.parent.mkdir(parents=True, exist_ok=True) + analysis_path.write_text(analysis, encoding="utf-8") + output_files.append(Path(f"{relative_path.as_posix()}.analysis.md")) + return output_files + + def _write_report_files(self, report: PipelineReport) -> None: + (self.output_root / "manifest.json").write_text(report.to_json(), encoding="utf-8") + (self.output_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8") + + def _build_overview(self, report: PipelineReport) -> str: + warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无" + pruned = "\n".join(f"- {item.as_posix()}" for item in report.pruned_files) or "- 无" + analyses = "\n".join(f"- {item.as_posix()}" for item in report.analysis_files) or "- 未启用 Gemini 分析" + return f"""# UiPath Explainator Overview + +- Project Root: `{report.project_root.as_posix()}` +- Output Root: `{report.output_root.as_posix()}` +- Entry File: `{report.entry_file.as_posix()}` +- Initial Files: {len(report.initial_files)} +- Final Files: {len(report.final_files)} +- Cleaned XAML Files: {len(report.cleaned_files)} +- Pruned Files: {len(report.pruned_files)} +- Analysis Files: {len(report.analysis_files)} + +## Final Files +{chr(10).join(f"- {item.as_posix()}" for item in report.final_files)} + +## Pruned Files +{pruned} + +## Analysis Files +{analyses} + +## Warnings +{warnings} +""" + + def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]: + return sorted(path.relative_to(root) for path in scan.files) + + def _ordered_files(self, paths: list[Path]) -> list[Path]: + return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower())) diff --git a/src/uipath_explainator/scanner.py b/src/uipath_explainator/scanner.py new file mode 100644 index 0000000..55d0503 --- /dev/null +++ b/src/uipath_explainator/scanner.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass, field +from pathlib import Path +import html +import re +import xml.etree.ElementTree as ET + + +COMMENT_TAG = "CommentOut" +WORKFLOW_EXTENSIONS = {".xaml"} +VBA_EXTENSIONS = {".txt", ".bas", ".vb", ".vbs", ".vba", ".cls", ".frm"} +TAG_PATTERN = re.compile( + r"<(?P/)?(?P[A-Za-z_][\w:.-]*)(?P[^<>]*?)(?P/)?>", + re.DOTALL, +) + + +@dataclass(slots=True, frozen=True) +class Dependency: + kind: str + raw_value: str + source: Path + target: Path + + +@dataclass(slots=True) +class ScanResult: + files: set[Path] = field(default_factory=set) + dependencies: dict[Path, list[Dependency]] = field(default_factory=dict) + warnings: list[str] = field(default_factory=list) + + +def local_name(tag: str) -> str: + if tag.startswith("{"): + return tag.rsplit("}", 1)[-1] + return tag.split(":", 1)[-1] + + +def read_text(path: Path) -> str: + return path.read_text(encoding="utf-8-sig") + + +def strip_comment_out_blocks(text: str) -> str: + output: list[str] = [] + cursor = 0 + + while True: + start = _find_comment_start(text, cursor) + if start is None: + output.append(text[cursor:]) + return "".join(output) + + output.append(text[cursor:start.start()]) + cursor = _find_comment_end(text, start) + + +def _find_comment_start(text: str, cursor: int) -> re.Match[str] | None: + for match in TAG_PATTERN.finditer(text, cursor): + if not match.group("close") and local_name(match.group("name")) == COMMENT_TAG: + return match + return None + + +def _find_comment_end(text: str, start: re.Match[str]) -> int: + if start.group("self"): + return start.end() + + depth = 1 + for match in TAG_PATTERN.finditer(text, start.end()): + if local_name(match.group("name")) != COMMENT_TAG: + continue + if match.group("close"): + depth -= 1 + elif not match.group("self"): + depth += 1 + if depth == 0: + return match.end() + return start.end() + + +def find_entry_file(project_root: Path, entry_name: str) -> Path: + direct = project_root / entry_name + if direct.exists(): + return direct + + target = entry_name.lower() + for child in project_root.iterdir(): + if child.is_file() and child.name.lower() == target: + return child + raise FileNotFoundError(f"Entry file not found: {entry_name}") + + +def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult: + result = ScanResult() + queue: deque[Path] = deque([entry_file]) + visited: set[Path] = set() + + while queue: + current = queue.popleft() + if current in visited: + continue + visited.add(current) + result.files.add(current) + + if current.suffix.lower() != ".xaml": + continue + + dependencies, warnings = extract_dependencies(project_root, current) + result.dependencies[current] = dependencies + result.warnings.extend(warnings) + + for dependency in dependencies: + result.files.add(dependency.target) + if dependency.kind == "workflow": + queue.append(dependency.target) + + return result + + +def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]: + text = read_text(file_path) + warnings: list[str] = [] + dependencies: list[Dependency] = [] + + try: + root = ET.fromstring(text) + except ET.ParseError as exc: + return [], [f"{file_path}: XML parse failed: {exc}"] + + for element in root.iter(): + activity_type = classify_invoke_activity(local_name(element.tag)) + if not activity_type: + continue + + literals = extract_literal_targets(element, activity_type) + if not literals: + warnings.append( + f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>" + ) + continue + + for literal in literals: + target = resolve_project_path(project_root, file_path, literal) + if target is None: + warnings.append(f"{file_path}: referenced file not found for {activity_type}: {literal}") + continue + dependencies.append(Dependency(activity_type, literal, file_path, target)) + + return _dedupe_dependencies(dependencies), warnings + + +def classify_invoke_activity(tag_name: str) -> str | None: + lowered = tag_name.lower() + if "invokeworkflow" in lowered: + return "workflow" + if "invokevba" in lowered: + return "vba" + return None + + +def extract_literal_targets(element: ET.Element, kind: str) -> list[str]: + suffixes = WORKFLOW_EXTENSIONS if kind == "workflow" else VBA_EXTENSIONS + found: list[str] = [] + + for value in iter_element_values(element): + found.extend(extract_literal_paths(value, suffixes)) + + deduped: list[str] = [] + seen: set[str] = set() + for item in found: + if item not in seen: + deduped.append(item) + seen.add(item) + return deduped + + +def iter_element_values(element: ET.Element): + for nested in element.iter(): + for value in nested.attrib.values(): + yield value + if nested.text and nested.text.strip(): + yield nested.text.strip() + + +def extract_literal_paths(raw_value: str, suffixes: set[str]) -> list[str]: + value = html.unescape(raw_value).strip() + if not value: + return [] + + suffix_pattern = "|".join(re.escape(item.lstrip(".")) for item in sorted(suffixes)) + quoted_pattern = re.compile(rf"""['"]([^'"]+?\.(?:{suffix_pattern}))['"]""", re.IGNORECASE) + direct_pattern = re.compile(rf"""(? str: + return value.strip().replace("\\", "/").lstrip("./") + + +def resolve_project_path(project_root: Path, source_file: Path, literal: str) -> Path | None: + candidate = Path(literal) + search_roots = [project_root, source_file.parent] + + for base in search_roots: + resolved = (base / candidate).resolve() + if resolved.exists() and _is_within_root(project_root.resolve(), resolved): + return resolved + return None + + +def _is_within_root(root: Path, path: Path) -> bool: + try: + path.relative_to(root) + except ValueError: + return False + return True + + +def _dedupe_dependencies(items: list[Dependency]) -> list[Dependency]: + seen: set[tuple[str, Path]] = set() + deduped: list[Dependency] = [] + for item in items: + key = (item.kind, item.target) + if key in seen: + continue + seen.add(key) + deduped.append(item) + return deduped diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..ae5aeb4 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +from pathlib import Path +from tempfile import TemporaryDirectory +import sys +import unittest + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + +from uipath_explainator.pipeline import ProjectPipeline +from uipath_explainator.scanner import crawl_dependencies, extract_dependencies, strip_comment_out_blocks + + +MAIN_XAML = """ + + + + + + + + + + +""" + +ACTIVE_XAML = """ + + + + + ["Scripts/Keep.bas"] + + + + +""" + +OLD_XAML = """ + + + + + +""" + + +class StubAnalyzer: + def analyze(self, relative_path: Path, content: str) -> str: + return f"# {relative_path.as_posix()}\n\n{len(content)}" + + +class PipelineTests(unittest.TestCase): + def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None: + source = "" + cleaned = strip_comment_out_blocks(source) + self.assertEqual(cleaned, "") + + def test_extract_dependencies_reads_property_element_paths(self) -> None: + with TemporaryDirectory() as tmp: + root = Path(tmp) + flows = root / "Flows" + flows.mkdir() + (root / "Main.xaml").write_text(ACTIVE_XAML, encoding="utf-8") + (root / "Scripts").mkdir() + (root / "Scripts" / "Keep.bas").write_text("Sub Keep()", encoding="utf-8") + + dependencies, warnings = extract_dependencies(root, root / "Main.xaml") + + self.assertEqual(len(warnings), 0) + self.assertEqual(len(dependencies), 1) + self.assertEqual(dependencies[0].target, root / "Scripts" / "Keep.bas") + + def test_pipeline_prunes_files_only_reachable_before_comment_cleanup(self) -> None: + with TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + project_root = tmp_path / "project" + output_root = tmp_path / "workspace" + (project_root / "Flows").mkdir(parents=True) + (project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8") + (project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8") + (project_root / "Scripts").mkdir() + (project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8") + (project_root / "Scripts" / "Drop.bas").write_text("Sub Drop()\nEnd Sub", encoding="utf-8") + (project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8") + + initial_scan = crawl_dependencies(project_root, project_root / "main.xaml") + initial_files = {path.relative_to(project_root).as_posix() for path in initial_scan.files} + self.assertIn("Flows/Old.xaml", initial_files) + self.assertIn("Scripts/Drop.bas", initial_files) + + report = ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(StubAnalyzer()) + + final_files = {path.as_posix() for path in report.final_files} + self.assertIn("main.xaml", final_files) + self.assertIn("Flows/Active.xaml", final_files) + self.assertIn("Scripts/Keep.bas", final_files) + self.assertNotIn("Flows/Old.xaml", final_files) + self.assertNotIn("Scripts/Drop.bas", final_files) + self.assertFalse((output_root / "Flows" / "Old.xaml").exists()) + self.assertFalse((output_root / "Scripts" / "Drop.bas").exists()) + self.assertTrue((output_root / "Flows" / "Active.xaml.analysis.md").exists()) + + +if __name__ == "__main__": + unittest.main()