Files
sundynix-agentix/sundynix-mcp-go/internal/rag/rag.go
T
Blizzard 85a5c2c1e7 feat(rag): 混合检索融合 — Milvus 向量 + Bleve 全文 + RRF + DashScope rerank
检索从向量单路升级为混合:向量(Milvus) + 全文(Bleve BM25) → RRF 融合 →
可选 rerank(DashScope gte-rerank)。

- rag/bleve.go: Bleve 全文索引(内存,随 ingest 写入;kb 过滤);ingest 同步写 Milvus+Bleve
- rag/fuse.go: RRF(Reciprocal Rank Fusion, k=60, 按文本去重)融合多路排序
- rag/rerank.go: DashScope gte-rerank 客户端(可选,env 配置,失败降级 RRF)
- rag/rag.go: Search 改混合(向量+全文→RRF→可选rerank→topK);main 读 RERANK_* env
- 验证: 全模块 build✓ + e2e PASS; live——入库写双索引;查'NATS'→全文精确命中#1+向量
  →RRF NATS 排首(向量=4 全文=1);接 DashScope gte-rerank(百炼 key 有权限)→relevance
  score 0.19 真重排;retriever 节点端到端→DeepSeek 答 Milvus
- 边界: Neo4j 图路(GraphRAG,需实体抽取)推迟;Bleve 内存索引重启重建;rerank 走 env
  (TODO 同 embedding 搬控制面 kind=rerank)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 09:53:37 +08:00

146 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package rag 实现 RAG 核心链:embedding(provider 抽象) + Milvus 向量库 + 入库/检索。
// 是 LLM Wiki 混合检索的向量路;Bleve/Neo4j 融合为后续扩展。
package rag
import (
"context"
"errors"
"log"
"strings"
"sync"
)
// Engine 聚合 embedding + Milvus(向量) + Bleve(全文) + RRF 融合 + 可选 rerank。
// embedding 可热更新(控制面下发)。
type Engine struct {
mu sync.RWMutex
emb *embedClient
mv *milvusStore
bleve *bleveStore
rerank *rerankClient
}
// SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。
func (e *Engine) SetEmbedding(base, key, model string) {
e.mu.Lock()
defer e.mu.Unlock()
if base == "" || model == "" {
e.emb = nil
return
}
e.emb = newEmbedClient(base, key, model)
log.Printf("[rag] embedding 配置: %s model=%s", base, model)
}
func (e *Engine) embed() *embedClient {
e.mu.RLock()
defer e.mu.RUnlock()
return e.emb
}
// Open 建立 RAG 引擎。embedding 未配 / Milvus 连不上 → 降级(检索返回空,不阻断工具服务)。
// rerank* 为空则不启用重排(融合结果直接返回)。
func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel, rerankBase, rerankKey, rerankModel string) *Engine {
e := &Engine{bleve: openBleve(), rerank: newRerankClient(rerankBase, rerankKey, rerankModel)}
if e.rerank.ready() {
log.Printf("[rag] rerank: %s model=%s", rerankBase, rerankModel)
}
if embBase != "" && embModel != "" {
e.SetEmbedding(embBase, embKey, embModel) // env 初值(控制面会覆盖)
} else {
log.Println("[rag] embedding 未配置(待控制面下发),向量检索暂降级")
}
if milvusAddr != "" {
mv, err := openMilvus(ctx, milvusAddr)
if err != nil {
log.Printf("[rag] Milvus 不可用,向量检索降级: %v", err)
} else {
e.mv = mv
log.Printf("[rag] Milvus connected %s", milvusAddr)
}
}
return e
}
// Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。
func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil }
// Ingest 把一段文本切块 → 向量化 → 写入 Milvus,返回块数。
func (e *Engine) Ingest(ctx context.Context, kb, text string) (int, error) {
if !e.Ready() {
return 0, errors.New("rag 未配置(需 embedding + Milvus")
}
chunks := chunk(text)
if len(chunks) == 0 {
return 0, nil
}
vecs, err := e.embed().Embed(ctx, chunks)
if err != nil {
return 0, err
}
if err := e.mv.insert(ctx, kb, chunks, vecs); err != nil {
return 0, err
}
_ = e.bleve.index(kb, chunks) // 同步写全文索引(失败不阻断向量入库)
return len(chunks), nil
}
// Search 混合检索:Milvus(向量) + Bleve(全文) → RRF 融合 → 可选 rerank → topK。降级时返回空。
func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit, error) {
if !e.Ready() {
return nil, nil
}
if topK <= 0 {
topK = 5
}
fanout := topK * 3
// 向量路
vecs, err := e.embed().Embed(ctx, []string{query})
if err != nil || len(vecs) == 0 {
return nil, err
}
vecHits, _ := e.mv.search(ctx, kb, vecs[0], fanout)
// 全文路
ftHits := e.bleve.search(kb, query, fanout)
// RRF 融合(按文本去重)
cand := rrf([][]Hit{vecHits, ftHits}, fanout)
log.Printf("[rag] hybrid: 向量=%d 全文=%d → 融合=%d", len(vecHits), len(ftHits), len(cand))
// 可选 rerank:对融合候选重排取 topK
if e.rerank.ready() && len(cand) > 1 {
if rr, rerr := e.rerank.rerank(ctx, query, cand, topK); rerr == nil {
return rr, nil
} else {
log.Printf("[rag] rerank 降级(用 RRF 结果): %v", rerr)
}
}
if len(cand) > topK {
cand = cand[:topK]
}
return cand, nil
}
func (e *Engine) Close() {
if e.mv != nil {
e.mv.close()
}
}
// chunk 朴素切块:按行切,去空白;过长再按长度切。真实系统应做版面/语义切块。
func chunk(text string) []string {
var out []string
for _, line := range strings.Split(text, "\n") {
s := strings.TrimSpace(line)
if s == "" {
continue
}
for len(s) > 2000 {
out = append(out, s[:2000])
s = s[2000:]
}
out = append(out, s)
}
return out
}