Files
sundynix-agentix/sundynix-mcp-go/internal/mcp/gateway.go
T
Blizzard 1dd6b0cce3 feat(report): 生成只出 Markdown 预览,导出时再渲染 Word/PDF/Markdown
把"渲染"从生成阶段解耦到导出阶段(导出时再处理):
- 生成阶段:报告正文按 Markdown 流式预览(前端 <Markdown> 已渲染),
  dispatcher 不再 eager 渲染 docx,改为经 mcp-go report_store 落盘报告源(title+sections JSON)。
- 导出阶段(按需现渲染):
  - GET /reports/:id/export?format=docx → mcp-go report_export 读源渲染 .docx;
  - ?format=md → 返回 Markdown 文本;
  - PDF → 前端把已渲染的 Markdown 送进打印视图出 PDF(CJK 零字体依赖)。
- 旧 /reports/:id/download 兼容保留(默认 docx)。

改动:
- contract: ReportSourcePath(id) = <id>.json。
- mcp-go: 新增 report_store / report_export 工具(report_render 保留给 Studio render 节点)。
- dispatcher: handleReport 末尾 renderReport → storeReport。
- gateway: DownloadReport → ExportReport(经 NATS 调 report_export)。
- 前端: ReportView 单个「Word」→「导出」组 Word/PDF/Markdown;
  desktop.printReportHtml 客户端打印;api.reportExportUrl。

