feat(logging): add configurable logging with file output support

Introduce --log-level and --log-file CLI arguments.
Add execution time tracking and detailed logs across all modules.
This commit is contained in:
2026-04-02 10:40:39 +08:00
parent eef3464257
commit 0bdebd5368
8 changed files with 248 additions and 15 deletions

View File

@@ -39,6 +39,8 @@ GEMINI_MODEL=gemini-2.5-flash
- `--model`: 临时覆盖 `.env` 里的模型名 - `--model`: 临时覆盖 `.env` 里的模型名
- `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini - `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini
- `--force`: 允许覆盖已有输出目录 - `--force`: 允许覆盖已有输出目录
- `--log-level`: 日志级别,支持 `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL`
- `--log-file`: 把完整日志同时写入文件,便于回溯扫描、剪枝和 Gemini 分析过程
## 输出内容 ## 输出内容
@@ -60,3 +62,24 @@ GEMINI_MODEL=gemini-2.5-flash
- 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。 - 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。
- 对动态拼接路径会记录 warning但不会猜测。 - 对动态拼接路径会记录 warning但不会猜测。
- `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt``.bas``.vb``.vbs``.vba``.cls``.frm` - `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt``.bas``.vb``.vbs``.vba``.cls``.frm`
## 日志说明
运行时默认输出 `INFO` 级别日志,会记录:
- CLI 启动参数、配置加载结果、是否启用 Gemini
- 依赖扫描开始/结束、文件数量、warning 数量
- 复制、清理 Comment Out、二次扫描、剪枝、生成文档等阶段摘要
如果需要看更细的文件级排查信息,直接把日志级别调到 `DEBUG`
```bash
./.venv/bin/python -m uipath_explainator \
/path/to/uipath-project \
--output-dir workspace/project-bundle \
--log-level DEBUG \
--log-file workspace/project-bundle/run.log \
--force
```
`DEBUG` 会额外记录每个文件的复制、依赖解析、路径解析、剪枝、分析文档写出等细节。

View File

@@ -2,12 +2,17 @@ from __future__ import annotations
from argparse import ArgumentParser from argparse import ArgumentParser
from pathlib import Path from pathlib import Path
import logging
from .config import Settings from .config import Settings
from .gemini import GeminiAnalyzer from .gemini import GeminiAnalyzer
from .logging_utils import configure_logging
from .pipeline import ProjectPipeline from .pipeline import ProjectPipeline
logger = logging.getLogger(__name__)
def build_parser() -> ArgumentParser: def build_parser() -> ArgumentParser:
parser = ArgumentParser(description="Extract and explain UiPath project dependencies.") parser = ArgumentParser(description="Extract and explain UiPath project dependencies.")
parser.add_argument("project_dir", type=Path, help="UiPath project root directory") parser.add_argument("project_dir", type=Path, help="UiPath project root directory")
@@ -22,6 +27,13 @@ def build_parser() -> ArgumentParser:
parser.add_argument("--model", help="Override GEMINI_MODEL") parser.add_argument("--model", help="Override GEMINI_MODEL")
parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files") parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files")
parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists") parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists")
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Logging verbosity",
)
parser.add_argument("--log-file", type=Path, help="Optional file path to write logs to")
return parser return parser
@@ -37,7 +49,25 @@ def main(argv: list[str] | None = None) -> int:
if not output_dir.is_absolute(): if not output_dir.is_absolute():
output_dir = Path.cwd() / output_dir output_dir = Path.cwd() / output_dir
settings = Settings.from_env(args.env_file if args.env_file.exists() else None, model_override=args.model) log_file = args.log_file.expanduser() if args.log_file else None
if log_file is not None and not log_file.is_absolute():
log_file = Path.cwd() / log_file
configure_logging(level_name=args.log_level, log_file=log_file)
logger.info(
"Starting CLI run: project_root=%s output_dir=%s entry=%s analysis=%s",
project_root,
output_dir,
args.entry,
not args.skip_analysis,
)
env_file = args.env_file if args.env_file.exists() else None
if args.env_file and env_file is None:
logger.warning("Environment file not found, falling back to default dotenv lookup: %s", args.env_file)
try:
settings = Settings.from_env(env_file, model_override=args.model)
analyzer = None if args.skip_analysis else GeminiAnalyzer(settings) analyzer = None if args.skip_analysis else GeminiAnalyzer(settings)
pipeline = ProjectPipeline( pipeline = ProjectPipeline(
@@ -47,6 +77,9 @@ def main(argv: list[str] | None = None) -> int:
force=args.force, force=args.force,
) )
report = pipeline.run(analyzer=analyzer) report = pipeline.run(analyzer=analyzer)
except Exception:
logger.exception("CLI run failed")
return 1
print(f"Output written to: {report.output_root}") print(f"Output written to: {report.output_root}")
print(f"Code written to: {report.code_root}") print(f"Code written to: {report.code_root}")

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import logging
import re import re
from typing import Self from typing import Self
@@ -9,6 +10,9 @@ from dotenv import load_dotenv
import os import os
logger = logging.getLogger(__name__)
@dataclass(slots=True) @dataclass(slots=True)
class Settings: class Settings:
api_key: str | None api_key: str | None
@@ -18,19 +22,29 @@ class Settings:
@classmethod @classmethod
def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self: def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self:
if env_file: if env_file:
logger.info("Loading environment variables from %s", env_file)
load_dotenv(env_file) load_dotenv(env_file)
else: else:
logger.debug("Loading environment variables using default dotenv lookup")
load_dotenv() load_dotenv()
return cls( settings = cls(
api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"), api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"),
base_url=os.getenv("GEMINI_BASE_URL") or None, base_url=os.getenv("GEMINI_BASE_URL") or None,
model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash", model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash",
) )
logger.info(
"Configuration loaded: model=%s base_url=%s api_key=%s",
settings.model,
settings.normalized_base_url() or "<default>",
"set" if settings.api_key else "missing",
)
return settings
def require_api_key(self) -> None: def require_api_key(self) -> None:
if not self.api_key: if not self.api_key:
raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.") raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.")
logger.debug("Gemini API key is available")
def normalized_base_url(self) -> str | None: def normalized_base_url(self) -> str | None:
if not self.base_url: if not self.base_url:

