feat: add initial uipath explainator implementation

Scaffold project with pyproject.toml and environment configuration
Implement core modules including CLI, Gemini integration, and scanner
This commit is contained in:
2026-04-02 10:10:56 +08:00
parent aca26fceb5
commit 7003dfa0df
11 changed files with 792 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""UiPath Explainator package."""
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@@ -0,0 +1,5 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from argparse import ArgumentParser
from pathlib import Path
from .config import Settings
from .gemini import GeminiAnalyzer
from .pipeline import ProjectPipeline
def build_parser() -> ArgumentParser:
parser = ArgumentParser(description="Extract and explain UiPath project dependencies.")
parser.add_argument("project_dir", type=Path, help="UiPath project root directory")
parser.add_argument("--output-dir", type=Path, default=Path("workspace"), help="Copied project output directory")
parser.add_argument("--entry", default="main.xaml", help="Entry XAML file name")
parser.add_argument("--env-file", type=Path, default=Path(".env"), help="Environment file for Gemini config")
parser.add_argument("--model", help="Override GEMINI_MODEL")
parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files")
parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
project_root = args.project_dir.expanduser().resolve()
if not project_root.is_dir():
parser.error(f"Project directory does not exist: {project_root}")
output_dir = args.output_dir.expanduser()
if not output_dir.is_absolute():
output_dir = Path.cwd() / output_dir
settings = Settings.from_env(args.env_file if args.env_file.exists() else None, model_override=args.model)
analyzer = None if args.skip_analysis else GeminiAnalyzer(settings)
pipeline = ProjectPipeline(
project_root=project_root,
output_root=output_dir,
entry_name=args.entry,
force=args.force,
)
report = pipeline.run(analyzer=analyzer)
print(f"Output written to: {report.output_root}")
print(f"Final files: {len(report.final_files)}")
print(f"Pruned files: {len(report.pruned_files)}")
print(f"Warnings: {len(report.warnings)}")
return 0

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Self
from dotenv import load_dotenv
import os
@dataclass(slots=True)
class Settings:
api_key: str | None
base_url: str | None
model: str
@classmethod
def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self:
if env_file:
load_dotenv(env_file)
else:
load_dotenv()
return cls(
api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"),
base_url=os.getenv("GEMINI_BASE_URL") or None,
model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash",
)
def require_api_key(self) -> None:
if not self.api_key:
raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.")

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import json
from .config import Settings
@dataclass(slots=True)
class GeminiAnalyzer:
settings: Settings
def __post_init__(self) -> None:
self.settings.require_api_key()
from google import genai
from google.genai import types
http_options = types.HttpOptions(timeout=120_000)
if self.settings.base_url:
http_options = types.HttpOptions(base_url=self.settings.base_url, timeout=120_000)
self._types = types
self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options)
def analyze(self, relative_path: Path, content: str) -> str:
prompt = self._build_prompt(relative_path, content)
response = self._client.models.generate_content(
model=self.settings.model,
contents=prompt,
config=self._types.GenerateContentConfig(
temperature=0.2,
response_mime_type="application/json",
response_schema={
"type": "OBJECT",
"required": ["summary", "implementation", "data", "example"],
"properties": {
"summary": {"type": "STRING"},
"implementation": {"type": "STRING"},
"data": {"type": "STRING"},
"example": {"type": "STRING"},
},
},
),
)
payload = json.loads(response.text)
return self._to_markdown(relative_path, payload)
def _build_prompt(self, relative_path: Path, content: str) -> str:
language = "xml" if relative_path.suffix.lower() == ".xaml" else "vb"
return f"""你是资深 UiPath 自动化架构师。请只基于给定文件内容分析,不要臆造未出现的系统、字段或业务规则。
请用中文返回 JSON字段固定为
- summary: 这个文件在做什么
- implementation: 这个文件怎么做
- data: 这个文件涉及的数据、变量、参数、外部文件或系统
- example: 一个贴近当前代码的说明性范例
如果信息不足,请明确写“无法从当前文件确定”。
文件路径: {relative_path.as_posix()}
代码:
```{language}
{content}
```"""
def _to_markdown(self, relative_path: Path, payload: dict[str, str]) -> str:
return "\n".join(
[
f"# {relative_path.as_posix()}",
"",
"## 做什么",
payload.get("summary", "").strip() or "无法从当前文件确定",
"",
"## 怎么做",
payload.get("implementation", "").strip() or "无法从当前文件确定",
"",
"## 涉及数据",
payload.get("data", "").strip() or "无法从当前文件确定",
"",
"## 范例",
payload.get("example", "").strip() or "无法从当前文件确定",
"",
]
)

View File

@@ -0,0 +1,181 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from shutil import copy2, rmtree
import json
from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks
@dataclass(slots=True)
class PipelineReport:
project_root: Path
output_root: Path
entry_file: Path
initial_files: list[Path]
final_files: list[Path]
pruned_files: list[Path]
cleaned_files: list[Path]
warnings: list[str]
analysis_files: list[Path]
def to_json(self) -> str:
return json.dumps(
{
"project_root": self.project_root.as_posix(),
"output_root": self.output_root.as_posix(),
"entry_file": self.entry_file.as_posix(),
"initial_files": [item.as_posix() for item in self.initial_files],
"final_files": [item.as_posix() for item in self.final_files],
"pruned_files": [item.as_posix() for item in self.pruned_files],
"cleaned_files": [item.as_posix() for item in self.cleaned_files],
"warnings": self.warnings,
"analysis_files": [item.as_posix() for item in self.analysis_files],
},
ensure_ascii=False,
indent=2,
)
class ProjectPipeline:
def __init__(self, project_root: Path, output_root: Path, entry_name: str, force: bool = False) -> None:
self.project_root = project_root.resolve()
self.output_root = output_root.resolve()
self.entry_name = entry_name
self.force = force
def run(self, analyzer=None) -> PipelineReport:
entry_file = find_entry_file(self.project_root, self.entry_name)
entry_rel = entry_file.relative_to(self.project_root)
initial_scan = crawl_dependencies(self.project_root, entry_file)
initial_rel_files = self._relative_files(initial_scan, self.project_root)
self._prepare_output_root()
self._copy_files(initial_rel_files)
cleaned_files = self._clean_copied_xaml_files(initial_rel_files)
final_scan = crawl_dependencies(self.output_root, self.output_root / entry_rel)
final_rel_files = self._relative_files(final_scan, self.output_root)
pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files)
analysis_files = self._write_analysis(final_rel_files, analyzer)
warnings = initial_scan.warnings + final_scan.warnings
report = PipelineReport(
project_root=self.project_root,
output_root=self.output_root,
entry_file=entry_rel,
initial_files=initial_rel_files,
final_files=final_rel_files,
pruned_files=pruned_files,
cleaned_files=cleaned_files,
warnings=warnings,
analysis_files=analysis_files,
)
self._write_report_files(report)
return report
def _prepare_output_root(self) -> None:
if self.output_root.exists():
if not self.force:
raise FileExistsError(f"Output directory already exists: {self.output_root}")
rmtree(self.output_root)
self.output_root.mkdir(parents=True, exist_ok=True)
def _copy_files(self, relative_files: list[Path]) -> None:
for relative_path in relative_files:
source = self.project_root / relative_path
destination = self.output_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True)
copy2(source, destination)
def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]:
cleaned: list[Path] = []
for relative_path in relative_files:
if relative_path.suffix.lower() != ".xaml":
continue
output_file = self.output_root / relative_path
original = read_text(output_file)
updated = strip_comment_out_blocks(original)
if updated != original:
output_file.write_text(updated, encoding="utf-8")
cleaned.append(relative_path)
return cleaned
def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]:
final_set = set(final_files)
pruned: list[Path] = []
for relative_path in initial_files:
if relative_path in final_set:
continue
target = self.output_root / relative_path
if target.exists():
target.unlink()
pruned.append(relative_path)
self._cleanup_empty_dirs()
return pruned
def _cleanup_empty_dirs(self) -> None:
directories = sorted(
[path for path in self.output_root.rglob("*") if path.is_dir()],
key=lambda item: len(item.parts),
reverse=True,
)
for directory in directories:
if any(directory.iterdir()):
continue
directory.rmdir()
def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]:
if analyzer is None:
return []
output_files: list[Path] = []
for relative_path in self._ordered_files(final_files):
content = read_text(self.output_root / relative_path)
analysis = analyzer.analyze(relative_path, content)
analysis_path = self.output_root / f"{relative_path.as_posix()}.analysis.md"
analysis_path.parent.mkdir(parents=True, exist_ok=True)
analysis_path.write_text(analysis, encoding="utf-8")
output_files.append(Path(f"{relative_path.as_posix()}.analysis.md"))
return output_files
def _write_report_files(self, report: PipelineReport) -> None:
(self.output_root / "manifest.json").write_text(report.to_json(), encoding="utf-8")
(self.output_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8")
def _build_overview(self, report: PipelineReport) -> str:
warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无"
pruned = "\n".join(f"- {item.as_posix()}" for item in report.pruned_files) or "- 无"
analyses = "\n".join(f"- {item.as_posix()}" for item in report.analysis_files) or "- 未启用 Gemini 分析"
return f"""# UiPath Explainator Overview
- Project Root: `{report.project_root.as_posix()}`
- Output Root: `{report.output_root.as_posix()}`
- Entry File: `{report.entry_file.as_posix()}`
- Initial Files: {len(report.initial_files)}
- Final Files: {len(report.final_files)}
- Cleaned XAML Files: {len(report.cleaned_files)}
- Pruned Files: {len(report.pruned_files)}
- Analysis Files: {len(report.analysis_files)}
## Final Files
{chr(10).join(f"- {item.as_posix()}" for item in report.final_files)}
## Pruned Files
{pruned}
## Analysis Files
{analyses}
## Warnings
{warnings}
"""
def _relative_files(self, scan: ScanResult, root: Path) -> list[Path]:
return sorted(path.relative_to(root) for path in scan.files)
def _ordered_files(self, paths: list[Path]) -> list[Path]:
return sorted(paths, key=lambda item: (item.suffix.lower() != ".xaml", item.as_posix().lower()))

View File

@@ -0,0 +1,241 @@
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
import html
import re
import xml.etree.ElementTree as ET
COMMENT_TAG = "CommentOut"
WORKFLOW_EXTENSIONS = {".xaml"}
VBA_EXTENSIONS = {".txt", ".bas", ".vb", ".vbs", ".vba", ".cls", ".frm"}
TAG_PATTERN = re.compile(
r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>",
re.DOTALL,
)
@dataclass(slots=True, frozen=True)
class Dependency:
kind: str
raw_value: str
source: Path
target: Path
@dataclass(slots=True)
class ScanResult:
files: set[Path] = field(default_factory=set)
dependencies: dict[Path, list[Dependency]] = field(default_factory=dict)
warnings: list[str] = field(default_factory=list)
def local_name(tag: str) -> str:
if tag.startswith("{"):
return tag.rsplit("}", 1)[-1]
return tag.split(":", 1)[-1]
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8-sig")
def strip_comment_out_blocks(text: str) -> str:
output: list[str] = []
cursor = 0
while True:
start = _find_comment_start(text, cursor)
if start is None:
output.append(text[cursor:])
return "".join(output)
output.append(text[cursor:start.start()])
cursor = _find_comment_end(text, start)
def _find_comment_start(text: str, cursor: int) -> re.Match[str] | None:
for match in TAG_PATTERN.finditer(text, cursor):
if not match.group("close") and local_name(match.group("name")) == COMMENT_TAG:
return match
return None
def _find_comment_end(text: str, start: re.Match[str]) -> int:
if start.group("self"):
return start.end()
depth = 1
for match in TAG_PATTERN.finditer(text, start.end()):
if local_name(match.group("name")) != COMMENT_TAG:
continue
if match.group("close"):
depth -= 1
elif not match.group("self"):
depth += 1
if depth == 0:
return match.end()
return start.end()
def find_entry_file(project_root: Path, entry_name: str) -> Path:
direct = project_root / entry_name
if direct.exists():
return direct
target = entry_name.lower()
for child in project_root.iterdir():
if child.is_file() and child.name.lower() == target:
return child
raise FileNotFoundError(f"Entry file not found: {entry_name}")
def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
result = ScanResult()
queue: deque[Path] = deque([entry_file])
visited: set[Path] = set()
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
result.files.add(current)
if current.suffix.lower() != ".xaml":
continue
dependencies, warnings = extract_dependencies(project_root, current)
result.dependencies[current] = dependencies
result.warnings.extend(warnings)
for dependency in dependencies:
result.files.add(dependency.target)
if dependency.kind == "workflow":
queue.append(dependency.target)
return result
def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]:
text = read_text(file_path)
warnings: list[str] = []
dependencies: list[Dependency] = []
try:
root = ET.fromstring(text)
except ET.ParseError as exc:
return [], [f"{file_path}: XML parse failed: {exc}"]
for element in root.iter():
activity_type = classify_invoke_activity(local_name(element.tag))
if not activity_type:
continue
literals = extract_literal_targets(element, activity_type)
if not literals:
warnings.append(
f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>"
)
continue
for literal in literals:
target = resolve_project_path(project_root, file_path, literal)
if target is None:
warnings.append(f"{file_path}: referenced file not found for {activity_type}: {literal}")
continue
dependencies.append(Dependency(activity_type, literal, file_path, target))
return _dedupe_dependencies(dependencies), warnings
def classify_invoke_activity(tag_name: str) -> str | None:
lowered = tag_name.lower()
if "invokeworkflow" in lowered:
return "workflow"
if "invokevba" in lowered:
return "vba"
return None
def extract_literal_targets(element: ET.Element, kind: str) -> list[str]:
suffixes = WORKFLOW_EXTENSIONS if kind == "workflow" else VBA_EXTENSIONS
found: list[str] = []
for value in iter_element_values(element):
found.extend(extract_literal_paths(value, suffixes))
deduped: list[str] = []
seen: set[str] = set()
for item in found:
if item not in seen:
deduped.append(item)
seen.add(item)
return deduped
def iter_element_values(element: ET.Element):
for nested in element.iter():
for value in nested.attrib.values():
yield value
if nested.text and nested.text.strip():
yield nested.text.strip()
def extract_literal_paths(raw_value: str, suffixes: set[str]) -> list[str]:
value = html.unescape(raw_value).strip()
if not value:
return []
suffix_pattern = "|".join(re.escape(item.lstrip(".")) for item in sorted(suffixes))
quoted_pattern = re.compile(rf"""['"]([^'"]+?\.(?:{suffix_pattern}))['"]""", re.IGNORECASE)
direct_pattern = re.compile(rf"""(?<![\w])([A-Za-z0-9_./\\ -]+?\.(?:{suffix_pattern}))(?![\w])""", re.IGNORECASE)
matches = [normalize_path(match) for match in quoted_pattern.findall(value)]
if matches:
return [item for item in matches if item]
candidate = value.removeprefix("[").removesuffix("]").strip()
if any(candidate.lower().endswith(ext) for ext in suffixes):
normalized = normalize_path(candidate.strip("'\""))
return [normalized] if normalized else []
matches = [normalize_path(match) for match in direct_pattern.findall(value)]
return [item for item in matches if item]
def normalize_path(value: str) -> str:
return value.strip().replace("\\", "/").lstrip("./")
def resolve_project_path(project_root: Path, source_file: Path, literal: str) -> Path | None:
candidate = Path(literal)
search_roots = [project_root, source_file.parent]
for base in search_roots:
resolved = (base / candidate).resolve()
if resolved.exists() and _is_within_root(project_root.resolve(), resolved):
return resolved
return None
def _is_within_root(root: Path, path: Path) -> bool:
try:
path.relative_to(root)
except ValueError:
return False
return True
def _dedupe_dependencies(items: list[Dependency]) -> list[Dependency]:
seen: set[tuple[str, Path]] = set()
deduped: list[Dependency] = []
for item in items:
key = (item.kind, item.target)
if key in seen:
continue
seen.add(key)
deduped.append(item)
return deduped