From a67604f4b7b706be3c8fca57c7f2029769bf4c64 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Wed, 10 Jun 2026 12:58:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20mcp-py=20=E6=8E=A5=E5=85=A5=E5=B7=A5?= =?UTF-8?q?=E5=85=B7=E6=80=BB=E7=BA=BF=20sundynix.tools.py.*=20(=E4=B8=8E?= =?UTF-8?q?=20Go=20=E5=90=8C=E5=A5=91=E7=BA=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 第 5b 层 Python 算法工具挂上 NATS,core NATS request-reply + 队列组, 与 Go 侧 ServeTool 字节级同契约({tool,args,task_id}/{ok,content,error})。 - mcp_gateway: nats-py 连接(无限重连) + queue subscribe(mcp-py-workers) + 按工具名路由 工具 echo / run_code(Docker桩) / parse_document(MinerU桩) / secure_sandbox(gVisor桩) - Makefile: 新增 mcp-py-setup(venv + pip install -e .),mcp-py 缺 venv 自动 setup e2e 补上 TestToolCallRoundTrip - 验证: live 对 NATS 容器跑通 4 类调用(含未知工具错误);3 个 Go e2e PASS Co-Authored-By: Claude Opus 4.8 --- Makefile | 14 ++- .../src/sundynix_mcp_py/mcp_gateway.py | 102 ++++++++++++++++-- 2 files changed, 104 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index fee9a3a..1209e40 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: infra infra-down devnats demo e2e gateway dispatcher mcp-go mcp-py desktop tidy +.PHONY: infra infra-down devnats demo e2e gateway dispatcher mcp-go mcp-py mcp-py-setup desktop tidy infra: ## 启动基础设施 (NATS / Postgres / Redis / Milvus / Neo4j) docker compose up -d @@ -12,8 +12,8 @@ devnats: ## 启动内嵌 JetStream NATS(无 Docker 本地联调) demo: ## 一键演示 Gateway→NATS→Dispatcher 任务流(无需 Docker) bash scripts/demo.sh -e2e: ## 跑共享 bus 的端到端测试(内嵌 NATS) - cd sundynix-shared && go test ./bus/ -run 'TestTaskRoundTrip|TestTokenStreamRoundTrip' -v +e2e: ## 跑共享 bus 的端到端测试(内嵌 NATS):任务流 / 工具调用 / Token 流 + cd sundynix-shared && go test ./bus/ -run 'TestTaskRoundTrip|TestToolCallRoundTrip|TestTokenStreamRoundTrip' -v gateway: cd sundynix-gateway && go run ./cmd/server @@ -24,8 +24,12 @@ dispatcher: mcp-go: cd sundynix-mcp-go && go run ./cmd/server -mcp-py: - cd sundynix-mcp-py && python -m sundynix_mcp_py.main +mcp-py-setup: ## 创建 venv 并安装 mcp-py 依赖(首次运行 mcp-py 前执行) + cd sundynix-mcp-py && python3 -m venv .venv && .venv/bin/pip install -q -e . + +mcp-py: ## 运行 Python 算法型 MCP 工具服务(缺 venv 则自动 setup) + cd sundynix-mcp-py && [ -x .venv/bin/python ] || $(MAKE) mcp-py-setup + cd sundynix-mcp-py && .venv/bin/python -m sundynix_mcp_py.main desktop: cd sundynix-desktop && wails dev diff --git a/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py b/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py index cd6e60b..7f52edc 100644 --- a/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py +++ b/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py @@ -1,24 +1,112 @@ -"""MCP 协议网关:注册算法型工具并经 NATS 分发调用。""" +"""MCP 协议网关:注册算法型工具并经 NATS 分发调用。 + +与 Go 侧 sundynix-shared/bus 的 ServeTool 同契约(core NATS request-reply): + 请求体 {"tool": str, "args": {...}, "task_id": str} + 应答体 {"ok": bool, "content": str, "error": str} +订阅 sundynix.tools.py.>,队列组 mcp-py-workers(多副本负载均衡)。 +""" from __future__ import annotations +import asyncio +import json +import logging +import os + +import nats + from .interpreter import CodeInterpreter from .mineru import MultimodalParser from .sandbox import SecureSandbox +log = logging.getLogger("mcp_py") + +# 与 contract.SubjectToolsPyAll / QueueToolsPy 保持一致。 +SUBJECT_PY_ALL = "sundynix.tools.py.>" +QUEUE_PY = "mcp-py-workers" + class McpGateway: def __init__(self) -> None: self.sandbox = SecureSandbox() # gVisor / KataVM + Static Code Guard self.parser = MultimodalParser() # MinerU / PaddleOCR self.interpreter = CodeInterpreter() # Docker 隔离沙箱 + self._nc: nats.NATS | None = None + self._tools: dict[str, callable] = {} async def register_tools(self) -> None: - # TODO: 向 MCP server 注册工具 schema - ... - - async def serve(self) -> None: - # TODO: 连接 NATS,订阅 sundynix.tools.py.*,按 MCP JSON-RPC 路由 - import asyncio + """注册工具名 → 处理协程。""" + self._tools = { + "echo": self._echo, + "run_code": self._run_code, + "parse_document": self._parse_document, + "secure_sandbox": self._secure_sandbox, + } + async def serve(self, url: str | None = None) -> None: + url = url or os.getenv("NATS_URL", "nats://localhost:4222") + # 容忍服务先于 NATS 启动:无限重连 + 等待间隔。 + self._nc = await nats.connect( + url, + allow_reconnect=True, + max_reconnect_attempts=-1, + reconnect_time_wait=1, + connect_timeout=5, + ) + log.info("[mcp_py] connected %s", url) + await self._nc.subscribe(SUBJECT_PY_ALL, queue=QUEUE_PY, cb=self._on_call) + log.info( + "[mcp_py] tools ready on %s (queue=%s): %s", + SUBJECT_PY_ALL, QUEUE_PY, ", ".join(self._tools), + ) await asyncio.Event().wait() + + async def _on_call(self, msg) -> None: + """解析 ToolCall → 路由到工具 → Respond ToolResult。""" + try: + req = json.loads(msg.data) + except Exception as e: # noqa: BLE001 + await self._reply(msg, ok=False, error=f"bad tool call: {e}") + return + tool = req.get("tool", "") + task_id = req.get("task_id", "") + args = req.get("args") or {} + log.info("[mcp_py] tool=%s task=%s args=%s", tool, task_id, args) + fn = self._tools.get(tool) + if fn is None: + await self._reply(msg, ok=False, error=f"unknown tool: {tool}") + return + try: + content = await fn(args) + await self._reply(msg, ok=True, content=content) + except Exception as e: # noqa: BLE001 + await self._reply(msg, ok=False, error=str(e)) + + @staticmethod + async def _reply(msg, *, ok: bool, content: str = "", error: str = "") -> None: + payload = json.dumps({"ok": ok, "content": content, "error": error}) + await msg.respond(payload.encode()) + + # ---- 工具实现(算法层目前为桩,但调用链路做真)---- + + async def _echo(self, args: dict) -> str: + return str(args.get("text", "")) + + async def _run_code(self, args: dict) -> str: + code = str(args.get("code", "")) + result = await self.interpreter.execute(code) # Docker 隔离沙箱(桩) + return f"[run_code] Docker 隔离执行(桩) stdout={result.get('stdout','')!r}" + + async def _parse_document(self, args: dict) -> str: + path = str(args.get("path", "")) + result = await self.parser.parse(path) # MinerU / PaddleOCR(桩) + return f"[parse_document] MinerU 解析(桩) path={result.get('path','')!r} blocks={len(result.get('blocks', []))}" + + async def _secure_sandbox(self, args: dict) -> str: + code = str(args.get("code", "")) + out = await self.sandbox.run(code) # gVisor/KataVM(桩,含静态守卫) + return f"[secure_sandbox] gVisor 隔离执行(桩) out={out!r}" + + async def close(self) -> None: + if self._nc is not None: + await self._nc.drain()