diff --git a/README.md b/README.md index 1aac596..685c372 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,8 @@ GEMINI_MODEL=gemini-2.5-flash - `--model`: 临时覆盖 `.env` 里的模型名 - `--skip-analysis`: 只做复制、清理和剪枝,不调用 Gemini - `--force`: 允许覆盖已有输出目录 +- `--log-level`: 日志级别,支持 `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL` +- `--log-file`: 把完整日志同时写入文件,便于回溯扫描、剪枝和 Gemini 分析过程 ## 输出内容 @@ -60,3 +62,24 @@ GEMINI_MODEL=gemini-2.5-flash - 只解析能静态识别出来的 `Invoke Workflow` / `Invoke VBA` 文件路径。 - 对动态拼接路径会记录 warning,但不会猜测。 - `Invoke VBA` 默认识别常见文本型 VBA 文件扩展名:`.txt`、`.bas`、`.vb`、`.vbs`、`.vba`、`.cls`、`.frm`。 + +## 日志说明 + +运行时默认输出 `INFO` 级别日志,会记录: + +- CLI 启动参数、配置加载结果、是否启用 Gemini +- 依赖扫描开始/结束、文件数量、warning 数量 +- 复制、清理 Comment Out、二次扫描、剪枝、生成文档等阶段摘要 + +如果需要看更细的文件级排查信息,直接把日志级别调到 `DEBUG`: + +```bash +./.venv/bin/python -m uipath_explainator \ + /path/to/uipath-project \ + --output-dir workspace/project-bundle \ + --log-level DEBUG \ + --log-file workspace/project-bundle/run.log \ + --force +``` + +`DEBUG` 会额外记录每个文件的复制、依赖解析、路径解析、剪枝、分析文档写出等细节。 diff --git a/src/uipath_explainator/cli.py b/src/uipath_explainator/cli.py index cfead57..d61cf7c 100644 --- a/src/uipath_explainator/cli.py +++ b/src/uipath_explainator/cli.py @@ -2,12 +2,17 @@ from __future__ import annotations from argparse import ArgumentParser from pathlib import Path +import logging from .config import Settings from .gemini import GeminiAnalyzer +from .logging_utils import configure_logging from .pipeline import ProjectPipeline +logger = logging.getLogger(__name__) + + def build_parser() -> ArgumentParser: parser = ArgumentParser(description="Extract and explain UiPath project dependencies.") parser.add_argument("project_dir", type=Path, help="UiPath project root directory") @@ -22,6 +27,13 @@ def build_parser() -> ArgumentParser: parser.add_argument("--model", help="Override GEMINI_MODEL") parser.add_argument("--skip-analysis", action="store_true", help="Skip Gemini analysis and only prepare files") parser.add_argument("--force", action="store_true", help="Overwrite the output directory if it already exists") + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Logging verbosity", + ) + parser.add_argument("--log-file", type=Path, help="Optional file path to write logs to") return parser @@ -37,16 +49,37 @@ def main(argv: list[str] | None = None) -> int: if not output_dir.is_absolute(): output_dir = Path.cwd() / output_dir - settings = Settings.from_env(args.env_file if args.env_file.exists() else None, model_override=args.model) - analyzer = None if args.skip_analysis else GeminiAnalyzer(settings) + log_file = args.log_file.expanduser() if args.log_file else None + if log_file is not None and not log_file.is_absolute(): + log_file = Path.cwd() / log_file + configure_logging(level_name=args.log_level, log_file=log_file) - pipeline = ProjectPipeline( - project_root=project_root, - output_root=output_dir, - entry_name=args.entry, - force=args.force, + logger.info( + "Starting CLI run: project_root=%s output_dir=%s entry=%s analysis=%s", + project_root, + output_dir, + args.entry, + not args.skip_analysis, ) - report = pipeline.run(analyzer=analyzer) + + env_file = args.env_file if args.env_file.exists() else None + if args.env_file and env_file is None: + logger.warning("Environment file not found, falling back to default dotenv lookup: %s", args.env_file) + + try: + settings = Settings.from_env(env_file, model_override=args.model) + analyzer = None if args.skip_analysis else GeminiAnalyzer(settings) + + pipeline = ProjectPipeline( + project_root=project_root, + output_root=output_dir, + entry_name=args.entry, + force=args.force, + ) + report = pipeline.run(analyzer=analyzer) + except Exception: + logger.exception("CLI run failed") + return 1 print(f"Output written to: {report.output_root}") print(f"Code written to: {report.code_root}") diff --git a/src/uipath_explainator/config.py b/src/uipath_explainator/config.py index c24f583..bff2305 100644 --- a/src/uipath_explainator/config.py +++ b/src/uipath_explainator/config.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path +import logging import re from typing import Self @@ -9,6 +10,9 @@ from dotenv import load_dotenv import os +logger = logging.getLogger(__name__) + + @dataclass(slots=True) class Settings: api_key: str | None @@ -18,19 +22,29 @@ class Settings: @classmethod def from_env(cls, env_file: Path | None = None, model_override: str | None = None) -> Self: if env_file: + logger.info("Loading environment variables from %s", env_file) load_dotenv(env_file) else: + logger.debug("Loading environment variables using default dotenv lookup") load_dotenv() - return cls( + settings = cls( api_key=os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"), base_url=os.getenv("GEMINI_BASE_URL") or None, model=model_override or os.getenv("GEMINI_MODEL") or "gemini-2.5-flash", ) + logger.info( + "Configuration loaded: model=%s base_url=%s api_key=%s", + settings.model, + settings.normalized_base_url() or "", + "set" if settings.api_key else "missing", + ) + return settings def require_api_key(self) -> None: if not self.api_key: raise ValueError("Missing GEMINI_API_KEY (or GOOGLE_API_KEY) in the environment.") + logger.debug("Gemini API key is available") def normalized_base_url(self) -> str | None: if not self.base_url: diff --git a/src/uipath_explainator/gemini.py b/src/uipath_explainator/gemini.py index 99d9fa3..6f968e6 100644 --- a/src/uipath_explainator/gemini.py +++ b/src/uipath_explainator/gemini.py @@ -4,11 +4,14 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Any import json +import logging +from time import perf_counter from .config import Settings UNKNOWN_TEXT = "无法从当前文件确定" +logger = logging.getLogger(__name__) @dataclass(slots=True) @@ -30,8 +33,15 @@ class GeminiAnalyzer: self._types = types self._client = genai.Client(api_key=self.settings.api_key, http_options=http_options) + logger.info( + "Gemini analyzer initialized: model=%s base_url=%s", + self.settings.model, + base_url or "", + ) def analyze(self, relative_path: Path, content: str) -> str: + started = perf_counter() + logger.info("Submitting Gemini analysis for %s (%d chars)", relative_path.as_posix(), len(content)) prompt = self._build_prompt(relative_path, content) response = self._client.models.generate_content( model=self.settings.model, @@ -43,7 +53,26 @@ class GeminiAnalyzer: ), ) - payload = json.loads(response.text) + response_text = response.text or "" + logger.debug( + "Gemini response received for %s (%d chars)", + relative_path.as_posix(), + len(response_text), + ) + try: + payload = json.loads(response_text) + except json.JSONDecodeError: + logger.exception( + "Gemini returned invalid JSON for %s. Response snippet: %r", + relative_path.as_posix(), + response_text[:500], + ) + raise + logger.info( + "Gemini analysis completed for %s in %.2fs", + relative_path.as_posix(), + perf_counter() - started, + ) return self._to_markdown(relative_path, payload) def _response_schema(self) -> dict[str, Any]: diff --git a/src/uipath_explainator/logging_utils.py b/src/uipath_explainator/logging_utils.py new file mode 100644 index 0000000..6894c53 --- /dev/null +++ b/src/uipath_explainator/logging_utils.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from pathlib import Path +import logging + + +LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] %(message)s" + + +def configure_logging(level_name: str = "INFO", log_file: Path | None = None) -> None: + level = getattr(logging, level_name.upper(), logging.INFO) + formatter = logging.Formatter(LOG_FORMAT) + + handlers: list[logging.Handler] = [logging.StreamHandler()] + if log_file is not None: + log_file.parent.mkdir(parents=True, exist_ok=True) + handlers.append(logging.FileHandler(log_file, encoding="utf-8")) + + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.setLevel(level) + + for handler in handlers: + handler.setLevel(level) + handler.setFormatter(formatter) + root_logger.addHandler(handler) + + logging.captureWarnings(True) diff --git a/src/uipath_explainator/pipeline.py b/src/uipath_explainator/pipeline.py index 596ae29..34a68bb 100644 --- a/src/uipath_explainator/pipeline.py +++ b/src/uipath_explainator/pipeline.py @@ -4,10 +4,15 @@ from dataclasses import dataclass from pathlib import Path from shutil import copy2, rmtree import json +import logging +from time import perf_counter from .scanner import ScanResult, crawl_dependencies, find_entry_file, read_text, strip_comment_out_blocks +logger = logging.getLogger(__name__) + + @dataclass(slots=True) class PipelineReport: project_root: Path @@ -52,19 +57,41 @@ class ProjectPipeline: self.force = force def run(self, analyzer=None) -> PipelineReport: + started = perf_counter() + logger.info( + "Starting pipeline: project_root=%s output_root=%s entry=%s analysis=%s force=%s", + self.project_root, + self.output_root, + self.entry_name, + analyzer is not None, + self.force, + ) entry_file = find_entry_file(self.project_root, self.entry_name) entry_rel = entry_file.relative_to(self.project_root) + logger.debug("Using entry file %s", entry_rel.as_posix()) initial_scan = crawl_dependencies(self.project_root, entry_file) initial_rel_files = self._relative_files(initial_scan, self.project_root) + logger.info( + "Initial scan complete: files=%d warnings=%d", + len(initial_rel_files), + len(initial_scan.warnings), + ) self._prepare_output_root() self._copy_files(initial_rel_files) cleaned_files = self._clean_copied_xaml_files(initial_rel_files) + logger.info("Copied %d files and cleaned %d XAML files", len(initial_rel_files), len(cleaned_files)) final_scan = crawl_dependencies(self.code_root, self.code_root / entry_rel) final_rel_files = self._relative_files(final_scan, self.code_root) pruned_files = self._prune_unused_files(initial_rel_files, final_rel_files) + logger.info( + "Final scan complete: files=%d warnings=%d pruned=%d", + len(final_rel_files), + len(final_scan.warnings), + len(pruned_files), + ) analysis_files = self._write_analysis(final_rel_files, analyzer) warnings = initial_scan.warnings + final_scan.warnings @@ -83,15 +110,24 @@ class ProjectPipeline: analysis_files=analysis_files, ) self._write_report_files(report) + logger.info( + "Pipeline completed in %.2fs: final_files=%d analysis_files=%d warnings=%d", + perf_counter() - started, + len(report.final_files), + len(report.analysis_files), + len(report.warnings), + ) return report def _prepare_output_root(self) -> None: if self.output_root.exists(): if not self.force: raise FileExistsError(f"Output directory already exists: {self.output_root}") + logger.info("Removing existing output directory because force=True: %s", self.output_root) rmtree(self.output_root) self.code_root.mkdir(parents=True, exist_ok=True) self.docs_root.mkdir(parents=True, exist_ok=True) + logger.debug("Prepared output directories: code=%s docs=%s", self.code_root, self.docs_root) def _copy_files(self, relative_files: list[Path]) -> None: for relative_path in relative_files: @@ -99,6 +135,7 @@ class ProjectPipeline: destination = self.code_root / relative_path destination.parent.mkdir(parents=True, exist_ok=True) copy2(source, destination) + logger.debug("Copied file: %s -> %s", source, destination) def _clean_copied_xaml_files(self, relative_files: list[Path]) -> list[Path]: cleaned: list[Path] = [] @@ -111,6 +148,7 @@ class ProjectPipeline: if updated != original: output_file.write_text(updated, encoding="utf-8") cleaned.append(relative_path) + logger.debug("Removed CommentOut blocks from %s", output_file) return cleaned def _prune_unused_files(self, initial_files: list[Path], final_files: list[Path]) -> list[Path]: @@ -123,6 +161,7 @@ class ProjectPipeline: if target.exists(): target.unlink() pruned.append(relative_path) + logger.debug("Pruned unreachable file: %s", target) self._cleanup_empty_dirs() return pruned @@ -136,9 +175,11 @@ class ProjectPipeline: if any(directory.iterdir()): continue directory.rmdir() + logger.debug("Removed empty directory: %s", directory) def _write_analysis(self, final_files: list[Path], analyzer) -> list[Path]: if analyzer is None: + logger.info("Skipping Gemini analysis because analyzer is disabled") return [] output_files: list[Path] = [] @@ -149,11 +190,13 @@ class ProjectPipeline: analysis_path.parent.mkdir(parents=True, exist_ok=True) analysis_path.write_text(analysis, encoding="utf-8") output_files.append(Path(f"{relative_path.as_posix()}.analysis.md")) + logger.debug("Wrote analysis file: %s", analysis_path) return output_files def _write_report_files(self, report: PipelineReport) -> None: (self.docs_root / "manifest.json").write_text(report.to_json(), encoding="utf-8") (self.docs_root / "OVERVIEW.md").write_text(self._build_overview(report), encoding="utf-8") + logger.debug("Wrote report files to %s", self.docs_root) def _build_overview(self, report: PipelineReport) -> str: warnings = "\n".join(f"- {item}" for item in report.warnings) or "- 无" diff --git a/src/uipath_explainator/scanner.py b/src/uipath_explainator/scanner.py index 55d0503..cefbdd6 100644 --- a/src/uipath_explainator/scanner.py +++ b/src/uipath_explainator/scanner.py @@ -4,6 +4,7 @@ from collections import deque from dataclasses import dataclass, field from pathlib import Path import html +import logging import re import xml.etree.ElementTree as ET @@ -15,6 +16,7 @@ TAG_PATTERN = re.compile( r"<(?P/)?(?P[A-Za-z_][\w:.-]*)(?P[^<>]*?)(?P/)?>", re.DOTALL, ) +logger = logging.getLogger(__name__) @dataclass(slots=True, frozen=True) @@ -81,30 +83,37 @@ def _find_comment_end(text: str, start: re.Match[str]) -> int: def find_entry_file(project_root: Path, entry_name: str) -> Path: + logger.debug("Resolving entry file %s within %s", entry_name, project_root) direct = project_root / entry_name if direct.exists(): + logger.info("Entry file resolved directly: %s", direct) return direct target = entry_name.lower() for child in project_root.iterdir(): if child.is_file() and child.name.lower() == target: + logger.info("Entry file resolved case-insensitively: %s", child) return child raise FileNotFoundError(f"Entry file not found: {entry_name}") def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult: + logger.info("Starting dependency crawl from %s", entry_file) result = ScanResult() queue: deque[Path] = deque([entry_file]) visited: set[Path] = set() while queue: current = queue.popleft() + logger.debug("Dequeued file for scan: %s", current) if current in visited: + logger.debug("Skipping already visited file: %s", current) continue visited.add(current) result.files.add(current) if current.suffix.lower() != ".xaml": + logger.debug("Skipping recursive scan for non-XAML file: %s", current) continue dependencies, warnings = extract_dependencies(project_root, current) @@ -114,12 +123,24 @@ def crawl_dependencies(project_root: Path, entry_file: Path) -> ScanResult: for dependency in dependencies: result.files.add(dependency.target) if dependency.kind == "workflow": + logger.debug( + "Queueing workflow dependency: %s -> %s", + current, + dependency.target, + ) queue.append(dependency.target) + logger.info( + "Dependency crawl complete: files=%d dependency_sources=%d warnings=%d", + len(result.files), + len(result.dependencies), + len(result.warnings), + ) return result def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Dependency], list[str]]: + logger.debug("Extracting dependencies from %s", file_path) text = read_text(file_path) warnings: list[str] = [] dependencies: list[Dependency] = [] @@ -127,7 +148,9 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe try: root = ET.fromstring(text) except ET.ParseError as exc: - return [], [f"{file_path}: XML parse failed: {exc}"] + warning = f"{file_path}: XML parse failed: {exc}" + logger.warning(warning) + return [], [warning] for element in root.iter(): activity_type = classify_invoke_activity(local_name(element.tag)) @@ -136,17 +159,26 @@ def extract_dependencies(project_root: Path, file_path: Path) -> tuple[list[Depe literals = extract_literal_targets(element, activity_type) if not literals: - warnings.append( - f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>" - ) + warning = f"{file_path}: unable to statically resolve {activity_type} path from <{local_name(element.tag)}>" + warnings.append(warning) + logger.warning(warning) continue for literal in literals: target = resolve_project_path(project_root, file_path, literal) if target is None: - warnings.append(f"{file_path}: referenced file not found for {activity_type}: {literal}") + warning = f"{file_path}: referenced file not found for {activity_type}: {literal}" + warnings.append(warning) + logger.warning(warning) continue dependencies.append(Dependency(activity_type, literal, file_path, target)) + logger.debug( + "Resolved %s dependency: source=%s literal=%s target=%s", + activity_type, + file_path, + literal, + target, + ) return _dedupe_dependencies(dependencies), warnings @@ -217,7 +249,15 @@ def resolve_project_path(project_root: Path, source_file: Path, literal: str) -> for base in search_roots: resolved = (base / candidate).resolve() if resolved.exists() and _is_within_root(project_root.resolve(), resolved): + logger.debug( + "Resolved literal path %s from %s using base %s -> %s", + literal, + source_file, + base, + resolved, + ) return resolved + logger.debug("Failed to resolve literal path %s from %s", literal, source_file) return None diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 980ed57..25ee037 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -122,6 +122,29 @@ class PipelineTests(unittest.TestCase): self.assertIn("Code Root", overview) self.assertIn("Docs Root", overview) + def test_pipeline_emits_stage_logs(self) -> None: + with TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + project_root = tmp_path / "project" + output_root = tmp_path / "workspace" + (project_root / "Flows").mkdir(parents=True) + (project_root / "Flows" / "Active.xaml").write_text(ACTIVE_XAML, encoding="utf-8") + (project_root / "Flows" / "Old.xaml").write_text(OLD_XAML, encoding="utf-8") + (project_root / "Scripts").mkdir() + (project_root / "Scripts" / "Keep.bas").write_text("Sub Keep()\nEnd Sub", encoding="utf-8") + (project_root / "Scripts" / "Drop.bas").write_text("Sub Drop()\nEnd Sub", encoding="utf-8") + (project_root / "main.xaml").write_text(MAIN_XAML, encoding="utf-8") + + with self.assertLogs("uipath_explainator", level="INFO") as captured: + ProjectPipeline(project_root, output_root, "main.xaml", force=True).run(StubAnalyzer()) + + combined = "\n".join(captured.output) + self.assertIn("Starting pipeline:", combined) + self.assertIn("Initial scan complete:", combined) + self.assertIn("Copied 5 files and cleaned 1 XAML files", combined) + self.assertIn("Final scan complete:", combined) + self.assertIn("Pipeline completed in", combined) + if __name__ == "__main__": unittest.main()