From d623b8590ef209c581a3ff68bc92b17c3d7ec724 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Thu, 11 Jun 2026 11:10:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20GraphRAG=20=E2=80=94=20LLM=20=E6=8A=BD?= =?UTF-8?q?=E4=B8=89=E5=85=83=E7=BB=84=E5=BB=BA=20Neo4j=20=E5=9B=BE?= =?UTF-8?q?=E8=B0=B1=20+=20=E6=B7=B7=E5=90=88=E6=A3=80=E7=B4=A2=E5=8A=A0?= =?UTF-8?q?=E5=9B=BE=E8=B0=B1=E7=AC=AC=E4=B8=89=E8=B7=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 混合检索从 2 路(向量+全文)升级为 3 路(+图谱)。入库时 LLM 抽实体/关系建 Neo4j 图,检索时图谱路(实体关联三元组)融进 RRF;UI 可视化图谱。 - mcp-go rag: chat.go(OpenAI 兼容非流式 chat 客户端,抽取用) + graph.go(neo4j-go-driver 连接 + LLM 抽三元组 + MERGE 实体/关系 + 图谱召回/全量三元组) + rag.go(Config 结构; graph+chat 路;Ingest 加 抽实体/写Neo4j 阶段;Search 三路 RRF 融合;SetChat 热更新) - mcp-go: Neo4j env(默认 neo4j://localhost:7687, neo4j/sundynix);订阅 chat 控制面配置 (复用 DeepSeek 做抽取);新工具 kb_graph(返回三元组) - gateway: GET /api/v1/kb/graph;frontend KbView 知识图谱面板(实体—关系→实体) - 验证: 全模块 build✓ + e2e PASS; live——入库'sundynix用Milvus...'→DeepSeek 抽 4 三元组 →Neo4j(8 实体);检索三路融合 向量=4 全文=2 图谱=1;浏览器图谱面板渲染 4 三元组 - 边界: 实体链接用 CONTAINS 朴素匹配(可升级 LLM 查询实体抽取);全文/图谱重启随入库重建 Co-Authored-By: Claude Opus 4.8 --- sundynix-desktop/frontend/src/lib/api.ts | 14 ++ .../frontend/src/views/KbView.tsx | 34 +++- sundynix-gateway/internal/handler/kb.go | 13 ++ sundynix-gateway/internal/router/router.go | 1 + sundynix-mcp-go/cmd/server/main.go | 25 ++- sundynix-mcp-go/go.mod | 1 + sundynix-mcp-go/go.sum | 2 + sundynix-mcp-go/internal/mcp/gateway.go | 16 +- sundynix-mcp-go/internal/rag/chat.go | 72 ++++++++ sundynix-mcp-go/internal/rag/graph.go | 158 ++++++++++++++++++ sundynix-mcp-go/internal/rag/rag.go | 85 ++++++++-- 11 files changed, 399 insertions(+), 22 deletions(-) create mode 100644 sundynix-mcp-go/internal/rag/chat.go create mode 100644 sundynix-mcp-go/internal/rag/graph.go diff --git a/sundynix-desktop/frontend/src/lib/api.ts b/sundynix-desktop/frontend/src/lib/api.ts index 6aaff2c..a93d2b1 100644 --- a/sundynix-desktop/frontend/src/lib/api.ts +++ b/sundynix-desktop/frontend/src/lib/api.ts @@ -105,6 +105,20 @@ export interface KbHit { score: number; } +export interface Triple { + s: string; + p: string; + o: string; +} + +// graphKb: GET /api/v1/kb/graph —— 取某知识库的图谱三元组(→ mcp-go kb_graph,Neo4j)。 +export async function graphKb(kb: string): Promise { + const res = await fetch(`${GATEWAY}/api/v1/kb/graph?kb=${encodeURIComponent(kb)}`); + const data = (await res.json()) as { triples?: Triple[]; error?: string }; + if (!res.ok) throw new Error(data.error ?? `graph failed: ${res.status}`); + return data.triples ?? []; +} + // searchKb: POST /api/v1/kb/search,检索台查询(→ mcp-go kb_search,带分数)。 export async function searchKb(kb: string, q: string, topK = 5): Promise { const res = await fetch(`${GATEWAY}/api/v1/kb/search`, { diff --git a/sundynix-desktop/frontend/src/views/KbView.tsx b/sundynix-desktop/frontend/src/views/KbView.tsx index 033f7c4..c6250fb 100644 --- a/sundynix-desktop/frontend/src/views/KbView.tsx +++ b/sundynix-desktop/frontend/src/views/KbView.tsx @@ -1,5 +1,5 @@ import { useRef, useState } from "react"; -import { ingestKb, ingestFile, streamIngest, searchKb, type IngestEvent, type KbHit } from "../lib/api"; +import { ingestKb, ingestFile, streamIngest, searchKb, graphKb, type IngestEvent, type KbHit, type Triple } from "../lib/api"; interface IngestLog { t: string; @@ -29,6 +29,15 @@ export function KbView() { const [hits, setHits] = useState(null); const [searching, setSearching] = useState(false); const [err, setErr] = useState(""); + const [graph, setGraph] = useState(null); + + const onGraph = async () => { + try { + setGraph(await graphKb(kb)); + } catch (e) { + setErr((e as Error).message); + } + }; const stamp = () => new Date().toLocaleTimeString(); const ingesting = prog?.active ?? false; @@ -234,7 +243,7 @@ export function KbView() { {err &&

✗ {err}

} -
    +
      {hits === null &&
    • 输入查询后展示命中片段与分数。
    • } {hits !== null && hits.length === 0 && (
    • 无命中(知识库为空或 RAG 未配置)。
    • @@ -250,6 +259,27 @@ export function KbView() { ))}
    + + {/* 知识图谱(Neo4j / GraphRAG) */} +
    +

    知识图谱(Neo4j)

    + +
    +
      + {graph === null &&
    • 点「查看图谱」展示入库抽取的实体关系。
    • } + {graph !== null && graph.length === 0 && ( +
    • 该库暂无图谱(需配置 chat 模型 + 入库触发抽取)。
    • + )} + {graph?.map((t, i) => ( +
    • + {t.s} + —{t.p}→ + {t.o} +
    • + ))} +
    diff --git a/sundynix-gateway/internal/handler/kb.go b/sundynix-gateway/internal/handler/kb.go index 0e53bac..60d3ad3 100644 --- a/sundynix-gateway/internal/handler/kb.go +++ b/sundynix-gateway/internal/handler/kb.go @@ -208,3 +208,16 @@ func (h *Handler) KbSearch(c *gin.Context) { _ = json.Unmarshal([]byte(res.Content), &hits) c.JSON(http.StatusOK, gin.H{"hits": hits}) } + +// KbGraph: GET /api/v1/kb/graph?kb= —— 某知识库的图谱三元组(→ mcp-go kb_graph,Neo4j)。 +func (h *Handler) KbGraph(c *gin.Context) { + res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_graph"), + &contract.ToolCall{Tool: "kb_graph", Args: map[string]any{"kb": c.Query("kb"), "limit": 100}}) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + var triples []map[string]any + _ = json.Unmarshal([]byte(res.Content), &triples) + c.JSON(http.StatusOK, gin.H{"triples": triples}) +} diff --git a/sundynix-gateway/internal/router/router.go b/sundynix-gateway/internal/router/router.go index 5b3a0c2..138ed09 100644 --- a/sundynix-gateway/internal/router/router.go +++ b/sundynix-gateway/internal/router/router.go @@ -27,6 +27,7 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine { api.POST("/kb/ingest_file", h.KbIngestFile) // 文件入库(docx/xlsx/pdf… 异步) api.GET("/kb/ingest/:id/stream", h.KbIngestStream) // 入库进度 SSE(实时监控) api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search) + api.GET("/kb/graph", h.KbGraph) // 知识图谱三元组(→ mcp-go kb_graph,Neo4j) api.GET("/billing", h.Billing) // 运维控制面:LLM 模型配置(独立运维控制台调用)。 diff --git a/sundynix-mcp-go/cmd/server/main.go b/sundynix-mcp-go/cmd/server/main.go index 5a6c385..66920f5 100644 --- a/sundynix-mcp-go/cmd/server/main.go +++ b/sundynix-mcp-go/cmd/server/main.go @@ -30,6 +30,9 @@ func main() { rerankBase := envOr("RERANK_BASE_URL", "") // DashScope 文本重排端点(空=不启用 rerank) rerankKey := envOr("RERANK_API_KEY", "") rerankModel := envOr("RERANK_MODEL", "") + neo4jURI := envOr("NEO4J_URI", "neo4j://localhost:7687") // GraphRAG 图谱(连不上则降级) + neo4jUser := envOr("NEO4J_USER", "neo4j") + neo4jPass := envOr("NEO4J_PASS", "sundynix") b, err := sharedbus.Connect(natsURL) if err != nil { @@ -47,23 +50,39 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - // RAG 核心链:embedding(env 初值) + Milvus(向量) + Bleve(全文) + 可选 rerank - ragEngine := rag.Open(ctx, milvusAddr, embBase, embKey, embModel, rerankBase, rerankKey, rerankModel) + // RAG 核心链:embedding + Milvus(向量) + Bleve(全文) + Neo4j(图谱) + 可选 rerank + ragEngine := rag.Open(ctx, rag.Config{ + MilvusAddr: milvusAddr, + EmbedBase: embBase, EmbedKey: embKey, EmbedModel: embModel, + RerankBase: rerankBase, RerankKey: rerankKey, RerankModel: rerankModel, + Neo4jURI: neo4jURI, Neo4jUser: neo4jUser, Neo4jPass: neo4jPass, + }) defer ragEngine.Close() - // 配置控制面:启动取激活 embedding 配置 + 订阅热更新(覆盖 env,持久化由 Gateway 管)。 + // 配置控制面:取激活 embedding(向量) + chat(图谱抽取) 配置并订阅热更新。 applyEmbed := func(cfg *contract.ModelConfig) { if cfg != nil { ragEngine.SetEmbedding(cfg.BaseURL, cfg.APIKey, cfg.Model) } } + applyChat := func(cfg *contract.ModelConfig) { + if cfg != nil { + ragEngine.SetChat(cfg.BaseURL, cfg.APIKey, cfg.Model) + } + } cctx, ccancel := context.WithTimeout(ctx, 3*time.Second) if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindEmbedding); cfg != nil { applyEmbed(cfg) } else { log.Println("[mcp_go] 未取到 embedding 控制面配置(用 env 或降级)") } + if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindChat); cfg != nil { + applyChat(cfg) + } ccancel() + if _, err := b.SubscribeConfigUpdated(contract.ConfigKindChat, applyChat); err != nil { + log.Printf("[mcp_go] subscribe chat config: %v", err) + } if _, err := b.SubscribeConfigUpdated(contract.ConfigKindEmbedding, applyEmbed); err != nil { log.Printf("[mcp_go] subscribe embedding config: %v", err) } diff --git a/sundynix-mcp-go/go.mod b/sundynix-mcp-go/go.mod index 9a287a0..dca2b32 100644 --- a/sundynix-mcp-go/go.mod +++ b/sundynix-mcp-go/go.mod @@ -5,6 +5,7 @@ go 1.24 require ( github.com/blevesearch/bleve/v2 v2.4.2 github.com/milvus-io/milvus-sdk-go/v2 v2.4.1 + github.com/neo4j/neo4j-go-driver/v5 v5.24.0 github.com/redis/go-redis/v9 v9.20.0 github.com/sundynix/sundynix-shared v0.0.0 gorm.io/driver/postgres v1.6.0 diff --git a/sundynix-mcp-go/go.sum b/sundynix-mcp-go/go.sum index 728305c..f8e5ada 100644 --- a/sundynix-mcp-go/go.sum +++ b/sundynix-mcp-go/go.sum @@ -256,6 +256,8 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= diff --git a/sundynix-mcp-go/internal/mcp/gateway.go b/sundynix-mcp-go/internal/mcp/gateway.go index b493c33..4e4cbd7 100644 --- a/sundynix-mcp-go/internal/mcp/gateway.go +++ b/sundynix-mcp-go/internal/mcp/gateway.go @@ -37,7 +37,7 @@ func (g *Gateway) Serve(ctx context.Context) error { return err } defer func() { _ = unsub() }() - log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, memory_get, memory_upsert, history_get, history_append, echo", + log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, kb_graph, memory_*, history_*, echo", contract.SubjectToolsGoAll, contract.QueueToolsGo) <-ctx.Done() return ctx.Err() @@ -53,6 +53,8 @@ func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contra return g.kbIngest(ctx, call) case "kb_search": return g.kbSearch(ctx, call) + case "kb_graph": + return g.kbGraph(ctx, call) case "memory_get": return g.memoryGet(ctx, call) case "memory_upsert": @@ -160,6 +162,18 @@ func (g *Gateway) kbSearch(ctx context.Context, call *contract.ToolCall) *contra return &contract.ToolResult{OK: true, Content: string(data)} } +// kbGraph 返回某知识库的图谱三元组 JSON [{s,p,o},...](供 UI 可视化 Neo4j 情况)。 +func (g *Gateway) kbGraph(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { + kb, _ := call.Args["kb"].(string) + limit := 100 + if v, ok := call.Args["limit"].(float64); ok && v > 0 { + limit = int(v) + } + triples := g.rag.Triples(ctx, kb, limit) + data, _ := json.Marshal(triples) + return &contract.ToolResult{OK: true, Content: string(data)} +} + // kbIngest 把文本入库(切块→embedding→Milvus+Bleve)。 // 带 job_id 时逐阶段把进度发到 sundynix.streams.,供 UI 实时入库监控。 func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { diff --git a/sundynix-mcp-go/internal/rag/chat.go b/sundynix-mcp-go/internal/rag/chat.go new file mode 100644 index 0000000..5cebba6 --- /dev/null +++ b/sundynix-mcp-go/internal/rag/chat.go @@ -0,0 +1,72 @@ +package rag + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +// chatClient 是 OpenAI 兼容的非流式对话客户端,供图谱实体抽取用。 +// 配置由控制面(chat kind)经 NATS 下发(与 Dispatcher 共用同一个模型)。 +type chatClient struct { + baseURL string + apiKey string + model string + hc *http.Client +} + +func newChatClient(baseURL, apiKey, model string) *chatClient { + if baseURL == "" || model == "" { + return nil + } + return &chatClient{baseURL: baseURL, apiKey: apiKey, model: model, hc: &http.Client{Timeout: 60 * time.Second}} +} + +func (c *chatClient) ready() bool { return c != nil && c.baseURL != "" } + +// complete 一次性补全(非流式),返回助手回复文本。 +func (c *chatClient) complete(ctx context.Context, system, user string) (string, error) { + body, _ := json.Marshal(map[string]any{ + "model": c.model, + "messages": []map[string]string{ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + }, + "stream": false, + }) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(body)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + if c.apiKey != "" { + req.Header.Set("Authorization", "Bearer "+c.apiKey) + } + resp, err := c.hc.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + buf := new(bytes.Buffer) + _, _ = buf.ReadFrom(resp.Body) + return "", fmt.Errorf("chat http %d: %s", resp.StatusCode, buf.String()) + } + var out struct { + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } `json:"choices"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return "", err + } + if len(out.Choices) == 0 { + return "", fmt.Errorf("chat: empty choices") + } + return out.Choices[0].Message.Content, nil +} diff --git a/sundynix-mcp-go/internal/rag/graph.go b/sundynix-mcp-go/internal/rag/graph.go new file mode 100644 index 0000000..6589c05 --- /dev/null +++ b/sundynix-mcp-go/internal/rag/graph.go @@ -0,0 +1,158 @@ +package rag + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/auth" +) + +// Triple 是一条知识三元组(主体-关系-客体)。 +type Triple struct { + S string `json:"s"` + P string `json:"p"` + O string `json:"o"` +} + +// graphStore 是 GraphRAG 的图路:实体/关系存 Neo4j。 +type graphStore struct { + driver neo4j.DriverWithContext +} + +func openGraph(ctx context.Context, uri, user, pass string) *graphStore { + if uri == "" { + return &graphStore{} + } + drv, err := neo4j.NewDriverWithContext(uri, + auth.BasicTokenManager(func(context.Context) (neo4j.AuthToken, error) { + return neo4j.BasicAuth(user, pass, ""), nil + })) + if err != nil { + log.Printf("[rag] Neo4j 连接失败,图谱路降级: %v", err) + return &graphStore{} + } + if err := drv.VerifyConnectivity(ctx); err != nil { + log.Printf("[rag] Neo4j 不可用,图谱路降级: %v", err) + return &graphStore{} + } + // 实体唯一约束(kb+name)。 + _, _ = neo4j.ExecuteQuery(ctx, drv, + "CREATE CONSTRAINT entity_key IF NOT EXISTS FOR (e:Entity) REQUIRE (e.kb, e.name) IS UNIQUE", + nil, neo4j.EagerResultTransformer) + log.Printf("[rag] Neo4j connected %s", uri) + return &graphStore{driver: drv} +} + +func (g *graphStore) ready() bool { return g != nil && g.driver != nil } + +func (g *graphStore) close(ctx context.Context) { + if g.ready() { + _ = g.driver.Close(ctx) + } +} + +// store 把三元组 MERGE 进 Neo4j(实体 + 关系,按 kb 隔离)。 +func (g *graphStore) store(ctx context.Context, kb string, triples []Triple) (int, error) { + if !g.ready() { + return 0, nil + } + n := 0 + for _, t := range triples { + if t.S == "" || t.O == "" || t.P == "" { + continue + } + _, err := neo4j.ExecuteQuery(ctx, g.driver, + `MERGE (a:Entity {kb:$kb, name:$s}) + MERGE (b:Entity {kb:$kb, name:$o}) + MERGE (a)-[r:REL {type:$p}]->(b)`, + map[string]any{"kb": kb, "s": t.S, "o": t.O, "p": t.P}, + neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase("neo4j")) + if err != nil { + return n, err + } + n++ + } + return n, nil +} + +// search 图谱召回:找查询里提到的实体,返回其相连三元组(文本化)。 +func (g *graphStore) search(ctx context.Context, kb, query string, limit int) []Hit { + if !g.ready() || query == "" { + return nil + } + res, err := neo4j.ExecuteQuery(ctx, g.driver, + `MATCH (a:Entity {kb:$kb})-[r:REL]->(b:Entity {kb:$kb}) + WHERE $q CONTAINS a.name OR $q CONTAINS b.name + RETURN a.name AS s, r.type AS p, b.name AS o LIMIT $k`, + map[string]any{"kb": kb, "q": query, "k": limit}, + neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase("neo4j")) + if err != nil { + return nil + } + var hits []Hit + for _, rec := range res.Records { + s, _ := rec.Get("s") + p, _ := rec.Get("p") + o, _ := rec.Get("o") + hits = append(hits, Hit{Text: fmt.Sprintf("%v —%v→ %v", s, p, o), Score: 1}) + } + return hits +} + +// triples 返回某 kb 的全部三元组(供 UI 图谱可视化)。 +func (g *graphStore) triples(ctx context.Context, kb string, limit int) []Triple { + if !g.ready() { + return nil + } + res, err := neo4j.ExecuteQuery(ctx, g.driver, + `MATCH (a:Entity {kb:$kb})-[r:REL]->(b:Entity {kb:$kb}) + RETURN a.name AS s, r.type AS p, b.name AS o LIMIT $k`, + map[string]any{"kb": kb, "k": limit}, + neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase("neo4j")) + if err != nil { + return nil + } + var out []Triple + for _, rec := range res.Records { + s, _ := rec.Get("s") + p, _ := rec.Get("p") + o, _ := rec.Get("o") + out = append(out, Triple{S: fmt.Sprint(s), P: fmt.Sprint(p), O: fmt.Sprint(o)}) + } + return out +} + +// extractTriples 用 LLM 从文本抽取知识三元组。 +func extractTriples(ctx context.Context, chat *chatClient, text string) ([]Triple, error) { + if !chat.ready() { + return nil, nil + } + const sys = "你是知识图谱抽取器。从用户文本中抽取知识三元组,输出 JSON 数组,每项形如 {\"s\":\"主体\",\"p\":\"关系\",\"o\":\"客体\"}。实体用简洁名词,关系用简短动词短语。只输出 JSON,不要任何解释或代码块标记。" + out, err := chat.complete(ctx, sys, text) + if err != nil { + return nil, err + } + return parseTriples(out), nil +} + +// parseTriples 容忍代码块/前后噪声地解析三元组 JSON。 +func parseTriples(s string) []Triple { + s = strings.TrimSpace(s) + s = strings.TrimPrefix(s, "```json") + s = strings.TrimPrefix(s, "```") + s = strings.TrimSuffix(s, "```") + if i := strings.Index(s, "["); i >= 0 { + if j := strings.LastIndex(s, "]"); j > i { + s = s[i : j+1] + } + } + var triples []Triple + if json.Unmarshal([]byte(s), &triples) != nil { + return nil + } + return triples +} diff --git a/sundynix-mcp-go/internal/rag/rag.go b/sundynix-mcp-go/internal/rag/rag.go index 54c9a74..b61a0ce 100644 --- a/sundynix-mcp-go/internal/rag/rag.go +++ b/sundynix-mcp-go/internal/rag/rag.go @@ -15,14 +15,24 @@ import ( // embedBatch 是每批向量化的块数(让大文件的入库进度可观测)。 const embedBatch = 10 -// Engine 聚合 embedding + Milvus(向量) + Bleve(全文) + RRF 融合 + 可选 rerank。 -// embedding 可热更新(控制面下发)。 +// Config 是 RAG 引擎的初始化配置。 +type Config struct { + MilvusAddr string + EmbedBase, EmbedKey, EmbedModel string + RerankBase, RerankKey, RerankModel string + Neo4jURI, Neo4jUser, Neo4jPass string +} + +// Engine 聚合 embedding + Milvus(向量) + Bleve(全文) + Neo4j(图谱) → RRF 融合 + 可选 rerank。 +// embedding 与 chat(图谱抽取用)可热更新(控制面下发)。 type Engine struct { mu sync.RWMutex emb *embedClient + chat *chatClient mv *milvusStore bleve *bleveStore rerank *rerankClient + graph *graphStore } // SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。 @@ -37,36 +47,60 @@ func (e *Engine) SetEmbedding(base, key, model string) { log.Printf("[rag] embedding 配置: %s model=%s", base, model) } +// SetChat 热更新对话模型配置(图谱实体抽取用,复用控制面 chat 模型)。 +func (e *Engine) SetChat(base, key, model string) { + e.mu.Lock() + defer e.mu.Unlock() + e.chat = newChatClient(base, key, model) + if e.chat.ready() { + log.Printf("[rag] 图谱抽取模型: %s model=%s", base, model) + } +} + func (e *Engine) embed() *embedClient { e.mu.RLock() defer e.mu.RUnlock() return e.emb } -// Open 建立 RAG 引擎。embedding 未配 / Milvus 连不上 → 降级(检索返回空,不阻断工具服务)。 -// rerank* 为空则不启用重排(融合结果直接返回)。 -func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel, rerankBase, rerankKey, rerankModel string) *Engine { - e := &Engine{bleve: openBleve(), rerank: newRerankClient(rerankBase, rerankKey, rerankModel)} - if e.rerank.ready() { - log.Printf("[rag] rerank: %s model=%s", rerankBase, rerankModel) +func (e *Engine) chatClient() *chatClient { + e.mu.RLock() + defer e.mu.RUnlock() + return e.chat +} + +// Open 建立 RAG 引擎。各路连不上 → 降级(不阻断工具服务)。 +func Open(ctx context.Context, cfg Config) *Engine { + e := &Engine{ + bleve: openBleve(), + rerank: newRerankClient(cfg.RerankBase, cfg.RerankKey, cfg.RerankModel), + graph: openGraph(ctx, cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPass), } - if embBase != "" && embModel != "" { - e.SetEmbedding(embBase, embKey, embModel) // env 初值(控制面会覆盖) + if e.rerank.ready() { + log.Printf("[rag] rerank: %s model=%s", cfg.RerankBase, cfg.RerankModel) + } + if cfg.EmbedBase != "" && cfg.EmbedModel != "" { + e.SetEmbedding(cfg.EmbedBase, cfg.EmbedKey, cfg.EmbedModel) } else { log.Println("[rag] embedding 未配置(待控制面下发),向量检索暂降级") } - if milvusAddr != "" { - mv, err := openMilvus(ctx, milvusAddr) + if cfg.MilvusAddr != "" { + mv, err := openMilvus(ctx, cfg.MilvusAddr) if err != nil { log.Printf("[rag] Milvus 不可用,向量检索降级: %v", err) } else { e.mv = mv - log.Printf("[rag] Milvus connected %s", milvusAddr) + log.Printf("[rag] Milvus connected %s", cfg.MilvusAddr) } } return e } +// Triples 返回某 kb 的图谱三元组(供 UI 可视化)。 +func (e *Engine) Triples(ctx context.Context, kb string, limit int) []Triple { + return e.graph.triples(ctx, kb, limit) +} + // Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。 func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil } @@ -108,6 +142,22 @@ func (e *Engine) Ingest(ctx context.Context, kb, text string, onProgress func(co emit(contract.IngestEvent{Stage: "写Bleve", Msg: "全文索引写入中"}) _ = e.bleve.index(kb, chunks) // 同步写全文索引(失败不阻断向量入库) + // 图谱路:LLM 抽实体/关系 → Neo4j(可降级,不阻断向量入库)。 + if e.graph.ready() && e.chatClient().ready() { + emit(contract.IngestEvent{Stage: "抽实体", Msg: "LLM 抽取知识三元组"}) + triples, terr := extractTriples(ctx, e.chatClient(), text) + if terr != nil { + log.Printf("[rag] 三元组抽取失败(图谱降级): %v", terr) + } else if len(triples) > 0 { + emit(contract.IngestEvent{Stage: "写Neo4j", Total: len(triples), Msg: itoa(len(triples)) + " 条三元组写入图谱"}) + if n, gerr := e.graph.store(ctx, kb, triples); gerr != nil { + log.Printf("[rag] 写 Neo4j 失败(图谱降级): %v", gerr) + } else { + log.Printf("[rag] 图谱: 写入 %d 条三元组到 kb=%s", n, kb) + } + } + } + return len(chunks), nil } @@ -155,9 +205,11 @@ func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit, vecHits, _ := e.mv.search(ctx, kb, vecs[0], fanout) // 全文路 ftHits := e.bleve.search(kb, query, fanout) - // RRF 融合(按文本去重) - cand := rrf([][]Hit{vecHits, ftHits}, fanout) - log.Printf("[rag] hybrid: 向量=%d 全文=%d → 融合=%d", len(vecHits), len(ftHits), len(cand)) + // 图谱路(GraphRAG:查询提到的实体的相连三元组) + graphHits := e.graph.search(ctx, kb, query, fanout) + // RRF 融合(三路,按文本去重) + cand := rrf([][]Hit{vecHits, ftHits, graphHits}, fanout) + log.Printf("[rag] hybrid: 向量=%d 全文=%d 图谱=%d → 融合=%d", len(vecHits), len(ftHits), len(graphHits), len(cand)) // 可选 rerank:对融合候选重排取 topK if e.rerank.ready() && len(cand) > 1 { @@ -177,6 +229,7 @@ func (e *Engine) Close() { if e.mv != nil { e.mv.close() } + e.graph.close(context.Background()) } // chunk 朴素切块:按行切,去空白;过长再按长度切。真实系统应做版面/语义切块。