Compare commits

..

6 Commits

Author SHA1 Message Date
0cf62d1ac5 feat(pipeline): support incremental runs and analysis caching
Reuse existing output directory by default instead of failing
Cache successful Gemini analysis results using content hashing
Skip unchanged files and retry failed analyses on subsequent runs
Update --force flag to explicitly delete and rebuild the output
2026-04-02 11:03:04 +08:00
c73767073e feat(analysis): prevent pipeline crash on single file analysis failure
Catch Gemini API errors (e.g., HTTP 429) and summarize upstream messages
Generate fallback markdown for failed files instead of aborting
Append analysis failures to pipeline warnings
2026-04-02 10:59:04 +08:00
0bdebd5368 feat(logging): add configurable logging with file output support
Introduce --log-level and --log-file CLI arguments.
Add execution time tracking and detailed logs across all modules.
2026-04-02 10:40:39 +08:00
eef3464257 feat(output): separate code and docs into subdirectories
Group copied project files under a dedicated code/ directory
Group generated analysis, manifest, and overview under docs/
2026-04-02 10:36:29 +08:00
5ba2e3217a feat(report): restructure Gemini analysis and overview documentation
Update Gemini response schema to extract detailed overview, logic steps, data dependencies, and
consultation insights.
Add pipeline processing logic and file categorization explanations to OVERVIEW.md.
2026-04-02 10:33:19 +08:00
d6218d6bad feat(gemini): normalize base URL to strip API version suffixes
Automatically remove trailing slashes and version paths (e.g., /v1beta) from GEMINI_BASE_URL
Update GeminiAnalyzer to use the normalized URL and add type hints
Add test coverage for Gemini client configuration
2026-04-02 10:25:18 +08:00
11 changed files with 1000 additions and 96 deletions

0
.codex Normal file
View File

2
.gitignore vendored
View File

@@ -214,3 +214,5 @@ __marimo__/
# Streamlit # Streamlit
.streamlit/secrets.toml .streamlit/secrets.toml
workspace

View File

@@ -2,6 +2,8 @@
`uipath-explainator` 会按 UiPath 项目入口 `main.xaml` 递归提取 `Invoke Workflow``Invoke VBA`,复制到指定工作区,删除 `<ui:CommentOut>` 里的无效代码,再重新扫描并裁掉不再使用的文件,最后逐个调用 Gemini 生成说明文档。 `uipath-explainator` 会按 UiPath 项目入口 `main.xaml` 递归提取 `Invoke Workflow``Invoke VBA`,复制到指定工作区,删除 `<ui:CommentOut>` 里的无效代码,再重新扫描并裁掉不再使用的文件,最后逐个调用 Gemini 生成说明文档。
输出会自动拆成独立的 `code/``docs/` 两个子目录,避免说明文档和项目文件混在一起。
## 安装 ## 安装
```bash ```bash
@@ -19,14 +21,14 @@ GEMINI_MODEL=gemini-2.5-flash
``` ```
`GEMINI_BASE_URL` 留空时走官方默认地址;如果你前面挂了代理或网关,可以填自定义地址。 `GEMINI_BASE_URL` 留空时走官方默认地址;如果你前面挂了代理或网关,可以填自定义地址。
这里填写网关根地址即可,例如 `https://your-gateway.example.com`,不要自己追加 `/v1beta``/v1alpha``/v1`
## 使用 ## 使用
```bash ```bash
./.venv/bin/python -m uipath_explainator \ ./.venv/bin/python -m uipath_explainator \
/path/to/uipath-project \ /path/to/uipath-project \
--output-dir workspace/project-copy \ --output-dir workspace/project-bundle
--force
``` ```
常用参数: 常用参数:
@@ -35,18 +37,49 @@ GEMINI_MODEL=gemini-2.5-flash
- `--env-file`: 自定义 `.env` 路径 - `--env-file`: 自定义 `.env` 路径
- `--model`: 临时覆盖 `.env` 里的模型名 - `--model`: 临时覆盖 `.env` 里的模型名
- `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini - `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini
- `--force`: 允许覆盖已有输出目录 - `--force`: 删除并重建已有输出目录;不加时会默认续跑,并跳过“已成功分析且源码未变化”的文件
- `--log-level`: 日志级别,支持 `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL`
- `--log-file`: 把完整日志同时写入文件,便于回溯扫描、剪枝和 Gemini 分析过程
中途中断后,直接对同一个 `--output-dir` 再运行一次即可继续。程序会复用 `docs/` 里的本地分析缓存,只把新增文件、内容有变化的文件,以及上次失败的文件重新发给 Gemini。
## 输出内容 ## 输出内容
输出目录会保留原项目的相对目录结构,并额外生成 输出目录现在是一个 bundle 目录,里面固定包含
- `manifest.json`: 扫描、剪枝、告警和分析结果清单 - `code/`: 复制出来并经过清理、剪枝后的项目文件,保留原项目相对目录结构
- `OVERVIEW.md`: 总览摘要 - `docs/manifest.json`: 扫描、剪枝、告警和分析结果清单
- `*.analysis.md`: 每个最终保留文件对应的 Gemini 说明 - `docs/OVERVIEW.md`: 总览说明书,除了文件数量,还会解释“初次扫描 -> 清理 Comment Out -> 二次扫描 -> 剪枝 -> Gemini 分析”的整条流水线逻辑
- `docs/*.analysis.md`: 每个最终保留文件对应的 Gemini 咨询式说明,固定包含:
- 文件定位:这个文件在整个流程中的角色、触发方式、核心作用
- 流程拆解:按步骤说明它是怎么执行的
- 关键逻辑:条件判断、调用关系、边界情况
- 数据与依赖:输入、输出、变量、外部文件或系统
- 咨询视角:业务含义、风险点、无法确认的信息
- 场景范例:帮助交接和理解的示例
## 当前实现约束 ## 当前实现约束
- 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。 - 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。
- 对动态拼接路径会记录 warning但不会猜测。 - 对动态拼接路径会记录 warning但不会猜测。
- `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt``.bas``.vb``.vbs``.vba``.cls``.frm` - `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt``.bas``.vb``.vbs``.vba``.cls``.frm`
## 日志说明
运行时默认输出 `INFO` 级别日志,会记录:
- CLI 启动参数、配置加载结果、是否启用 Gemini
- 依赖扫描开始/结束、文件数量、warning 数量
- 复制、清理 Comment Out、二次扫描、剪枝、生成文档等阶段摘要
如果需要看更细的文件级排查信息,直接把日志级别调到 `DEBUG`
```bash
./.venv/bin/python -m uipath_explainator \
/path/to/uipath-project \
--output-dir workspace/project-bundle \
--log-level DEBUG \
--log-file workspace/project-bundle/run.log
```
`DEBUG` 会额外记录每个文件的复制、依赖解析、路径解析、剪枝、分析文档写出等细节。

View File

