feat: RAG 核心链 — embedding(provider) + Milvus 真连 + 入库/检索
mcp-go 接通向量 RAG:embedding(OpenAI 兼容 provider 抽象) + Milvus 真实连接, kb_ingest 入库、wiki_search 真检索。retriever 节点一行不改即从桩变真。 - mcp-go internal/rag: embed.go(OpenAI 兼容 /embeddings 客户端) + milvus.go(milvus-sdk-go 真连,集合按首次 embedding 维度懒建+AUTOINDEX/COSINE索引+加载,insert/向量search) + rag.go(Engine: 切块→embed→insert / embed query→search;embedding 或 Milvus 缺则降级) - mcp-go gateway: 新工具 kb_ingest,wiki_search 换真(RAG 向量检索,kb 过滤 topK) - mcp-go main: rag.Open 读 MILVUS_ADDR/EMBED_BASE_URL/EMBED_API_KEY/EMBED_MODEL 环境变量 - gateway: POST /api/v1/kb/ingest → kb_ingest(供知识库页/脚本) - scripts/mock_embeddings.py: 确定性词法向量(字+bigram 哈希),无真 key 验证检索 - 开发期 embedding 接在线 API(无真 key 用 mock),见 llm-provider-strategy - 验证: 全模块 build✓ + e2e PASS; live——入库5条→Milvus;retriever 节点查'向量数据库' →召回 Milvus 那条→DeepSeek 答'Milvus';查'知识图谱'→Neo4j(向量检索区分正确) 注: 当前向量单路;Bleve/Neo4j 融合 + rerank + 真实语义 embedding 为后续。 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -6,12 +6,14 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
sharedbus "github.com/sundynix/sundynix-shared/bus"
|
||||
"github.com/sundynix/sundynix-shared/contract"
|
||||
|
||||
"github.com/sundynix/sundynix-mcp-go/internal/history"
|
||||
"github.com/sundynix/sundynix-mcp-go/internal/memory"
|
||||
"github.com/sundynix/sundynix-mcp-go/internal/rag"
|
||||
"github.com/sundynix/sundynix-mcp-go/internal/search"
|
||||
)
|
||||
|
||||
@@ -21,10 +23,11 @@ type Gateway struct {
|
||||
search *search.Hybrid
|
||||
memory *memory.Store
|
||||
history *history.Store
|
||||
rag *rag.Engine
|
||||
}
|
||||
|
||||
func NewGateway(b *sharedbus.Bus, s *search.Hybrid, m *memory.Store, h *history.Store) *Gateway {
|
||||
return &Gateway{bus: b, search: s, memory: m, history: h}
|
||||
func NewGateway(b *sharedbus.Bus, s *search.Hybrid, m *memory.Store, h *history.Store, r *rag.Engine) *Gateway {
|
||||
return &Gateway{bus: b, search: s, memory: m, history: h, rag: r}
|
||||
}
|
||||
|
||||
// Serve 以队列组通配订阅 sundynix.tools.go.>,按工具名分发并阻塞。
|
||||
@@ -34,7 +37,7 @@ func (g *Gateway) Serve(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = unsub() }()
|
||||
log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, memory_get, memory_upsert, history_get, history_append, echo",
|
||||
log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, memory_get, memory_upsert, history_get, history_append, echo",
|
||||
contract.SubjectToolsGoAll, contract.QueueToolsGo)
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
@@ -46,6 +49,8 @@ func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contra
|
||||
switch call.Tool {
|
||||
case "wiki_search":
|
||||
return g.wikiSearch(ctx, call)
|
||||
case "kb_ingest":
|
||||
return g.kbIngest(ctx, call)
|
||||
case "memory_get":
|
||||
return g.memoryGet(ctx, call)
|
||||
case "memory_upsert":
|
||||
@@ -110,24 +115,40 @@ func (g *Gateway) memoryUpsert(ctx context.Context, call *contract.ToolCall) *co
|
||||
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已记住 %s 的「%s」", uid, key)}
|
||||
}
|
||||
|
||||
// wikiSearch 调 Hybrid 混合检索引擎。引擎目前为桩(返回空),
|
||||
// 这里仍把调用链路做真:真实接入 Bleve/Milvus/Neo4j 后无需改动协议。
|
||||
// wikiSearch 经 RAG 引擎做向量检索(embedding + Milvus)。
|
||||
// RAG 未就绪时降级返回空命中(不阻断图执行)。
|
||||
func (g *Gateway) wikiSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
|
||||
q, _ := call.Args["q"].(string)
|
||||
results, err := g.search.Query(ctx, q, 5)
|
||||
kb, _ := call.Args["kb"].(string)
|
||||
topK := 5
|
||||
if v, ok := call.Args["topK"].(float64); ok && v > 0 {
|
||||
topK = int(v)
|
||||
}
|
||||
if !g.rag.Ready() {
|
||||
return &contract.ToolResult{OK: true, Content: "[wiki_search] RAG 未配置(需 embedding + Milvus),无召回"}
|
||||
}
|
||||
hits, err := g.rag.Search(ctx, kb, q, topK)
|
||||
if err != nil {
|
||||
return &contract.ToolResult{OK: false, Error: "wiki_search: " + err.Error()}
|
||||
}
|
||||
return &contract.ToolResult{
|
||||
OK: true,
|
||||
Content: fmt.Sprintf("[wiki_search] 命中 %d 条(Bleve+Milvus+Neo4j 混合检索桩)查询=%q", len(results), preview(q)),
|
||||
var b strings.Builder
|
||||
fmt.Fprintf(&b, "[wiki_search] 命中 %d 条(Milvus 向量检索):\n", len(hits))
|
||||
for i, h := range hits {
|
||||
fmt.Fprintf(&b, "%d. (%.3f) %s\n", i+1, h.Score, h.Text)
|
||||
}
|
||||
return &contract.ToolResult{OK: true, Content: strings.TrimRight(b.String(), "\n")}
|
||||
}
|
||||
|
||||
func preview(s string) string {
|
||||
r := []rune(s)
|
||||
if len(r) > 40 {
|
||||
return string(r[:40]) + "…"
|
||||
// kbIngest 把文本入库(切块→embedding→Milvus)。
|
||||
func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
|
||||
kb, _ := call.Args["kb"].(string)
|
||||
text, _ := call.Args["text"].(string)
|
||||
if text == "" {
|
||||
return &contract.ToolResult{OK: false, Error: "kb_ingest: text 必填"}
|
||||
}
|
||||
return s
|
||||
n, err := g.rag.Ingest(ctx, kb, text)
|
||||
if err != nil {
|
||||
return &contract.ToolResult{OK: false, Error: "kb_ingest: " + err.Error()}
|
||||
}
|
||||
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已入库 %d 块到知识库 %q", n, kb)}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
package rag
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// embedClient 是 OpenAI 兼容的 embeddings 客户端(provider 抽象)。
|
||||
// 开发期指向第三方在线 API 或本地 mock;生产期换自部署/在线 embedding 模型。
|
||||
type embedClient struct {
|
||||
baseURL string
|
||||
apiKey string
|
||||
model string
|
||||
hc *http.Client
|
||||
}
|
||||
|
||||
func newEmbedClient(baseURL, apiKey, model string) *embedClient {
|
||||
return &embedClient{
|
||||
baseURL: baseURL,
|
||||
apiKey: apiKey,
|
||||
model: model,
|
||||
hc: &http.Client{Timeout: 30 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func (e *embedClient) ready() bool { return e != nil && e.baseURL != "" && e.model != "" }
|
||||
|
||||
// Embed 把若干文本向量化(OpenAI 兼容 /embeddings)。
|
||||
func (e *embedClient) Embed(ctx context.Context, texts []string) ([][]float32, error) {
|
||||
if !e.ready() {
|
||||
return nil, fmt.Errorf("embedding not configured")
|
||||
}
|
||||
body, _ := json.Marshal(map[string]any{"model": e.model, "input": texts})
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.baseURL+"/embeddings", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if e.apiKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+e.apiKey)
|
||||
}
|
||||
resp, err := e.hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("embed request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
buf := new(bytes.Buffer)
|
||||
_, _ = buf.ReadFrom(resp.Body)
|
||||
return nil, fmt.Errorf("embed http %d: %s", resp.StatusCode, buf.String())
|
||||
}
|
||||
var out struct {
|
||||
Data []struct {
|
||||
Embedding []float32 `json:"embedding"`
|
||||
Index int `json:"index"`
|
||||
} `json:"data"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, fmt.Errorf("embed decode: %w", err)
|
||||
}
|
||||
vecs := make([][]float32, len(out.Data))
|
||||
for _, d := range out.Data {
|
||||
if d.Index >= 0 && d.Index < len(vecs) {
|
||||
vecs[d.Index] = d.Embedding
|
||||
}
|
||||
}
|
||||
return vecs, nil
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
package rag
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus-sdk-go/v2/client"
|
||||
"github.com/milvus-io/milvus-sdk-go/v2/entity"
|
||||
)
|
||||
|
||||
const collection = "sundynix_wiki" // Wiki/知识库向量集合
|
||||
|
||||
// milvusStore 封装 Milvus 连接与集合管理(集合按首次写入的向量维度懒建)。
|
||||
type milvusStore struct {
|
||||
cli client.Client
|
||||
mu sync.Mutex
|
||||
dim int // 已建集合的维度(0=未建)
|
||||
ok bool // 集合是否就绪
|
||||
}
|
||||
|
||||
func openMilvus(ctx context.Context, addr string) (*milvusStore, error) {
|
||||
cli, err := client.NewClient(ctx, client.Config{Address: addr})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &milvusStore{cli: cli}, nil
|
||||
}
|
||||
|
||||
func (m *milvusStore) close() {
|
||||
if m != nil && m.cli != nil {
|
||||
_ = m.cli.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// ensure 幂等地按维度 dim 建集合 + 向量索引 + 加载(首次写入时调用)。
|
||||
func (m *milvusStore) ensure(ctx context.Context, dim int) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.ok && m.dim == dim {
|
||||
return nil
|
||||
}
|
||||
has, err := m.cli.HasCollection(ctx, collection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !has {
|
||||
schema := entity.NewSchema().WithName(collection).WithDescription("sundynix wiki vectors").
|
||||
WithField(entity.NewField().WithName("id").WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true).WithIsAutoID(true)).
|
||||
WithField(entity.NewField().WithName("kb").WithDataType(entity.FieldTypeVarChar).WithMaxLength(64)).
|
||||
WithField(entity.NewField().WithName("text").WithDataType(entity.FieldTypeVarChar).WithMaxLength(8192)).
|
||||
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithDim(int64(dim)))
|
||||
if err := m.cli.CreateCollection(ctx, schema, 1); err != nil {
|
||||
return fmt.Errorf("create collection: %w", err)
|
||||
}
|
||||
idx, _ := entity.NewIndexAUTOINDEX(entity.COSINE)
|
||||
if err := m.cli.CreateIndex(ctx, collection, "vector", idx, false); err != nil {
|
||||
return fmt.Errorf("create index: %w", err)
|
||||
}
|
||||
}
|
||||
if err := m.cli.LoadCollection(ctx, collection, false); err != nil {
|
||||
return fmt.Errorf("load collection: %w", err)
|
||||
}
|
||||
m.dim, m.ok = dim, true
|
||||
return nil
|
||||
}
|
||||
|
||||
// insert 写入若干 (kb, text, vector)。
|
||||
func (m *milvusStore) insert(ctx context.Context, kb string, texts []string, vecs [][]float32) error {
|
||||
if len(vecs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := m.ensure(ctx, len(vecs[0])); err != nil {
|
||||
return err
|
||||
}
|
||||
kbs := make([]string, len(texts))
|
||||
for i := range kbs {
|
||||
kbs[i] = kb
|
||||
}
|
||||
_, err := m.cli.Insert(ctx, collection, "",
|
||||
entity.NewColumnVarChar("kb", kbs),
|
||||
entity.NewColumnVarChar("text", texts),
|
||||
entity.NewColumnFloatVector("vector", len(vecs[0]), vecs),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert: %w", err)
|
||||
}
|
||||
return m.cli.Flush(ctx, collection, false)
|
||||
}
|
||||
|
||||
// Hit 是一条检索结果。
|
||||
type Hit struct {
|
||||
Text string
|
||||
Score float32
|
||||
}
|
||||
|
||||
// search 用查询向量做 topK 向量检索(可按 kb 过滤)。
|
||||
func (m *milvusStore) search(ctx context.Context, kb string, qvec []float32, topK int) ([]Hit, error) {
|
||||
if !m.ok {
|
||||
// 集合未建(还没入过库)→ 尝试确保(按查询维度),无则空结果。
|
||||
if err := m.ensure(ctx, len(qvec)); err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
expr := ""
|
||||
if kb != "" {
|
||||
expr = fmt.Sprintf("kb == \"%s\"", kb)
|
||||
}
|
||||
sp, _ := entity.NewIndexAUTOINDEXSearchParam(1)
|
||||
results, err := m.cli.Search(ctx, collection, nil, expr, []string{"text"},
|
||||
[]entity.Vector{entity.FloatVector(qvec)}, "vector", entity.COSINE, topK, sp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("search: %w", err)
|
||||
}
|
||||
var hits []Hit
|
||||
for _, r := range results {
|
||||
textCol := r.Fields.GetColumn("text")
|
||||
for i := 0; i < r.ResultCount; i++ {
|
||||
text := ""
|
||||
if textCol != nil {
|
||||
if s, err := textCol.GetAsString(i); err == nil {
|
||||
text = s
|
||||
}
|
||||
}
|
||||
var score float32
|
||||
if i < len(r.Scores) {
|
||||
score = r.Scores[i]
|
||||
}
|
||||
hits = append(hits, Hit{Text: text, Score: score})
|
||||
}
|
||||
}
|
||||
return hits, nil
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
// Package rag 实现 RAG 核心链:embedding(provider 抽象) + Milvus 向量库 + 入库/检索。
|
||||
// 是 LLM Wiki 混合检索的向量路;Bleve/Neo4j 融合为后续扩展。
|
||||
package rag
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Engine 聚合 embedding 与 Milvus,对外提供入库/检索。
|
||||
type Engine struct {
|
||||
emb *embedClient
|
||||
mv *milvusStore
|
||||
}
|
||||
|
||||
// Open 建立 RAG 引擎。embedding 未配 / Milvus 连不上 → 降级(检索返回空,不阻断工具服务)。
|
||||
func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel string) *Engine {
|
||||
e := &Engine{}
|
||||
if embBase != "" && embModel != "" {
|
||||
e.emb = newEmbedClient(embBase, embKey, embModel)
|
||||
log.Printf("[rag] embedding: %s model=%s", embBase, embModel)
|
||||
} else {
|
||||
log.Println("[rag] embedding 未配置,向量检索降级")
|
||||
}
|
||||
if milvusAddr != "" {
|
||||
mv, err := openMilvus(ctx, milvusAddr)
|
||||
if err != nil {
|
||||
log.Printf("[rag] Milvus 不可用,向量检索降级: %v", err)
|
||||
} else {
|
||||
e.mv = mv
|
||||
log.Printf("[rag] Milvus connected %s", milvusAddr)
|
||||
}
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。
|
||||
func (e *Engine) Ready() bool { return e.emb.ready() && e.mv != nil }
|
||||
|
||||
// Ingest 把一段文本切块 → 向量化 → 写入 Milvus,返回块数。
|
||||
func (e *Engine) Ingest(ctx context.Context, kb, text string) (int, error) {
|
||||
if !e.Ready() {
|
||||
return 0, errors.New("rag 未配置(需 embedding + Milvus)")
|
||||
}
|
||||
chunks := chunk(text)
|
||||
if len(chunks) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
vecs, err := e.emb.Embed(ctx, chunks)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := e.mv.insert(ctx, kb, chunks, vecs); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(chunks), nil
|
||||
}
|
||||
|
||||
// Search 向量化查询 → Milvus topK 检索。降级时返回空。
|
||||
func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit, error) {
|
||||
if !e.Ready() {
|
||||
return nil, nil
|
||||
}
|
||||
if topK <= 0 {
|
||||
topK = 5
|
||||
}
|
||||
vecs, err := e.emb.Embed(ctx, []string{query})
|
||||
if err != nil || len(vecs) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
return e.mv.search(ctx, kb, vecs[0], topK)
|
||||
}
|
||||
|
||||
func (e *Engine) Close() {
|
||||
if e.mv != nil {
|
||||
e.mv.close()
|
||||
}
|
||||
}
|
||||
|
||||
// chunk 朴素切块:按行切,去空白;过长再按长度切。真实系统应做版面/语义切块。
|
||||
func chunk(text string) []string {
|
||||
var out []string
|
||||
for _, line := range strings.Split(text, "\n") {
|
||||
s := strings.TrimSpace(line)
|
||||
if s == "" {
|
||||
continue
|
||||
}
|
||||
for len(s) > 2000 {
|
||||
out = append(out, s[:2000])
|
||||
s = s[2000:]
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
return out
|
||||
}
|
||||
Reference in New Issue
Block a user