// Package mcp 实现 MCP 协议网关,把工具注册到 NATS 并响应调用。 package mcp import ( "context" "encoding/json" "fmt" "log" "os" "path/filepath" "strings" sharedbus "github.com/sundynix/sundynix-shared/bus" "github.com/sundynix/sundynix-shared/contract" "github.com/sundynix/sundynix-mcp-go/internal/history" "github.com/sundynix/sundynix-mcp-go/internal/memory" "github.com/sundynix/sundynix-mcp-go/internal/office" "github.com/sundynix/sundynix-mcp-go/internal/rag" "github.com/sundynix/sundynix-mcp-go/internal/search" ) // Gateway 暴露 MCP 协议端点,经共享 bus 订阅 sundynix.tools.go.* 响应调用。 type Gateway struct { bus *sharedbus.Bus search *search.Hybrid memory *memory.Store history *history.Store rag *rag.Engine } func NewGateway(b *sharedbus.Bus, s *search.Hybrid, m *memory.Store, h *history.Store, r *rag.Engine) *Gateway { return &Gateway{bus: b, search: s, memory: m, history: h, rag: r} } // Serve 以队列组通配订阅 sundynix.tools.go.>,按工具名分发并阻塞。 func (g *Gateway) Serve(ctx context.Context) error { unsub, err := g.bus.ServeTool(contract.SubjectToolsGoAll, contract.QueueToolsGo, g.dispatch) if err != nil { return err } defer func() { _ = unsub() }() log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, kb_graph, report_render, memory_*, history_*, echo", contract.SubjectToolsGoAll, contract.QueueToolsGo) <-ctx.Done() return ctx.Err() } // dispatch 按 ToolCall.Tool 路由到具体工具实现。 func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { log.Printf("[mcp_go] tool=%s task=%s args=%v", call.Tool, call.TaskID, call.Args) switch call.Tool { case "wiki_search": return g.wikiSearch(ctx, call) case "kb_ingest": return g.kbIngest(ctx, call) case "kb_search": return g.kbSearch(ctx, call) case "kb_graph": return g.kbGraph(ctx, call) case "report_render": return g.reportRender(ctx, call) case "report_store": return g.reportStore(ctx, call) case "report_export": return g.reportExport(ctx, call) case "external_api": return g.externalAPI(ctx, call) case "health": data, _ := json.Marshal(g.rag.Status()) return &contract.ToolResult{OK: true, Content: string(data)} case "memory_get": return g.memoryGet(ctx, call) case "memory_upsert": return g.memoryUpsert(ctx, call) case "history_get": return g.historyGet(ctx, call) case "history_append": return g.historyAppend(ctx, call) case "echo": return &contract.ToolResult{OK: true, Content: fmt.Sprint(call.Args["text"])} default: return &contract.ToolResult{OK: false, Error: "unknown tool: " + call.Tool} } } // memoryGet 召回某用户的常驻画像(已渲染为可注入 prompt 的多行文本)。 func (g *Gateway) memoryGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { uid, _ := call.Args["user_id"].(string) profile, err := g.memory.Get(ctx, uid) if err != nil { return &contract.ToolResult{OK: false, Error: "memory_get: " + err.Error()} } return &contract.ToolResult{OK: true, Content: profile} } // historyGet 召回某会话最近多轮历史,Content 为 JSON 数组 [{role,content},...](正序)。 func (g *Gateway) historyGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { session, _ := call.Args["session_id"].(string) turns, err := g.history.Get(ctx, session) if err != nil { return &contract.ToolResult{OK: false, Error: "history_get: " + err.Error()} } data, _ := json.Marshal(turns) return &contract.ToolResult{OK: true, Content: string(data)} } // historyAppend 追加一条会话消息(session_id + role + content)。 func (g *Gateway) historyAppend(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { session, _ := call.Args["session_id"].(string) role, _ := call.Args["role"].(string) content, _ := call.Args["content"].(string) if session == "" || role == "" { return &contract.ToolResult{OK: false, Error: "history_append: session_id 和 role 必填"} } if err := g.history.Append(ctx, session, role, content); err != nil { return &contract.ToolResult{OK: false, Error: "history_append: " + err.Error()} } return &contract.ToolResult{OK: true} } // memoryUpsert 写入/更新一条画像偏好(user_id + key + value)。 func (g *Gateway) memoryUpsert(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { uid, _ := call.Args["user_id"].(string) key, _ := call.Args["key"].(string) val, _ := call.Args["value"].(string) if uid == "" || key == "" { return &contract.ToolResult{OK: false, Error: "memory_upsert: user_id 和 key 必填"} } if err := g.memory.Upsert(ctx, uid, key, val); err != nil { return &contract.ToolResult{OK: false, Error: "memory_upsert: " + err.Error()} } return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已记住 %s 的「%s」", uid, key)} } // wikiSearch 经 RAG 引擎做向量检索(embedding + Milvus)。 // RAG 未就绪时降级返回空命中(不阻断图执行)。 func (g *Gateway) wikiSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { q, _ := call.Args["q"].(string) kb, _ := call.Args["kb"].(string) topK := 5 if v, ok := call.Args["topK"].(float64); ok && v > 0 { topK = int(v) } if !g.rag.Ready() { return &contract.ToolResult{OK: true, Content: "[wiki_search] RAG 未配置(需 embedding + Milvus),无召回"} } hits, err := g.rag.Search(ctx, kb, q, topK) if err != nil { return &contract.ToolResult{OK: false, Error: "wiki_search: " + err.Error()} } var b strings.Builder fmt.Fprintf(&b, "[wiki_search] 命中 %d 条(Milvus 向量检索):\n", len(hits)) for i, h := range hits { fmt.Fprintf(&b, "%d. (%.3f) %s\n", i+1, h.Score, h.Text) } return &contract.ToolResult{OK: true, Content: strings.TrimRight(b.String(), "\n")} } // kbSearch 检索台用:返回结构化命中 JSON [{text,score},...](供检索台展示分数)。 func (g *Gateway) kbSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { q, _ := call.Args["q"].(string) kb, _ := call.Args["kb"].(string) topK := 5 if v, ok := call.Args["topK"].(float64); ok && v > 0 { topK = int(v) } if !g.rag.Ready() { return &contract.ToolResult{OK: true, Content: "[]"} } hits, err := g.rag.Search(ctx, kb, q, topK) if err != nil { return &contract.ToolResult{OK: false, Error: "kb_search: " + err.Error()} } data, _ := json.Marshal(hits) return &contract.ToolResult{OK: true, Content: string(data)} } // kbGraph 返回某知识库的图谱三元组 JSON [{s,p,o},...](供 UI 可视化 Neo4j 情况)。 func (g *Gateway) kbGraph(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { kb, _ := call.Args["kb"].(string) limit := 100 if v, ok := call.Args["limit"].(float64); ok && v > 0 { limit = int(v) } triples := g.rag.Triples(ctx, kb, limit) data, _ := json.Marshal(triples) return &contract.ToolResult{OK: true, Content: string(data)} } // reportRender 把结构化报告(title + sections[{heading,body}])渲染为真实 .docx, // 落盘到 contract.ReportPath(task_id),返回绝对路径供 Gateway 提供下载。 func (g *Gateway) reportRender(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { title, _ := call.Args["title"].(string) id, _ := call.Args["task_id"].(string) if id == "" { id = call.TaskID } if id == "" { return &contract.ToolResult{OK: false, Error: "report_render: task_id 必填"} } // sections 经 NATS JSON 透传,统一 re-marshal 再解出强类型。 var secs []office.Section if raw, err := json.Marshal(call.Args["sections"]); err == nil { _ = json.Unmarshal(raw, &secs) } data, err := office.NewRenderer().RenderReport(ctx, title, secs) if err != nil { return &contract.ToolResult{OK: false, Error: "report_render: " + err.Error()} } path := contract.ReportPath(id) if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return &contract.ToolResult{OK: false, Error: "report_render: mkdir " + err.Error()} } if err := os.WriteFile(path, data, 0o644); err != nil { return &contract.ToolResult{OK: false, Error: "report_render: write " + err.Error()} } log.Printf("[mcp_go] report_render 已生成 %s (%d 字节, %d 章节)", path, len(data), len(secs)) return &contract.ToolResult{OK: true, Content: path} } // reportSource 是报告的可序列化源数据(标题 + 章节),导出时据此渲染各格式。 type reportSource struct { Title string `json:"title"` Sections []office.Section `json:"sections"` } // reportStore 把报告源数据(title + sections)落盘为 JSON,供导出时按需渲染 Word/PDF/Markdown。 // 生成阶段只存源、不渲染("导出时再处理")。 func (g *Gateway) reportStore(_ context.Context, call *contract.ToolCall) *contract.ToolResult { id, _ := call.Args["task_id"].(string) if id == "" { id = call.TaskID } if id == "" { return &contract.ToolResult{OK: false, Error: "report_store: task_id 必填"} } title, _ := call.Args["title"].(string) var secs []office.Section if raw, err := json.Marshal(call.Args["sections"]); err == nil { _ = json.Unmarshal(raw, &secs) } data, _ := json.Marshal(reportSource{Title: title, Sections: secs}) path := contract.ReportSourcePath(id) if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return &contract.ToolResult{OK: false, Error: "report_store: mkdir " + err.Error()} } if err := os.WriteFile(path, data, 0o644); err != nil { return &contract.ToolResult{OK: false, Error: "report_store: write " + err.Error()} } log.Printf("[mcp_go] report_store 已存源 %s (%d 章节)", path, len(secs)) return &contract.ToolResult{OK: true, Content: path} } // reportExport 按需把已存报告源渲染为指定格式: // docx → 渲染并落盘,返回 .docx 路径;md → 返回 Markdown 文本。 func (g *Gateway) reportExport(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { id, _ := call.Args["task_id"].(string) if id == "" { id = call.TaskID } if id == "" { return &contract.ToolResult{OK: false, Error: "report_export: task_id 必填"} } format, _ := call.Args["format"].(string) raw, err := os.ReadFile(contract.ReportSourcePath(id)) if err != nil { return &contract.ToolResult{OK: false, Error: "report_export: 报告尚未生成或已过期"} } var src reportSource if err := json.Unmarshal(raw, &src); err != nil { return &contract.ToolResult{OK: false, Error: "report_export: 源解析失败"} } switch format { case "md", "markdown": return &contract.ToolResult{OK: true, Content: reportMarkdown(src)} default: // docx data, rerr := office.NewRenderer().RenderReport(ctx, src.Title, src.Sections) if rerr != nil { return &contract.ToolResult{OK: false, Error: "report_export: " + rerr.Error()} } path := contract.ReportPath(id) if err := os.WriteFile(path, data, 0o644); err != nil { return &contract.ToolResult{OK: false, Error: "report_export: write " + err.Error()} } log.Printf("[mcp_go] report_export 已渲染 docx %s (%d 字节)", path, len(data)) return &contract.ToolResult{OK: true, Content: path} } } // reportMarkdown 把报告源拼为 Markdown(标题 + 各章 ## 小标题 + 正文)。 func reportMarkdown(src reportSource) string { var b strings.Builder if src.Title != "" { b.WriteString("# " + src.Title + "\n\n") } for _, s := range src.Sections { if s.Heading != "" { b.WriteString("## " + s.Heading + "\n\n") } b.WriteString(strings.TrimSpace(s.Body) + "\n\n") } return b.String() } // kbIngest 把文本入库(切块→embedding→Milvus+Bleve)。 // 带 job_id 时逐阶段把进度发到 sundynix.streams.,供 UI 实时入库监控。 func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { kb, _ := call.Args["kb"].(string) doc, _ := call.Args["doc"].(string) text, _ := call.Args["text"].(string) jobID, _ := call.Args["job_id"].(string) if text == "" { return &contract.ToolResult{OK: false, Error: "kb_ingest: text 必填"} } var onProgress func(contract.IngestEvent) if jobID != "" { onProgress = func(ev contract.IngestEvent) { if data, err := json.Marshal(ev); err == nil { _ = g.bus.PublishToken(jobID, data) } } } n, err := g.rag.Ingest(ctx, kb, doc, text, onProgress) if jobID != "" { if err != nil { onProgress(contract.IngestEvent{Stage: "失败", Error: err.Error()}) } else { onProgress(contract.IngestEvent{Stage: "完成", Done: n, Total: n, Msg: fmt.Sprintf("已入库 %d 块", n)}) } _ = g.bus.CompleteStream(jobID) } if err != nil { return &contract.ToolResult{OK: false, Error: "kb_ingest: " + err.Error()} } return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已入库 %d 块到知识库 %q", n, kb)} }