实测(docker 全栈 + mcp-go + gateway + dispatcher + DeepSeek 真跑):
- 真实生成「绿茶的功效」18s 完成,report_store 落源(5章, 6280B) ✓
- export md 返回正确 Markdown(# 标题/## 小节/正文) ✓
- export docx 为合法「Microsoft Word 2007+」(含 document.xml/Content_Types) ✓
- 前端 tsc 干净 + 生产构建通过 ✓
(注:发现并修复一处环境问题——mcp-go 启动时若 Milvus 未起会阻塞在
 rag 初始化、永不订阅工具,导致所有 mcp-go 工具"no responders";起全栈后正常。
 报告生成在累积大量未完成 DeepSeek 流连接时会偶发卡顿,干净进程下正常。
 前端导出按钮的实时点击因 React 受控输入自动化限制未在预览中走通,非代码缺陷。)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 14:04:06 +08:00

336 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package mcp 实现 MCP 协议网关,把工具注册到 NATS 并响应调用。
package mcp
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"strings"
sharedbus "github.com/sundynix/sundynix-shared/bus"
"github.com/sundynix/sundynix-shared/contract"
"github.com/sundynix/sundynix-mcp-go/internal/history"
"github.com/sundynix/sundynix-mcp-go/internal/memory"
"github.com/sundynix/sundynix-mcp-go/internal/office"
"github.com/sundynix/sundynix-mcp-go/internal/rag"
"github.com/sundynix/sundynix-mcp-go/internal/search"
)
// Gateway 暴露 MCP 协议端点,经共享 bus 订阅 sundynix.tools.go.* 响应调用。
type Gateway struct {
bus *sharedbus.Bus
search *search.Hybrid
memory *memory.Store
history *history.Store
rag *rag.Engine
}
func NewGateway(b *sharedbus.Bus, s *search.Hybrid, m *memory.Store, h *history.Store, r *rag.Engine) *Gateway {
return &Gateway{bus: b, search: s, memory: m, history: h, rag: r}
}
// Serve 以队列组通配订阅 sundynix.tools.go.>,按工具名分发并阻塞。
func (g *Gateway) Serve(ctx context.Context) error {
unsub, err := g.bus.ServeTool(contract.SubjectToolsGoAll, contract.QueueToolsGo, g.dispatch)
if err != nil {
return err
}
defer func() { _ = unsub() }()
log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, kb_graph, report_render, memory_*, history_*, echo",
contract.SubjectToolsGoAll, contract.QueueToolsGo)
<-ctx.Done()
return ctx.Err()
}
// dispatch 按 ToolCall.Tool 路由到具体工具实现。
func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
log.Printf("[mcp_go] tool=%s task=%s args=%v", call.Tool, call.TaskID, call.Args)
switch call.Tool {
case "wiki_search":
return g.wikiSearch(ctx, call)
case "kb_ingest":
return g.kbIngest(ctx, call)
case "kb_search":
return g.kbSearch(ctx, call)
case "kb_graph":
return g.kbGraph(ctx, call)
case "report_render":
return g.reportRender(ctx, call)
case "report_store":
return g.reportStore(ctx, call)
case "report_export":
return g.reportExport(ctx, call)
case "health":
data, _ := json.Marshal(g.rag.Status())
return &contract.ToolResult{OK: true, Content: string(data)}
case "memory_get":
return g.memoryGet(ctx, call)
case "memory_upsert":
return g.memoryUpsert(ctx, call)
case "history_get":
return g.historyGet(ctx, call)
case "history_append":
return g.historyAppend(ctx, call)
case "echo":
return &contract.ToolResult{OK: true, Content: fmt.Sprint(call.Args["text"])}
default:
return &contract.ToolResult{OK: false, Error: "unknown tool: " + call.Tool}
}
}
// memoryGet 召回某用户的常驻画像(已渲染为可注入 prompt 的多行文本)。
func (g *Gateway) memoryGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
uid, _ := call.Args["user_id"].(string)
profile, err := g.memory.Get(ctx, uid)
if err != nil {
return &contract.ToolResult{OK: false, Error: "memory_get: " + err.Error()}
}
return &contract.ToolResult{OK: true, Content: profile}
}
// historyGet 召回某会话最近多轮历史,Content 为 JSON 数组 [{role,content},...](正序)。
func (g *Gateway) historyGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
session, _ := call.Args["session_id"].(string)
turns, err := g.history.Get(ctx, session)
if err != nil {
return &contract.ToolResult{OK: false, Error: "history_get: " + err.Error()}
}
data, _ := json.Marshal(turns)
return &contract.ToolResult{OK: true, Content: string(data)}
}
// historyAppend 追加一条会话消息(session_id + role + content)。
func (g *Gateway) historyAppend(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
session, _ := call.Args["session_id"].(string)
role, _ := call.Args["role"].(string)
content, _ := call.Args["content"].(string)
if session == "" || role == "" {
return &contract.ToolResult{OK: false, Error: "history_append: session_id 和 role 必填"}
}
if err := g.history.Append(ctx, session, role, content); err != nil {
return &contract.ToolResult{OK: false, Error: "history_append: " + err.Error()}
}
return &contract.ToolResult{OK: true}
}
// memoryUpsert 写入/更新一条画像偏好(user_id + key + value)。
func (g *Gateway) memoryUpsert(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
uid, _ := call.Args["user_id"].(string)
key, _ := call.Args["key"].(string)
val, _ := call.Args["value"].(string)
if uid == "" || key == "" {
return &contract.ToolResult{OK: false, Error: "memory_upsert: user_id 和 key 必填"}
}
if err := g.memory.Upsert(ctx, uid, key, val); err != nil {
return &contract.ToolResult{OK: false, Error: "memory_upsert: " + err.Error()}
}
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已记住 %s 的「%s」", uid, key)}
}
// wikiSearch 经 RAG 引擎做向量检索(embedding + Milvus)。
// RAG 未就绪时降级返回空命中(不阻断图执行)。
func (g *Gateway) wikiSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
q, _ := call.Args["q"].(string)
kb, _ := call.Args["kb"].(string)
topK := 5
if v, ok := call.Args["topK"].(float64); ok && v > 0 {
topK = int(v)
}
if !g.rag.Ready() {
return &contract.ToolResult{OK: true, Content: "[wiki_search] RAG 未配置(需 embedding + Milvus),无召回"}
}
hits, err := g.rag.Search(ctx, kb, q, topK)
if err != nil {
return &contract.ToolResult{OK: false, Error: "wiki_search: " + err.Error()}
}
var b strings.Builder
fmt.Fprintf(&b, "[wiki_search] 命中 %d 条(Milvus 向量检索):\n", len(hits))
for i, h := range hits {
fmt.Fprintf(&b, "%d. (%.3f) %s\n", i+1, h.Score, h.Text)
}
return &contract.ToolResult{OK: true, Content: strings.TrimRight(b.String(), "\n")}
}
// kbSearch 检索台用:返回结构化命中 JSON [{text,score},...](供检索台展示分数)。
func (g *Gateway) kbSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
q, _ := call.Args["q"].(string)
kb, _ := call.Args["kb"].(string)
topK := 5
if v, ok := call.Args["topK"].(float64); ok && v > 0 {
topK = int(v)
}
if !g.rag.Ready() {
return &contract.ToolResult{OK: true, Content: "[]"}
}
hits, err := g.rag.Search(ctx, kb, q, topK)
if err != nil {
return &contract.ToolResult{OK: false, Error: "kb_search: " + err.Error()}
}
data, _ := json.Marshal(hits)
return &contract.ToolResult{OK: true, Content: string(data)}
}
// kbGraph 返回某知识库的图谱三元组 JSON [{s,p,o},...](供 UI 可视化 Neo4j 情况)。
func (g *Gateway) kbGraph(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
kb, _ := call.Args["kb"].(string)
limit := 100
if v, ok := call.Args["limit"].(float64); ok && v > 0 {
limit = int(v)
}
triples := g.rag.Triples(ctx, kb, limit)
data, _ := json.Marshal(triples)
return &contract.ToolResult{OK: true, Content: string(data)}
}
// reportRender 把结构化报告(title + sections[{heading,body}])渲染为真实 .docx
// 落盘到 contract.ReportPath(task_id),返回绝对路径供 Gateway 提供下载。
func (g *Gateway) reportRender(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
title, _ := call.Args["title"].(string)
id, _ := call.Args["task_id"].(string)
if id == "" {
id = call.TaskID
}
if id == "" {
return &contract.ToolResult{OK: false, Error: "report_render: task_id 必填"}
}
// sections 经 NATS JSON 透传,统一 re-marshal 再解出强类型。
var secs []office.Section
if raw, err := json.Marshal(call.Args["sections"]); err == nil {
_ = json.Unmarshal(raw, &secs)
}
data, err := office.NewRenderer().RenderReport(ctx, title, secs)
if err != nil {
return &contract.ToolResult{OK: false, Error: "report_render: " + err.Error()}
}
path := contract.ReportPath(id)
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return &contract.ToolResult{OK: false, Error: "report_render: mkdir " + err.Error()}
}
if err := os.WriteFile(path, data, 0o644); err != nil {
return &contract.ToolResult{OK: false, Error: "report_render: write " + err.Error()}
}
log.Printf("[mcp_go] report_render 已生成 %s (%d 字节, %d 章节)", path, len(data), len(secs))
return &contract.ToolResult{OK: true, Content: path}
}
// reportSource 是报告的可序列化源数据(标题 + 章节),导出时据此渲染各格式。
type reportSource struct {
Title string `json:"title"`
Sections []office.Section `json:"sections"`
}
// reportStore 把报告源数据(title + sections)落盘为 JSON,供导出时按需渲染 Word/PDF/Markdown。
// 生成阶段只存源、不渲染("导出时再处理")。
func (g *Gateway) reportStore(_ context.Context, call *contract.ToolCall) *contract.ToolResult {
id, _ := call.Args["task_id"].(string)
if id == "" {
id = call.TaskID
}
if id == "" {
return &contract.ToolResult{OK: false, Error: "report_store: task_id 必填"}
}
title, _ := call.Args["title"].(string)
var secs []office.Section
if raw, err := json.Marshal(call.Args["sections"]); err == nil {
_ = json.Unmarshal(raw, &secs)
}
data, _ := json.Marshal(reportSource{Title: title, Sections: secs})
path := contract.ReportSourcePath(id)
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return &contract.ToolResult{OK: false, Error: "report_store: mkdir " + err.Error()}
}
if err := os.WriteFile(path, data, 0o644); err != nil {
return &contract.ToolResult{OK: false, Error: "report_store: write " + err.Error()}
}
log.Printf("[mcp_go] report_store 已存源 %s (%d 章节)", path, len(secs))
return &contract.ToolResult{OK: true, Content: path}
}
// reportExport 按需把已存报告源渲染为指定格式:
// docx → 渲染并落盘,返回 .docx 路径;md → 返回 Markdown 文本。
func (g *Gateway) reportExport(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
id, _ := call.Args["task_id"].(string)
if id == "" {
id = call.TaskID
}
if id == "" {
return &contract.ToolResult{OK: false, Error: "report_export: task_id 必填"}
}
format, _ := call.Args["format"].(string)
raw, err := os.ReadFile(contract.ReportSourcePath(id))
if err != nil {
return &contract.ToolResult{OK: false, Error: "report_export: 报告尚未生成或已过期"}
}
var src reportSource
if err := json.Unmarshal(raw, &src); err != nil {
return &contract.ToolResult{OK: false, Error: "report_export: 源解析失败"}
}
switch format {
case "md", "markdown":
return &contract.ToolResult{OK: true, Content: reportMarkdown(src)}
default: // docx
data, rerr := office.NewRenderer().RenderReport(ctx, src.Title, src.Sections)
if rerr != nil {
return &contract.ToolResult{OK: false, Error: "report_export: " + rerr.Error()}
}
path := contract.ReportPath(id)
if err := os.WriteFile(path, data, 0o644); err != nil {
return &contract.ToolResult{OK: false, Error: "report_export: write " + err.Error()}
}
log.Printf("[mcp_go] report_export 已渲染 docx %s (%d 字节)", path, len(data))
return &contract.ToolResult{OK: true, Content: path}
}
}
// reportMarkdown 把报告源拼为 Markdown(标题 + 各章 ## 小标题 + 正文)。
func reportMarkdown(src reportSource) string {
var b strings.Builder
if src.Title != "" {
b.WriteString("# " + src.Title + "\n\n")
}
for _, s := range src.Sections {
if s.Heading != "" {
b.WriteString("## " + s.Heading + "\n\n")
}
b.WriteString(strings.TrimSpace(s.Body) + "\n\n")
}
return b.String()
}
// kbIngest 把文本入库(切块→embedding→Milvus+Bleve)。
// 带 job_id 时逐阶段把进度发到 sundynix.streams.<job_id>,供 UI 实时入库监控。
func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
kb, _ := call.Args["kb"].(string)
doc, _ := call.Args["doc"].(string)
text, _ := call.Args["text"].(string)
jobID, _ := call.Args["job_id"].(string)
if text == "" {
return &contract.ToolResult{OK: false, Error: "kb_ingest: text 必填"}
}
var onProgress func(contract.IngestEvent)
if jobID != "" {
onProgress = func(ev contract.IngestEvent) {
if data, err := json.Marshal(ev); err == nil {
_ = g.bus.PublishToken(jobID, data)
}
}
}
n, err := g.rag.Ingest(ctx, kb, doc, text, onProgress)
if jobID != "" {
if err != nil {
onProgress(contract.IngestEvent{Stage: "失败", Error: err.Error()})
} else {
onProgress(contract.IngestEvent{Stage: "完成", Done: n, Total: n, Msg: fmt.Sprintf("已入库 %d 块", n)})
}
_ = g.bus.CompleteStream(jobID)
}
if err != nil {
return &contract.ToolResult{OK: false, Error: "kb_ingest: " + err.Error()}
}
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已入库 %d 块到知识库 %q", n, kb)}
}