feat(pipeline): support incremental runs and analysis caching

Reuse existing output directory by default instead of failing
Cache successful Gemini analysis results using content hashing
Skip unchanged files and retry failed analyses on subsequent runs
Update --force flag to explicitly delete and rebuild the output
This commit is contained in:
2026-04-02 11:03:04 +08:00
parent c73767073e
commit 0cf62d1ac5
4 changed files with 165 additions and 22 deletions

View File

@@ -28,8 +28,7 @@ GEMINI_MODEL=gemini-2.5-flash
```bash
./.venv/bin/python -m uipath_explainator \
/path/to/uipath-project \
--output-dir workspace/project-bundle \
--force
--output-dir workspace/project-bundle
```
常用参数:
@@ -38,10 +37,12 @@ GEMINI_MODEL=gemini-2.5-flash
- `--env-file`: 自定义 `.env` 路径
- `--model`: 临时覆盖 `.env` 里的模型名
- `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini
- `--force`: 允许覆盖已有输出目录
- `--force`: 删除并重建已有输出目录;不加时会默认续跑,并跳过“已成功分析且源码未变化”的文件
- `--log-level`: 日志级别,支持 `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL`
- `--log-file`: 把完整日志同时写入文件,便于回溯扫描、剪枝和 Gemini 分析过程
中途中断后,直接对同一个 `--output-dir` 再运行一次即可继续。程序会复用 `docs/` 里的本地分析缓存,只把新增文件、内容有变化的文件,以及上次失败的文件重新发给 Gemini。
## 输出内容
输出目录现在是一个 bundle 目录,里面固定包含:
@@ -78,8 +79,7 @@ GEMINI_MODEL=gemini-2.5-flash
/path/to/uipath-project \
--output-dir workspace/project-bundle \
--log-level DEBUG \
--log-file workspace/project-bundle/run.log \
--force
--log-file workspace/project-bundle/run.log
```
`DEBUG` 会额外记录每个文件的复制、依赖解析、路径解析、剪枝、分析文档写出等细节。

View File