@@ -2,21 +2,38 @@ from __future__ import annotations
from argparse import ArgumentParser from argparse import ArgumentParser
from pathlib import Path from pathlib import Path
import logging
from .config import Settings from .config import Settings
from .gemini import GeminiAnalyzer from .gemini import GeminiAnalyzer
from .logging_utils import configure_logging
from .pipeline import ProjectPipeline from .pipeline import ProjectPipeline
logger = logging.getLogger(__name__)
def build_parser() -> ArgumentParser: def build_parser() -> ArgumentParser:
parser = ArgumentParser(description="Extract and explain UiPath project dependencies.") parser = ArgumentParser(description="Extract and explain UiPath project dependencies.")
parser.add_argument("project_dir", type=Path, help="UiPath project root directory") parser.add_argument("project_dir", type=Path, help="UiPath project root directory")
parser.add_argument("--output-dir", type=Path, default=Path("workspace"), help="Copied project output directory") parser.add_argument(
"--output-dir",
type=Path,
default=Path("workspace"),
help="Output bundle directory containing separate code/ and docs/ folders",
)
parser.add_argument("--entry", default="main.xaml", help="Entry XAML file name") parser.add_argument("--entry", default="main.xaml", help="Entry XAML file name")
parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config") parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config")
parser.add_argument("--model", help="Override GEMINI_MODEL") parser.add_argument("--model", help="Override GEMINI_MODEL")
parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files") parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files")
parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists") parser.add_argument("--force", action="store_true", help="Delete and rebuild the output directory instead of resuming")
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Logging verbosity",
)
parser.add_argument("--log-file", type=Path, help="Optional file path to write logs to")
return parser return parser
@@ -32,7 +49,25 @@ def main(argv: list[str] | None = None) -> int:
if not output_dir.is_absolute(): if not output_dir.is_absolute():
output_dir = Path.cwd() / output_dir output_dir = Path.cwd() / output_dir
settings = Settings.from_env(args.env_file if args.env_file.exists() else None, model_override=args.model) log_file = args.log_file.expanduser() if args.log_file else None
if log_file is not None and not log_file.is_absolute():
log_file = Path.cwd() / log_file
configure_logging(level_name=args.log_level, log_file=log_file)
logger.info(
"Starting CLI run: project_root=%s output_dir=%s entry=%s analysis=%s",
project_root,
output_dir,
args.entry,
not args.skip_analysis,
)
env_file = args.env_file if args.env_file.exists() else None
if args.env_file and env_file is None:
logger.warning("Environment file not found, falling back to default dotenv lookup: %s", args.env_file)
try:
settings = Settings.from_env(env_file, model_override=args.model)
analyzer = None if args.skip_analysis else GeminiAnalyzer(settings) analyzer = None if args.skip_analysis else GeminiAnalyzer(settings)
pipeline = ProjectPipeline( pipeline = ProjectPipeline(
@@ -42,8 +77,13 @@ def main(argv: list[str] | None = None) -> int:
force=args.force, force=args.force,
) )
report = pipeline.run(analyzer=analyzer) report = pipeline.run(analyzer=analyzer)
except Exception:
logger.exception("CLI run failed")
return 1
print(f"Output written to: {report.output_root}") print(f"Output written to: {report.output_root}")
print(f"Code written to: {report.code_root}")
print(f"Docs written to: {report.docs_root}")
print(f"Final files: {len(report.final_files)}") print(f"Final files: {len(report.final_files)}")
print(f"Pruned files: {len(report.pruned_files)}") print(f"Pruned files: {len(report.pruned_files)}")
print(f"Warnings: {len(report.warnings)}") print(f"Warnings: {len(report.warnings)}")

View File

@@ -2,12 +2,17 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import logging
import re
from typing import Self from typing import Self
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
logger = logging.getLogger(__name__)
@dataclass(slots=True) @dataclass(slots=True)
class Settings: class Settings:
api_key: str | None api_key: str | None
@@ -17,16 +22,33 @@ class Settings:
@classmethod @classmethod
def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self: def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self:
if env_file: if env_file:
logger.info("Loading environment variables from %s", env_file)
load_dotenv(env_file) load_dotenv(env_file)
else: else:
logger.debug("Loading environment variables using default dotenv lookup")
load_dotenv() load_dotenv()
return cls( settings = cls(
api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"), api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"),
base_url=os.getenv("GEMINI_BASE_URL") or None, base_url=os.getenv("GEMINI_BASE_URL") or None,
model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash", model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash",
) )
logger.info(
"Configuration loaded: model=%s base_url=%s api_key=%s",
settings.model,
settings.normalized_base_url() or "<default>",
"set" if settings.api_key else "missing",
)
return settings
def require_api_key(self) -> None: def require_api_key(self) -> None:
if not self.api_key: if not self.api_key:
raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.") raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.")
logger.debug("Gemini API key is available")
def normalized_base_url(self) -> str | None:
if not self.base_url:
return None
base_url = self.base_url.strip().rstrip("/")
return re.sub(r"/v\d+(?:alpha|beta)?$", "", base_url, flags=re.IGNORECASE)

View File