View File

@@ -4,11 +4,14 @@ from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import json import json
import logging
from time import perf_counter
from .config import Settings from .config import Settings
UNKNOWN_TEXT = "无法从当前文件确定" UNKNOWN_TEXT = "无法从当前文件确定"
logger = logging.getLogger(__name__)
@dataclass(slots=True) @dataclass(slots=True)
@@ -30,8 +33,15 @@ class GeminiAnalyzer:
self._types = types self._types = types
self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options) self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options)
logger.info(
"Gemini analyzer initialized: model=%s base_url=%s",
self.settings.model,
base_url or "<default>",
)
def analyze(self, relative_path: Path, content: str) -> str: def analyze(self, relative_path: Path, content: str) -> str:
started = perf_counter()
logger.info("Submitting Gemini analysis for %s (%d chars)", relative_path.as_posix(), len(content))
prompt = self._build_prompt(relative_path, content) prompt = self._build_prompt(relative_path, content)
response = self._client.models.generate_content( response = self._client.models.generate_content(
model=self.settings.model, model=self.settings.model,
@@ -43,7 +53,26 @@ class GeminiAnalyzer:
), ),
) )
payload = json.loads(response.text) response_text = response.text or ""
logger.debug(
"Gemini response received for %s (%d chars)",
relative_path.as_posix(),
len(response_text),
)
try:
payload = json.loads(response_text)
except json.JSONDecodeError:
logger.exception(
"Gemini returned invalid JSON for %s. Response snippet: %r",
relative_path.as_posix(),
response_text[:500],
)
raise
logger.info(
"Gemini analysis completed for %s in %.2fs",
relative_path.as_posix(),
perf_counter() - started,
)
return self._to_markdown(relative_path, payload) return self._to_markdown(relative_path, payload)
def _response_schema(self) -> dict[str, Any]: def _response_schema(self) -> dict[str, Any]:

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from pathlib import Path
import logging
LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
def configure_logging(level_name: str = "INFO", log_file: Path | None = None) -> None:
level = getattr(logging, level_name.upper(), logging.INFO)
formatter = logging.Formatter(LOG_FORMAT)
handlers: list[logging.Handler] = [logging.StreamHandler()]
if log_file is not None:
log_file.parent.mkdir(parents=True, exist_ok=True)
handlers.append(logging.FileHandler(log_file, encoding="utf-8"))
root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.setLevel(level)
for handler in handlers:
handler.setLevel(level)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
logging.captureWarnings(True)

