from __future__ import annotations
from pathlib import Path
from types import ModuleType, SimpleNamespace
import sys
import unittest
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
fake_dotenv = ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
sys.modules.setdefault("dotenv", fake_dotenv)
from uipath_explainator.config import Settings
from uipath_explainator.gemini import AnalysisError, GeminiAnalyzer
class FakeHttpOptions:
def __init__(self, **kwargs) -> None:
self.kwargs = kwargs
class FakeGenerateContentConfig:
def __init__(self, **kwargs) -> None:
self.kwargs = kwargs
class FakeClient:
def __init__(self, api_key: str, http_options: FakeHttpOptions) -> None:
self.api_key = api_key
self.http_options = http_options
self.models = SimpleNamespace()
class GeminiAnalyzerTests(unittest.TestCase):
def test_init_with_slots_declares_runtime_fields(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
self.assertIs(analyzer._types, fake_types)
self.assertIsInstance(analyzer._client, FakeClient)
self.assertEqual(analyzer._client.api_key, "test-key")
self.assertEqual(analyzer._client.http_options.kwargs, {"timeout": 120_000})
def test_init_strips_version_suffix_from_custom_base_url(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(
Settings(
api_key="test-key",
base_url="https://newapi.tootaio.com/v1beta/",
model="gemini-test",
)
)
self.assertEqual(
analyzer._client.http_options.kwargs,
{"base_url": "https://newapi.tootaio.com", "timeout": 120_000},
)
def test_markdown_output_uses_consulting_format(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
markdown = analyzer._to_markdown(
Path("Flows/Active.xaml"),
{
"overview": {
"purpose": "负责调用 VBA 并准备执行上下文",
"role": "主流程中的子流程节点",
"trigger": "由上游工作流通过 Invoke Workflow 调用",
},
"logic": {
"steps": [
{
"title": "读取 VBA 文件路径",
"detail": "从 `CodeFilePath` 读取脚本位置。",
"why": "定位需要执行的 VBA 代码。",
"result": "得到待执行的脚本文件。",
}
],
"decision_logic": ["如果 `CodeFilePath` 为空,则无法继续执行脚本。"],
"exceptions": ["当前文件未展示脚本执行失败后的补偿逻辑。"],
},
"data": {
"inputs": ["`Scripts/Keep.bas` 路径"],
"outputs": ["VBA 执行结果未直接在当前文件中落盘"],
"variables": ["`CodeFilePath`"],
"external_dependencies": ["外部 VBA 文件 `Scripts/Keep.bas`"],
},
"consultation": {
"business_meaning": "这是把业务动作下沉到 VBA 的桥接层。",
"risks": ["脚本文件缺失会导致执行失败。"],
"example": "例如:财务流程在这里调用 Excel VBA 完成批量格式整理。",
"unknowns": ["无法从当前文件确定 VBA 内部实现逻辑。"],
},
},
)
self.assertIn("## 文件定位", markdown)
self.assertIn("## 流程拆解", markdown)
self.assertIn("1. **读取 VBA 文件路径**", markdown)
self.assertIn("### 输入", markdown)
self.assertIn("## 咨询视角", markdown)
self.assertIn("## 场景范例", markdown)
def test_prompt_requires_strict_structured_json(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
prompt = analyzer._build_prompt(Path("main.xaml"), "")
self.assertIn("请严格返回 JSON", prompt)
self.assertIn("先讲这个文件在整个流程中的定位", prompt)
self.assertIn("判断逻辑、调用链、输入输出、关键变量、外部依赖", prompt)
def test_analyze_wraps_rate_limit_error_with_clear_message(self) -> None:
fake_types = SimpleNamespace(
HttpOptions=FakeHttpOptions,
GenerateContentConfig=FakeGenerateContentConfig,
)
fake_genai = ModuleType("google.genai")
fake_genai.Client = FakeClient
fake_genai.types = fake_types
fake_google = ModuleType("google")
fake_google.genai = fake_genai
with patch.dict(sys.modules, {"google": fake_google, "google.genai": fake_genai}):
analyzer = GeminiAnalyzer(Settings(api_key="test-key", base_url=None, model="gemini-test"))
error = RuntimeError("quota exceeded")
error.status_code = 429
error.response_json = {"error": {"message": "Resource has been exhausted (e.g. check quota)."}}
analyzer._client.models.generate_content = lambda **_: (_ for _ in ()).throw(error)
with self.assertRaises(AnalysisError) as captured:
analyzer.analyze(Path("main.xaml"), "")
self.assertIn("HTTP 429", str(captured.exception))
self.assertIn("Resource has been exhausted", str(captured.exception))
self.assertIn("--skip-analysis", str(captured.exception))
if __name__ == "__main__":
unittest.main()