diff --git a/README.md b/README.md index 685c372..376f1d4 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,7 @@ GEMINI_MODEL=gemini-2.5-flash ```bash ./.venv/bin/python -m uipath_explainator \ /path/to/uipath-project \ - --output-dir workspace/project-bundle \ - --force + --output-dir workspace/project-bundle ``` 常用参数: @@ -38,10 +37,12 @@ GEMINI_MODEL=gemini-2.5-flash - `--env-file`: 自定义 `.env` 路径 - `--model`: 临时覆盖 `.env` 里的模型名 - `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini -- `--force`: 允许覆盖已有输出目录 +- `--force`: 删除并重建已有输出目录;不加时会默认续跑,并跳过“已成功分析且源码未变化”的文件 - `--log-level`: 日志级别,支持 `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL` - `--log-file`: 把完整日志同时写入文件,便于回溯扫描、剪枝和 Gemini 分析过程 +中途中断后,直接对同一个 `--output-dir` 再运行一次即可继续。程序会复用 `docs/` 里的本地分析缓存,只把新增文件、内容有变化的文件,以及上次失败的文件重新发给 Gemini。 + ## 输出内容 输出目录现在是一个 bundle 目录,里面固定包含: @@ -78,8 +79,7 @@ GEMINI_MODEL=gemini-2.5-flash /path/to/uipath-project \ --output-dir workspace/project-bundle \ --log-level DEBUG \ - --log-file workspace/project-bundle/run.log \ - --force + --log-file workspace/project-bundle/run.log ``` `DEBUG` 会额外记录每个文件的复制、依赖解析、路径解析、剪枝、分析文档写出等细节。 diff --git a/src/uipath_explainator/cli.py b/src/uipath_explainator/cli.py index d61cf7c..03d98fc 100644 --- a/src/uipath_explainator/cli.py +++ b/src/uipath_explainator/cli.py @@ -26,7 +26,7 @@ def build_parser() -> ArgumentParser: parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config") parser.add_argument("--model", help="Override GEMINI_MODEL") parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files") - parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists") + parser.add_argument("--force", action="store_true", help="Delete and rebuild the output directory instead of resuming") parser.add_argument( "--log-level", default="INFO", diff --git a/src/uipath_explainator/pipeline.py b/src/uipath_explainator/pipeline.py index c8432f4..0f688f9 100644 --- a/src/uipath_explainator/pipeline.py +++ b/src/uipath_explainator/pipeline.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass +from hashlib import sha256 from pathlib import Path from shutil import copy2, rmtree import json @@ -11,6 +12,7 @@ from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, logger = logging.getLogger(__name__) +ANALYSIS_CACHE_NAME = ".analysis-cache.json" @dataclass(slots=True) @@ -85,7 +87,7 @@ class ProjectPipeline: final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel) final_rel_files = self._relative_files(final_scan, self.code_root) - pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files) + pruned_files = self._prune_unused_files(final_rel_files) logger.info( "Final scan complete: files=%d warnings=%d pruned=%d", len(final_rel_files), @@ -121,10 +123,11 @@ class ProjectPipeline: def _prepare_output_root(self) -> None: if self.output_root.exists(): - if not self.force: - raise FileExistsError(f"Output directory already exists: {self.output_root}") - logger.info("Removing existing output directory because force=True: %s", self.output_root) - rmtree(self.output_root) + if self.force: + logger.info("Removing existing output directory because force=True: %s", self.output_root) + rmtree(self.output_root) + else: + logger.info("Reusing existing output directory for incremental run: %s", self.output_root) self.code_root.mkdir(parents=True, exist_ok=True) self.docs_root.mkdir(parents=True, exist_ok=True) logger.debug("Prepared output directories: code=%s docs=%s", self.code_root, self.docs_root) @@ -151,10 +154,10 @@ class ProjectPipeline: logger.debug("Removed CommentOut blocks from %s", output_file) return cleaned - def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]: + def _prune_unused_files(self, final_files: list[Path]) -> list[Path]: final_set = set(final_files) pruned: list[Path] = [] - for relative_path in initial_files: + for relative_path in self._existing_relative_files(self.code_root): if relative_path in final_set: continue target = self.code_root / relative_path @@ -162,15 +165,11 @@ class ProjectPipeline: target.unlink() pruned.append(relative_path) logger.debug("Pruned unreachable file: %s", target) - self._cleanup_empty_dirs() + self._cleanup_empty_dirs(self.code_root) return pruned - def _cleanup_empty_dirs(self) -> None: - directories = sorted( - [path for path in self.code_root.rglob("*") if path.is_dir()], - key=lambda item: len(item.parts), - reverse=True, - ) + def _cleanup_empty_dirs(self, root: Path) -> None: + directories = sorted([path for path in root.rglob("*") if path.is_dir()], key=lambda item: len(item.parts), reverse=True) for directory in directories: if any(directory.iterdir()): continue @@ -180,23 +179,36 @@ class ProjectPipeline: def _write_analysis(self, final_files: list[Path], analyzer) -> tuple[list[Path], list[str]]: if analyzer is None: logger.info("Skipping Gemini analysis because analyzer is disabled") + self._clear_analysis_outputs() return [], [] output_files: list[Path] = [] warnings: list[str] = [] + cache = self._load_analysis_cache() + cache = self._prune_stale_analysis_outputs(final_files, cache) + self._save_analysis_cache(cache) for relative_path in self._ordered_files(final_files): + analysis_rel_path = Path(f"{relative_path.as_posix()}.analysis.md") + analysis_path = self.docs_root / analysis_rel_path content = read_text(self.code_root / relative_path) + content_hash = self._content_hash(content) + cache_entry = cache.get(relative_path.as_posix()) + if self._can_reuse_analysis(relative_path, analysis_path, content_hash, cache_entry): + output_files.append(analysis_rel_path) + continue try: analysis = analyzer.analyze(relative_path, content) + cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "success"} except Exception as exc: warning = f"Analysis failed for {relative_path.as_posix()}: {self._format_analysis_error(exc)}" warnings.append(warning) logger.warning(warning) analysis = self._build_failed_analysis(relative_path, exc) - analysis_path = self.docs_root / f"{relative_path.as_posix()}.analysis.md" + cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "failed"} analysis_path.parent.mkdir(parents=True, exist_ok=True) analysis_path.write_text(analysis, encoding="utf-8") - output_files.append(Path(f"{relative_path.as_posix()}.analysis.md")) + self._save_analysis_cache(cache) + output_files.append(analysis_rel_path) logger.debug("Wrote analysis file: %s", analysis_path) return output_files, warnings @@ -257,9 +269,90 @@ class ProjectPipeline: def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]: return sorted(path.relative_to(root) for path in scan.files) + def _existing_relative_files(self, root: Path) -> list[Path]: + return sorted(path.relative_to(root) for path in root.rglob("*") if path.is_file()) + def _ordered_files(self, paths: list[Path]) -> list[Path]: return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower())) + def _analysis_cache_path(self) -> Path: + return self.docs_root / ANALYSIS_CACHE_NAME + + def _load_analysis_cache(self) -> dict[str, dict[str, str]]: + cache_path = self._analysis_cache_path() + if not cache_path.exists(): + return {} + try: + raw = json.loads(cache_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + logger.warning("Ignoring invalid analysis cache file: %s", cache_path) + return {} + if not isinstance(raw, dict): + logger.warning("Ignoring unexpected analysis cache payload: %s", cache_path) + return {} + + cache: dict[str, dict[str, str]] = {} + for relative_path, metadata in raw.items(): + if not isinstance(relative_path, str) or not isinstance(metadata, dict): + continue + content_hash = metadata.get("content_hash") + status = metadata.get("status") + if not isinstance(content_hash, str) or not isinstance(status, str): + continue + cache[relative_path] = {"content_hash": content_hash, "status": status} + return cache + + def _save_analysis_cache(self, cache: dict[str, dict[str, str]]) -> None: + self._analysis_cache_path().write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") + + def _prune_stale_analysis_outputs( + self, final_files: list[Path], cache: dict[str, dict[str, str]] + ) -> dict[str, dict[str, str]]: + final_paths = {relative_path.as_posix() for relative_path in final_files} + valid_analysis_paths = {Path(f"{relative_path}.analysis.md") for relative_path in final_paths} + + for analysis_path in self.docs_root.rglob("*.analysis.md"): + relative_analysis_path = analysis_path.relative_to(self.docs_root) + if relative_analysis_path in valid_analysis_paths: + continue + analysis_path.unlink() + logger.debug("Removed stale analysis file: %s", analysis_path) + + pruned_cache = {relative_path: metadata for relative_path, metadata in cache.items() if relative_path in final_paths} + self._cleanup_empty_dirs(self.docs_root) + return pruned_cache + + def _clear_analysis_outputs(self) -> None: + for analysis_path in self.docs_root.rglob("*.analysis.md"): + analysis_path.unlink() + logger.debug("Removed analysis file because analysis is disabled: %s", analysis_path) + cache_path = self._analysis_cache_path() + if cache_path.exists(): + cache_path.unlink() + logger.debug("Removed analysis cache file: %s", cache_path) + self._cleanup_empty_dirs(self.docs_root) + + def _can_reuse_analysis( + self, + relative_path: Path, + analysis_path: Path, + content_hash: str, + cache_entry: dict[str, str] | None, + ) -> bool: + if cache_entry is None: + return False + if cache_entry.get("status") != "success": + return False + if cache_entry.get("content_hash") != content_hash: + return False + if not analysis_path.exists(): + return False + logger.info("Reusing cached Gemini analysis for %s", relative_path.as_posix()) + return True + + def _content_hash(self, content: str) -> str: + return sha256(content.encode("utf-8")).hexdigest() + def _build_failed_analysis(self, relative_path: Path, exc: Exception) -> str: reason = self._format_analysis_error(exc) return "\n".join( diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index b773357..c1fe94e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -64,6 +64,15 @@ class FlakyAnalyzer: return f"# {relative_path.as_posix()}\n\n{len(content)}" +class RecordingAnalyzer: + def __init__(self) -> None: + self.paths: list[str] = [] + + def analyze(self, relative_path: Path, content: str) -> str: + self.paths.append(relative_path.as_posix()) + return f"# {relative_path.as_posix()}\n\n{len(content)}" + + class PipelineTests(unittest.TestCase): def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None: source = "" @@ -173,6 +182,47 @@ class PipelineTests(unittest.TestCase): self.assertTrue((docs_root / "Scripts" / "Keep.bas.analysis.md").exists()) self.assertTrue(any("Analysis failed for Flows/Active.xaml" in item for item in report.warnings)) + def test_pipeline_resume_skips_successfully_cached_analyses(self) -> None: + with TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + project_root = tmp_path / "project" + output_root = tmp_path / "workspace" + (project_root / "Flows").mkdir(parents=True) + (project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8") + (project_root / "Scripts").mkdir() + (project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8") + (project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8") + + first = RecordingAnalyzer() + ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(first) + self.assertEqual( + first.paths, + ["Flows/Active.xaml", "main.xaml", "Scripts/Keep.bas"], + ) + + second = RecordingAnalyzer() + ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(second) + self.assertEqual(second.paths, []) + + def test_pipeline_resume_retries_failed_analysis_and_reanalyzes_changed_files(self) -> None: + with TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + project_root = tmp_path / "project" + output_root = tmp_path / "workspace" + (project_root / "Flows").mkdir(parents=True) + (project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8") + (project_root / "Scripts").mkdir() + (project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8") + (project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8") + + ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(FlakyAnalyzer()) + + (project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nMsgBox \"updated\"\nEnd Sub", encoding="utf-8") + + retry = RecordingAnalyzer() + ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(retry) + self.assertEqual(retry.paths, ["Flows/Active.xaml", "Scripts/Keep.bas"]) + if __name__ == "__main__": unittest.main()