Files
sundynix-agentix/sundynix-mcp-go/internal/rag/rag.go
T
Blizzard 72e008bfe8 feat(kb): 入库可视化做厚 —— 文件解析/知识抽取过程 + 力导向知识图谱
把"进度条"升级成可观测的入库工作台,回应三点诉求:解析过程、知识抽取过程、丰富图谱。

- contract: IngestEvent 加 Preview(解析文本预览)+ Triples[]TripleView(抽出的三元组)。
- 后端回流:rag.Ingest 抽实体阶段把 LLM 抽出的三元组实时回流(边出现边渲染);
  gateway 解析完成回流文件类型 + 文本预览片段。
- 前端 GraphView.tsx:零依赖自建力导向布局(斥力+边弹簧+居中静态收敛),实体=节点
  按度着色(枢纽紫/关联青/叶子)、关系=带标签边、hover 高亮邻域、节点过多按度裁剪。
- 前端 KbView 重做:入库从"阶段徽标+进度条"→竖向时间线(解析预览/切块块/向量化进度/
  抽取知识三元组 chips + 实时小图谱逐步浮现);右侧知识图谱从扁平列表→GraphView,
  入库完成自动刷新整库图谱。

验证(Preview):入库一段多事实文本 → 时间线逐阶段点亮、抽出 17 条三元组实时浮现、
右侧力导向图渲染 sundynix-agentix/知识库 为枢纽 + 带标签关系边。tsc+vite+后端 build 通过。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 14:22:50 +08:00