View File

@@ -4,10 +4,15 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from shutil import copy2, rmtree from shutil import copy2, rmtree
import json import json
import logging
from time import perf_counter
from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks
logger = logging.getLogger(__name__)
@dataclass(slots=True) @dataclass(slots=True)
class PipelineReport: class PipelineReport:
project_root: Path project_root: Path
@@ -52,19 +57,41 @@ class ProjectPipeline:
self.force = force self.force = force
def run(self, analyzer=None) -> PipelineReport: def run(self, analyzer=None) -> PipelineReport:
started = perf_counter()
logger.info(
"Starting pipeline: project_root=%s output_root=%s entry=%s analysis=%s force=%s",
self.project_root,
self.output_root,
self.entry_name,
analyzer is not None,
self.force,
)
entry_file = find_entry_file(self.project_root, self.entry_name) entry_file = find_entry_file(self.project_root, self.entry_name)
entry_rel = entry_file.relative_to(self.project_root) entry_rel = entry_file.relative_to(self.project_root)
logger.debug("Using entry file %s", entry_rel.as_posix())
initial_scan = crawl_dependencies(self.project_root, entry_file) initial_scan = crawl_dependencies(self.project_root, entry_file)
initial_rel_files = self._relative_files(initial_scan, self.project_root) initial_rel_files = self._relative_files(initial_scan, self.project_root)
logger.info(
"Initial scan complete: files=%d warnings=%d",
len(initial_rel_files),
len(initial_scan.warnings),
)
self._prepare_output_root() self._prepare_output_root()
self._copy_files(initial_rel_files) self._copy_files(initial_rel_files)
cleaned_files = self._clean_copied_xaml_files(initial_rel_files) cleaned_files = self._clean_copied_xaml_files(initial_rel_files)
logger.info("Copied %d files and cleaned %d XAML files", len(initial_rel_files), len(cleaned_files))
final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel) final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel)
final_rel_files = self._relative_files(final_scan, self.code_root) final_rel_files = self._relative_files(final_scan, self.code_root)
pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files) pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files)
logger.info(
"Final scan complete: files=%d warnings=%d pruned=%d",
len(final_rel_files),
len(final_scan.warnings),
len(pruned_files),
)
analysis_files = self._write_analysis(final_rel_files, analyzer) analysis_files = self._write_analysis(final_rel_files, analyzer)
warnings = initial_scan.warnings + final_scan.warnings warnings = initial_scan.warnings + final_scan.warnings
@@ -83,15 +110,24 @@ class ProjectPipeline:
analysis_files=analysis_files, analysis_files=analysis_files,
) )
self._write_report_files(report) self._write_report_files(report)
logger.info(
"Pipeline completed in %.2fs: final_files=%d analysis_files=%d warnings=%d",
perf_counter() - started,
len(report.final_files),
len(report.analysis_files),
len(report.warnings),
)
return report return report
def _prepare_output_root(self) -> None: def _prepare_output_root(self) -> None:
if self.output_root.exists(): if self.output_root.exists():
if not self.force: if not self.force:
raise FileExistsError(f"Output directory already exists: {self.output_root}") raise FileExistsError(f"Output directory already exists: {self.output_root}")
logger.info("Removing existing output directory because force=True: %s", self.output_root)
rmtree(self.output_root) rmtree(self.output_root)
self.code_root.mkdir(parents=True, exist_ok=True) self.code_root.mkdir(parents=True, exist_ok=True)
self.docs_root.mkdir(parents=True, exist_ok=True) self.docs_root.mkdir(parents=True, exist_ok=True)
logger.debug("Prepared output directories: code=%s docs=%s", self.code_root, self.docs_root)
def _copy_files(self, relative_files: list[Path]) -> None: def _copy_files(self, relative_files: list[Path]) -> None:
for relative_path in relative_files: for relative_path in relative_files:
@@ -99,6 +135,7 @@ class ProjectPipeline:
destination = self.code_root / relative_path destination = self.code_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True) destination.parent.mkdir(parents=True, exist_ok=True)
copy2(source, destination) copy2(source, destination)
logger.debug("Copied file: %s -> %s", source, destination)
def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]: def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]:
cleaned: list[Path] = [] cleaned: list[Path] = []
@@ -111,6 +148,7 @@ class ProjectPipeline:
if updated != original: if updated != original:
output_file.write_text(updated, encoding="utf-8") output_file.write_text(updated, encoding="utf-8")
cleaned.append(relative_path) cleaned.append(relative_path)
logger.debug("Removed CommentOut blocks from %s", output_file)
return cleaned return cleaned
def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]: def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]:
@@ -123,6 +161,7 @@ class ProjectPipeline:
if target.exists(): if target.exists():
target.unlink() target.unlink()
pruned.append(relative_path) pruned.append(relative_path)
logger.debug("Pruned unreachable file: %s", target)
self._cleanup_empty_dirs() self._cleanup_empty_dirs()
return pruned return pruned
@@ -136,9 +175,11 @@ class ProjectPipeline:
if any(directory.iterdir()): if any(directory.iterdir()):
continue continue
directory.rmdir() directory.rmdir()
logger.debug("Removed empty directory: %s", directory)
def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]: def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]:
if analyzer is None: if analyzer is None:
logger.info("Skipping Gemini analysis because analyzer is disabled")
return [] return []
output_files: list[Path] = [] output_files: list[Path] = []
@@ -149,11 +190,13 @@ class ProjectPipeline:
analysis_path.parent.mkdir(parents=True, exist_ok=True) analysis_path.parent.mkdir(parents=True, exist_ok=True)
analysis_path.write_text(analysis, encoding="utf-8") analysis_path.write_text(analysis, encoding="utf-8")
output_files.append(Path(f"{relative_path.as_posix()}.analysis.md")) output_files.append(Path(f"{relative_path.as_posix()}.analysis.md"))
logger.debug("Wrote analysis file: %s", analysis_path)
return output_files return output_files
def _write_report_files(self, report: PipelineReport) -> None: def _write_report_files(self, report: PipelineReport) -> None:
(self.docs_root / "manifest.json").write_text(report.to_json(), encoding="utf-8") (self.docs_root / "manifest.json").write_text(report.to_json(), encoding="utf-8")
(self.docs_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8") (self.docs_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8")
logger.debug("Wrote report files to %s", self.docs_root)
def _build_overview(self, report: PipelineReport) -> str: def _build_overview(self, report: PipelineReport) -> str:
warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无" warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无"

