Compare commits
6 Commits
7003dfa0df
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 0cf62d1ac5 | |||
| c73767073e | |||
| 0bdebd5368 | |||
| eef3464257 | |||
| 5ba2e3217a | |||
| d6218d6bad |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -214,3 +214,5 @@ __marimo__/
|
|||||||
|
|
||||||
# Streamlit
|
# Streamlit
|
||||||
.streamlit/secrets.toml
|
.streamlit/secrets.toml
|
||||||
|
|
||||||
|
workspace
|
||||||
47
README.md
47
README.md
@@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
`uipath-explainator` 会按 UiPath 项目入口 `main.xaml` 递归提取 `Invoke Workflow` 和 `Invoke VBA`,复制到指定工作区,删除 `<ui:CommentOut>` 里的无效代码,再重新扫描并裁掉不再使用的文件,最后逐个调用 Gemini 生成说明文档。
|
`uipath-explainator` 会按 UiPath 项目入口 `main.xaml` 递归提取 `Invoke Workflow` 和 `Invoke VBA`,复制到指定工作区,删除 `<ui:CommentOut>` 里的无效代码,再重新扫描并裁掉不再使用的文件,最后逐个调用 Gemini 生成说明文档。
|
||||||
|
|
||||||
|
输出会自动拆成独立的 `code/` 与 `docs/` 两个子目录,避免说明文档和项目文件混在一起。
|
||||||
|
|
||||||
## 安装
|
## 安装
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
@@ -19,14 +21,14 @@ GEMINI_MODEL=gemini-2.5-flash
|
|||||||
```
|
```
|
||||||
|
|
||||||
`GEMINI_BASE_URL` 留空时走官方默认地址;如果你前面挂了代理或网关,可以填自定义地址。
|
`GEMINI_BASE_URL` 留空时走官方默认地址;如果你前面挂了代理或网关,可以填自定义地址。
|
||||||
|
这里填写网关根地址即可,例如 `https://your-gateway.example.com`,不要自己追加 `/v1beta`、`/v1alpha` 或 `/v1`。
|
||||||
|
|
||||||
## 使用
|
## 使用
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./.venv/bin/python -m uipath_explainator \
|
./.venv/bin/python -m uipath_explainator \
|
||||||
/path/to/uipath-project \
|
/path/to/uipath-project \
|
||||||
--output-dir workspace/project-copy \
|
--output-dir workspace/project-bundle
|
||||||
--force
|
|
||||||
```
|
```
|
||||||
|
|
||||||
常用参数:
|
常用参数:
|
||||||
@@ -35,18 +37,49 @@ GEMINI_MODEL=gemini-2.5-flash
|
|||||||
- `--env-file`: 自定义 `.env` 路径
|
- `--env-file`: 自定义 `.env` 路径
|
||||||
- `--model`: 临时覆盖 `.env` 里的模型名
|
- `--model`: 临时覆盖 `.env` 里的模型名
|
||||||
- `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini
|
- `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini
|
||||||
- `--force`: 允许覆盖已有输出目录
|
- `--force`: 删除并重建已有输出目录;不加时会默认续跑,并跳过“已成功分析且源码未变化”的文件
|
||||||
|
- `--log-level`: 日志级别,支持 `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL`
|
||||||
|
- `--log-file`: 把完整日志同时写入文件,便于回溯扫描、剪枝和 Gemini 分析过程
|
||||||
|
|
||||||
|
中途中断后,直接对同一个 `--output-dir` 再运行一次即可继续。程序会复用 `docs/` 里的本地分析缓存,只把新增文件、内容有变化的文件,以及上次失败的文件重新发给 Gemini。
|
||||||
|
|
||||||
## 输出内容
|
## 输出内容
|
||||||
|
|
||||||
输出目录会保留原项目的相对目录结构,并额外生成:
|
输出目录现在是一个 bundle 目录,里面固定包含:
|
||||||
|
|
||||||
- `manifest.json`: 扫描、剪枝、告警和分析结果清单
|
- `code/`: 复制出来并经过清理、剪枝后的项目文件,保留原项目相对目录结构
|
||||||
- `OVERVIEW.md`: 总览摘要
|
- `docs/manifest.json`: 扫描、剪枝、告警和分析结果清单
|
||||||
- `*.analysis.md`: 每个最终保留文件对应的 Gemini 说明
|
- `docs/OVERVIEW.md`: 总览说明书,除了文件数量,还会解释“初次扫描 -> 清理 Comment Out -> 二次扫描 -> 剪枝 -> Gemini 分析”的整条流水线逻辑
|
||||||
|
- `docs/*.analysis.md`: 每个最终保留文件对应的 Gemini 咨询式说明,固定包含:
|
||||||
|
- 文件定位:这个文件在整个流程中的角色、触发方式、核心作用
|
||||||
|
- 流程拆解:按步骤说明它是怎么执行的
|
||||||
|
- 关键逻辑:条件判断、调用关系、边界情况
|
||||||
|
- 数据与依赖:输入、输出、变量、外部文件或系统
|
||||||
|
- 咨询视角:业务含义、风险点、无法确认的信息
|
||||||
|
- 场景范例:帮助交接和理解的示例
|
||||||
|
|
||||||
## 当前实现约束
|
## 当前实现约束
|
||||||
|
|
||||||
- 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。
|
- 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。
|
||||||
- 对动态拼接路径会记录 warning,但不会猜测。
|
- 对动态拼接路径会记录 warning,但不会猜测。
|
||||||
- `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt`、`.bas`、`.vb`、`.vbs`、`.vba`、`.cls`、`.frm`。
|
- `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt`、`.bas`、`.vb`、`.vbs`、`.vba`、`.cls`、`.frm`。
|
||||||
|
|
||||||
|
## 日志说明
|
||||||
|
|
||||||
|
运行时默认输出 `INFO` 级别日志,会记录:
|
||||||
|
|
||||||
|
- CLI 启动参数、配置加载结果、是否启用 Gemini
|
||||||
|
- 依赖扫描开始/结束、文件数量、warning 数量
|
||||||
|
- 复制、清理 Comment Out、二次扫描、剪枝、生成文档等阶段摘要
|
||||||
|
|
||||||
|
如果需要看更细的文件级排查信息,直接把日志级别调到 `DEBUG`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./.venv/bin/python -m uipath_explainator \
|
||||||
|
/path/to/uipath-project \
|
||||||
|
--output-dir workspace/project-bundle \
|
||||||
|
--log-level DEBUG \
|
||||||
|
--log-file workspace/project-bundle/run.log
|
||||||
|
```
|
||||||
|
|
||||||
|
`DEBUG` 会额外记录每个文件的复制、依赖解析、路径解析、剪枝、分析文档写出等细节。
|
||||||
|
|||||||
@@ -2,21 +2,38 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import logging
|
||||||
|
|
||||||
from .config import Settings
|
from .config import Settings
|
||||||
from .gemini import GeminiAnalyzer
|
from .gemini import GeminiAnalyzer
|
||||||
|
from .logging_utils import configure_logging
|
||||||
from .pipeline import ProjectPipeline
|
from .pipeline import ProjectPipeline
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def build_parser() -> ArgumentParser:
|
def build_parser() -> ArgumentParser:
|
||||||
parser = ArgumentParser(description="Extract and explain UiPath project dependencies.")
|
parser = ArgumentParser(description="Extract and explain UiPath project dependencies.")
|
||||||
parser.add_argument("project_dir", type=Path, help="UiPath project root directory")
|
parser.add_argument("project_dir", type=Path, help="UiPath project root directory")
|
||||||
parser.add_argument("--output-dir", type=Path, default=Path("workspace"), help="Copied project output directory")
|
parser.add_argument(
|
||||||
|
"--output-dir",
|
||||||
|
type=Path,
|
||||||
|
default=Path("workspace"),
|
||||||
|
help="Output bundle directory containing separate code/ and docs/ folders",
|
||||||
|
)
|
||||||
parser.add_argument("--entry", default="main.xaml", help="Entry XAML file name")
|
parser.add_argument("--entry", default="main.xaml", help="Entry XAML file name")
|
||||||
parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config")
|
parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config")
|
||||||
parser.add_argument("--model", help="Override GEMINI_MODEL")
|
parser.add_argument("--model", help="Override GEMINI_MODEL")
|
||||||
parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files")
|
parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files")
|
||||||
parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists")
|
parser.add_argument("--force", action="store_true", help="Delete and rebuild the output directory instead of resuming")
|
||||||
|
parser.add_argument(
|
||||||
|
"--log-level",
|
||||||
|
default="INFO",
|
||||||
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
||||||
|
help="Logging verbosity",
|
||||||
|
)
|
||||||
|
parser.add_argument("--log-file", type=Path, help="Optional file path to write logs to")
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
@@ -32,18 +49,41 @@ def main(argv: list[str] | None = None) -> int:
|
|||||||
if not output_dir.is_absolute():
|
if not output_dir.is_absolute():
|
||||||
output_dir = Path.cwd() / output_dir
|
output_dir = Path.cwd() / output_dir
|
||||||
|
|
||||||
settings = Settings.from_env(args.env_file if args.env_file.exists() else None, model_override=args.model)
|
log_file = args.log_file.expanduser() if args.log_file else None
|
||||||
analyzer = None if args.skip_analysis else GeminiAnalyzer(settings)
|
if log_file is not None and not log_file.is_absolute():
|
||||||
|
log_file = Path.cwd() / log_file
|
||||||
|
configure_logging(level_name=args.log_level, log_file=log_file)
|
||||||
|
|
||||||
pipeline = ProjectPipeline(
|
logger.info(
|
||||||
project_root=project_root,
|
"Starting CLI run: project_root=%s output_dir=%s entry=%s analysis=%s",
|
||||||
output_root=output_dir,
|
project_root,
|
||||||
entry_name=args.entry,
|
output_dir,
|
||||||
force=args.force,
|
args.entry,
|
||||||
|
not args.skip_analysis,
|
||||||
)
|
)
|
||||||
report = pipeline.run(analyzer=analyzer)
|
|
||||||
|
env_file = args.env_file if args.env_file.exists() else None
|
||||||
|
if args.env_file and env_file is None:
|
||||||
|
logger.warning("Environment file not found, falling back to default dotenv lookup: %s", args.env_file)
|
||||||
|
|
||||||
|
try:
|
||||||
|
settings = Settings.from_env(env_file, model_override=args.model)
|
||||||
|
analyzer = None if args.skip_analysis else GeminiAnalyzer(settings)
|
||||||
|
|
||||||
|
pipeline = ProjectPipeline(
|
||||||
|
project_root=project_root,
|
||||||
|
output_root=output_dir,
|
||||||
|
entry_name=args.entry,
|
||||||
|
force=args.force,
|
||||||
|
)
|
||||||
|
report = pipeline.run(analyzer=analyzer)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("CLI run failed")
|
||||||
|
return 1
|
||||||
|
|
||||||
print(f"Output written to: {report.output_root}")
|
print(f"Output written to: {report.output_root}")
|
||||||
|
print(f"Code written to: {report.code_root}")
|
||||||
|
print(f"Docs written to: {report.docs_root}")
|
||||||
print(f"Final files: {len(report.final_files)}")
|
print(f"Final files: {len(report.final_files)}")
|
||||||
print(f"Pruned files: {len(report.pruned_files)}")
|
print(f"Pruned files: {len(report.pruned_files)}")
|
||||||
print(f"Warnings: {len(report.warnings)}")
|
print(f"Warnings: {len(report.warnings)}")
|
||||||
|
|||||||
@@ -2,12 +2,17 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
from typing import Self
|
from typing import Self
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class Settings:
|
class Settings:
|
||||||
api_key: str | None
|
api_key: str | None
|
||||||
@@ -17,16 +22,33 @@ class Settings:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self:
|
def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self:
|
||||||
if env_file:
|
if env_file:
|
||||||
|
logger.info("Loading environment variables from %s", env_file)
|
||||||
load_dotenv(env_file)
|
load_dotenv(env_file)
|
||||||
else:
|
else:
|
||||||
|
logger.debug("Loading environment variables using default dotenv lookup")
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
return cls(
|
settings = cls(
|
||||||
api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"),
|
api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"),
|
||||||
base_url=os.getenv("GEMINI_BASE_URL") or None,
|
base_url=os.getenv("GEMINI_BASE_URL") or None,
|
||||||
model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash",
|
model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash",
|
||||||
)
|
)
|
||||||
|
logger.info(
|
||||||
|
"Configuration loaded: model=%s base_url=%s api_key=%s",
|
||||||
|
settings.model,
|
||||||
|
settings.normalized_base_url() or "<default>",
|
||||||
|
"set" if settings.api_key else "missing",
|
||||||
|
)
|
||||||
|
return settings
|
||||||
|
|
||||||
def require_api_key(self) -> None:
|
def require_api_key(self) -> None:
|
||||||
if not self.api_key:
|
if not self.api_key:
|
||||||
raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.")
|
raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.")
|
||||||
|
logger.debug("Gemini API key is available")
|
||||||
|
|
||||||
|
def normalized_base_url(self) -> str | None:
|
||||||
|
if not self.base_url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
base_url = self.base_url.strip().rstrip("/")
|
||||||
|
return re.sub(r"/v\d+(?:alpha|beta)?$", "", base_url, flags=re.IGNORECASE)
|
||||||
|
|||||||
@@ -1,15 +1,28 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
from time import perf_counter
|
||||||
|
|
||||||
from .config import Settings
|
from .config import Settings
|
||||||
|
|
||||||
|
|
||||||
|
UNKNOWN_TEXT = "无法从当前文件确定"
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AnalysisError(RuntimeError):
|
||||||
|
"""Raised when a single file analysis cannot be completed."""
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class GeminiAnalyzer:
|
class GeminiAnalyzer:
|
||||||
settings: Settings
|
settings: Settings
|
||||||
|
_types: Any = field(init=False, repr=False)
|
||||||
|
_client: Any = field(init=False, repr=False)
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
self.settings.require_api_key()
|
self.settings.require_api_key()
|
||||||
@@ -18,47 +31,187 @@ class GeminiAnalyzer:
|
|||||||
from google.genai import types
|
from google.genai import types
|
||||||
|
|
||||||
http_options = types.HttpOptions(timeout=120_000)
|
http_options = types.HttpOptions(timeout=120_000)
|
||||||
if self.settings.base_url:
|
base_url = self.settings.normalized_base_url()
|
||||||
http_options = types.HttpOptions(base_url=self.settings.base_url, timeout=120_000)
|
if base_url:
|
||||||
|
http_options = types.HttpOptions(base_url=base_url, timeout=120_000)
|
||||||
|
|
||||||
self._types = types
|
self._types = types
|
||||||
self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options)
|
self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options)
|
||||||
|
logger.info(
|
||||||
def analyze(self, relative_path: Path, content: str) -> str:
|
"Gemini analyzer initialized: model=%s base_url=%s",
|
||||||
prompt = self._build_prompt(relative_path, content)
|
self.settings.model,
|
||||||
response = self._client.models.generate_content(
|
base_url or "<default>",
|
||||||
model=self.settings.model,
|
|
||||||
contents=prompt,
|
|
||||||
config=self._types.GenerateContentConfig(
|
|
||||||
temperature=0.2,
|
|
||||||
response_mime_type="application/json",
|
|
||||||
response_schema={
|
|
||||||
"type": "OBJECT",
|
|
||||||
"required": ["summary", "implementation", "data", "example"],
|
|
||||||
"properties": {
|
|
||||||
"summary": {"type": "STRING"},
|
|
||||||
"implementation": {"type": "STRING"},
|
|
||||||
"data": {"type": "STRING"},
|
|
||||||
"example": {"type": "STRING"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
payload = json.loads(response.text)
|
def analyze(self, relative_path: Path, content: str) -> str:
|
||||||
|
started = perf_counter()
|
||||||
|
logger.info("Submitting Gemini analysis for %s (%d chars)", relative_path.as_posix(), len(content))
|
||||||
|
prompt = self._build_prompt(relative_path, content)
|
||||||
|
try:
|
||||||
|
response = self._client.models.generate_content(
|
||||||
|
model=self.settings.model,
|
||||||
|
contents=prompt,
|
||||||
|
config=self._types.GenerateContentConfig(
|
||||||
|
temperature=0.2,
|
||||||
|
response_mime_type="application/json",
|
||||||
|
response_schema=self._response_schema(),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
response_text = response.text or ""
|
||||||
|
logger.debug(
|
||||||
|
"Gemini response received for %s (%d chars)",
|
||||||
|
relative_path.as_posix(),
|
||||||
|
len(response_text),
|
||||||
|
)
|
||||||
|
payload = json.loads(response_text)
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
logger.exception(
|
||||||
|
"Gemini returned invalid JSON for %s. Response snippet: %r",
|
||||||
|
relative_path.as_posix(),
|
||||||
|
response_text[:500],
|
||||||
|
)
|
||||||
|
raise AnalysisError(
|
||||||
|
f"Gemini 返回了无法解析的 JSON,无法生成该文件说明。原始错误: {exc}"
|
||||||
|
) from exc
|
||||||
|
except Exception as exc:
|
||||||
|
summary = self._summarize_error(exc)
|
||||||
|
logger.exception("Gemini analysis failed for %s: %s", relative_path.as_posix(), summary)
|
||||||
|
raise AnalysisError(summary) from exc
|
||||||
|
logger.info(
|
||||||
|
"Gemini analysis completed for %s in %.2fs",
|
||||||
|
relative_path.as_posix(),
|
||||||
|
perf_counter() - started,
|
||||||
|
)
|
||||||
return self._to_markdown(relative_path, payload)
|
return self._to_markdown(relative_path, payload)
|
||||||
|
|
||||||
|
def _summarize_error(self, exc: Exception) -> str:
|
||||||
|
status_code = getattr(exc, "status_code", None)
|
||||||
|
response_json = getattr(exc, "response_json", None)
|
||||||
|
upstream_message = self._extract_error_message(response_json)
|
||||||
|
|
||||||
|
if status_code == 429:
|
||||||
|
detail = upstream_message or "Resource has been exhausted"
|
||||||
|
return (
|
||||||
|
"Gemini 配额或速率限制已触发(HTTP 429),当前文件说明未生成。"
|
||||||
|
f"上游信息: {detail}。可稍后重试,或使用 --skip-analysis 仅导出代码与依赖。"
|
||||||
|
)
|
||||||
|
|
||||||
|
if status_code is not None:
|
||||||
|
detail = upstream_message or str(exc).strip() or exc.__class__.__name__
|
||||||
|
return f"Gemini 调用失败(HTTP {status_code})。上游信息: {detail}"
|
||||||
|
|
||||||
|
detail = str(exc).strip()
|
||||||
|
if detail:
|
||||||
|
return f"Gemini 分析失败: {detail}"
|
||||||
|
return f"Gemini 分析失败: {exc.__class__.__name__}"
|
||||||
|
|
||||||
|
def _extract_error_message(self, response_json: Any) -> str | None:
|
||||||
|
if not isinstance(response_json, dict):
|
||||||
|
return None
|
||||||
|
|
||||||
|
error = response_json.get("error")
|
||||||
|
if isinstance(error, dict):
|
||||||
|
message = error.get("message")
|
||||||
|
if isinstance(message, str) and message.strip():
|
||||||
|
return message.strip()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _response_schema(self) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"type": "OBJECT",
|
||||||
|
"required": ["overview", "logic", "data", "consultation"],
|
||||||
|
"properties": {
|
||||||
|
"overview": {
|
||||||
|
"type": "OBJECT",
|
||||||
|
"required": ["purpose", "role", "trigger"],
|
||||||
|
"properties": {
|
||||||
|
"purpose": {"type": "STRING"},
|
||||||
|
"role": {"type": "STRING"},
|
||||||
|
"trigger": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"logic": {
|
||||||
|
"type": "OBJECT",
|
||||||
|
"required": ["steps", "decision_logic", "exceptions"],
|
||||||
|
"properties": {
|
||||||
|
"steps": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {
|
||||||
|
"type": "OBJECT",
|
||||||
|
"required": ["title", "detail", "why", "result"],
|
||||||
|
"properties": {
|
||||||
|
"title": {"type": "STRING"},
|
||||||
|
"detail": {"type": "STRING"},
|
||||||
|
"why": {"type": "STRING"},
|
||||||
|
"result": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"decision_logic": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
"exceptions": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"type": "OBJECT",
|
||||||
|
"required": ["inputs", "outputs", "variables", "external_dependencies"],
|
||||||
|
"properties": {
|
||||||
|
"inputs": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
"outputs": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
"variables": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
"external_dependencies": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"consultation": {
|
||||||
|
"type": "OBJECT",
|
||||||
|
"required": ["business_meaning", "risks", "example", "unknowns"],
|
||||||
|
"properties": {
|
||||||
|
"business_meaning": {"type": "STRING"},
|
||||||
|
"risks": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
"example": {"type": "STRING"},
|
||||||
|
"unknowns": {
|
||||||
|
"type": "ARRAY",
|
||||||
|
"items": {"type": "STRING"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
def _build_prompt(self, relative_path: Path, content: str) -> str:
|
def _build_prompt(self, relative_path: Path, content: str) -> str:
|
||||||
language = "xml" if relative_path.suffix.lower() == ".xaml" else "vb"
|
language = "xml" if relative_path.suffix.lower() == ".xaml" else "vb"
|
||||||
return f"""你是资深 UiPath 自动化架构师。请只基于给定文件内容分析,不要臆造未出现的系统、字段或业务规则。
|
return f"""你是资深 UiPath 自动化架构师。请只基于给定文件内容分析,不要臆造未出现的系统、字段或业务规则。
|
||||||
|
|
||||||
请用中文返回 JSON,字段固定为:
|
请严格返回 JSON,并且完全匹配给定 schema。所有字段都必须填写。
|
||||||
- summary: 这个文件在做什么
|
|
||||||
- implementation: 这个文件怎么做
|
|
||||||
- data: 这个文件涉及的数据、变量、参数、外部文件或系统
|
|
||||||
- example: 一个贴近当前代码的说明性范例
|
|
||||||
|
|
||||||
如果信息不足,请明确写“无法从当前文件确定”。
|
输出要求:
|
||||||
|
1. 用中文,写成“给交接人员/业务方看的咨询说明”,不要只给一句笼统概括。
|
||||||
|
2. 先讲这个文件在整个流程中的定位,再按执行顺序拆解步骤。
|
||||||
|
3. 重点解释判断逻辑、调用链、输入输出、关键变量、外部依赖。
|
||||||
|
4. 如果代码里能看出顺序动作,请在 steps 里拆成明确步骤;不要只复述节点名称。
|
||||||
|
5. 无法确认的信息必须明确写“无法从当前文件确定”,数组字段则写成仅包含这一项的数组。
|
||||||
|
6. 不要输出 JSON 以外的任何文字。
|
||||||
|
|
||||||
文件路径: {relative_path.as_posix()}
|
文件路径: {relative_path.as_posix()}
|
||||||
代码:
|
代码:
|
||||||
@@ -66,22 +219,109 @@ class GeminiAnalyzer:
|
|||||||
{content}
|
{content}
|
||||||
```"""
|
```"""
|
||||||
|
|
||||||
def _to_markdown(self, relative_path: Path, payload: dict[str, str]) -> str:
|
def _to_markdown(self, relative_path: Path, payload: dict[str, Any]) -> str:
|
||||||
|
overview = payload.get("overview") or {}
|
||||||
|
logic = payload.get("logic") or {}
|
||||||
|
data = payload.get("data") or {}
|
||||||
|
consultation = payload.get("consultation") or {}
|
||||||
|
|
||||||
return "\n".join(
|
return "\n".join(
|
||||||
[
|
[
|
||||||
f"# {relative_path.as_posix()}",
|
f"# {relative_path.as_posix()}",
|
||||||
"",
|
"",
|
||||||
"## 做什么",
|
"## 文件定位",
|
||||||
payload.get("summary", "").strip() or "无法从当前文件确定",
|
f"- 核心作用:{self._clean_text(overview.get('purpose'))}",
|
||||||
|
f"- 流程角色:{self._clean_text(overview.get('role'))}",
|
||||||
|
f"- 触发方式:{self._clean_text(overview.get('trigger'))}",
|
||||||
"",
|
"",
|
||||||
"## 怎么做",
|
"## 流程拆解",
|
||||||
payload.get("implementation", "").strip() or "无法从当前文件确定",
|
self._render_steps(logic.get("steps")),
|
||||||
"",
|
"",
|
||||||
"## 涉及数据",
|
"## 关键逻辑",
|
||||||
payload.get("data", "").strip() or "无法从当前文件确定",
|
self._render_bullets(logic.get("decision_logic")),
|
||||||
"",
|
"",
|
||||||
"## 范例",
|
"## 异常与边界",
|
||||||
payload.get("example", "").strip() or "无法从当前文件确定",
|
self._render_bullets(logic.get("exceptions")),
|
||||||
|
"",
|
||||||
|
"## 数据与依赖",
|
||||||
|
"### 输入",
|
||||||
|
self._render_bullets(data.get("inputs")),
|
||||||
|
"",
|
||||||
|
"### 输出",
|
||||||
|
self._render_bullets(data.get("outputs")),
|
||||||
|
"",
|
||||||
|
"### 关键变量",
|
||||||
|
self._render_bullets(data.get("variables")),
|
||||||
|
"",
|
||||||
|
"### 外部依赖",
|
||||||
|
self._render_bullets(data.get("external_dependencies")),
|
||||||
|
"",
|
||||||
|
"## 咨询视角",
|
||||||
|
f"- 业务含义:{self._clean_text(consultation.get('business_meaning'))}",
|
||||||
|
"",
|
||||||
|
"### 风险与注意点",
|
||||||
|
self._render_bullets(consultation.get("risks")),
|
||||||
|
"",
|
||||||
|
"### 无法确认的点",
|
||||||
|
self._render_bullets(consultation.get("unknowns")),
|
||||||
|
"",
|
||||||
|
"## 场景范例",
|
||||||
|
self._clean_text(consultation.get("example")),
|
||||||
"",
|
"",
|
||||||
]
|
]
|
||||||
)
|
).rstrip() + "\n"
|
||||||
|
|
||||||
|
def _render_steps(self, items: Any) -> str:
|
||||||
|
if not isinstance(items, list):
|
||||||
|
items = []
|
||||||
|
|
||||||
|
normalized: list[dict[str, str]] = []
|
||||||
|
for item in items:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
normalized.append(
|
||||||
|
{
|
||||||
|
"title": self._clean_text(item.get("title")),
|
||||||
|
"detail": self._clean_text(item.get("detail")),
|
||||||
|
"why": self._clean_text(item.get("why")),
|
||||||
|
"result": self._clean_text(item.get("result")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if not normalized:
|
||||||
|
normalized = [
|
||||||
|
{
|
||||||
|
"title": UNKNOWN_TEXT,
|
||||||
|
"detail": UNKNOWN_TEXT,
|
||||||
|
"why": UNKNOWN_TEXT,
|
||||||
|
"result": UNKNOWN_TEXT,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
lines: list[str] = []
|
||||||
|
for index, item in enumerate(normalized, start=1):
|
||||||
|
lines.extend(
|
||||||
|
[
|
||||||
|
f"{index}. **{item['title']}**",
|
||||||
|
f" - 动作:{item['detail']}",
|
||||||
|
f" - 目的:{item['why']}",
|
||||||
|
f" - 结果:{item['result']}",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
def _render_bullets(self, items: Any) -> str:
|
||||||
|
if not isinstance(items, list):
|
||||||
|
items = []
|
||||||
|
|
||||||
|
cleaned = [self._clean_text(item) for item in items if isinstance(item, str) and item.strip()]
|
||||||
|
if not cleaned:
|
||||||
|
cleaned = [UNKNOWN_TEXT]
|
||||||
|
return "\n".join(f"- {item}" for item in cleaned)
|
||||||
|
|
||||||
|
def _clean_text(self, value: Any) -> str:
|
||||||
|
if isinstance(value, str):
|
||||||
|
stripped = value.strip()
|
||||||
|
if stripped:
|
||||||
|
return stripped
|
||||||
|
return UNKNOWN_TEXT
|
||||||
|
|||||||
28
src/uipath_explainator/logging_utils.py
Normal file
28
src/uipath_explainator/logging_utils.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
|
||||||
|
|
||||||
|
|
||||||
|
def configure_logging(level_name: str = "INFO", log_file: Path | None = None) -> None:
|
||||||
|
level = getattr(logging, level_name.upper(), logging.INFO)
|
||||||
|
formatter = logging.Formatter(LOG_FORMAT)
|
||||||
|
|
||||||
|
handlers: list[logging.Handler] = [logging.StreamHandler()]
|
||||||
|
if log_file is not None:
|
||||||
|
log_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
handlers.append(logging.FileHandler(log_file, encoding="utf-8"))
|
||||||
|
|
||||||
|
root_logger = logging.getLogger()
|
||||||
|
root_logger.handlers.clear()
|
||||||
|
root_logger.setLevel(level)
|
||||||
|
|
||||||
|
for handler in handlers:
|
||||||
|
handler.setLevel(level)
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
root_logger.addHandler(handler)
|
||||||
|
|
||||||
|
logging.captureWarnings(True)
|
||||||
@@ -1,17 +1,26 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from hashlib import sha256
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shutil import copy2, rmtree
|
from shutil import copy2, rmtree
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
from time import perf_counter
|
||||||
|
|
||||||
from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks
|
from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
ANALYSIS_CACHE_NAME = ".analysis-cache.json"
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class PipelineReport:
|
class PipelineReport:
|
||||||
project_root: Path
|
project_root: Path
|
||||||
output_root: Path
|
output_root: Path
|
||||||
|
code_root: Path
|
||||||
|
docs_root: Path
|
||||||
entry_file: Path
|
entry_file: Path
|
||||||
initial_files: list[Path]
|
initial_files: list[Path]
|
||||||
final_files: list[Path]
|
final_files: list[Path]
|
||||||
@@ -25,6 +34,8 @@ class PipelineReport:
|
|||||||
{
|
{
|
||||||
"project_root": self.project_root.as_posix(),
|
"project_root": self.project_root.as_posix(),
|
||||||
"output_root": self.output_root.as_posix(),
|
"output_root": self.output_root.as_posix(),
|
||||||
|
"code_root": self.code_root.as_posix(),
|
||||||
|
"docs_root": self.docs_root.as_posix(),
|
||||||
"entry_file": self.entry_file.as_posix(),
|
"entry_file": self.entry_file.as_posix(),
|
||||||
"initial_files": [item.as_posix() for item in self.initial_files],
|
"initial_files": [item.as_posix() for item in self.initial_files],
|
||||||
"final_files": [item.as_posix() for item in self.final_files],
|
"final_files": [item.as_posix() for item in self.final_files],
|
||||||
@@ -42,30 +53,56 @@ class ProjectPipeline:
|
|||||||
def __init__(self, project_root: Path, output_root: Path, entry_name: str, force: bool = False) -> None:
|
def __init__(self, project_root: Path, output_root: Path, entry_name: str, force: bool = False) -> None:
|
||||||
self.project_root = project_root.resolve()
|
self.project_root = project_root.resolve()
|
||||||
self.output_root = output_root.resolve()
|
self.output_root = output_root.resolve()
|
||||||
|
self.code_root = self.output_root / "code"
|
||||||
|
self.docs_root = self.output_root / "docs"
|
||||||
self.entry_name = entry_name
|
self.entry_name = entry_name
|
||||||
self.force = force
|
self.force = force
|
||||||
|
|
||||||
def run(self, analyzer=None) -> PipelineReport:
|
def run(self, analyzer=None) -> PipelineReport:
|
||||||
|
started = perf_counter()
|
||||||
|
logger.info(
|
||||||
|
"Starting pipeline: project_root=%s output_root=%s entry=%s analysis=%s force=%s",
|
||||||
|
self.project_root,
|
||||||
|
self.output_root,
|
||||||
|
self.entry_name,
|
||||||
|
analyzer is not None,
|
||||||
|
self.force,
|
||||||
|
)
|
||||||
entry_file = find_entry_file(self.project_root, self.entry_name)
|
entry_file = find_entry_file(self.project_root, self.entry_name)
|
||||||
entry_rel = entry_file.relative_to(self.project_root)
|
entry_rel = entry_file.relative_to(self.project_root)
|
||||||
|
logger.debug("Using entry file %s", entry_rel.as_posix())
|
||||||
|
|
||||||
initial_scan = crawl_dependencies(self.project_root, entry_file)
|
initial_scan = crawl_dependencies(self.project_root, entry_file)
|
||||||
initial_rel_files = self._relative_files(initial_scan, self.project_root)
|
initial_rel_files = self._relative_files(initial_scan, self.project_root)
|
||||||
|
logger.info(
|
||||||
|
"Initial scan complete: files=%d warnings=%d",
|
||||||
|
len(initial_rel_files),
|
||||||
|
len(initial_scan.warnings),
|
||||||
|
)
|
||||||
|
|
||||||
self._prepare_output_root()
|
self._prepare_output_root()
|
||||||
self._copy_files(initial_rel_files)
|
self._copy_files(initial_rel_files)
|
||||||
cleaned_files = self._clean_copied_xaml_files(initial_rel_files)
|
cleaned_files = self._clean_copied_xaml_files(initial_rel_files)
|
||||||
|
logger.info("Copied %d files and cleaned %d XAML files", len(initial_rel_files), len(cleaned_files))
|
||||||
|
|
||||||
final_scan = crawl_dependencies(self.output_root, self.output_root / entry_rel)
|
final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel)
|
||||||
final_rel_files = self._relative_files(final_scan, self.output_root)
|
final_rel_files = self._relative_files(final_scan, self.code_root)
|
||||||
pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files)
|
pruned_files = self._prune_unused_files(final_rel_files)
|
||||||
|
logger.info(
|
||||||
|
"Final scan complete: files=%d warnings=%d pruned=%d",
|
||||||
|
len(final_rel_files),
|
||||||
|
len(final_scan.warnings),
|
||||||
|
len(pruned_files),
|
||||||
|
)
|
||||||
|
|
||||||
analysis_files = self._write_analysis(final_rel_files, analyzer)
|
analysis_files, analysis_warnings = self._write_analysis(final_rel_files, analyzer)
|
||||||
warnings = initial_scan.warnings + final_scan.warnings
|
warnings = initial_scan.warnings + final_scan.warnings + analysis_warnings
|
||||||
|
|
||||||
report = PipelineReport(
|
report = PipelineReport(
|
||||||
project_root=self.project_root,
|
project_root=self.project_root,
|
||||||
output_root=self.output_root,
|
output_root=self.output_root,
|
||||||
|
code_root=self.code_root,
|
||||||
|
docs_root=self.docs_root,
|
||||||
entry_file=entry_rel,
|
entry_file=entry_rel,
|
||||||
initial_files=initial_rel_files,
|
initial_files=initial_rel_files,
|
||||||
final_files=final_rel_files,
|
final_files=final_rel_files,
|
||||||
@@ -75,85 +112,122 @@ class ProjectPipeline:
|
|||||||
analysis_files=analysis_files,
|
analysis_files=analysis_files,
|
||||||
)
|
)
|
||||||
self._write_report_files(report)
|
self._write_report_files(report)
|
||||||
|
logger.info(
|
||||||
|
"Pipeline completed in %.2fs: final_files=%d analysis_files=%d warnings=%d",
|
||||||
|
perf_counter() - started,
|
||||||
|
len(report.final_files),
|
||||||
|
len(report.analysis_files),
|
||||||
|
len(report.warnings),
|
||||||
|
)
|
||||||
return report
|
return report
|
||||||
|
|
||||||
def _prepare_output_root(self) -> None:
|
def _prepare_output_root(self) -> None:
|
||||||
if self.output_root.exists():
|
if self.output_root.exists():
|
||||||
if not self.force:
|
if self.force:
|
||||||
raise FileExistsError(f"Output directory already exists: {self.output_root}")
|
logger.info("Removing existing output directory because force=True: %s", self.output_root)
|
||||||
rmtree(self.output_root)
|
rmtree(self.output_root)
|
||||||
self.output_root.mkdir(parents=True, exist_ok=True)
|
else:
|
||||||
|
logger.info("Reusing existing output directory for incremental run: %s", self.output_root)
|
||||||
|
self.code_root.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.docs_root.mkdir(parents=True, exist_ok=True)
|
||||||
|
logger.debug("Prepared output directories: code=%s docs=%s", self.code_root, self.docs_root)
|
||||||
|
|
||||||
def _copy_files(self, relative_files: list[Path]) -> None:
|
def _copy_files(self, relative_files: list[Path]) -> None:
|
||||||
for relative_path in relative_files:
|
for relative_path in relative_files:
|
||||||
source = self.project_root / relative_path
|
source = self.project_root / relative_path
|
||||||
destination = self.output_root / relative_path
|
destination = self.code_root / relative_path
|
||||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||||
copy2(source, destination)
|
copy2(source, destination)
|
||||||
|
logger.debug("Copied file: %s -> %s", source, destination)
|
||||||
|
|
||||||
def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]:
|
def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]:
|
||||||
cleaned: list[Path] = []
|
cleaned: list[Path] = []
|
||||||
for relative_path in relative_files:
|
for relative_path in relative_files:
|
||||||
if relative_path.suffix.lower() != ".xaml":
|
if relative_path.suffix.lower() != ".xaml":
|
||||||
continue
|
continue
|
||||||
output_file = self.output_root / relative_path
|
output_file = self.code_root / relative_path
|
||||||
original = read_text(output_file)
|
original = read_text(output_file)
|
||||||
updated = strip_comment_out_blocks(original)
|
updated = strip_comment_out_blocks(original)
|
||||||
if updated != original:
|
if updated != original:
|
||||||
output_file.write_text(updated, encoding="utf-8")
|
output_file.write_text(updated, encoding="utf-8")
|
||||||
cleaned.append(relative_path)
|
cleaned.append(relative_path)
|
||||||
|
logger.debug("Removed CommentOut blocks from %s", output_file)
|
||||||
return cleaned
|
return cleaned
|
||||||
|
|
||||||
def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]:
|
def _prune_unused_files(self, final_files: list[Path]) -> list[Path]:
|
||||||
final_set = set(final_files)
|
final_set = set(final_files)
|
||||||
pruned: list[Path] = []
|
pruned: list[Path] = []
|
||||||
for relative_path in initial_files:
|
for relative_path in self._existing_relative_files(self.code_root):
|
||||||
if relative_path in final_set:
|
if relative_path in final_set:
|
||||||
continue
|
continue
|
||||||
target = self.output_root / relative_path
|
target = self.code_root / relative_path
|
||||||
if target.exists():
|
if target.exists():
|
||||||
target.unlink()
|
target.unlink()
|
||||||
pruned.append(relative_path)
|
pruned.append(relative_path)
|
||||||
self._cleanup_empty_dirs()
|
logger.debug("Pruned unreachable file: %s", target)
|
||||||
|
self._cleanup_empty_dirs(self.code_root)
|
||||||
return pruned
|
return pruned
|
||||||
|
|
||||||
def _cleanup_empty_dirs(self) -> None:
|
def _cleanup_empty_dirs(self, root: Path) -> None:
|
||||||
directories = sorted(
|
directories = sorted([path for path in root.rglob("*") if path.is_dir()], key=lambda item: len(item.parts), reverse=True)
|
||||||
[path for path in self.output_root.rglob("*") if path.is_dir()],
|
|
||||||
key=lambda item: len(item.parts),
|
|
||||||
reverse=True,
|
|
||||||
)
|
|
||||||
for directory in directories:
|
for directory in directories:
|
||||||
if any(directory.iterdir()):
|
if any(directory.iterdir()):
|
||||||
continue
|
continue
|
||||||
directory.rmdir()
|
directory.rmdir()
|
||||||
|
logger.debug("Removed empty directory: %s", directory)
|
||||||
|
|
||||||
def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]:
|
def _write_analysis(self, final_files: list[Path], analyzer) -> tuple[list[Path], list[str]]:
|
||||||
if analyzer is None:
|
if analyzer is None:
|
||||||
return []
|
logger.info("Skipping Gemini analysis because analyzer is disabled")
|
||||||
|
self._clear_analysis_outputs()
|
||||||
|
return [], []
|
||||||
|
|
||||||
output_files: list[Path] = []
|
output_files: list[Path] = []
|
||||||
|
warnings: list[str] = []
|
||||||
|
cache = self._load_analysis_cache()
|
||||||
|
cache = self._prune_stale_analysis_outputs(final_files, cache)
|
||||||
|
self._save_analysis_cache(cache)
|
||||||
for relative_path in self._ordered_files(final_files):
|
for relative_path in self._ordered_files(final_files):
|
||||||
content = read_text(self.output_root / relative_path)
|
analysis_rel_path = Path(f"{relative_path.as_posix()}.analysis.md")
|
||||||
analysis = analyzer.analyze(relative_path, content)
|
analysis_path = self.docs_root / analysis_rel_path
|
||||||
analysis_path = self.output_root / f"{relative_path.as_posix()}.analysis.md"
|
content = read_text(self.code_root / relative_path)
|
||||||
|
content_hash = self._content_hash(content)
|
||||||
|
cache_entry = cache.get(relative_path.as_posix())
|
||||||
|
if self._can_reuse_analysis(relative_path, analysis_path, content_hash, cache_entry):
|
||||||
|
output_files.append(analysis_rel_path)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
analysis = analyzer.analyze(relative_path, content)
|
||||||
|
cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "success"}
|
||||||
|
except Exception as exc:
|
||||||
|
warning = f"Analysis failed for {relative_path.as_posix()}: {self._format_analysis_error(exc)}"
|
||||||
|
warnings.append(warning)
|
||||||
|
logger.warning(warning)
|
||||||
|
analysis = self._build_failed_analysis(relative_path, exc)
|
||||||
|
cache[relative_path.as_posix()] = {"content_hash": content_hash, "status": "failed"}
|
||||||
analysis_path.parent.mkdir(parents=True, exist_ok=True)
|
analysis_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
analysis_path.write_text(analysis, encoding="utf-8")
|
analysis_path.write_text(analysis, encoding="utf-8")
|
||||||
output_files.append(Path(f"{relative_path.as_posix()}.analysis.md"))
|
self._save_analysis_cache(cache)
|
||||||
return output_files
|
output_files.append(analysis_rel_path)
|
||||||
|
logger.debug("Wrote analysis file: %s", analysis_path)
|
||||||
|
return output_files, warnings
|
||||||
|
|
||||||
def _write_report_files(self, report: PipelineReport) -> None:
|
def _write_report_files(self, report: PipelineReport) -> None:
|
||||||
(self.output_root / "manifest.json").write_text(report.to_json(), encoding="utf-8")
|
(self.docs_root / "manifest.json").write_text(report.to_json(), encoding="utf-8")
|
||||||
(self.output_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8")
|
(self.docs_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8")
|
||||||
|
logger.debug("Wrote report files to %s", self.docs_root)
|
||||||
|
|
||||||
def _build_overview(self, report: PipelineReport) -> str:
|
def _build_overview(self, report: PipelineReport) -> str:
|
||||||
warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无"
|
warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无"
|
||||||
|
cleaned = "\n".join(f"- {item.as_posix()}" for item in report.cleaned_files) or "- 无"
|
||||||
pruned = "\n".join(f"- {item.as_posix()}" for item in report.pruned_files) or "- 无"
|
pruned = "\n".join(f"- {item.as_posix()}" for item in report.pruned_files) or "- 无"
|
||||||
analyses = "\n".join(f"- {item.as_posix()}" for item in report.analysis_files) or "- 未启用 Gemini 分析"
|
analyses = "\n".join(f"- {item.as_posix()}" for item in report.analysis_files) or "- 未启用 Gemini 分析"
|
||||||
return f"""# UiPath Explainator Overview
|
return f"""# UiPath Explainator Overview
|
||||||
|
|
||||||
- Project Root: `{report.project_root.as_posix()}`
|
- Project Root: `{report.project_root.as_posix()}`
|
||||||
- Output Root: `{report.output_root.as_posix()}`
|
- Output Root: `{report.output_root.as_posix()}`
|
||||||
|
- Code Root: `{report.code_root.as_posix()}`
|
||||||
|
- Docs Root: `{report.docs_root.as_posix()}`
|
||||||
- Entry File: `{report.entry_file.as_posix()}`
|
- Entry File: `{report.entry_file.as_posix()}`
|
||||||
- Initial Files: {len(report.initial_files)}
|
- Initial Files: {len(report.initial_files)}
|
||||||
- Final Files: {len(report.final_files)}
|
- Final Files: {len(report.final_files)}
|
||||||
@@ -164,6 +238,24 @@ class ProjectPipeline:
|
|||||||
## Final Files
|
## Final Files
|
||||||
{chr(10).join(f"- {item.as_posix()}" for item in report.final_files)}
|
{chr(10).join(f"- {item.as_posix()}" for item in report.final_files)}
|
||||||
|
|
||||||
|
## Processing Logic
|
||||||
|
1. **Initial Scan**: 从入口文件开始递归扫描 `Invoke Workflow` 与 `Invoke VBA`,得到初始依赖集合。这个阶段会把 `Comment Out` 里的引用也一起算进去,因为此时还未清理失效代码。
|
||||||
|
2. **Copy to Workspace**: 将初始依赖集合完整复制到 `code/` 目录,保持相对目录结构不变,确保后续裁剪只发生在代码副本中。
|
||||||
|
3. **Comment Cleanup**: 对已复制的 XAML 删除 `<ui:CommentOut>` 代码块,让后续扫描只看到当前真正生效的流程节点。
|
||||||
|
4. **Rescan After Cleanup**: 以清理后的入口文件重新递归扫描,得到清理后的实际依赖集合。
|
||||||
|
5. **Prune Unused Files**: 删除 `code/` 目录中首次扫描能到达、但二次扫描已不可达的文件。这些文件通常来自被注释掉的工作流、VBA,或清理后失效的调用链。
|
||||||
|
6. **Gemini Analysis**: 仅针对最终保留文件在 `docs/` 目录生成 `*.analysis.md`,让说明文档与实际可执行流程保持一致。
|
||||||
|
|
||||||
|
## How To Read This Output
|
||||||
|
- `Initial Files`: 清理 `<ui:CommentOut>` 前扫描到的所有文件,代表“理论上被引用过”的集合。
|
||||||
|
- `Final Files`: 清理后 `code/` 目录中仍可达的文件,代表“当前实际流程会用到”的集合。
|
||||||
|
- `Cleaned XAML Files`: 被移除 `<ui:CommentOut>` 代码块的文件。
|
||||||
|
- `Pruned Files`: 只在失效分支中出现、已从 `code/` 目录删除的文件。
|
||||||
|
- `Analysis Files`: `docs/` 目录下 Gemini 生成的逐文件说明,重点解释该文件的流程、逻辑、数据与风险点。
|
||||||
|
|
||||||
|
## Cleaned XAML Files
|
||||||
|
{cleaned}
|
||||||
|
|
||||||
## Pruned Files
|
## Pruned Files
|
||||||
{pruned}
|
{pruned}
|
||||||
|
|
||||||
@@ -177,5 +269,106 @@ class ProjectPipeline:
|
|||||||
def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]:
|
def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]:
|
||||||
return sorted(path.relative_to(root) for path in scan.files)
|
return sorted(path.relative_to(root) for path in scan.files)
|
||||||
|
|
||||||
|
def _existing_relative_files(self, root: Path) -> list[Path]:
|
||||||
|
return sorted(path.relative_to(root) for path in root.rglob("*") if path.is_file())
|
||||||
|
|
||||||
def _ordered_files(self, paths: list[Path]) -> list[Path]:
|
def _ordered_files(self, paths: list[Path]) -> list[Path]:
|
||||||
return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower()))
|
return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower()))
|
||||||
|
|
||||||
|
def _analysis_cache_path(self) -> Path:
|
||||||
|
return self.docs_root / ANALYSIS_CACHE_NAME
|
||||||
|
|
||||||
|
def _load_analysis_cache(self) -> dict[str, dict[str, str]]:
|
||||||
|
cache_path = self._analysis_cache_path()
|
||||||
|
if not cache_path.exists():
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
raw = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
logger.warning("Ignoring invalid analysis cache file: %s", cache_path)
|
||||||
|
return {}
|
||||||
|
if not isinstance(raw, dict):
|
||||||
|
logger.warning("Ignoring unexpected analysis cache payload: %s", cache_path)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
cache: dict[str, dict[str, str]] = {}
|
||||||
|
for relative_path, metadata in raw.items():
|
||||||
|
if not isinstance(relative_path, str) or not isinstance(metadata, dict):
|
||||||
|
continue
|
||||||
|
content_hash = metadata.get("content_hash")
|
||||||
|
status = metadata.get("status")
|
||||||
|
if not isinstance(content_hash, str) or not isinstance(status, str):
|
||||||
|
continue
|
||||||
|
cache[relative_path] = {"content_hash": content_hash, "status": status}
|
||||||
|
return cache
|
||||||
|
|
||||||
|
def _save_analysis_cache(self, cache: dict[str, dict[str, str]]) -> None:
|
||||||
|
self._analysis_cache_path().write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||||
|
|
||||||
|
def _prune_stale_analysis_outputs(
|
||||||
|
self, final_files: list[Path], cache: dict[str, dict[str, str]]
|
||||||
|
) -> dict[str, dict[str, str]]:
|
||||||
|
final_paths = {relative_path.as_posix() for relative_path in final_files}
|
||||||
|
valid_analysis_paths = {Path(f"{relative_path}.analysis.md") for relative_path in final_paths}
|
||||||
|
|
||||||
|
for analysis_path in self.docs_root.rglob("*.analysis.md"):
|
||||||
|
relative_analysis_path = analysis_path.relative_to(self.docs_root)
|
||||||
|
if relative_analysis_path in valid_analysis_paths:
|
||||||
|
continue
|
||||||
|
analysis_path.unlink()
|
||||||
|
logger.debug("Removed stale analysis file: %s", analysis_path)
|
||||||
|
|
||||||
|
pruned_cache = {relative_path: metadata for relative_path, metadata in cache.items() if relative_path in final_paths}
|
||||||
|
self._cleanup_empty_dirs(self.docs_root)
|
||||||
|
return pruned_cache
|
||||||
|
|
||||||
|
def _clear_analysis_outputs(self) -> None:
|
||||||
|
for analysis_path in self.docs_root.rglob("*.analysis.md"):
|
||||||
|
analysis_path.unlink()
|
||||||
|
logger.debug("Removed analysis file because analysis is disabled: %s", analysis_path)
|
||||||
|
cache_path = self._analysis_cache_path()
|
||||||
|
if cache_path.exists():
|
||||||
|
cache_path.unlink()
|
||||||
|
logger.debug("Removed analysis cache file: %s", cache_path)
|
||||||
|
self._cleanup_empty_dirs(self.docs_root)
|
||||||
|
|
||||||
|
def _can_reuse_analysis(
|
||||||
|
self,
|
||||||
|
relative_path: Path,
|
||||||
|
analysis_path: Path,
|
||||||
|
content_hash: str,
|
||||||
|
cache_entry: dict[str, str] | None,
|
||||||
|
) -> bool:
|
||||||
|
if cache_entry is None:
|
||||||
|
return False
|
||||||
|
if cache_entry.get("status") != "success":
|
||||||
|
return False
|
||||||
|
if cache_entry.get("content_hash") != content_hash:
|
||||||
|
return False
|
||||||
|
if not analysis_path.exists():
|
||||||
|
return False
|
||||||
|
logger.info("Reusing cached Gemini analysis for %s", relative_path.as_posix())
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _content_hash(self, content: str) -> str:
|
||||||
|
return sha256(content.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
def _build_failed_analysis(self, relative_path: Path, exc: Exception) -> str:
|
||||||
|
reason = self._format_analysis_error(exc)
|
||||||
|
return "\n".join(
|
||||||
|
[
|
||||||
|
f"# {relative_path.as_posix()}",
|
||||||
|
"",
|
||||||
|
"## 分析状态",
|
||||||
|
"- 状态:Gemini 分析失败,当前文件未生成结构化说明。",
|
||||||
|
f"- 原因:{reason}",
|
||||||
|
"- 建议:稍后重试;如果当前只需要导出代码与依赖,可使用 `--skip-analysis`。",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def _format_analysis_error(self, exc: Exception) -> str:
|
||||||
|
message = str(exc).strip()
|
||||||
|
if message:
|
||||||
|
return message
|
||||||
|
return exc.__class__.__name__
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from collections import deque
|
|||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import html
|
import html
|
||||||
|
import logging
|
||||||
import re
|
import re
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
@@ -15,6 +16,7 @@ TAG_PATTERN = re.compile(
|
|||||||
r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>",
|
r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>",
|
||||||
re.DOTALL,
|
re.DOTALL,
|
||||||
)
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True, frozen=True)
|
@dataclass(slots=True, frozen=True)
|
||||||
@@ -81,30 +83,37 @@ def _find_comment_end(text: str, start: re.Match[str]) -> int:
|
|||||||
|
|
||||||
|
|
||||||
def find_entry_file(project_root: Path, entry_name: str) -> Path:
|
def find_entry_file(project_root: Path, entry_name: str) -> Path:
|
||||||
|
logger.debug("Resolving entry file %s within %s", entry_name, project_root)
|
||||||
direct = project_root / entry_name
|
direct = project_root / entry_name
|
||||||
if direct.exists():
|
if direct.exists():
|
||||||
|
logger.info("Entry file resolved directly: %s", direct)
|
||||||
return direct
|
return direct
|
||||||
|
|
||||||
target = entry_name.lower()
|
target = entry_name.lower()
|
||||||
for child in project_root.iterdir():
|
for child in project_root.iterdir():
|
||||||
if child.is_file() and child.name.lower() == target:
|
if child.is_file() and child.name.lower() == target:
|
||||||
|
logger.info("Entry file resolved case-insensitively: %s", child)
|
||||||
return child
|
return child
|
||||||
raise FileNotFoundError(f"Entry file not found: {entry_name}")
|
raise FileNotFoundError(f"Entry file not found: {entry_name}")
|
||||||
|
|
||||||
|
|
||||||
def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
|
def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
|
||||||
|
logger.info("Starting dependency crawl from %s", entry_file)
|
||||||
result = ScanResult()
|
result = ScanResult()
|
||||||
queue: deque[Path] = deque([entry_file])
|
queue: deque[Path] = deque([entry_file])
|
||||||
visited: set[Path] = set()
|
visited: set[Path] = set()
|
||||||
|
|
||||||
while queue:
|
while queue:
|
||||||
current = queue.popleft()
|
current = queue.popleft()
|
||||||
|
logger.debug("Dequeued file for scan: %s", current)
|
||||||
if current in visited:
|
if current in visited:
|
||||||
|
logger.debug("Skipping already visited file: %s", current)
|
||||||
continue
|
continue
|
||||||
visited.add(current)
|
visited.add(current)
|
||||||
result.files.add(current)
|
result.files.add(current)
|
||||||
|
|
||||||
if current.suffix.lower() != ".xaml":
|
if current.suffix.lower() != ".xaml":
|
||||||
|
logger.debug("Skipping recursive scan for non-XAML file: %s", current)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
dependencies, warnings = extract_dependencies(project_root, current)
|
dependencies, warnings = extract_dependencies(project_root, current)
|
||||||
@@ -114,12 +123,24 @@ def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
|
|||||||
for dependency in dependencies:
|
for dependency in dependencies:
|
||||||
result.files.add(dependency.target)
|
result.files.add(dependency.target)
|
||||||
if dependency.kind == "workflow":
|
if dependency.kind == "workflow":
|
||||||
|
logger.debug(
|
||||||
|
"Queueing workflow dependency: %s -> %s",
|
||||||
|
current,
|
||||||
|
dependency.target,
|
||||||
|
)
|
||||||
queue.append(dependency.target)
|
queue.append(dependency.target)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Dependency crawl complete: files=%d dependency_sources=%d warnings=%d",
|
||||||
|
len(result.files),
|
||||||
|
len(result.dependencies),
|
||||||
|
len(result.warnings),
|
||||||
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]:
|
def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]:
|
||||||
|
logger.debug("Extracting dependencies from %s", file_path)
|
||||||
text = read_text(file_path)
|
text = read_text(file_path)
|
||||||
warnings: list[str] = []
|
warnings: list[str] = []
|
||||||
dependencies: list[Dependency] = []
|
dependencies: list[Dependency] = []
|
||||||
@@ -127,7 +148,9 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe
|
|||||||
try:
|
try:
|
||||||
root = ET.fromstring(text)
|
root = ET.fromstring(text)
|
||||||
except ET.ParseError as exc:
|
except ET.ParseError as exc:
|
||||||
return [], [f"{file_path}: XML parse failed: {exc}"]
|
warning = f"{file_path}: XML parse failed: {exc}"
|
||||||
|
logger.warning(warning)
|
||||||
|
return [], [warning]
|
||||||
|
|
||||||
for element in root.iter():
|
for element in root.iter():
|
||||||
activity_type = classify_invoke_activity(local_name(element.tag))
|
activity_type = classify_invoke_activity(local_name(element.tag))
|
||||||
@@ -136,17 +159,26 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe
|
|||||||
|
|
||||||
literals = extract_literal_targets(element, activity_type)
|
literals = extract_literal_targets(element, activity_type)
|
||||||
if not literals:
|
if not literals:
|
||||||
warnings.append(
|
warning = f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>"
|
||||||
f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>"
|
warnings.append(warning)
|
||||||
)
|
logger.warning(warning)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for literal in literals:
|
for literal in literals:
|
||||||
target = resolve_project_path(project_root, file_path, literal)
|
target = resolve_project_path(project_root, file_path, literal)
|
||||||
if target is None:
|
if target is None:
|
||||||
warnings.append(f"{file_path}: referenced file not found for {activity_type}: {literal}")
|
warning = f"{file_path}: referenced file not found for {activity_type}: {literal}"
|
||||||
|
warnings.append(warning)
|
||||||
|
logger.warning(warning)
|
||||||
continue
|
continue
|
||||||
dependencies.append(Dependency(activity_type, literal, file_path, target))
|
dependencies.append(Dependency(activity_type, literal, file_path, target))
|
||||||
|
logger.debug(
|
||||||
|
"Resolved %s dependency: source=%s literal=%s target=%s",
|
||||||
|
activity_type,
|
||||||
|
file_path,
|
||||||
|
literal,
|
||||||
|
target,
|
||||||
|
)
|
||||||
|
|
||||||
return _dedupe_dependencies(dependencies), warnings
|
return _dedupe_dependencies(dependencies), warnings
|
||||||
|
|
||||||
@@ -217,7 +249,15 @@ def resolve_project_path(project_root: Path, source_file: Path, literal: str) ->
|
|||||||
for base in search_roots:
|
for base in search_roots:
|
||||||
resolved = (base / candidate).resolve()
|
resolved = (base / candidate).resolve()
|
||||||
if resolved.exists() and _is_within_root(project_root.resolve(), resolved):
|
if resolved.exists() and _is_within_root(project_root.resolve(), resolved):
|
||||||
|
logger.debug(
|
||||||
|
"Resolved literal path %s from %s using base %s -> %s",
|
||||||
|
literal,
|
||||||
|
source_file,
|
||||||
|
base,
|
||||||
|
resolved,
|
||||||
|
)
|
||||||
return resolved
|
return resolved
|
||||||
|
logger.debug("Failed to resolve literal path %s from %s", literal, source_file)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
191
tests/test_gemini.py
Normal file
191
tests/test_gemini.py
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from types import ModuleType, SimpleNamespace
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
sys.path.insert(0, str(ROOT / "src"))
|
||||||
|
|
||||||
|
fake_dotenv = ModuleType("dotenv")
|
||||||
|
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
|
||||||
|
sys.modules.setdefault("dotenv", fake_dotenv)
|
||||||
|
|
||||||
|
from uipath_explainator.config import Settings
|
||||||
|
from uipath_explainator.gemini import AnalysisError, GeminiAnalyzer
|
||||||
|
|
||||||
|
|
||||||
|
class FakeHttpOptions:
|
||||||
|
def __init__(self, **kwargs) -> None:
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
|
||||||
|
class FakeGenerateContentConfig:
|
||||||
|
def __init__(self, **kwargs) -> None:
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
|
||||||
|
class FakeClient:
|
||||||
|
def __init__(self, api_key: str, http_options: FakeHttpOptions) -> None:
|
||||||
|
self.api_key = api_key
|
||||||
|
self.http_options = http_options
|
||||||
|
self.models = SimpleNamespace()
|
||||||
|
|
||||||
|
|
||||||
|
class GeminiAnalyzerTests(unittest.TestCase):
|
||||||
|
def test_init_with_slots_declares_runtime_fields(self) -> None:
|
||||||
|
fake_types = SimpleNamespace(
|
||||||
|
HttpOptions=FakeHttpOptions,
|
||||||
|
GenerateContentConfig=FakeGenerateContentConfig,
|
||||||
|
)
|
||||||
|
fake_genai = ModuleType("google.genai")
|
||||||
|
fake_genai.Client = FakeClient
|
||||||
|
fake_genai.types = fake_types
|
||||||
|
|
||||||
|
fake_google = ModuleType("google")
|
||||||
|
fake_google.genai = fake_genai
|
||||||
|
|
||||||
|
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
|
||||||
|
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
|
||||||
|
|
||||||
|
self.assertIs(analyzer._types, fake_types)
|
||||||
|
self.assertIsInstance(analyzer._client, FakeClient)
|
||||||
|
self.assertEqual(analyzer._client.api_key, "test-key")
|
||||||
|
self.assertEqual(analyzer._client.http_options.kwargs, {"timeout": 120_000})
|
||||||
|
|
||||||
|
def test_init_strips_version_suffix_from_custom_base_url(self) -> None:
|
||||||
|
fake_types = SimpleNamespace(
|
||||||
|
HttpOptions=FakeHttpOptions,
|
||||||
|
GenerateContentConfig=FakeGenerateContentConfig,
|
||||||
|
)
|
||||||
|
fake_genai = ModuleType("google.genai")
|
||||||
|
fake_genai.Client = FakeClient
|
||||||
|
fake_genai.types = fake_types
|
||||||
|
|
||||||
|
fake_google = ModuleType("google")
|
||||||
|
fake_google.genai = fake_genai
|
||||||
|
|
||||||
|
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
|
||||||
|
analyzer = GeminiAnalyzer(
|
||||||
|
Settings(
|
||||||
|
api_key="test-key",
|
||||||
|
base_url="https://newapi.tootaio.com/v1beta/",
|
||||||
|
model="gemini-test",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
analyzer._client.http_options.kwargs,
|
||||||
|
{"base_url": "https://newapi.tootaio.com", "timeout": 120_000},
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_markdown_output_uses_consulting_format(self) -> None:
|
||||||
|
fake_types = SimpleNamespace(
|
||||||
|
HttpOptions=FakeHttpOptions,
|
||||||
|
GenerateContentConfig=FakeGenerateContentConfig,
|
||||||
|
)
|
||||||
|
fake_genai = ModuleType("google.genai")
|
||||||
|
fake_genai.Client = FakeClient
|
||||||
|
fake_genai.types = fake_types
|
||||||
|
|
||||||
|
fake_google = ModuleType("google")
|
||||||
|
fake_google.genai = fake_genai
|
||||||
|
|
||||||
|
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
|
||||||
|
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
|
||||||
|
|
||||||
|
markdown = analyzer._to_markdown(
|
||||||
|
Path("Flows/Active.xaml"),
|
||||||
|
{
|
||||||
|
"overview": {
|
||||||
|
"purpose": "负责调用 VBA 并准备执行上下文",
|
||||||
|
"role": "主流程中的子流程节点",
|
||||||
|
"trigger": "由上游工作流通过 Invoke Workflow 调用",
|
||||||
|
},
|
||||||
|
"logic": {
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"title": "读取 VBA 文件路径",
|
||||||
|
"detail": "从 `CodeFilePath` 读取脚本位置。",
|
||||||
|
"why": "定位需要执行的 VBA 代码。",
|
||||||
|
"result": "得到待执行的脚本文件。",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"decision_logic": ["如果 `CodeFilePath` 为空,则无法继续执行脚本。"],
|
||||||
|
"exceptions": ["当前文件未展示脚本执行失败后的补偿逻辑。"],
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"inputs": ["`Scripts/Keep.bas` 路径"],
|
||||||
|
"outputs": ["VBA 执行结果未直接在当前文件中落盘"],
|
||||||
|
"variables": ["`CodeFilePath`"],
|
||||||
|
"external_dependencies": ["外部 VBA 文件 `Scripts/Keep.bas`"],
|
||||||
|
},
|
||||||
|
"consultation": {
|
||||||
|
"business_meaning": "这是把业务动作下沉到 VBA 的桥接层。",
|
||||||
|
"risks": ["脚本文件缺失会导致执行失败。"],
|
||||||
|
"example": "例如:财务流程在这里调用 Excel VBA 完成批量格式整理。",
|
||||||
|
"unknowns": ["无法从当前文件确定 VBA 内部实现逻辑。"],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIn("## 文件定位", markdown)
|
||||||
|
self.assertIn("## 流程拆解", markdown)
|
||||||
|
self.assertIn("1. **读取 VBA 文件路径**", markdown)
|
||||||
|
self.assertIn("### 输入", markdown)
|
||||||
|
self.assertIn("## 咨询视角", markdown)
|
||||||
|
self.assertIn("## 场景范例", markdown)
|
||||||
|
|
||||||
|
def test_prompt_requires_strict_structured_json(self) -> None:
|
||||||
|
fake_types = SimpleNamespace(
|
||||||
|
HttpOptions=FakeHttpOptions,
|
||||||
|
GenerateContentConfig=FakeGenerateContentConfig,
|
||||||
|
)
|
||||||
|
fake_genai = ModuleType("google.genai")
|
||||||
|
fake_genai.Client = FakeClient
|
||||||
|
fake_genai.types = fake_types
|
||||||
|
|
||||||
|
fake_google = ModuleType("google")
|
||||||
|
fake_google.genai = fake_genai
|
||||||
|
|
||||||
|
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
|
||||||
|
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
|
||||||
|
|
||||||
|
prompt = analyzer._build_prompt(Path("main.xaml"), "<Sequence />")
|
||||||
|
|
||||||
|
self.assertIn("请严格返回 JSON", prompt)
|
||||||
|
self.assertIn("先讲这个文件在整个流程中的定位", prompt)
|
||||||
|
self.assertIn("判断逻辑、调用链、输入输出、关键变量、外部依赖", prompt)
|
||||||
|
|
||||||
|
def test_analyze_wraps_rate_limit_error_with_clear_message(self) -> None:
|
||||||
|
fake_types = SimpleNamespace(
|
||||||
|
HttpOptions=FakeHttpOptions,
|
||||||
|
GenerateContentConfig=FakeGenerateContentConfig,
|
||||||
|
)
|
||||||
|
fake_genai = ModuleType("google.genai")
|
||||||
|
fake_genai.Client = FakeClient
|
||||||
|
fake_genai.types = fake_types
|
||||||
|
|
||||||
|
fake_google = ModuleType("google")
|
||||||
|
fake_google.genai = fake_genai
|
||||||
|
|
||||||
|
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
|
||||||
|
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
|
||||||
|
|
||||||
|
error = RuntimeError("quota exceeded")
|
||||||
|
error.status_code = 429
|
||||||
|
error.response_json = {"error": {"message": "Resource has been exhausted (e.g. check quota)."}}
|
||||||
|
analyzer._client.models.generate_content = lambda **_: (_ for _ in ()).throw(error)
|
||||||
|
|
||||||
|
with self.assertRaises(AnalysisError) as captured:
|
||||||
|
analyzer.analyze(Path("main.xaml"), "<Sequence />")
|
||||||
|
|
||||||
|
self.assertIn("HTTP 429", str(captured.exception))
|
||||||
|
self.assertIn("Resource has been exhausted", str(captured.exception))
|
||||||
|
self.assertIn("--skip-analysis", str(captured.exception))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -57,6 +57,22 @@ class StubAnalyzer:
|
|||||||
return f"# {relative_path.as_posix()}\n\n{len(content)}"
|
return f"# {relative_path.as_posix()}\n\n{len(content)}"
|
||||||
|
|
||||||
|
|
||||||
|
class FlakyAnalyzer:
|
||||||
|
def analyze(self, relative_path: Path, content: str) -> str:
|
||||||
|
if relative_path.name == "Active.xaml":
|
||||||
|
raise RuntimeError("HTTP 429 quota exhausted")
|
||||||
|
return f"# {relative_path.as_posix()}\n\n{len(content)}"
|
||||||
|
|
||||||
|
|
||||||
|
class RecordingAnalyzer:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.paths: list[str] = []
|
||||||
|
|
||||||
|
def analyze(self, relative_path: Path, content: str) -> str:
|
||||||
|
self.paths.append(relative_path.as_posix())
|
||||||
|
return f"# {relative_path.as_posix()}\n\n{len(content)}"
|
||||||
|
|
||||||
|
|
||||||
class PipelineTests(unittest.TestCase):
|
class PipelineTests(unittest.TestCase):
|
||||||
def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None:
|
def test_strip_comment_out_blocks_removes_nested_blocks(self) -> None:
|
||||||
source = "<root><ui:CommentOut><x/><ui:CommentOut><y/></ui:CommentOut></ui:CommentOut><z/></root>"
|
source = "<root><ui:CommentOut><x/><ui:CommentOut><y/></ui:CommentOut></ui:CommentOut><z/></root>"
|
||||||
@@ -83,6 +99,8 @@ class PipelineTests(unittest.TestCase):
|
|||||||
tmp_path = Path(tmp)
|
tmp_path = Path(tmp)
|
||||||
project_root = tmp_path / "project"
|
project_root = tmp_path / "project"
|
||||||
output_root = tmp_path / "workspace"
|
output_root = tmp_path / "workspace"
|
||||||
|
code_root = output_root / "code"
|
||||||
|
docs_root = output_root / "docs"
|
||||||
(project_root / "Flows").mkdir(parents=True)
|
(project_root / "Flows").mkdir(parents=True)
|
||||||
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
|
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
|
||||||
(project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8")
|
(project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8")
|
||||||
@@ -104,9 +122,106 @@ class PipelineTests(unittest.TestCase):
|
|||||||
self.assertIn("Scripts/Keep.bas", final_files)
|
self.assertIn("Scripts/Keep.bas", final_files)
|
||||||
self.assertNotIn("Flows/Old.xaml", final_files)
|
self.assertNotIn("Flows/Old.xaml", final_files)
|
||||||
self.assertNotIn("Scripts/Drop.bas", final_files)
|
self.assertNotIn("Scripts/Drop.bas", final_files)
|
||||||
self.assertFalse((output_root / "Flows" / "Old.xaml").exists())
|
self.assertEqual(report.code_root, code_root.resolve())
|
||||||
self.assertFalse((output_root / "Scripts" / "Drop.bas").exists())
|
self.assertEqual(report.docs_root, docs_root.resolve())
|
||||||
self.assertTrue((output_root / "Flows" / "Active.xaml.analysis.md").exists())
|
self.assertFalse((code_root / "Flows" / "Old.xaml").exists())
|
||||||
|
self.assertFalse((code_root / "Scripts" / "Drop.bas").exists())
|
||||||
|
self.assertTrue((code_root / "Flows" / "Active.xaml").exists())
|
||||||
|
self.assertTrue((docs_root / "Flows" / "Active.xaml.analysis.md").exists())
|
||||||
|
self.assertTrue((docs_root / "manifest.json").exists())
|
||||||
|
|
||||||
|
overview = (docs_root / "OVERVIEW.md").read_text(encoding="utf-8")
|
||||||
|
self.assertIn("## Processing Logic", overview)
|
||||||
|
self.assertIn("Initial Scan", overview)
|
||||||
|
self.assertIn("## How To Read This Output", overview)
|
||||||
|
self.assertIn("## Cleaned XAML Files", overview)
|
||||||
|
self.assertIn("Code Root", overview)
|
||||||
|
self.assertIn("Docs Root", overview)
|
||||||
|
|
||||||
|
def test_pipeline_emits_stage_logs(self) -> None:
|
||||||
|
with TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
project_root = tmp_path / "project"
|
||||||
|
output_root = tmp_path / "workspace"
|
||||||
|
(project_root / "Flows").mkdir(parents=True)
|
||||||
|
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
|
||||||
|
(project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8")
|
||||||
|
(project_root / "Scripts").mkdir()
|
||||||
|
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
|
||||||
|
(project_root / "Scripts" / "Drop.bas").write_text("Sub Drop()\nEnd Sub", encoding="utf-8")
|
||||||
|
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
|
||||||
|
|
||||||
|
with self.assertLogs("uipath_explainator", level="INFO") as captured:
|
||||||
|
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(StubAnalyzer())
|
||||||
|
|
||||||
|
combined = "\n".join(captured.output)
|
||||||
|
self.assertIn("Starting pipeline:", combined)
|
||||||
|
self.assertIn("Initial scan complete:", combined)
|
||||||
|
self.assertIn("Copied 5 files and cleaned 1 XAML files", combined)
|
||||||
|
self.assertIn("Final scan complete:", combined)
|
||||||
|
self.assertIn("Pipeline completed in", combined)
|
||||||
|
|
||||||
|
def test_pipeline_keeps_running_when_single_analysis_fails(self) -> None:
|
||||||
|
with TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
project_root = tmp_path / "project"
|
||||||
|
output_root = tmp_path / "workspace"
|
||||||
|
docs_root = output_root / "docs"
|
||||||
|
(project_root / "Flows").mkdir(parents=True)
|
||||||
|
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
|
||||||
|
(project_root / "Scripts").mkdir()
|
||||||
|
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
|
||||||
|
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
|
||||||
|
|
||||||
|
report = ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(FlakyAnalyzer())
|
||||||
|
|
||||||
|
self.assertTrue((docs_root / "Flows" / "Active.xaml.analysis.md").exists())
|
||||||
|
fallback = (docs_root / "Flows" / "Active.xaml.analysis.md").read_text(encoding="utf-8")
|
||||||
|
self.assertIn("Gemini 分析失败", fallback)
|
||||||
|
self.assertIn("HTTP 429 quota exhausted", fallback)
|
||||||
|
self.assertTrue((docs_root / "Scripts" / "Keep.bas.analysis.md").exists())
|
||||||
|
self.assertTrue(any("Analysis failed for Flows/Active.xaml" in item for item in report.warnings))
|
||||||
|
|
||||||
|
def test_pipeline_resume_skips_successfully_cached_analyses(self) -> None:
|
||||||
|
with TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
project_root = tmp_path / "project"
|
||||||
|
output_root = tmp_path / "workspace"
|
||||||
|
(project_root / "Flows").mkdir(parents=True)
|
||||||
|
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
|
||||||
|
(project_root / "Scripts").mkdir()
|
||||||
|
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
|
||||||
|
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
|
||||||
|
|
||||||
|
first = RecordingAnalyzer()
|
||||||
|
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(first)
|
||||||
|
self.assertEqual(
|
||||||
|
first.paths,
|
||||||
|
["Flows/Active.xaml", "main.xaml", "Scripts/Keep.bas"],
|
||||||
|
)
|
||||||
|
|
||||||
|
second = RecordingAnalyzer()
|
||||||
|
ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(second)
|
||||||
|
self.assertEqual(second.paths, [])
|
||||||
|
|
||||||
|
def test_pipeline_resume_retries_failed_analysis_and_reanalyzes_changed_files(self) -> None:
|
||||||
|
with TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
project_root = tmp_path / "project"
|
||||||
|
output_root = tmp_path / "workspace"
|
||||||
|
(project_root / "Flows").mkdir(parents=True)
|
||||||
|
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
|
||||||
|
(project_root / "Scripts").mkdir()
|
||||||
|
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
|
||||||
|
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
|
||||||
|
|
||||||
|
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(FlakyAnalyzer())
|
||||||
|
|
||||||
|
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nMsgBox \"updated\"\nEnd Sub", encoding="utf-8")
|
||||||
|
|
||||||
|
retry = RecordingAnalyzer()
|
||||||
|
ProjectPipeline(project_root, output_root, "main.xaml", force=False).run(retry)
|
||||||
|
self.assertEqual(retry.paths, ["Flows/Active.xaml", "Scripts/Keep.bas"])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user