@@ -26,7 +26,7 @@ def build_parser() -> ArgumentParser:
parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config")
parser.add_argument("--model", help="Override GEMINI_MODEL")
parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files")
parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists")
parser.add_argument("--force", action="store_true", help="Delete and rebuild the output directory instead of resuming")
parser.add_argument(
"--log-level",
default="INFO",

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from hashlib import sha256
from pathlib import Path
from shutil import copy2, rmtree
import json
@@ -11,6 +12,7 @@ from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text,
logger = logging.getLogger(__name__)
ANALYSIS_CACHE_NAME = ".analysis-cache.json"
@dataclass(slots=True)
@@ -85,7 +87,7 @@ class ProjectPipeline:
final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel)
final_rel_files = self._relative_files(final_scan, self.code_root)
pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files)
pruned_files = self._prune_unused_files(final_rel_files)
logger.info(
"Final scan complete: files=%d warnings=%d pruned=%d",
len(final_rel_files),
@@ -121,10 +123,11 @@ class ProjectPipeline:
def _prepare_output_root(self) -> None:
if self.output_root.exists():
if not self.force:
raise FileExistsError(f"Output directory already exists: {self.output_root}")
if self.force:
logger.info("Removing existing output directory because force=True: %s", self.output_root)
rmtree(self.output_root)
else:
logger.info("Reusing existing output directory for incremental run: %s", self.output_root)
self.code_root.mkdir(parents=True, exist_ok=True)
self.docs_root.mkdir(parents=True, exist_ok=True)
logger.debug("Prepared output directories: code=%s docs=%s", self.code_root, self.docs_root)
@@ -151,10 +154,10 @@ class ProjectPipeline:
logger.debug("Removed CommentOut blocks from %s", output_file)
return cleaned
def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]:
def _prune_unused_files(self, final_files: list[Path]) -> list[Path]:
final_set = set(final_files)
pruned: list[Path] = []
for relative_path in initial_files:
for relative_path in self._existing_relative_files(self.code_root):
if relative_path in final_set:
continue
target = self.code_root / relative_path
@@ -162,15 +165,11 @@ class ProjectPipeline:
target.unlink()
pruned.append(relative_path)
logger.debug("Pruned unreachable file: %s", target)
self._cleanup_empty_dirs()
self._cleanup_empty_dirs(self.code_root)
return pruned
def _cleanup_empty_dirs(self) -> None:
directories = sorted(
[path for path in self.code_root.rglob("*") if path.is_dir()],
key=lambda item: len(item.parts),
reverse=True,
)
def _cleanup_empty_dirs(self, root: Path) -> None:
directories = sorted([path for path in root.rglob("*") if path.is_dir()], key=lambda item: len(item.parts), reverse=True)
for directory in directories:
if any(directory.iterdir()):
continue
@@ -180,23 +179,36 @@ class ProjectPipeline:
def _write_analysis(self, final_files: list[Path], analyzer) -> tuple[list[Path], list[str]]:
if analyzer is None:
logger.info("Skipping Gemini analysis because analyzer is disabled")
self._clear_analysis_outputs()
return [], []
output_files: list[Path] = []
warnings: list[str] = []
cache = self._load_analysis_cache()
cache = self._prune_stale_analysis_outputs(final_files, cache)
self._save_analysis_cache(cache)
for relative_path in self._ordered_files(final_files):
analysis_rel_path = Path(f"{relative_path.as_posix()}.analysis.md")
analysis_path = self.docs_root / analysis_rel_path
content = read_text(self.code_root / relative_path)
content_hash = self._content_hash(content)
cache_entry = cache.get(relative_path.as_posix())
if self._can_reuse_analysis(relative_path, analysis_path, content_hash, cache_entry):
output_files.append(analysis_rel_path)
continue
try:
analysis = analyzer.analyze(relative_path, content)
cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "success"}
except Exception as exc:
warning = f"Analysis failed for {relative_path.as_posix()}: {self._format_analysis_error(exc)}"
warnings.append(warning)
logger.warning(warning)
analysis = self._build_failed_analysis(relative_path, exc)
analysis_path = self.docs_root / f"{relative_path.as_posix()}.analysis.md"
cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "failed"}
analysis_path.parent.mkdir(parents=True, exist_ok=True)
analysis_path.write_text(analysis, encoding="utf-8")
output_files.append(Path(f"{relative_path.as_posix()}.analysis.md"))
self._save_analysis_cache(cache)
output_files.append(analysis_rel_path)
logger.debug("Wrote analysis file: %s", analysis_path)
return output_files, warnings
@@ -257,9 +269,90 @@ class ProjectPipeline:
def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]:
return sorted(path.relative_to(root) for path in scan.files)
def _existing_relative_files(self, root: Path) -> list[Path]:
return sorted(path.relative_to(root) for path in root.rglob("*") if path.is_file())
def _ordered_files(self, paths: list[Path]) -> list[Path]:
return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower()))
def _analysis_cache_path(self) -> Path:
return self.docs_root / ANALYSIS_CACHE_NAME
def _load_analysis_cache(self) -> dict[str, dict[str, str]]:
cache_path = self._analysis_cache_path()
if not cache_path.exists():
return {}
try:
raw = json.loads(cache_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
logger.warning("Ignoring invalid analysis cache file: %s", cache_path)
return {}
if not isinstance(raw, dict):
logger.warning("Ignoring unexpected analysis cache payload: %s", cache_path)
return {}
cache: dict[str, dict[str, str]] = {}
for relative_path, metadata in raw.items():
if not isinstance(relative_path, str) or not isinstance(metadata, dict):
continue
content_hash = metadata.get("content_hash")
status = metadata.get("status")
if not isinstance(content_hash, str) or not isinstance(status, str):
continue
cache[relative_path] = {"content_hash": content_hash, "status": status}
return cache
def _save_analysis_cache(self, cache: dict[str, dict[str, str]]) -> None:
self._analysis_cache_path().write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
def _prune_stale_analysis_outputs(
self, final_files: list[Path], cache: dict[str, dict[str, str]]
) -> dict[str, dict[str, str]]:
final_paths = {relative_path.as_posix() for relative_path in final_files}
valid_analysis_paths = {Path(f"{relative_path}.analysis.md") for relative_path in final_paths}
for analysis_path in self.docs_root.rglob("*.analysis.md"):
relative_analysis_path = analysis_path.relative_to(self.docs_root)
if relative_analysis_path in valid_analysis_paths:
continue
analysis_path.unlink()
logger.debug("Removed stale analysis file: %s", analysis_path)
pruned_cache = {relative_path: metadata for relative_path, metadata in cache.items() if relative_path in final_paths}
self._cleanup_empty_dirs(self.docs_root)
return pruned_cache
def _clear_analysis_outputs(self) -> None:
for analysis_path in self.docs_root.rglob("*.analysis.md"):
analysis_path.unlink()
logger.debug("Removed analysis file because analysis is disabled: %s", analysis_path)
cache_path = self._analysis_cache_path()
if cache_path.exists():
cache_path.unlink()
logger.debug("Removed analysis cache file: %s", cache_path)
self._cleanup_empty_dirs(self.docs_root)
def _can_reuse_analysis(
self,
relative_path: Path,
analysis_path: Path,
content_hash: str,
cache_entry: dict[str, str] | None,
) -> bool:
if cache_entry is None:
return False
if cache_entry.get("status") != "success":
return False
if cache_entry.get("content_hash") != content_hash:
return False
if not analysis_path.exists():
return False
logger.info("Reusing cached Gemini analysis for %s", relative_path.as_posix())
return True
def _content_hash(self, content: str) -> str:
return sha256(content.encode("utf-8")).hexdigest()
def _build_failed_analysis(self, relative_path: Path, exc: Exception) -> str:
reason = self._format_analysis_error(exc)
return "\n".join(

View File

@@ -64,6 +64,15 @@ class FlakyAnalyzer:
return f"# {relative_path.as_posix()}\n\n{len(content)}"
class RecordingAnalyzer:
def __init__(self) -> None:
self.paths: list[str] = []
def analyze(self, relative_path: Path, content: str) -> str:
self.paths.append(relative_path.as_posix())
return f"# {relative_path.as_posix()}\n\n{len(content)}"
class PipelineTests(unittest.TestCase):
def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None:
source = "<root><ui:CommentOut><x/><ui:CommentOut><y/></ui:CommentOut></ui:CommentOut><z/></root>"
@@ -173,6 +182,47 @@ class PipelineTests(unittest.TestCase):
self.assertTrue((docs_root / "Scripts" / "Keep.bas.analysis.md").exists())
self.assertTrue(any("Analysis failed for Flows/Active.xaml" in item for item in report.warnings))
def test_pipeline_resume_skips_successfully_cached_analyses(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
first = RecordingAnalyzer()
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(first)
self.assertEqual(
first.paths,
["Flows/Active.xaml", "main.xaml", "Scripts/Keep.bas"],
)
second = RecordingAnalyzer()
ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(second)
self.assertEqual(second.paths, [])
def test_pipeline_resume_retries_failed_analysis_and_reanalyzes_changed_files(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(FlakyAnalyzer())
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nMsgBox \"updated\"\nEnd Sub", encoding="utf-8")
retry = RecordingAnalyzer()
ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(retry)
self.assertEqual(retry.paths, ["Flows/Active.xaml", "Scripts/Keep.bas"])
if __name__ == "__main__":
unittest.main()