View File

@@ -4,6 +4,7 @@ from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import html import html
import logging
import re import re
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@@ -15,6 +16,7 @@ TAG_PATTERN = re.compile(
r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>", r"<(?P<close>/)?(?P<name>[A-Za-z_][\w:.-]*)(?P<body>[^<>]*?)(?P<self>/)?>",
re.DOTALL, re.DOTALL,
) )
logger = logging.getLogger(__name__)
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
@@ -81,30 +83,37 @@ def _find_comment_end(text: str, start: re.Match[str]) -> int:
def find_entry_file(project_root: Path, entry_name: str) -> Path: def find_entry_file(project_root: Path, entry_name: str) -> Path:
logger.debug("Resolving entry file %s within %s", entry_name, project_root)
direct = project_root / entry_name direct = project_root / entry_name
if direct.exists(): if direct.exists():
logger.info("Entry file resolved directly: %s", direct)
return direct return direct
target = entry_name.lower() target = entry_name.lower()
for child in project_root.iterdir(): for child in project_root.iterdir():
if child.is_file() and child.name.lower() == target: if child.is_file() and child.name.lower() == target:
logger.info("Entry file resolved case-insensitively: %s", child)
return child return child
raise FileNotFoundError(f"Entry file not found: {entry_name}") raise FileNotFoundError(f"Entry file not found: {entry_name}")
def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult: def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
logger.info("Starting dependency crawl from %s", entry_file)
result = ScanResult() result = ScanResult()
queue: deque[Path] = deque([entry_file]) queue: deque[Path] = deque([entry_file])
visited: set[Path] = set() visited: set[Path] = set()
while queue: while queue:
current = queue.popleft() current = queue.popleft()
logger.debug("Dequeued file for scan: %s", current)
if current in visited: if current in visited:
logger.debug("Skipping already visited file: %s", current)
continue continue
visited.add(current) visited.add(current)
result.files.add(current) result.files.add(current)
if current.suffix.lower() != ".xaml": if current.suffix.lower() != ".xaml":
logger.debug("Skipping recursive scan for non-XAML file: %s", current)
continue continue
dependencies, warnings = extract_dependencies(project_root, current) dependencies, warnings = extract_dependencies(project_root, current)
@@ -114,12 +123,24 @@ def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult:
for dependency in dependencies: for dependency in dependencies:
result.files.add(dependency.target) result.files.add(dependency.target)
if dependency.kind == "workflow": if dependency.kind == "workflow":
logger.debug(
"Queueing workflow dependency: %s -> %s",
current,
dependency.target,
)
queue.append(dependency.target) queue.append(dependency.target)
logger.info(
"Dependency crawl complete: files=%d dependency_sources=%d warnings=%d",
len(result.files),
len(result.dependencies),
len(result.warnings),
)
return result return result
def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]: def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]:
logger.debug("Extracting dependencies from %s", file_path)
text = read_text(file_path) text = read_text(file_path)
warnings: list[str] = [] warnings: list[str] = []
dependencies: list[Dependency] = [] dependencies: list[Dependency] = []
@@ -127,7 +148,9 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe
try: try:
root = ET.fromstring(text) root = ET.fromstring(text)
except ET.ParseError as exc: except ET.ParseError as exc:
return [], [f"{file_path}: XML parse failed: {exc}"] warning = f"{file_path}: XML parse failed: {exc}"
logger.warning(warning)
return [], [warning]
for element in root.iter(): for element in root.iter():
activity_type = classify_invoke_activity(local_name(element.tag)) activity_type = classify_invoke_activity(local_name(element.tag))
@@ -136,17 +159,26 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe
literals = extract_literal_targets(element, activity_type) literals = extract_literal_targets(element, activity_type)
if not literals: if not literals:
warnings.append( warning = f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>"
f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>" warnings.append(warning)
) logger.warning(warning)
continue continue
for literal in literals: for literal in literals:
target = resolve_project_path(project_root, file_path, literal) target = resolve_project_path(project_root, file_path, literal)
if target is None: if target is None:
warnings.append(f"{file_path}: referenced file not found for {activity_type}: {literal}") warning = f"{file_path}: referenced file not found for {activity_type}: {literal}"
warnings.append(warning)
logger.warning(warning)
continue continue
dependencies.append(Dependency(activity_type, literal, file_path, target)) dependencies.append(Dependency(activity_type, literal, file_path, target))
logger.debug(
"Resolved %s dependency: source=%s literal=%s target=%s",
activity_type,
file_path,
literal,
target,
)
return _dedupe_dependencies(dependencies), warnings return _dedupe_dependencies(dependencies), warnings
@@ -217,7 +249,15 @@ def resolve_project_path(project_root: Path, source_file: Path, literal: str) ->
for base in search_roots: for base in search_roots:
resolved = (base / candidate).resolve() resolved = (base / candidate).resolve()
if resolved.exists() and _is_within_root(project_root.resolve(), resolved): if resolved.exists() and _is_within_root(project_root.resolve(), resolved):
logger.debug(
"Resolved literal path %s from %s using base %s -> %s",
literal,
source_file,
base,
resolved,
)
return resolved return resolved
logger.debug("Failed to resolve literal path %s from %s", literal, source_file)
return None return None

View File

@@ -122,6 +122,29 @@ class PipelineTests(unittest.TestCase):
self.assertIn("Code Root", overview) self.assertIn("Code Root", overview)
self.assertIn("Docs Root", overview) self.assertIn("Docs Root", overview)
def test_pipeline_emits_stage_logs(self) -> None:
with TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
project_root = tmp_path / "project"
output_root = tmp_path / "workspace"
(project_root / "Flows").mkdir(parents=True)
(project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8")
(project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8")
(project_root / "Scripts").mkdir()
(project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8")
(project_root / "Scripts" / "Drop.bas").write_text("Sub Drop()\nEnd Sub", encoding="utf-8")
(project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8")
with self.assertLogs("uipath_explainator", level="INFO") as captured:
ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(StubAnalyzer())
combined = "\n".join(captured.output)
self.assertIn("Starting pipeline:", combined)
self.assertIn("Initial scan complete:", combined)
self.assertIn("Copied 5 files and cleaned 1 XAML files", combined)
self.assertIn("Final scan complete:", combined)
self.assertIn("Pipeline completed in", combined)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()