@@ -1,15 +1,28 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any
import json import json
import logging
from time import perf_counter
from .config import Settings from .config import Settings
UNKNOWN_TEXT = "无法从当前文件确定"
logger = logging.getLogger(__name__)
class AnalysisError(RuntimeError):
"""Raised when a single file analysis cannot be completed."""
@dataclass(slots=True) @dataclass(slots=True)
class GeminiAnalyzer: class GeminiAnalyzer:
settings: Settings settings: Settings
_types: Any = field(init=False, repr=False)
_client: Any = field(init=False, repr=False)
def __post_init__(self) -> None: def __post_init__(self) -> None:
self.settings.require_api_key() self.settings.require_api_key()
@@ -18,47 +31,187 @@ class GeminiAnalyzer:
from google.genai import types from google.genai import types
http_options = types.HttpOptions(timeout=120_000) http_options = types.HttpOptions(timeout=120_000)
if self.settings.base_url: base_url = self.settings.normalized_base_url()
http_options = types.HttpOptions(base_url=self.settings.base_url, timeout=120_000) if base_url:
http_options = types.HttpOptions(base_url=base_url, timeout=120_000)
self._types = types self._types = types
self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options) self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options)
logger.info(
"Gemini analyzer initialized: model=%s base_url=%s",
self.settings.model,
base_url or "<default>",
)
def analyze(self, relative_path: Path, content: str) -> str: def analyze(self, relative_path: Path, content: str) -> str:
started = perf_counter()
logger.info("Submitting Gemini analysis for %s (%d chars)", relative_path.as_posix(), len(content))
prompt = self._build_prompt(relative_path, content) prompt = self._build_prompt(relative_path, content)
try:
response = self._client.models.generate_content( response = self._client.models.generate_content(
model=self.settings.model, model=self.settings.model,
contents=prompt, contents=prompt,
config=self._types.GenerateContentConfig( config=self._types.GenerateContentConfig(
temperature=0.2, temperature=0.2,
response_mime_type="application/json", response_mime_type="application/json",
response_schema={ response_schema=self._response_schema(),
"type": "OBJECT",
"required": ["summary", "implementation", "data", "example"],
"properties": {
"summary": {"type": "STRING"},
"implementation": {"type": "STRING"},
"data": {"type": "STRING"},
"example": {"type": "STRING"},
},
},
), ),
) )
payload = json.loads(response.text) response_text = response.text or ""
logger.debug(
"Gemini response received for %s (%d chars)",
relative_path.as_posix(),
len(response_text),
)
payload = json.loads(response_text)
except json.JSONDecodeError as exc:
logger.exception(
"Gemini returned invalid JSON for %s. Response snippet: %r",
relative_path.as_posix(),
response_text[:500],
)
raise AnalysisError(
f"Gemini 返回了无法解析的 JSON无法生成该文件说明。原始错误: {exc}"
) from exc
except Exception as exc:
summary = self._summarize_error(exc)
logger.exception("Gemini analysis failed for %s: %s", relative_path.as_posix(), summary)
raise AnalysisError(summary) from exc
logger.info(
"Gemini analysis completed for %s in %.2fs",
relative_path.as_posix(),
perf_counter() - started,
)
return self._to_markdown(relative_path, payload) return self._to_markdown(relative_path, payload)
def _summarize_error(self, exc: Exception) -> str:
status_code = getattr(exc, "status_code", None)
response_json = getattr(exc, "response_json", None)
upstream_message = self._extract_error_message(response_json)
if status_code == 429:
detail = upstream_message or "Resource has been exhausted"
return (
"Gemini 配额或速率限制已触发HTTP 429当前文件说明未生成。"
f"上游信息: {detail}。可稍后重试,或使用 --skip-analysis 仅导出代码与依赖。"
)
if status_code is not None:
detail = upstream_message or str(exc).strip() or exc.__class__.__name__
return f"Gemini 调用失败HTTP {status_code})。上游信息: {detail}"
detail = str(exc).strip()
if detail:
return f"Gemini 分析失败: {detail}"
return f"Gemini 分析失败: {exc.__class__.__name__}"
def _extract_error_message(self, response_json: Any) -> str | None:
if not isinstance(response_json, dict):
return None
error = response_json.get("error")
if isinstance(error, dict):
message = error.get("message")
if isinstance(message, str) and message.strip():
return message.strip()
return None
def _response_schema(self) -> dict[str, Any]:
return {
"type": "OBJECT",
"required": ["overview", "logic", "data", "consultation"],
"properties": {
"overview": {
"type": "OBJECT",
"required": ["purpose", "role", "trigger"],
"properties": {
"purpose": {"type": "STRING"},
"role": {"type": "STRING"},
"trigger": {"type": "STRING"},
},
},
"logic": {
"type": "OBJECT",
"required": ["steps", "decision_logic", "exceptions"],
"properties": {
"steps": {
"type": "ARRAY",
"items": {
"type": "OBJECT",
"required": ["title", "detail", "why", "result"],
"properties": {
"title": {"type": "STRING"},
"detail": {"type": "STRING"},
"why": {"type": "STRING"},
"result": {"type": "STRING"},
},
},
},
"decision_logic": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
"exceptions": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
},
},
"data": {
"type": "OBJECT",
"required": ["inputs", "outputs", "variables", "external_dependencies"],
"properties": {
"inputs": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
"outputs": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
"variables": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
"external_dependencies": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
},
},
"consultation": {
"type": "OBJECT",
"required": ["business_meaning", "risks", "example", "unknowns"],
"properties": {
"business_meaning": {"type": "STRING"},
"risks": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
"example": {"type": "STRING"},
"unknowns": {
"type": "ARRAY",
"items": {"type": "STRING"},
},
},
},
},
}
def _build_prompt(self, relative_path: Path, content: str) -> str: def _build_prompt(self, relative_path: Path, content: str) -> str:
language = "xml" if relative_path.suffix.lower() == ".xaml" else "vb" language = "xml" if relative_path.suffix.lower() == ".xaml" else "vb"
return f"""你是资深 UiPath 自动化架构师。请只基于给定文件内容分析,不要臆造未出现的系统、字段或业务规则。 return f"""你是资深 UiPath 自动化架构师。请只基于给定文件内容分析,不要臆造未出现的系统、字段或业务规则。
用中文返回 JSON字段固定为: 严格返回 JSON并且完全匹配给定 schema。所有字段都必须填写。
- summary: 这个文件在做什么
- implementation: 这个文件怎么做
- data: 这个文件涉及的数据、变量、参数、外部文件或系统
- example: 一个贴近当前代码的说明性范例
如果信息不足,请明确写“无法从当前文件确定”。 输出要求:
1. 用中文,写成“给交接人员/业务方看的咨询说明”,不要只给一句笼统概括。
2. 先讲这个文件在整个流程中的定位,再按执行顺序拆解步骤。
3. 重点解释判断逻辑、调用链、输入输出、关键变量、外部依赖。
4. 如果代码里能看出顺序动作,请在 steps 里拆成明确步骤;不要只复述节点名称。
5. 无法确认的信息必须明确写“无法从当前文件确定”,数组字段则写成仅包含这一项的数组。
6. 不要输出 JSON 以外的任何文字。
文件路径: {relative_path.as_posix()} 文件路径: {relative_path.as_posix()}
代码: 代码:
@@ -66,22 +219,109 @@ class GeminiAnalyzer:
{content} {content}
```""" ```"""
def _to_markdown(self, relative_path: Path, payload: dict[str, str]) -> str: def _to_markdown(self, relative_path: Path, payload: dict[str, Any]) -> str:
overview = payload.get("overview") or {}
logic = payload.get("logic") or {}
data = payload.get("data") or {}
consultation = payload.get("consultation") or {}
return "\n".join( return "\n".join(
[ [
f"# {relative_path.as_posix()}", f"# {relative_path.as_posix()}",
"", "",
"## 做什么", "## 文件定位",
payload.get("summary", "").strip() or "无法从当前文件确定", f"- 核心作用:{self._clean_text(overview.get('purpose'))}",
f"- 流程角色:{self._clean_text(overview.get('role'))}",
f"- 触发方式:{self._clean_text(overview.get('trigger'))}",
"", "",
"## 怎么做", "## 流程拆解",
payload.get("implementation", "").strip() or "无法从当前文件确定", self._render_steps(logic.get("steps")),
"", "",
"## 涉及数据", "## 关键逻辑",
payload.get("data", "").strip() or "无法从当前文件确定", self._render_bullets(logic.get("decision_logic")),
"", "",
"## 范例", "## 异常与边界",
payload.get("example", "").strip() or "无法从当前文件确定", self._render_bullets(logic.get("exceptions")),
"",
"## 数据与依赖",
"### 输入",
self._render_bullets(data.get("inputs")),
"",
"### 输出",
self._render_bullets(data.get("outputs")),
"",
"### 关键变量",
self._render_bullets(data.get("variables")),
"",
"### 外部依赖",
self._render_bullets(data.get("external_dependencies")),
"",
"## 咨询视角",
f"- 业务含义:{self._clean_text(consultation.get('business_meaning'))}",
"",
"### 风险与注意点",
self._render_bullets(consultation.get("risks")),
"",
"### 无法确认的点",
self._render_bullets(consultation.get("unknowns")),
"",
"## 场景范例",
self._clean_text(consultation.get("example")),
"", "",
] ]
).rstrip() + "\n"
def _render_steps(self, items: Any) -> str:
if not isinstance(items, list):
items = []
normalized: list[dict[str, str]] = []
for item in items:
if not isinstance(item, dict):
continue
normalized.append(
{
"title": self._clean_text(item.get("title")),
"detail": self._clean_text(item.get("detail")),
"why": self._clean_text(item.get("why")),
"result": self._clean_text(item.get("result")),
}
) )
if not normalized:
normalized = [
{
"title": UNKNOWN_TEXT,
"detail": UNKNOWN_TEXT,
"why": UNKNOWN_TEXT,
"result": UNKNOWN_TEXT,
}
]
lines: list[str] = []
for index, item in enumerate(normalized, start=1):
lines.extend(
[
f"{index}. **{item['title']}**",
f" - 动作:{item['detail']}",
f" - 目的:{item['why']}",
f" - 结果:{item['result']}",
]
)
return "\n".join(lines)
def _render_bullets(self, items: Any) -> str:
if not isinstance(items, list):
items = []
cleaned = [self._clean_text(item) for item in items if isinstance(item, str) and item.strip()]
if not cleaned:
cleaned = [UNKNOWN_TEXT]
return "\n".join(f"- {item}" for item in cleaned)
def _clean_text(self, value: Any) -> str:
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
return UNKNOWN_TEXT

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from pathlib import Path
import logging
LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
def configure_logging(level_name: str = "INFO", log_file: Path | None = None) -> None:
level = getattr(logging, level_name.upper(), logging.INFO)
formatter = logging.Formatter(LOG_FORMAT)
handlers: list[logging.Handler] = [logging.StreamHandler()]
if log_file is not None:
log_file.parent.mkdir(parents=True, exist_ok=True)
handlers.append(logging.FileHandler(log_file, encoding="utf-8"))
root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.setLevel(level)
for handler in handlers:
handler.setLevel(level)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
logging.captureWarnings(True)

View File

@@ -1,17 +1,26 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from hashlib import sha256
from pathlib import Path from pathlib import Path
from shutil import copy2, rmtree from shutil import copy2, rmtree
import json import json
import logging
from time import perf_counter
from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks
logger = logging.getLogger(__name__)
ANALYSIS_CACHE_NAME = ".analysis-cache.json"
@dataclass(slots=True) @dataclass(slots=True)
class PipelineReport: class PipelineReport:
project_root: Path project_root: Path
output_root: Path output_root: Path
code_root: Path
docs_root: Path
entry_file: Path entry_file: Path
initial_files: list[Path] initial_files: list[Path]
final_files: list[Path] final_files: list[Path]
@@ -25,6 +34,8 @@ class PipelineReport:
{ {
"project_root": self.project_root.as_posix(), "project_root": self.project_root.as_posix(),
"output_root": self.output_root.as_posix(), "output_root": self.output_root.as_posix(),
"code_root": self.code_root.as_posix(),
"docs_root": self.docs_root.as_posix(),
"entry_file": self.entry_file.as_posix(), "entry_file": self.entry_file.as_posix(),
"initial_files": [item.as_posix() for item in self.initial_files], "initial_files": [item.as_posix() for item in self.initial_files],
"final_files": [item.as_posix() for item in self.final_files], "final_files": [item.as_posix() for item in self.final_files],
@@ -42,30 +53,56 @@ class ProjectPipeline:
def __init__(self, project_root: Path, output_root: Path, entry_name: str, force: bool = False) -> None: def __init__(self, project_root: Path, output_root: Path, entry_name: str, force: bool = False) -> None:
self.project_root = project_root.resolve() self.project_root = project_root.resolve()
self.output_root = output_root.resolve() self.output_root = output_root.resolve()
self.code_root = self.output_root / "code"
self.docs_root = self.output_root / "docs"
self.entry_name = entry_name self.entry_name = entry_name
self.force = force self.force = force
def run(self, analyzer=None) -> PipelineReport: def run(self, analyzer=None) -> PipelineReport:
started = perf_counter()
logger.info(
"Starting pipeline: project_root=%s output_root=%s entry=%s analysis=%s force=%s",
self.project_root,
self.output_root,
self.entry_name,
analyzer is not None,
self.force,
)
entry_file = find_entry_file(self.project_root, self.entry_name) entry_file = find_entry_file(self.project_root, self.entry_name)
entry_rel = entry_file.relative_to(self.project_root) entry_rel = entry_file.relative_to(self.project_root)
logger.debug("Using entry file %s", entry_rel.as_posix())
initial_scan = crawl_dependencies(self.project_root, entry_file) initial_scan = crawl_dependencies(self.project_root, entry_file)
initial_rel_files = self._relative_files(initial_scan, self.project_root) initial_rel_files = self._relative_files(initial_scan, self.project_root)
logger.info(
"Initial scan complete: files=%d warnings=%d",
len(initial_rel_files),
len(initial_scan.warnings),
)
self._prepare_output_root() self._prepare_output_root()
self._copy_files(initial_rel_files) self._copy_files(initial_rel_files)
cleaned_files = self._clean_copied_xaml_files(initial_rel_files) cleaned_files = self._clean_copied_xaml_files(initial_rel_files)
logger.info("Copied %d files and cleaned %d XAML files", len(initial_rel_files), len(cleaned_files))
final_scan = crawl_dependencies(self.output_root, self.output_root / entry_rel) final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel)
final_rel_files = self._relative_files(final_scan, self.output_root) final_rel_files = self._relative_files(final_scan, self.code_root)
pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files) pruned_files = self._prune_unused_files(final_rel_files)
logger.info(
"Final scan complete: files=%d warnings=%d pruned=%d",
len(final_rel_files),
len(final_scan.warnings),
len(pruned_files),
)
analysis_files = self._write_analysis(final_rel_files, analyzer) analysis_files, analysis_warnings = self._write_analysis(final_rel_files, analyzer)
warnings = initial_scan.warnings + final_scan.warnings warnings = initial_scan.warnings + final_scan.warnings + analysis_warnings
report = PipelineReport( report = PipelineReport(
project_root=self.project_root, project_root=self.project_root,
output_root=self.output_root, output_root=self.output_root,
code_root=self.code_root,
docs_root=self.docs_root,
entry_file=entry_rel, entry_file=entry_rel,
initial_files=initial_rel_files, initial_files=initial_rel_files,
final_files=final_rel_files, final_files=final_rel_files,
@@ -75,85 +112,122 @@ class ProjectPipeline:
analysis_files=analysis_files, analysis_files=analysis_files,
) )
self._write_report_files(report) self._write_report_files(report)
logger.info(
"Pipeline completed in %.2fs: final_files=%d analysis_files=%d warnings=%d",
perf_counter() - started,
len(report.final_files),
len(report.analysis_files),
len(report.warnings),
)
return report return report
def _prepare_output_root(self) -> None: def _prepare_output_root(self) -> None:
if self.output_root.exists(): if self.output_root.exists():
if not self.force: if self.force:
raise FileExistsError(f"Output directory already exists: {self.output_root}") logger.info("Removing existing output directory because force=True: %s", self.output_root)
rmtree(self.output_root) rmtree(self.output_root)
self.output_root.mkdir(parents=True, exist_ok=True) else:
logger.info("Reusing existing output directory for incremental run: %s", self.output_root)
self.code_root.mkdir(parents=True, exist_ok=True)
self.docs_root.mkdir(parents=True, exist_ok=True)
logger.debug("Prepared output directories: code=%s docs=%s", self.code_root, self.docs_root)
def _copy_files(self, relative_files: list[Path]) -> None: def _copy_files(self, relative_files: list[Path]) -> None:
for relative_path in relative_files: for relative_path in relative_files:
source = self.project_root / relative_path source = self.project_root / relative_path
destination = self.output_root / relative_path destination = self.code_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True) destination.parent.mkdir(parents=True, exist_ok=True)
copy2(source, destination) copy2(source, destination)
logger.debug("Copied file: %s -> %s", source, destination)
def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]: def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]:
cleaned: list[Path] = [] cleaned: list[Path] = []
for relative_path in relative_files: for relative_path in relative_files:
if relative_path.suffix.lower() != ".xaml": if relative_path.suffix.lower() != ".xaml":
continue continue
output_file = self.output_root / relative_path output_file = self.code_root / relative_path
original = read_text(output_file) original = read_text(output_file)
updated = strip_comment_out_blocks(original) updated = strip_comment_out_blocks(original)
if updated != original: if updated != original:
output_file.write_text(updated, encoding="utf-8") output_file.write_text(updated, encoding="utf-8")
cleaned.append(relative_path) cleaned.append(relative_path)
logger.debug("Removed CommentOut blocks from %s", output_file)
return cleaned return cleaned
def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]: def _prune_unused_files(self, final_files: list[Path]) -> list[Path]:
final_set = set(final_files) final_set = set(final_files)
pruned: list[Path] = [] pruned: list[Path] = []
for relative_path in initial_files: for relative_path in self._existing_relative_files(self.code_root):
if relative_path in final_set: if relative_path in final_set:
continue continue
target = self.output_root / relative_path target = self.code_root / relative_path
if target.exists(): if target.exists():
target.unlink() target.unlink()
pruned.append(relative_path) pruned.append(relative_path)
self._cleanup_empty_dirs() logger.debug("Pruned unreachable file: %s", target)
self._cleanup_empty_dirs(self.code_root)
return pruned return pruned
def _cleanup_empty_dirs(self) -> None: def _cleanup_empty_dirs(self, root: Path) -> None:
directories = sorted( directories = sorted([path for path in root.rglob("*") if path.is_dir()], key=lambda item: len(item.parts), reverse=True)
[path for path in self.output_root.rglob("*") if path.is_dir()],
key=lambda item: len(item.parts),
reverse=True,
)
for directory in directories: for directory in directories:
if any(directory.iterdir()): if any(directory.iterdir()):
continue continue
directory.rmdir() directory.rmdir()
logger.debug("Removed empty directory: %s", directory)
def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]: def _write_analysis(self, final_files: list[Path], analyzer) -> tuple[list[Path], list[str]]:
if analyzer is None: if analyzer is None:
return [] logger.info("Skipping Gemini analysis because analyzer is disabled")
self._clear_analysis_outputs()
return [], []
output_files: list[Path] = [] output_files: list[Path] = []
warnings: list[str] = []
cache = self._load_analysis_cache()
cache = self._prune_stale_analysis_outputs(final_files, cache)
self._save_analysis_cache(cache)
for relative_path in self._ordered_files(final_files): for relative_path in self._ordered_files(final_files):
content = read_text(self.output_root / relative_path) analysis_rel_path = Path(f"{relative_path.as_posix()}.analysis.md")
analysis_path = self.docs_root / analysis_rel_path
content = read_text(self.code_root / relative_path)
content_hash = self._content_hash(content)
cache_entry = cache.get(relative_path.as_posix())
if self._can_reuse_analysis(relative_path, analysis_path, content_hash, cache_entry):
output_files.append(analysis_rel_path)
continue
try:
analysis = analyzer.analyze(relative_path, content) analysis = analyzer.analyze(relative_path, content)
analysis_path = self.output_root / f"{relative_path.as_posix()}.analysis.md" cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "success"}
except Exception as exc:
warning = f"Analysis failed for {relative_path.as_posix()}: {self._format_analysis_error(exc)}"
warnings.append(warning)
logger.warning(warning)
analysis = self._build_failed_analysis(relative_path, exc)
cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "failed"}
analysis_path.parent.mkdir(parents=True, exist_ok=True) analysis_path.parent.mkdir(parents=True, exist_ok=True)
analysis_path.write_text(analysis, encoding="utf-8") analysis_path.write_text(analysis, encoding="utf-8")
output_files.append(Path(f"{relative_path.as_posix()}.analysis.md")) self._save_analysis_cache(cache)
return output_files output_files.append(analysis_rel_path)
logger.debug("Wrote analysis file: %s", analysis_path)
return output_files, warnings
def _write_report_files(self, report: PipelineReport) -> None: def _write_report_files(self, report: PipelineReport) -> None:
(self.output_root / "manifest.json").write_text(report.to_json(), encoding="utf-8") (self.docs_root / "manifest.json").write_text(report.to_json(), encoding="utf-8")
(self.output_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8") (self.docs_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8")
logger.debug("Wrote report files to %s", self.docs_root)
def _build_overview(self, report: PipelineReport) -> str: def _build_overview(self, report: PipelineReport) -> str:
warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无" warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无"
cleaned = "\n".join(f"- {item.as_posix()}" for item in report.cleaned_files) or "- 无"
pruned = "\n".join(f"- {item.as_posix()}" for item in report.pruned_files) or "- 无" pruned = "\n".join(f"- {item.as_posix()}" for item in report.pruned_files) or "- 无"
analyses = "\n".join(f"- {item.as_posix()}" for item in report.analysis_files) or "- 未启用 Gemini 分析" analyses = "\n".join(f"- {item.as_posix()}" for item in report.analysis_files) or "- 未启用 Gemini 分析"
return f"""# UiPath Explainator Overview return f"""# UiPath Explainator Overview
- Project Root: `{report.project_root.as_posix()}` - Project Root: `{report.project_root.as_posix()}`
- Output Root: `{report.output_root.as_posix()}` - Output Root: `{report.output_root.as_posix()}`
- Code Root: `{report.code_root.as_posix()}`
- Docs Root: `{report.docs_root.as_posix()}`
- Entry File: `{report.entry_file.as_posix()}` - Entry File: `{report.entry_file.as_posix()}`
- Initial Files: {len(report.initial_files)} - Initial Files: {len(report.initial_files)}
- Final Files: {len(report.final_files)} - Final Files: {len(report.final_files)}
@@ -164,6 +238,24 @@ class ProjectPipeline:
## Final Files ## Final Files
{chr(10).join(f"- {item.as_posix()}" for item in report.final_files)} {chr(10).join(f"- {item.as_posix()}" for item in report.final_files)}
## Processing Logic
1. **Initial Scan**: 从入口文件开始递归扫描 `Invoke Workflow` 与 `Invoke VBA`,得到初始依赖集合。这个阶段会把 `Comment Out` 里的引用也一起算进去,因为此时还未清理失效代码。
2. **Copy to Workspace**: 将初始依赖集合完整复制到 `code/` 目录,保持相对目录结构不变,确保后续裁剪只发生在代码副本中。
3. **Comment Cleanup**: 对已复制的 XAML 删除 `<ui:CommentOut>` 代码块,让后续扫描只看到当前真正生效的流程节点。
4. **Rescan After Cleanup**: 以清理后的入口文件重新递归扫描,得到清理后的实际依赖集合。
5. **Prune Unused Files**: 删除 `code/` 目录中首次扫描能到达、但二次扫描已不可达的文件。这些文件通常来自被注释掉的工作流、VBA或清理后失效的调用链。
6. **Gemini Analysis**: 仅针对最终保留文件在 `docs/` 目录生成 `*.analysis.md`,让说明文档与实际可执行流程保持一致。
## How To Read This Output
- `Initial Files`: 清理 `<ui:CommentOut>` 前扫描到的所有文件,代表“理论上被引用过”的集合。
- `Final Files`: 清理后 `code/` 目录中仍可达的文件,代表“当前实际流程会用到”的集合。
- `Cleaned XAML Files`: 被移除 `<ui:CommentOut>` 代码块的文件。
- `Pruned Files`: 只在失效分支中出现、已从 `code/` 目录删除的文件。
- `Analysis Files`: `docs/` 目录下 Gemini 生成的逐文件说明,重点解释该文件的流程、逻辑、数据与风险点。
## Cleaned XAML Files
{cleaned}
## Pruned Files ## Pruned Files
{pruned} {pruned}
@@ -177,5 +269,106 @@ class ProjectPipeline:
def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]: def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]:
return sorted(path.relative_to(root) for path in scan.files) return sorted(path.relative_to(root) for path in scan.files)
def _existing_relative_files(self, root: Path) -> list[Path]:
return sorted(path.relative_to(root) for path in root.rglob("*") if path.is_file())
def _ordered_files(self, paths: list[Path]) -> list[Path]: def _ordered_files(self, paths: list[Path]) -> list[Path]:
return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower())) return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower()))
def _analysis_cache_path(self) -> Path:
return self.docs_root / ANALYSIS_CACHE_NAME
def _load_analysis_cache(self) -> dict[str, dict[str, str]]:
cache_path = self._analysis_cache_path()
if not cache_path.exists():
return {}
try:
raw = json.loads(cache_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
logger.warning("Ignoring invalid analysis cache file: %s", cache_path)
return {}
if not isinstance(raw, dict):
logger.warning("Ignoring unexpected analysis cache payload: %s", cache_path)
return {}
cache: dict[str, dict[str, str]] = {}
for relative_path, metadata in raw.items():
if not isinstance(relative_path, str) or not isinstance(metadata, dict):
continue
content_hash = metadata.get("content_hash")
status = metadata.get("status")
if not isinstance(content_hash, str) or not isinstance(status, str):
continue
cache[relative_path] = {"content_hash": content_hash, "status": status}
return cache
def _save_analysis_cache(self, cache: dict[str, dict[str, str]]) -> None:
self._analysis_cache_path().write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
def _prune_stale_analysis_outputs(
self, final_files: list[Path], cache: dict[str, dict[str, str]]
) -> dict[str, dict[str, str]]:
final_paths = {relative_path.as_posix() for relative_path in final_files}
valid_analysis_paths = {Path(f"{relative_path}.analysis.md") for relative_path in final_paths}
for analysis_path in self.docs_root.rglob("*.analysis.md"):
relative_analysis_path = analysis_path.relative_to(self.docs_root)
if relative_analysis_path in valid_analysis_paths:
continue
analysis_path.unlink()
logger.debug("Removed stale analysis file: %s", analysis_path)
pruned_cache = {relative_path: metadata for relative_path, metadata in cache.items() if relative_path in final_paths}
self._cleanup_empty_dirs(self.docs_root)
return pruned_cache
def _clear_analysis_outputs(self) -> None:
for analysis_path in self.docs_root.rglob("*.analysis.md"):
analysis_path.unlink()
logger.debug("Removed analysis file because analysis is disabled: %s", analysis_path)
cache_path = self._analysis_cache_path()
if cache_path.exists():
cache_path.unlink()
logger.debug("Removed analysis cache file: %s", cache_path)
self._cleanup_empty_dirs(self.docs_root)
def _can_reuse_analysis(
self,
relative_path: Path,
analysis_path: Path,
content_hash: str,
cache_entry: dict[str, str] | None,
) -> bool:
if cache_entry is None:
return False
if cache_entry.get("status") != "success":
return False
if cache_entry.get("content_hash") != content_hash:
return False
if not analysis_path.exists():
return False
logger.info("Reusing cached Gemini analysis for %s", relative_path.as_posix())
return True
def _content_hash(self, content: str) -> str:
return sha256(content.encode("utf-8")).hexdigest()
def _build_failed_analysis(self, relative_path: Path, exc: Exception) -> str:
reason = self._format_analysis_error(exc)
return "\n".join(
[
f"# {relative_path.as_posix()}",
"",
"## 分析状态",
"- 状态Gemini 分析失败,当前文件未生成结构化说明。",
f"- 原因:{reason}",
"- 建议:稍后重试;如果当前只需要导出代码与依赖,可使用 `--skip-analysis`。",
"",
]
)
def _format_analysis_error(self, exc: Exception) -> str:
message = str(exc).strip()
if message:
return message
return exc.__class__.__name__

View File

@@ -4,6 +4,7 @@ from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import html import html
import logging
import re import re
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@@ -15,6 +16,7 @@ TAG_PATTERN = re.compile(
r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>", r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>",
re.DOTALL, re.DOTALL,
) )
logger = logging.getLogger(__name__)
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
@@ -81,30 +83,37 @@ def _find_comment_end(text: str, start: re.Match[str]) -> int:
def find_entry_file(project_root: Path, entry_name: str) -> Path: def find_entry_file(project_root: Path, entry_name: str) -> Path:
logger.debug("Resolving entry file %s within %s", entry_name, project_root)
direct = project_root / entry_name direct = project_root / entry_name
if direct.exists(): if direct.exists():
logger.info("Entry file resolved directly: %s", direct)
return direct return direct
target = entry_name.lower() target = entry_name.lower()
for child in project_root.iterdir(): for child in project_root.iterdir():
if child.is_file() and child.name.lower() == target: if child.is_file() and child.name.lower() == target:
logger.info("Entry file resolved case-insensitively: %s", child)
return child return child
raise FileNotFoundError(f"Entry file not found: {entry_name}") raise FileNotFoundError(f"Entry file not found: {entry_name}")
def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult: def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
logger.info("Starting dependency crawl from %s", entry_file)
result = ScanResult() result = ScanResult()
queue: deque[Path] = deque([entry_file]) queue: deque[Path] = deque([entry_file])
visited: set[Path] = set() visited: set[Path] = set()
while queue: while queue:
current = queue.popleft() current = queue.popleft()
logger.debug("Dequeued file for scan: %s", current)
if current in visited: if current in visited:
logger.debug("Skipping already visited file: %s", current)
continue continue
visited.add(current) visited.add(current)
result.files.add(current) result.files.add(current)
if current.suffix.lower() != ".xaml": if current.suffix.lower() != ".xaml":
logger.debug("Skipping recursive scan for non-XAML file: %s", current)
continue continue
dependencies, warnings = extract_dependencies(project_root, current) dependencies, warnings = extract_dependencies(project_root, current)
@@ -114,12 +123,24 @@ def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
for dependency in dependencies: for dependency in dependencies:
result.files.add(dependency.target) result.files.add(dependency.target)
if dependency.kind == "workflow": if dependency.kind == "workflow":
logger.debug(
"Queueing workflow dependency: %s -> %s",
current,
dependency.target,
)
queue.append(dependency.target) queue.append(dependency.target)
logger.info(
"Dependency crawl complete: files=%d dependency_sources=%d warnings=%d",
len(result.files),
len(result.dependencies),
len(result.warnings),
)
return result return result
def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]: def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]:
logger.debug("Extracting dependencies from %s", file_path)
text = read_text(file_path) text = read_text(file_path)
warnings: list[str] = [] warnings: list[str] = []
dependencies: list[Dependency] = [] dependencies: list[Dependency] = []
@@ -127,7 +148,9 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe
try: try:
root = ET.fromstring(text) root = ET.fromstring(text)
except ET.ParseError as exc: except ET.ParseError as exc:
return [], [f"{file_path}: XML parse failed: {exc}"] warning = f"{file_path}: XML parse failed: {exc}"
logger.warning(warning)
return [], [warning]
for element in root.iter(): for element in root.iter():
activity_type = classify_invoke_activity(local_name(element.tag)) activity_type = classify_invoke_activity(local_name(element.tag))
@@ -136,17 +159,26 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe
literals = extract_literal_targets(element, activity_type) literals = extract_literal_targets(element, activity_type)
if not literals: if not literals:
warnings.append( warning = f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>"
f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>" warnings.append(warning)
) logger.warning(warning)
continue continue
for literal in literals: for literal in literals:
target = resolve_project_path(project_root, file_path, literal) target = resolve_project_path(project_root, file_path, literal)
if target is None: if target is None:
warnings.append(f"{file_path}: referenced file not found for {activity_type}: {literal}") warning = f"{file_path}: referenced file not found for {activity_type}: {literal}"
warnings.append(warning)
logger.warning(warning)
continue continue
dependencies.append(Dependency(activity_type, literal, file_path, target)) dependencies.append(Dependency(activity_type, literal, file_path, target))
logger.debug(
"Resolved %s dependency: source=%s literal=%s target=%s",
activity_type,
file_path,
literal,
target,
)
return _dedupe_dependencies(dependencies), warnings return _dedupe_dependencies(dependencies), warnings
@@ -217,7 +249,15 @@ def resolve_project_path(project_root: Path, source_file: Path, literal: str) ->
for base in search_roots: for base in search_roots:
resolved = (base / candidate).resolve() resolved = (base / candidate).resolve()
if resolved.exists() and _is_within_root(project_root.resolve(), resolved): if resolved.exists() and _is_within_root(project_root.resolve(), resolved):
logger.debug(
"Resolved literal path %s from %s using base %s -> %s",
literal,
source_file,
base,
resolved,
)
return resolved return resolved
logger.debug("Failed to resolve literal path %s from %s", literal, source_file)
return None return None

