From adc521f94ddc0053e3a94d4b2d3457f5ee5f1031 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Wed, 10 Jun 2026 11:31:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=89=93=E9=80=9A=20Dispatcher?= =?UTF-8?q?=E2=86=92MCP=20=E5=B7=A5=E5=85=B7=E8=B0=83=E7=94=A8=E9=93=BE?= =?UTF-8?q?=E8=B7=AF=20(core=20NATS=20request-reply)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 第 4 层 Dispatcher 经 NATS request-reply + 队列组同步调用第 5 层 MCP 工具, 工具不可用/超时即降级,不阻断主流程。 - shared/contract: ToolCall/ToolResult + sundynix.tools.go.* subject 约定 + ToolSubjectGo/Py - shared/bus: CallTool(发起) / ServeTool(队列组订阅+应答) - mcp-go: 接共享 bus,gateway 通配订阅按工具名分发(wiki_search/echo),main 优雅退出 - dispatcher: ToolCaller 接口 + Orchestrator.retrieveContext(调 wiki_search,超时3s降级) - e2e: TestToolCallRoundTrip(PASS);demo.sh 加 mcp-go(就绪门避免启动竞态),live 跑通 Co-Authored-By: Claude Opus 4.8 --- README.md | 6 +- scripts/demo.sh | 18 +++-- sundynix-dispatcher/cmd/dispatcher/main.go | 4 +- .../internal/eino/orchestrator.go | 47 +++++++++++- .../internal/nats/subscriber.go | 5 ++ sundynix-mcp-go/cmd/server/main.go | 33 +++++++-- sundynix-mcp-go/go.mod | 15 ++-- sundynix-mcp-go/go.sum | 24 ++++++- sundynix-mcp-go/internal/mcp/gateway.go | 71 ++++++++++++++++--- sundynix-shared/bus/bus.go | 50 +++++++++++++ sundynix-shared/bus/bus_e2e_test.go | 47 ++++++++++++ sundynix-shared/contract/task.go | 27 +++++++ 12 files changed, 315 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 1061401..29323ef 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ 1. `Gateway` 解析 DSL → **Publish** `sundynix.tasks.*` (NATS Queue) 2. `Dispatcher` 订阅任务 → `Eino` 图编排 → 调用 `LLM Pool` -3. 经 NATS 调用第 5 层 `MCP Tools` +3. 经 NATS **request-reply** 同步调用第 5 层 `MCP Tools`(`sundynix.tools.go.*` 队列组负载均衡,工具不可用即降级) 4. 结果以零拷贝 Token Stream 经 `sundynix.streams.task_id` 回流 → SSE/WS 推给 `Client` ## 快速开始 @@ -27,8 +27,8 @@ ### 无 Docker — 一键验证任务流(推荐先跑这个) ```bash -make demo # 内嵌NATS + Gateway + Dispatcher,提交一个 DSL 任务,看 Dispatcher 消费到 -make e2e # 仅跑共享 bus 的端到端测试(go test,内嵌 NATS) +make demo # 内嵌NATS + Gateway + Dispatcher + MCP-Go,提交 DSL 任务,看任务流+工具调用+Token流闭环 +make e2e # 仅跑共享 bus 的端到端测试(go test,内嵌 NATS):任务流 / 工具调用 / Token 流 ``` `make demo` 实测输出: diff --git a/scripts/demo.sh b/scripts/demo.sh index 210fd50..83bc3f2 100755 --- a/scripts/demo.sh +++ b/scripts/demo.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -# 无 Docker 的最小任务流演示:devnats(内嵌NATS) + gateway + dispatcher。 -# 提交一个 DSL 任务,验证 Gateway → NATS → Dispatcher 全链路。 +# 无 Docker 的最小任务流演示:devnats(内嵌NATS) + gateway + dispatcher + mcp-go。 +# 提交一个 DSL 任务,验证 Gateway → NATS → Dispatcher → MCP 工具 全链路。 set -euo pipefail cd "$(dirname "$0")/.." @@ -9,8 +9,9 @@ echo "== 编译 ==" ( cd sundynix-shared && go build -o ../.bin/devnats ./cmd/devnats ) ( cd sundynix-gateway && go build -o ../.bin/gateway ./cmd/server ) ( cd sundynix-dispatcher && go build -o ../.bin/dispatcher ./cmd/dispatcher ) +( cd sundynix-mcp-go && go build -o ../.bin/mcp-go ./cmd/server ) -cleanup() { kill "${GW_PID:-}" "${DISP_PID:-}" "${NATS_PID:-}" 2>/dev/null || true; } +cleanup() { kill "${GW_PID:-}" "${DISP_PID:-}" "${MCP_PID:-}" "${NATS_PID:-}" 2>/dev/null || true; } trap cleanup EXIT # 若 :4222 已有 NATS(docker compose 的容器),直接复用;否则起内嵌 devnats。 @@ -22,7 +23,8 @@ else for _ in $(seq 1 30); do nc -z 127.0.0.1 4222 2>/dev/null && break || sleep 0.2; done fi -echo "== 启动 dispatcher / gateway ==" +echo "== 启动 mcp-go / dispatcher / gateway ==" +.bin/mcp-go > .bin/mcp-go.log 2>&1 & MCP_PID=$! .bin/dispatcher > .bin/dispatcher.log 2>&1 & DISP_PID=$! .bin/gateway > .bin/gateway.log 2>&1 & GW_PID=$! @@ -30,6 +32,11 @@ for _ in $(seq 1 30); do curl -s -o /dev/null http://127.0.0.1:8080/api/v1/billing && break || sleep 0.3 done +# 等 mcp-go 订阅就绪后再提交,否则工具调用会撞上启动竞态而降级。 +for _ in $(seq 1 30); do + grep -q "tools ready" .bin/mcp-go.log 2>/dev/null && break || sleep 0.1 +done + echo "== 提交 DSL 任务 ==" RESP=$(curl -s -X POST http://127.0.0.1:8080/api/v1/tasks \ -H 'Content-Type: application/json' \ @@ -42,5 +49,8 @@ echo "== 订阅 SSE Token 流 (Gateway ← NATS ← Dispatcher) ==" curl -sN --max-time 10 "http://127.0.0.1:8080/api/v1/tasks/$TASK_ID/stream" || true echo +echo "== mcp-go 日志 (工具被调用) ==" +cat .bin/mcp-go.log + echo "== dispatcher 日志 ==" cat .bin/dispatcher.log diff --git a/sundynix-dispatcher/cmd/dispatcher/main.go b/sundynix-dispatcher/cmd/dispatcher/main.go index 2a2184a..1870795 100644 --- a/sundynix-dispatcher/cmd/dispatcher/main.go +++ b/sundynix-dispatcher/cmd/dispatcher/main.go @@ -23,8 +23,8 @@ func main() { sub := dnats.MustConnect(natsURL) defer sub.Close() - // sub 同时作为 Token 回流出口(TokenSink)。 - orch := eino.NewOrchestrator(pool, breaker, sub) + // sub 同时作为 Token 回流出口(TokenSink)与 MCP 工具调用出口(ToolCaller)。 + orch := eino.NewOrchestrator(pool, breaker, sub, sub) // 监听退出信号,优雅停止消费。 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index 9b49f29..4e35b83 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -4,6 +4,7 @@ package eino import ( "context" "log" + "time" "github.com/sundynix/sundynix-dispatcher/internal/harness" "github.com/sundynix/sundynix-dispatcher/internal/llm" @@ -16,15 +17,24 @@ type TokenSink interface { CompleteStream(taskID string) error } +// ToolCaller 经 NATS 调起第 5 层 MCP 工具(由 NATS bus 实现)。 +type ToolCaller interface { + CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) +} + +// 工具调用超时;超时即降级(不带工具上下文继续推理)。 +const toolCallTimeout = 3 * time.Second + // Orchestrator 将 DSL 图编译为 Eino Graph 并驱动执行。 type Orchestrator struct { pool *llm.Pool breaker *harness.CircuitBreaker sink TokenSink + tools ToolCaller } -func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink) *Orchestrator { - return &Orchestrator{pool: pool, breaker: breaker, sink: sink} +func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) *Orchestrator { + return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools} } // Handle 消费一个任务:编译图 → 流式推理 → 经 sink 把 Token 回流到 sundynix.streams.。 @@ -36,9 +46,14 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { log.Printf("[eino] task %s received (graph=%d bytes), streaming tokens...", t.ID, len(t.Graph)) // TODO: compose.NewGraph(...) 编译 DSL;此处 prompt 占位为图原文。 - // 工具节点经 NATS 调用第 5 层 MCP(sundynix.tools.go.* / sundynix.tools.py.*)。 prompt := string(t.Graph) + // 工具节点:经 NATS 调用第 5 层 MCP(sundynix.tools.go.*)。 + // 这里以 wiki_search 演示完整调用链路;真实 Eino 图会按 DSL 节点择机调用。 + if ctxNote := o.retrieveContext(ctx, t); ctxNote != "" { + prompt = ctxNote + "\n" + prompt + } + n := 0 err := o.pool.Stream(ctx, prompt, func(tok []byte) { if perr := o.sink.PublishToken(t.ID, tok); perr != nil { @@ -58,3 +73,29 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { o.breaker.Report(err == nil) return err } + +// retrieveContext 经 MCP wiki_search 工具拉取检索上下文。 +// 工具不可用/超时时返回空串,降级为无工具上下文推理(不阻断主流程)。 +func (o *Orchestrator) retrieveContext(ctx context.Context, t *contract.Task) string { + if o.tools == nil { + return "" + } + cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) + defer cancel() + + res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{ + Tool: "wiki_search", + TaskID: t.ID, + Args: map[string]any{"q": string(t.Graph)}, + }) + if err != nil { + log.Printf("[eino] task %s wiki_search unavailable, degrade: %v", t.ID, err) + return "" + } + if !res.OK { + log.Printf("[eino] task %s wiki_search error: %s", t.ID, res.Error) + return "" + } + log.Printf("[eino] task %s wiki_search ok: %s", t.ID, res.Content) + return res.Content +} diff --git a/sundynix-dispatcher/internal/nats/subscriber.go b/sundynix-dispatcher/internal/nats/subscriber.go index ddad315..8cd71e1 100644 --- a/sundynix-dispatcher/internal/nats/subscriber.go +++ b/sundynix-dispatcher/internal/nats/subscriber.go @@ -53,4 +53,9 @@ func (s *Subscriber) CompleteStream(taskID string) error { return s.inner.CompleteStream(taskID) } +// CallTool 让 Subscriber 满足 eino.ToolCaller,经 NATS request-reply 调起第 5 层 MCP 工具。 +func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) { + return s.inner.CallTool(ctx, subject, call) +} + func (s *Subscriber) Close() { s.inner.Close() } diff --git a/sundynix-mcp-go/cmd/server/main.go b/sundynix-mcp-go/cmd/server/main.go index c0d493c..73d8147 100644 --- a/sundynix-mcp-go/cmd/server/main.go +++ b/sundynix-mcp-go/cmd/server/main.go @@ -2,18 +2,43 @@ package main import ( + "context" "log" + "os" + "os/signal" + "syscall" + + sharedbus "github.com/sundynix/sundynix-shared/bus" "github.com/sundynix/sundynix-mcp-go/internal/mcp" "github.com/sundynix/sundynix-mcp-go/internal/search" ) func main() { - engine := search.NewHybrid() // LLM Wiki 混合检索:Bleve + Milvus + Neo4j - gw := mcp.NewGateway(engine) + natsURL := envOr("NATS_URL", "nats://localhost:4222") - log.Println("[mcp_go] serving MCP over sundynix.tools.go.*") - if err := gw.Serve(); err != nil { + b, err := sharedbus.Connect(natsURL) + if err != nil { + log.Fatalf("[mcp_go] nats connect: %v", err) + } + defer b.Close() + log.Printf("[mcp_go] connected %s", natsURL) + + engine := search.NewHybrid() // LLM Wiki 混合检索:Bleve + Milvus + Neo4j + gw := mcp.NewGateway(b, engine) + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + log.Println("[mcp_go] serving MCP over sundynix.tools.go.* (Ctrl-C to quit)") + if err := gw.Serve(ctx); err != nil && err != context.Canceled { log.Fatalf("[mcp_go] exit: %v", err) } } + +func envOr(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} diff --git a/sundynix-mcp-go/go.mod b/sundynix-mcp-go/go.mod index dfa14a7..9d636d5 100644 --- a/sundynix-mcp-go/go.mod +++ b/sundynix-mcp-go/go.mod @@ -2,9 +2,16 @@ module github.com/sundynix/sundynix-mcp-go go 1.23 +require github.com/sundynix/sundynix-shared v0.0.0 + require ( - github.com/blevesearch/bleve/v2 v2.4.2 - github.com/milvus-io/milvus-sdk-go/v2 v2.4.1 - github.com/neo4j/neo4j-go-driver/v5 v5.24.0 - github.com/nats-io/nats.go v1.37.0 + github.com/klauspost/compress v1.17.9 // indirect + github.com/nats-io/nats.go v1.37.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect ) + +replace github.com/sundynix/sundynix-shared => ../sundynix-shared diff --git a/sundynix-mcp-go/go.sum b/sundynix-mcp-go/go.sum index 181e66b..7483077 100644 --- a/sundynix-mcp-go/go.sum +++ b/sundynix-mcp-go/go.sum @@ -1,4 +1,22 @@ -github.com/blevesearch/bleve/v2 v2.4.2/go.mod h1:ATNKj7Yl2oJv/lGuF4kx39bST2dveX6w0th2FFYLkc8= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI= +github.com/nats-io/nats-server/v2 v2.10.20/go.mod h1:hgcPnoUtMfxz1qVOvLZGurVypQ+Cg6GXVXjG53iHk+M= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= -github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= -github.com/qdrant/go-client v1.11.0/go.mod h1:j+OVRsJIZhOSRK2toPl8tTBOhwr4AxXCz9RACzv0JB4= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/sundynix-mcp-go/internal/mcp/gateway.go b/sundynix-mcp-go/internal/mcp/gateway.go index 987dbf4..6ad77d9 100644 --- a/sundynix-mcp-go/internal/mcp/gateway.go +++ b/sundynix-mcp-go/internal/mcp/gateway.go @@ -1,18 +1,71 @@ // Package mcp 实现 MCP 协议网关,把工具注册到 NATS 并响应调用。 package mcp -import "github.com/sundynix/sundynix-mcp-go/internal/search" +import ( + "context" + "fmt" + "log" -// Gateway 暴露 MCP 协议端点(stdio / HTTP / NATS)。 + sharedbus "github.com/sundynix/sundynix-shared/bus" + "github.com/sundynix/sundynix-shared/contract" + + "github.com/sundynix/sundynix-mcp-go/internal/search" +) + +// Gateway 暴露 MCP 协议端点,经共享 bus 订阅 sundynix.tools.go.* 响应调用。 type Gateway struct { + bus *sharedbus.Bus search *search.Hybrid } -func NewGateway(s *search.Hybrid) *Gateway { return &Gateway{search: s} } - -// Serve 监听 sundynix.tools.go.* 并按 MCP 协议分发工具调用。 -func (g *Gateway) Serve() error { - // TODO: 注册工具清单 (wiki_search / render_doc / call_external_api ...) - // 订阅 NATS,按 MCP JSON-RPC 解析并路由 - select {} +func NewGateway(b *sharedbus.Bus, s *search.Hybrid) *Gateway { + return &Gateway{bus: b, search: s} +} + +// Serve 以队列组通配订阅 sundynix.tools.go.>,按工具名分发并阻塞。 +func (g *Gateway) Serve(ctx context.Context) error { + unsub, err := g.bus.ServeTool(contract.SubjectToolsGoAll, contract.QueueToolsGo, g.dispatch) + if err != nil { + return err + } + defer func() { _ = unsub() }() + log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, echo", + contract.SubjectToolsGoAll, contract.QueueToolsGo) + <-ctx.Done() + return ctx.Err() +} + +// dispatch 按 ToolCall.Tool 路由到具体工具实现。 +func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { + log.Printf("[mcp_go] tool=%s task=%s args=%v", call.Tool, call.TaskID, call.Args) + switch call.Tool { + case "wiki_search": + return g.wikiSearch(ctx, call) + case "echo": + return &contract.ToolResult{OK: true, Content: fmt.Sprint(call.Args["text"])} + default: + return &contract.ToolResult{OK: false, Error: "unknown tool: " + call.Tool} + } +} + +// wikiSearch 调 Hybrid 混合检索引擎。引擎目前为桩(返回空), +// 这里仍把调用链路做真:真实接入 Bleve/Milvus/Neo4j 后无需改动协议。 +func (g *Gateway) wikiSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { + q, _ := call.Args["q"].(string) + results, err := g.search.Query(ctx, q, 5) + if err != nil { + return &contract.ToolResult{OK: false, Error: "wiki_search: " + err.Error()} + } + return &contract.ToolResult{ + OK: true, + Content: fmt.Sprintf("[wiki_search] 命中 %d 条(Bleve+Milvus+Neo4j 混合检索桩)查询=%q", len(results), preview(q)), + } +} + +func preview(s string) string { + r := []rune(s) + if len(r) > 40 { + return string(r[:40]) + "…" + } + return s } diff --git a/sundynix-shared/bus/bus.go b/sundynix-shared/bus/bus.go index 9e7645a..f63273d 100644 --- a/sundynix-shared/bus/bus.go +++ b/sundynix-shared/bus/bus.go @@ -4,6 +4,7 @@ package bus import ( "context" + "encoding/json" "fmt" "time" @@ -133,6 +134,55 @@ func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func() return sub.Unsubscribe, nil } +// ---- MCP 工具调用(core NATS request-reply)---- + +// CallTool 同步调用一个 MCP 工具:发到 subject,阻塞等待应答。 +// ctx 超时即视为工具不可用,由调用方决定降级。 +func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) { + data, err := json.Marshal(call) + if err != nil { + return nil, fmt.Errorf("marshal tool call: %w", err) + } + msg, err := b.nc.RequestWithContext(ctx, subject, data) + if err != nil { + return nil, fmt.Errorf("call tool %s: %w", subject, err) + } + var res contract.ToolResult + if err := json.Unmarshal(msg.Data, &res); err != nil { + return nil, fmt.Errorf("unmarshal tool result: %w", err) + } + return &res, nil +} + +// ToolHandler 处理一次工具调用并返回结果。 +type ToolHandler func(ctx context.Context, call *contract.ToolCall) *contract.ToolResult + +// ServeTool 以队列组订阅工具主题(可用通配 sundynix.tools.go.>), +// 对每个请求调用 h 并 Respond,队列组内多副本自动负载均衡。 +// 返回的 unsub 用于退订。 +func (b *Bus) ServeTool(subject, queue string, h ToolHandler) (unsub func() error, err error) { + sub, err := b.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) { + var call contract.ToolCall + if err := json.Unmarshal(m.Data, &call); err != nil { + respond(m, &contract.ToolResult{OK: false, Error: "bad tool call: " + err.Error()}) + return + } + respond(m, h(context.Background(), &call)) + }) + if err != nil { + return nil, fmt.Errorf("serve tool %s: %w", subject, err) + } + return sub.Unsubscribe, nil +} + +func respond(m *nats.Msg, res *contract.ToolResult) { + data, err := json.Marshal(res) + if err != nil { + data, _ = json.Marshal(&contract.ToolResult{OK: false, Error: "marshal result: " + err.Error()}) + } + _ = m.Respond(data) +} + // TaskHandler 处理一个消费到的任务。 type TaskHandler func(ctx context.Context, t *contract.Task) error diff --git a/sundynix-shared/bus/bus_e2e_test.go b/sundynix-shared/bus/bus_e2e_test.go index 8dd1541..bed00f8 100644 --- a/sundynix-shared/bus/bus_e2e_test.go +++ b/sundynix-shared/bus/bus_e2e_test.go @@ -93,6 +93,53 @@ func TestTaskRoundTrip(t *testing.T) { } } +// TestToolCallRoundTrip 模拟 Dispatcher 经 NATS 调用 → mcp-go 响应 的工具调用闭环。 +func TestToolCallRoundTrip(t *testing.T) { + url := startEmbeddedNATS(t) + + // --- mcp-go 侧:以队列组订阅工具主题并响应 --- + srv, err := bus.Connect(url) + if err != nil { + t.Fatalf("mcp connect: %v", err) + } + defer srv.Close() + + unsub, err := srv.ServeTool(contract.SubjectToolsGoAll, contract.QueueToolsGo, + func(_ context.Context, call *contract.ToolCall) *contract.ToolResult { + if call.Tool != "wiki_search" { + return &contract.ToolResult{OK: false, Error: "unknown tool"} + } + return &contract.ToolResult{OK: true, Content: "命中:" + call.Args["q"].(string)} + }) + if err != nil { + t.Fatalf("serve tool: %v", err) + } + defer func() { _ = unsub() }() + + // --- Dispatcher 侧:同步调用工具 --- + dp, err := bus.Connect(url) + if err != nil { + t.Fatalf("dispatcher connect: %v", err) + } + defer dp.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + res, err := dp.CallTool(ctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{ + Tool: "wiki_search", + TaskID: "task_tool_001", + Args: map[string]any{"q": "向量检索"}, + }) + if err != nil { + t.Fatalf("call tool: %v", err) + } + if !res.OK || res.Content != "命中:向量检索" { + t.Fatalf("tool result = %+v, want ok content=命中:向量检索", res) + } + t.Logf("✓ 工具调用闭环:Dispatcher → sundynix.tools.go.wiki_search → mcp-go → %q", res.Content) +} + // TestTokenStreamRoundTrip 模拟 Dispatcher 回流 Token → Gateway 订阅 的流式闭环。 func TestTokenStreamRoundTrip(t *testing.T) { url := startEmbeddedNATS(t) diff --git a/sundynix-shared/contract/task.go b/sundynix-shared/contract/task.go index 60c6478..1a16670 100644 --- a/sundynix-shared/contract/task.go +++ b/sundynix-shared/contract/task.go @@ -15,6 +15,15 @@ const ( // HeaderStreamEnd 是 Token 流的结束信号(core NATS 消息头)。 // 置为 "1" 的消息体为空,表示该 task 的 Token 流结束。 HeaderStreamEnd = "X-Stream-End" + + // MCP 工具调用约定(第 4 层 Dispatcher → 第 5 层 MCP Tools)。 + // 用 core NATS request-reply:同步拿结果,队列组内负载均衡。 + SubjectToolsGo = "sundynix.tools.go" // Go I/O 型工具前缀;实际 sundynix.tools.go. + SubjectToolsGoAll = "sundynix.tools.go.>" // mcp-go 通配订阅 + SubjectToolsPy = "sundynix.tools.py" // Python 算法型工具前缀;实际 sundynix.tools.py. + SubjectToolsPyAll = "sundynix.tools.py.>" // mcp-py 通配订阅 + QueueToolsGo = "mcp-go-workers" // mcp-go 队列组(多副本负载均衡) + QueueToolsPy = "mcp-py-workers" // mcp-py 队列组 ) // Task 是 DSL 解析组装后的可调度任务,在 NATS 上以 JSON 传输。 @@ -30,6 +39,24 @@ func TaskSubject(id string) string { return SubjectTasks + "." + id } // StreamSubject 返回某任务的 Token 回流主题。 func StreamSubject(id string) string { return SubjectStream + "." + id } +// ToolSubjectGo / ToolSubjectPy 返回某工具的调用主题。 +func ToolSubjectGo(tool string) string { return SubjectToolsGo + "." + tool } +func ToolSubjectPy(tool string) string { return SubjectToolsPy + "." + tool } + +// ToolCall 是 Dispatcher 对一个 MCP 工具的调用请求(NATS request 体)。 +type ToolCall struct { + Tool string `json:"tool"` // 工具名,如 wiki_search + Args map[string]any `json:"args,omitempty"` // 工具参数 + TaskID string `json:"task_id,omitempty"` // 触发该调用的任务(便于追踪) +} + +// ToolResult 是 MCP 工具的应答(NATS reply 体)。 +type ToolResult struct { + OK bool `json:"ok"` + Content string `json:"content,omitempty"` // 工具产出(如检索结果文本) + Error string `json:"error,omitempty"` // 非空表示工具内部出错 +} + // Marshal / Unmarshal 便捷方法。 func (t *Task) Marshal() ([]byte, error) { return json.Marshal(t) } func Unmarshal(b []byte) (*Task, error) {