Files
sundynix-agentix/sundynix-mcp-go/internal/mcp/gateway.go
T
Blizzard 2d5fd2fca5 feat: 实时入库监控 + 向量拆分可视化(异步入库 + 进度 SSE)
入库从同步改为异步流水线 + 进度回流(复用 token 流 NATS streaming)。
UI 实时看到 解析→切块→向量化(分批)→写入 各阶段 + 拆分块预览。

- shared: contract.IngestEvent(stage/done/total/chunks/error)
- mcp-go: rag.Ingest 加 onProgress + 分批向量化(10/批)逐批回报;kb_ingest 带 job_id
  把进度发到 sundynix.streams.<job_id> + CompleteStream
- gateway: 入库异步返回 job_id,后台 runIngest 发进度;GET /kb/ingest/:id/stream SSE
- frontend: streamIngest(EventSource);KbView 实时进度面板(阶段徽标+进度条+拆分列表)
- 验证: build✓+e2e PASS; 浏览器 12 行→6 阶段点亮+进度条 12/12+拆分 12 块逐条

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 10:33:36 +08:00

194 lines
7.2 KiB
Go

// Package mcp 实现 MCP 协议网关,把工具注册到 NATS 并响应调用。
package mcp
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
sharedbus "github.com/sundynix/sundynix-shared/bus"
"github.com/sundynix/sundynix-shared/contract"
"github.com/sundynix/sundynix-mcp-go/internal/history"
"github.com/sundynix/sundynix-mcp-go/internal/memory"
"github.com/sundynix/sundynix-mcp-go/internal/rag"
"github.com/sundynix/sundynix-mcp-go/internal/search"
)
// Gateway 暴露 MCP 协议端点,经共享 bus 订阅 sundynix.tools.go.* 响应调用。
type Gateway struct {
bus *sharedbus.Bus
search *search.Hybrid
memory *memory.Store
history *history.Store
rag *rag.Engine
}
func NewGateway(b *sharedbus.Bus, s *search.Hybrid, m *memory.Store, h *history.Store, r *rag.Engine) *Gateway {
return &Gateway{bus: b, search: s, memory: m, history: h, rag: r}
}
// Serve 以队列组通配订阅 sundynix.tools.go.>,按工具名分发并阻塞。
func (g *Gateway) Serve(ctx context.Context) error {
unsub, err := g.bus.ServeTool(contract.SubjectToolsGoAll, contract.QueueToolsGo, g.dispatch)
if err != nil {
return err
}
defer func() { _ = unsub() }()
log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, memory_get, memory_upsert, history_get, history_append, echo",
contract.SubjectToolsGoAll, contract.QueueToolsGo)
<-ctx.Done()
return ctx.Err()
}
// dispatch 按 ToolCall.Tool 路由到具体工具实现。
func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
log.Printf("[mcp_go] tool=%s task=%s args=%v", call.Tool, call.TaskID, call.Args)
switch call.Tool {
case "wiki_search":
return g.wikiSearch(ctx, call)
case "kb_ingest":
return g.kbIngest(ctx, call)
case "kb_search":
return g.kbSearch(ctx, call)
case "memory_get":
return g.memoryGet(ctx, call)
case "memory_upsert":
return g.memoryUpsert(ctx, call)
case "history_get":
return g.historyGet(ctx, call)
case "history_append":
return g.historyAppend(ctx, call)
case "echo":
return &contract.ToolResult{OK: true, Content: fmt.Sprint(call.Args["text"])}
default:
return &contract.ToolResult{OK: false, Error: "unknown tool: " + call.Tool}
}
}
// memoryGet 召回某用户的常驻画像(已渲染为可注入 prompt 的多行文本)。
func (g *Gateway) memoryGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
uid, _ := call.Args["user_id"].(string)
profile, err := g.memory.Get(ctx, uid)
if err != nil {
return &contract.ToolResult{OK: false, Error: "memory_get: " + err.Error()}
}
return &contract.ToolResult{OK: true, Content: profile}
}
// historyGet 召回某会话最近多轮历史,Content 为 JSON 数组 [{role,content},...](正序)。
func (g *Gateway) historyGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
session, _ := call.Args["session_id"].(string)
turns, err := g.history.Get(ctx, session)
if err != nil {
return &contract.ToolResult{OK: false, Error: "history_get: " + err.Error()}
}
data, _ := json.Marshal(turns)
return &contract.ToolResult{OK: true, Content: string(data)}
}
// historyAppend 追加一条会话消息(session_id + role + content)。
func (g *Gateway) historyAppend(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
session, _ := call.Args["session_id"].(string)
role, _ := call.Args["role"].(string)
content, _ := call.Args["content"].(string)
if session == "" || role == "" {
return &contract.ToolResult{OK: false, Error: "history_append: session_id 和 role 必填"}
}
if err := g.history.Append(ctx, session, role, content); err != nil {
return &contract.ToolResult{OK: false, Error: "history_append: " + err.Error()}
}
return &contract.ToolResult{OK: true}
}
// memoryUpsert 写入/更新一条画像偏好(user_id + key + value)。
func (g *Gateway) memoryUpsert(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
uid, _ := call.Args["user_id"].(string)
key, _ := call.Args["key"].(string)
val, _ := call.Args["value"].(string)
if uid == "" || key == "" {
return &contract.ToolResult{OK: false, Error: "memory_upsert: user_id 和 key 必填"}
}
if err := g.memory.Upsert(ctx, uid, key, val); err != nil {
return &contract.ToolResult{OK: false, Error: "memory_upsert: " + err.Error()}
}
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已记住 %s 的「%s」", uid, key)}
}
// wikiSearch 经 RAG 引擎做向量检索(embedding + Milvus)。
// RAG 未就绪时降级返回空命中(不阻断图执行)。
func (g *Gateway) wikiSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
q, _ := call.Args["q"].(string)
kb, _ := call.Args["kb"].(string)
topK := 5
if v, ok := call.Args["topK"].(float64); ok && v > 0 {
topK = int(v)
}
if !g.rag.Ready() {
return &contract.ToolResult{OK: true, Content: "[wiki_search] RAG 未配置(需 embedding + Milvus),无召回"}
}
hits, err := g.rag.Search(ctx, kb, q, topK)
if err != nil {
return &contract.ToolResult{OK: false, Error: "wiki_search: " + err.Error()}
}
var b strings.Builder
fmt.Fprintf(&b, "[wiki_search] 命中 %d 条(Milvus 向量检索):\n", len(hits))
for i, h := range hits {
fmt.Fprintf(&b, "%d. (%.3f) %s\n", i+1, h.Score, h.Text)
}
return &contract.ToolResult{OK: true, Content: strings.TrimRight(b.String(), "\n")}
}
// kbSearch 检索台用:返回结构化命中 JSON [{text,score},...](供检索台展示分数)。
func (g *Gateway) kbSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
q, _ := call.Args["q"].(string)
kb, _ := call.Args["kb"].(string)
topK := 5
if v, ok := call.Args["topK"].(float64); ok && v > 0 {
topK = int(v)
}
if !g.rag.Ready() {
return &contract.ToolResult{OK: true, Content: "[]"}
}
hits, err := g.rag.Search(ctx, kb, q, topK)
if err != nil {
return &contract.ToolResult{OK: false, Error: "kb_search: " + err.Error()}
}
data, _ := json.Marshal(hits)
return &contract.ToolResult{OK: true, Content: string(data)}
}
// kbIngest 把文本入库(切块→embedding→Milvus+Bleve)。
// 带 job_id 时逐阶段把进度发到 sundynix.streams.<job_id>,供 UI 实时入库监控。
func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
kb, _ := call.Args["kb"].(string)
text, _ := call.Args["text"].(string)
jobID, _ := call.Args["job_id"].(string)
if text == "" {
return &contract.ToolResult{OK: false, Error: "kb_ingest: text 必填"}
}
var onProgress func(contract.IngestEvent)
if jobID != "" {
onProgress = func(ev contract.IngestEvent) {
if data, err := json.Marshal(ev); err == nil {
_ = g.bus.PublishToken(jobID, data)
}
}
}
n, err := g.rag.Ingest(ctx, kb, text, onProgress)
if jobID != "" {
if err != nil {
onProgress(contract.IngestEvent{Stage: "失败", Error: err.Error()})
} else {
onProgress(contract.IngestEvent{Stage: "完成", Done: n, Total: n, Msg: fmt.Sprintf("已入库 %d 块", n)})
}
_ = g.bus.CompleteStream(jobID)
}
if err != nil {
return &contract.ToolResult{OK: false, Error: "kb_ingest: " + err.Error()}
}
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已入库 %d 块到知识库 %q", n, kb)}
}