266 lines
8.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package rag 实现 RAG 核心链:embedding(provider 抽象) + Milvus 向量库 + 入库/检索。
// 是 LLM Wiki 混合检索的向量路;Bleve/Neo4j 融合为后续扩展。
package rag
import (
"context"
"errors"
"log"
"strings"
"sync"
"github.com/sundynix/sundynix-shared/contract"
)
// embedBatch 是每批向量化的块数(让大文件的入库进度可观测)。
const embedBatch = 10
// Config 是 RAG 引擎的初始化配置。
type Config struct {
MilvusAddr string
EmbedBase, EmbedKey, EmbedModel string
RerankBase, RerankKey, RerankModel string
Neo4jURI, Neo4jUser, Neo4jPass string
}
// Engine 聚合 embedding + Milvus(向量) + Bleve(全文) + Neo4j(图谱) → RRF 融合 + 可选 rerank。
// embedding 与 chat(图谱抽取用)可热更新(控制面下发)。
type Engine struct {
mu sync.RWMutex
emb *embedClient
chat *chatClient
mv *milvusStore
bleve *bleveStore
rerank *rerankClient
graph *graphStore
}
// SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。
func (e *Engine) SetEmbedding(base, key, model string) {
e.mu.Lock()
defer e.mu.Unlock()
if base == "" || model == "" {
e.emb = nil
return
}
e.emb = newEmbedClient(base, key, model)
log.Printf("[rag] embedding 配置: %s model=%s", base, model)
}
// SetChat 热更新对话模型配置(图谱实体抽取用,复用控制面 chat 模型)。
func (e *Engine) SetChat(base, key, model string) {
e.mu.Lock()
defer e.mu.Unlock()
e.chat = newChatClient(base, key, model)
if e.chat.ready() {
log.Printf("[rag] 图谱抽取模型: %s model=%s", base, model)
}
}
func (e *Engine) embed() *embedClient {
e.mu.RLock()
defer e.mu.RUnlock()
return e.emb
}
func (e *Engine) chatClient() *chatClient {
e.mu.RLock()
defer e.mu.RUnlock()
return e.chat
}
// Open 建立 RAG 引擎。各路连不上 → 降级(不阻断工具服务)。
func Open(ctx context.Context, cfg Config) *Engine {
e := &Engine{
bleve: openBleve(),
rerank: newRerankClient(cfg.RerankBase, cfg.RerankKey, cfg.RerankModel),
graph: openGraph(ctx, cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPass),
}
if e.rerank.ready() {
log.Printf("[rag] rerank: %s model=%s", cfg.RerankBase, cfg.RerankModel)
}
if cfg.EmbedBase != "" && cfg.EmbedModel != "" {
e.SetEmbedding(cfg.EmbedBase, cfg.EmbedKey, cfg.EmbedModel)
} else {
log.Println("[rag] embedding 未配置(待控制面下发),向量检索暂降级")
}
if cfg.MilvusAddr != "" {
mv, err := openMilvus(ctx, cfg.MilvusAddr)
if err != nil {
log.Printf("[rag] Milvus 不可用,向量检索降级: %v", err)
} else {
e.mv = mv
log.Printf("[rag] Milvus connected %s", cfg.MilvusAddr)
}
}
return e
}
// Triples 返回某 kb 的图谱三元组(供 UI 可视化)。
func (e *Engine) Triples(ctx context.Context, kb string, limit int) []Triple {
return e.graph.triples(ctx, kb, limit)
}
// Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。
func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil }
// Status 报告各依赖子系统的就绪情况(供 health 工具 → 控制台健康灯)。
func (e *Engine) Status() map[string]bool {
return map[string]bool{
"milvus": e.mv != nil,
"neo4j": e.graph.ready(),
"embedding": e.embed().ready(),
}
}
// Ingest 把一段文本切块 → 分批向量化 → 写 Milvus + Bleve,返回块数。
// onProgress 非空时逐阶段/逐批回调进度(用于实时入库监控)。
func (e *Engine) Ingest(ctx context.Context, kb, text string, onProgress func(contract.IngestEvent)) (int, error) {
emit := func(ev contract.IngestEvent) {
if onProgress != nil {
onProgress(ev)
}
}
if !e.Ready() {
return 0, errors.New("rag 未配置(需 embedding + Milvus")
}
chunks := chunk(text)
if len(chunks) == 0 {
return 0, nil
}
emit(contract.IngestEvent{Stage: "切块", Total: len(chunks), Chunks: previews(chunks), Msg: "拆为 " + itoa(len(chunks)) + " 块"})
// 分批向量化(逐批回报进度)。
vecs := make([][]float32, 0, len(chunks))
for i := 0; i < len(chunks); i += embedBatch {
end := min(i+embedBatch, len(chunks))
bv, err := e.embed().Embed(ctx, chunks[i:end])
if err != nil {
emit(contract.IngestEvent{Stage: "失败", Error: "向量化: " + err.Error()})
return 0, err
}
vecs = append(vecs, bv...)
emit(contract.IngestEvent{Stage: "向量化", Done: end, Total: len(chunks)})
}
emit(contract.IngestEvent{Stage: "写Milvus", Msg: "向量库写入中"})
if err := e.mv.insert(ctx, kb, chunks, vecs); err != nil {
emit(contract.IngestEvent{Stage: "失败", Error: "写Milvus: " + err.Error()})
return 0, err
}
emit(contract.IngestEvent{Stage: "写Bleve", Msg: "全文索引写入中"})
_ = e.bleve.index(kb, chunks) // 同步写全文索引(失败不阻断向量入库)
// 图谱路:LLM 抽实体/关系 → Neo4j(可降级,不阻断向量入库)。
if e.graph.ready() && e.chatClient().ready() {
emit(contract.IngestEvent{Stage: "抽实体", Msg: "LLM 正在抽取知识三元组…"})
triples, terr := extractTriples(ctx, e.chatClient(), text)
if terr != nil {
log.Printf("[rag] 三元组抽取失败(图谱降级): %v", terr)
} else if len(triples) > 0 {
// 把抽出的三元组实时回流给 UI(边出现边渲染图谱)。
tv := make([]contract.TripleView, len(triples))
for i, t := range triples {
tv[i] = contract.TripleView{S: t.S, P: t.P, O: t.O}
}
emit(contract.IngestEvent{Stage: "抽实体", Total: len(triples), Triples: tv, Msg: "抽出 " + itoa(len(triples)) + " 条知识三元组"})
emit(contract.IngestEvent{Stage: "写Neo4j", Total: len(triples), Msg: itoa(len(triples)) + " 条三元组写入图谱"})
if n, gerr := e.graph.store(ctx, kb, triples); gerr != nil {
log.Printf("[rag] 写 Neo4j 失败(图谱降级): %v", gerr)
} else {
log.Printf("[rag] 图谱: 写入 %d 条三元组到 kb=%s", n, kb)
}
}
}
return len(chunks), nil
}
// previews 取每块的前若干字作为预览(供 UI 展示拆分情况)。
func previews(chunks []string) []string {
out := make([]string, len(chunks))
for i, c := range chunks {
r := []rune(c)
if len(r) > 50 {
out[i] = string(r[:50]) + "…"
} else {
out[i] = c
}
}
return out
}
func itoa(n int) string {
if n == 0 {
return "0"
}
var b []byte
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}
// Search 混合检索:Milvus(向量) + Bleve(全文) → RRF 融合 → 可选 rerank → topK。降级时返回空。
func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit, error) {
if !e.Ready() {
return nil, nil
}
if topK <= 0 {
topK = 5
}
fanout := topK * 3
// 向量路
vecs, err := e.embed().Embed(ctx, []string{query})
if err != nil || len(vecs) == 0 {
return nil, err
}
vecHits, _ := e.mv.search(ctx, kb, vecs[0], fanout)
// 全文路
ftHits := e.bleve.search(kb, query, fanout)
// 图谱路(GraphRAG:查询提到的实体的相连三元组)
graphHits := e.graph.search(ctx, kb, query, fanout)
// RRF 融合(三路,按文本去重)
cand := rrf([][]Hit{vecHits, ftHits, graphHits}, fanout)
log.Printf("[rag] hybrid: 向量=%d 全文=%d 图谱=%d → 融合=%d", len(vecHits), len(ftHits), len(graphHits), len(cand))
// 可选 rerank:对融合候选重排取 topK
if e.rerank.ready() && len(cand) > 1 {
if rr, rerr := e.rerank.rerank(ctx, query, cand, topK); rerr == nil {
return rr, nil
} else {
log.Printf("[rag] rerank 降级(用 RRF 结果): %v", rerr)
}
}
if len(cand) > topK {
cand = cand[:topK]
}
return cand, nil
}
func (e *Engine) Close() {
if e.mv != nil {
e.mv.close()
}
e.graph.close(context.Background())
}
// chunk 朴素切块:按行切,去空白;过长再按长度切。真实系统应做版面/语义切块。
func chunk(text string) []string {
var out []string
for _, line := range strings.Split(text, "\n") {
s := strings.TrimSpace(line)
if s == "" {
continue
}
for len(s) > 2000 {
out = append(out, s[:2000])
s = s[2000:]
}
out = append(out, s)
}
return out
}