191
tests/test_gemini.py Normal file
View File

@@ -0,0 +1,191 @@
from __future__ import annotations
from pathlib import Path
from types import ModuleType, SimpleNamespace
import sys
import unittest
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
fake_dotenv = ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
sys.modules.setdefault("dotenv", fake_dotenv)
from uipath_explainator.config import Settings
from uipath_explainator.gemini import AnalysisError, GeminiAnalyzer
class FakeHttpOptions:
def __init__(self, **kwargs) -> None:
self.kwargs = kwargs
class FakeGenerateContentConfig:
def __init__(self, **kwargs) -> None:
self.kwargs = kwargs
class FakeClient:
def __init__(self, api_key: str, http_options: FakeHttpOptions) -> None:
self.api_key = api_key
self.http_options = http_options
self.models = SimpleNamespace()
class GeminiAnalyzerTests(unittest.TestCase):
def test_init_with_slots_declares_runtime_fields(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
self.assertIs(analyzer._types, fake_types)
self.assertIsInstance(analyzer._client, FakeClient)
self.assertEqual(analyzer._client.api_key, "test-key")
self.assertEqual(analyzer._client.http_options.kwargs, {"timeout": 120_000})
def test_init_strips_version_suffix_from_custom_base_url(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(
Settings(
api_key="test-key",
base_url="https://newapi.tootaio.com/v1beta/",
model="gemini-test",
)
)
self.assertEqual(
analyzer._client.http_options.kwargs,
{"base_url": "https://newapi.tootaio.com", "timeout": 120_000},
)
def test_markdown_output_uses_consulting_format(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
markdown = analyzer._to_markdown(
Path("Flows/Active.xaml"),
{
"overview": {
"purpose": "负责调用 VBA 并准备执行上下文",
"role": "主流程中的子流程节点",
"trigger": "由上游工作流通过 Invoke Workflow 调用",
},
"logic": {
"steps": [
{
"title": "读取 VBA 文件路径",
"detail": "从 `CodeFilePath` 读取脚本位置。",
"why": "定位需要执行的 VBA 代码。",
"result": "得到待执行的脚本文件。",
}
],
"decision_logic": ["如果 `CodeFilePath` 为空,则无法继续执行脚本。"],
"exceptions": ["当前文件未展示脚本执行失败后的补偿逻辑。"],
},
"data": {
"inputs": ["`Scripts/Keep.bas` 路径"],
"outputs": ["VBA 执行结果未直接在当前文件中落盘"],
"variables": ["`CodeFilePath`"],
"external_dependencies": ["外部 VBA 文件 `Scripts/Keep.bas`"],
},
"consultation": {
"business_meaning": "这是把业务动作下沉到 VBA 的桥接层。",
"risks": ["脚本文件缺失会导致执行失败。"],
"example": "例如:财务流程在这里调用 Excel VBA 完成批量格式整理。",
"unknowns": ["无法从当前文件确定 VBA 内部实现逻辑。"],
},
},
)
self.assertIn("## 文件定位", markdown)
self.assertIn("## 流程拆解", markdown)
self.assertIn("1. **读取 VBA 文件路径**", markdown)
self.assertIn("### 输入", markdown)
self.assertIn("## 咨询视角", markdown)
self.assertIn("## 场景范例", markdown)
def test_prompt_requires_strict_structured_json(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
prompt = analyzer._build_prompt(Path("main.xaml"), "<Sequence />")
self.assertIn("请严格返回 JSON", prompt)
self.assertIn("先讲这个文件在整个流程中的定位", prompt)
self.assertIn("判断逻辑、调用链、输入输出、关键变量、外部依赖", prompt)
def test_analyze_wraps_rate_limit_error_with_clear_message(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
error = RuntimeError("quota exceeded")
error.status_code = 429
error.response_json = {"error": {"message": "Resource has been exhausted (e.g. check quota)."}}
analyzer._client.models.generate_content = lambda **_: (_ for _ in ()).throw(error)
with self.assertRaises(AnalysisError) as captured:
analyzer.analyze(Path("main.xaml"), "<Sequence />")
self.assertIn("HTTP 429", str(captured.exception))
self.assertIn("Resource has been exhausted", str(captured.exception))
self.assertIn("--skip-analysis", str(captured.exception))
if __name__ == "__main__":
unittest.main()

View File

@@ -57,6 +57,22 @@ class StubAnalyzer:
return f"# {relative_path.as_posix()}\n\n{len(content)}" return f"# {relative_path.as_posix()}\n\n{len(content)}"
class FlakyAnalyzer:
def analyze(self, relative_path: Path, content: str) -> str:
if relative_path.name == "Active.xaml":
raise RuntimeError("HTTP 429 quota exhausted")
return f"# {relative_path.as_posix()}\n\n{len(content)}"
class RecordingAnalyzer:
def __init__(self) -> None:
self.paths: list[str] = []
def analyze(self, relative_path: Path, content: str) -> str:
self.paths.append(relative_path.as_posix())
return f"# {relative_path.as_posix()}\n\n{len(content)}"
class PipelineTests(unittest.TestCase): class PipelineTests(unittest.TestCase):
def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None: def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None:
source = "<root><ui:CommentOut><x/><ui:CommentOut><y/></ui:CommentOut></ui:CommentOut><z/></root>" source = "<root><ui:CommentOut><x/><ui:CommentOut><y/></ui:CommentOut></ui:CommentOut><z/></root>"
@@ -83,6 +99,8 @@ class PipelineTests(unittest.TestCase):
tmp_path = Path(tmp) tmp_path = Path(tmp)
project_root = tmp_path / "project" project_root = tmp_path / "project"
output_root = tmp_path / "workspace" output_root = tmp_path / "workspace"
code_root = output_root / "code"
docs_root = output_root / "docs"
(project_root / "Flows").mkdir(parents=True) (project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8") (project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8") (project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8")
@@ -104,9 +122,106 @@ class PipelineTests(unittest.TestCase):
self.assertIn("Scripts/Keep.bas", final_files) self.assertIn("Scripts/Keep.bas", final_files)
self.assertNotIn("Flows/Old.xaml", final_files) self.assertNotIn("Flows/Old.xaml", final_files)
self.assertNotIn("Scripts/Drop.bas", final_files) self.assertNotIn("Scripts/Drop.bas", final_files)
self.assertFalse((output_root / "Flows" / "Old.xaml").exists()) self.assertEqual(report.code_root, code_root.resolve())
self.assertFalse((output_root / "Scripts" / "Drop.bas").exists()) self.assertEqual(report.docs_root, docs_root.resolve())
self.assertTrue((output_root / "Flows" / "Active.xaml.analysis.md").exists()) self.assertFalse((code_root / "Flows" / "Old.xaml").exists())
self.assertFalse((code_root / "Scripts" / "Drop.bas").exists())
self.assertTrue((code_root / "Flows" / "Active.xaml").exists())
self.assertTrue((docs_root / "Flows" / "Active.xaml.analysis.md").exists())
self.assertTrue((docs_root / "manifest.json").exists())
overview = (docs_root / "OVERVIEW.md").read_text(encoding="utf-8")
self.assertIn("## Processing Logic", overview)
self.assertIn("Initial Scan", overview)
self.assertIn("## How To Read This Output", overview)
self.assertIn("## Cleaned XAML Files", overview)
self.assertIn("Code Root", overview)
self.assertIn("Docs Root", overview)
def test_pipeline_emits_stage_logs(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "Scripts" / "Drop.bas").write_text("Sub Drop()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
with self.assertLogs("uipath_explainator", level="INFO") as captured:
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(StubAnalyzer())
combined = "\n".join(captured.output)
self.assertIn("Starting pipeline:", combined)
self.assertIn("Initial scan complete:", combined)
self.assertIn("Copied 5 files and cleaned 1 XAML files", combined)
self.assertIn("Final scan complete:", combined)
self.assertIn("Pipeline completed in", combined)
def test_pipeline_keeps_running_when_single_analysis_fails(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
docs_root = output_root / "docs"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
report = ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(FlakyAnalyzer())
self.assertTrue((docs_root / "Flows" / "Active.xaml.analysis.md").exists())
fallback = (docs_root / "Flows" / "Active.xaml.analysis.md").read_text(encoding="utf-8")
self.assertIn("Gemini 分析失败", fallback)
self.assertIn("HTTP 429 quota exhausted", fallback)
self.assertTrue((docs_root / "Scripts" / "Keep.bas.analysis.md").exists())
self.assertTrue(any("Analysis failed for Flows/Active.xaml" in item for item in report.warnings))
def test_pipeline_resume_skips_successfully_cached_analyses(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
first = RecordingAnalyzer()
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(first)
self.assertEqual(
first.paths,
["Flows/Active.xaml", "main.xaml", "Scripts/Keep.bas"],
)
second = RecordingAnalyzer()
ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(second)
self.assertEqual(second.paths, [])
def test_pipeline_resume_retries_failed_analysis_and_reanalyzes_changed_files(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(FlakyAnalyzer())
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nMsgBox \"updated\"\nEnd Sub", encoding="utf-8")
retry = RecordingAnalyzer()
ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(retry)
self.assertEqual(retry.paths, ["Flows/Active.xaml", "Scripts/Keep.bas"])
if __name__ == "__main__": if __name__ == "__main__":