feat: 打通 Dispatcher→MCP 工具调用链路 (core NATS request-reply)

第 4 层 Dispatcher 经 NATS request-reply + 队列组同步调用第 5 层 MCP 工具,
工具不可用/超时即降级,不阻断主流程。

- shared/contract: ToolCall/ToolResult + sundynix.tools.go.* subject 约定 + ToolSubjectGo/Py
- shared/bus: CallTool(发起) / ServeTool(队列组订阅+应答)
- mcp-go: 接共享 bus,gateway 通配订阅按工具名分发(wiki_search/echo),main 优雅退出
- dispatcher: ToolCaller 接口 + Orchestrator.retrieveContext(调 wiki_search,超时3s降级)
- e2e: TestToolCallRoundTrip(PASS);demo.sh 加 mcp-go(就绪门避免启动竞态),live 跑通

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Blizzard
2026-06-10 11:31:58 +08:00
parent 61337b1920
commit adc521f94d
12 changed files with 315 additions and 32 deletions
+50
View File
@@ -4,6 +4,7 @@ package bus
import (
"context"
"encoding/json"
"fmt"
"time"
@@ -133,6 +134,55 @@ func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func()
return sub.Unsubscribe, nil
}
// ---- MCP 工具调用(core NATS request-reply----
// CallTool 同步调用一个 MCP 工具:发到 subject,阻塞等待应答。
// ctx 超时即视为工具不可用,由调用方决定降级。
func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) {
data, err := json.Marshal(call)
if err != nil {
return nil, fmt.Errorf("marshal tool call: %w", err)
}
msg, err := b.nc.RequestWithContext(ctx, subject, data)
if err != nil {
return nil, fmt.Errorf("call tool %s: %w", subject, err)
}
var res contract.ToolResult
if err := json.Unmarshal(msg.Data, &res); err != nil {
return nil, fmt.Errorf("unmarshal tool result: %w", err)
}
return &res, nil
}
// ToolHandler 处理一次工具调用并返回结果。
type ToolHandler func(ctx context.Context, call *contract.ToolCall) *contract.ToolResult
// ServeTool 以队列组订阅工具主题(可用通配 sundynix.tools.go.>),
// 对每个请求调用 h 并 Respond,队列组内多副本自动负载均衡。
// 返回的 unsub 用于退订。
func (b *Bus) ServeTool(subject, queue string, h ToolHandler) (unsub func() error, err error) {
sub, err := b.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) {
var call contract.ToolCall
if err := json.Unmarshal(m.Data, &call); err != nil {
respond(m, &contract.ToolResult{OK: false, Error: "bad tool call: " + err.Error()})
return
}
respond(m, h(context.Background(), &call))
})
if err != nil {
return nil, fmt.Errorf("serve tool %s: %w", subject, err)
}
return sub.Unsubscribe, nil
}
func respond(m *nats.Msg, res *contract.ToolResult) {
data, err := json.Marshal(res)
if err != nil {
data, _ = json.Marshal(&contract.ToolResult{OK: false, Error: "marshal result: " + err.Error()})
}
_ = m.Respond(data)
}
// TaskHandler 处理一个消费到的任务。
type TaskHandler func(ctx context.Context, t *contract.Task) error
+47
View File
@@ -93,6 +93,53 @@ func TestTaskRoundTrip(t *testing.T) {
}
}
// TestToolCallRoundTrip 模拟 Dispatcher 经 NATS 调用 → mcp-go 响应 的工具调用闭环。
func TestToolCallRoundTrip(t *testing.T) {
url := startEmbeddedNATS(t)
// --- mcp-go 侧:以队列组订阅工具主题并响应 ---
srv, err := bus.Connect(url)
if err != nil {
t.Fatalf("mcp connect: %v", err)
}
defer srv.Close()
unsub, err := srv.ServeTool(contract.SubjectToolsGoAll, contract.QueueToolsGo,
func(_ context.Context, call *contract.ToolCall) *contract.ToolResult {
if call.Tool != "wiki_search" {
return &contract.ToolResult{OK: false, Error: "unknown tool"}
}
return &contract.ToolResult{OK: true, Content: "命中:" + call.Args["q"].(string)}
})
if err != nil {
t.Fatalf("serve tool: %v", err)
}
defer func() { _ = unsub() }()
// --- Dispatcher 侧:同步调用工具 ---
dp, err := bus.Connect(url)
if err != nil {
t.Fatalf("dispatcher connect: %v", err)
}
defer dp.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := dp.CallTool(ctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{
Tool: "wiki_search",
TaskID: "task_tool_001",
Args: map[string]any{"q": "向量检索"},
})
if err != nil {
t.Fatalf("call tool: %v", err)
}
if !res.OK || res.Content != "命中:向量检索" {
t.Fatalf("tool result = %+v, want ok content=命中:向量检索", res)
}
t.Logf("✓ 工具调用闭环:Dispatcher → sundynix.tools.go.wiki_search → mcp-go → %q", res.Content)
}
// TestTokenStreamRoundTrip 模拟 Dispatcher 回流 Token → Gateway 订阅 的流式闭环。
func TestTokenStreamRoundTrip(t *testing.T) {
url := startEmbeddedNATS(t)
+27
View File
@@ -15,6 +15,15 @@ const (
// HeaderStreamEnd 是 Token 流的结束信号(core NATS 消息头)。
// 置为 "1" 的消息体为空,表示该 task 的 Token 流结束。
HeaderStreamEnd = "X-Stream-End"
// MCP 工具调用约定(第 4 层 Dispatcher → 第 5 层 MCP Tools)。
// 用 core NATS request-reply:同步拿结果,队列组内负载均衡。
SubjectToolsGo = "sundynix.tools.go" // Go I/O 型工具前缀;实际 sundynix.tools.go.<tool>
SubjectToolsGoAll = "sundynix.tools.go.>" // mcp-go 通配订阅
SubjectToolsPy = "sundynix.tools.py" // Python 算法型工具前缀;实际 sundynix.tools.py.<tool>
SubjectToolsPyAll = "sundynix.tools.py.>" // mcp-py 通配订阅
QueueToolsGo = "mcp-go-workers" // mcp-go 队列组(多副本负载均衡)
QueueToolsPy = "mcp-py-workers" // mcp-py 队列组
)
// Task 是 DSL 解析组装后的可调度任务,在 NATS 上以 JSON 传输。
@@ -30,6 +39,24 @@ func TaskSubject(id string) string { return SubjectTasks + "." + id }
// StreamSubject 返回某任务的 Token 回流主题。
func StreamSubject(id string) string { return SubjectStream + "." + id }
// ToolSubjectGo / ToolSubjectPy 返回某工具的调用主题。
func ToolSubjectGo(tool string) string { return SubjectToolsGo + "." + tool }
func ToolSubjectPy(tool string) string { return SubjectToolsPy + "." + tool }
// ToolCall 是 Dispatcher 对一个 MCP 工具的调用请求(NATS request 体)。
type ToolCall struct {
Tool string `json:"tool"` // 工具名,如 wiki_search
Args map[string]any `json:"args,omitempty"` // 工具参数
TaskID string `json:"task_id,omitempty"` // 触发该调用的任务(便于追踪)
}
// ToolResult 是 MCP 工具的应答(NATS reply 体)。
type ToolResult struct {
OK bool `json:"ok"`
Content string `json:"content,omitempty"` // 工具产出(如检索结果文本)
Error string `json:"error,omitempty"` // 非空表示工具内部出错
}
// Marshal / Unmarshal 便捷方法。
func (t *Task) Marshal() ([]byte, error) { return json.Marshal(t) }
func Unmarshal(b []byte) (*Task, error) {