feat: GraphRAG — LLM 抽三元组建 Neo4j 图谱 + 混合检索加图谱第三路

混合检索从 2 路(向量+全文)升级为 3 路(+图谱)。入库时 LLM 抽实体/关系建
Neo4j 图,检索时图谱路(实体关联三元组)融进 RRF;UI 可视化图谱。

- mcp-go rag: chat.go(OpenAI 兼容非流式 chat 客户端,抽取用) + graph.go(neo4j-go-driver
  连接 + LLM 抽三元组 + MERGE 实体/关系 + 图谱召回/全量三元组) + rag.go(Config 结构;
  graph+chat 路;Ingest 加 抽实体/写Neo4j 阶段;Search 三路 RRF 融合;SetChat 热更新)
- mcp-go: Neo4j env(默认 neo4j://localhost:7687, neo4j/sundynix);订阅 chat 控制面配置
  (复用 DeepSeek 做抽取);新工具 kb_graph(返回三元组)
- gateway: GET /api/v1/kb/graph;frontend KbView 知识图谱面板(实体—关系→实体)
- 验证: 全模块 build✓ + e2e PASS; live——入库'sundynix用Milvus...'→DeepSeek 抽 4 三元组
  →Neo4j(8 实体);检索三路融合 向量=4 全文=2 图谱=1;浏览器图谱面板渲染 4 三元组
- 边界: 实体链接用 CONTAINS 朴素匹配(可升级 LLM 查询实体抽取);全文/图谱重启随入库重建

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Blizzard
2026-06-11 11:10:22 +08:00
parent 2d5fd2fca5
commit d623b8590e
11 changed files with 399 additions and 22 deletions
+14
View File
@@ -105,6 +105,20 @@ export interface KbHit {
score: number; score: number;
} }
export interface Triple {
s: string;
p: string;
o: string;
}
// graphKb: GET /api/v1/kb/graph —— 取某知识库的图谱三元组(→ mcp-go kb_graphNeo4j)。
export async function graphKb(kb: string): Promise<Triple[]> {
const res = await fetch(`${GATEWAY}/api/v1/kb/graph?kb=${encodeURIComponent(kb)}`);
const data = (await res.json()) as { triples?: Triple[]; error?: string };
if (!res.ok) throw new Error(data.error ?? `graph failed: ${res.status}`);
return data.triples ?? [];
}
// searchKb: POST /api/v1/kb/search,检索台查询(→ mcp-go kb_search,带分数)。 // searchKb: POST /api/v1/kb/search,检索台查询(→ mcp-go kb_search,带分数)。
export async function searchKb(kb: string, q: string, topK = 5): Promise<KbHit[]> { export async function searchKb(kb: string, q: string, topK = 5): Promise<KbHit[]> {
const res = await fetch(`${GATEWAY}/api/v1/kb/search`, { const res = await fetch(`${GATEWAY}/api/v1/kb/search`, {
+32 -2
View File
@@ -1,5 +1,5 @@
import { useRef, useState } from "react"; import { useRef, useState } from "react";
import { ingestKb, ingestFile, streamIngest, searchKb, type IngestEvent, type KbHit } from "../lib/api"; import { ingestKb, ingestFile, streamIngest, searchKb, graphKb, type IngestEvent, type KbHit, type Triple } from "../lib/api";
interface IngestLog { interface IngestLog {
t: string; t: string;
@@ -29,6 +29,15 @@ export function KbView() {
const [hits, setHits] = useState<KbHit[] | null>(null); const [hits, setHits] = useState<KbHit[] | null>(null);
const [searching, setSearching] = useState(false); const [searching, setSearching] = useState(false);
const [err, setErr] = useState(""); const [err, setErr] = useState("");
const [graph, setGraph] = useState<Triple[] | null>(null);
const onGraph = async () => {
try {
setGraph(await graphKb(kb));
} catch (e) {
setErr((e as Error).message);
}
};
const stamp = () => new Date().toLocaleTimeString(); const stamp = () => new Date().toLocaleTimeString();
const ingesting = prog?.active ?? false; const ingesting = prog?.active ?? false;
@@ -234,7 +243,7 @@ export function KbView() {
</button> </button>
</div> </div>
{err && <p className="mt-2 text-xs text-rose-600"> {err}</p>} {err && <p className="mt-2 text-xs text-rose-600"> {err}</p>}
<ul className="mt-3 flex-1 space-y-2 overflow-auto"> <ul className="mt-3 max-h-[40%] space-y-2 overflow-auto">
{hits === null && <li className="text-xs text-gray-400"></li>} {hits === null && <li className="text-xs text-gray-400"></li>}
{hits !== null && hits.length === 0 && ( {hits !== null && hits.length === 0 && (
<li className="text-xs text-gray-400"> RAG </li> <li className="text-xs text-gray-400"> RAG </li>
@@ -250,6 +259,27 @@ export function KbView() {
</li> </li>
))} ))}
</ul> </ul>
{/* 知识图谱(Neo4j / GraphRAG */}
<div className="mt-3 flex items-center justify-between border-t pt-2">
<h3 className="text-xs font-semibold text-gray-600">Neo4j</h3>
<button onClick={onGraph} className="rounded border px-2 py-0.5 text-xs hover:bg-gray-50">
</button>
</div>
<ul className="mt-2 flex-1 space-y-1 overflow-auto">
{graph === null && <li className="text-[11px] text-gray-400"></li>}
{graph !== null && graph.length === 0 && (
<li className="text-[11px] text-gray-400"> chat + </li>
)}
{graph?.map((t, i) => (
<li key={i} className="flex items-center gap-1 text-[11px]">
<span className="rounded bg-amber-100 px-1.5 py-0.5 text-amber-700">{t.s}</span>
<span className="text-gray-400">{t.p}</span>
<span className="rounded bg-emerald-100 px-1.5 py-0.5 text-emerald-700">{t.o}</span>
</li>
))}
</ul>
</section> </section>
</div> </div>
</div> </div>
+13
View File
@@ -208,3 +208,16 @@ func (h *Handler) KbSearch(c *gin.Context) {
_ = json.Unmarshal([]byte(res.Content), &hits) _ = json.Unmarshal([]byte(res.Content), &hits)
c.JSON(http.StatusOK, gin.H{"hits": hits}) c.JSON(http.StatusOK, gin.H{"hits": hits})
} }
// KbGraph: GET /api/v1/kb/graph?kb= —— 某知识库的图谱三元组(→ mcp-go kb_graphNeo4j)。
func (h *Handler) KbGraph(c *gin.Context) {
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_graph"),
&contract.ToolCall{Tool: "kb_graph", Args: map[string]any{"kb": c.Query("kb"), "limit": 100}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
var triples []map[string]any
_ = json.Unmarshal([]byte(res.Content), &triples)
c.JSON(http.StatusOK, gin.H{"triples": triples})
}
@@ -27,6 +27,7 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine {
api.POST("/kb/ingest_file", h.KbIngestFile) // 文件入库(docx/xlsx/pdf… 异步) api.POST("/kb/ingest_file", h.KbIngestFile) // 文件入库(docx/xlsx/pdf… 异步)
api.GET("/kb/ingest/:id/stream", h.KbIngestStream) // 入库进度 SSE(实时监控) api.GET("/kb/ingest/:id/stream", h.KbIngestStream) // 入库进度 SSE(实时监控)
api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search
api.GET("/kb/graph", h.KbGraph) // 知识图谱三元组(→ mcp-go kb_graphNeo4j
api.GET("/billing", h.Billing) api.GET("/billing", h.Billing)
// 运维控制面:LLM 模型配置(独立运维控制台调用)。 // 运维控制面:LLM 模型配置(独立运维控制台调用)。
+22 -3
View File
@@ -30,6 +30,9 @@ func main() {
rerankBase := envOr("RERANK_BASE_URL", "") // DashScope 文本重排端点(空=不启用 rerank) rerankBase := envOr("RERANK_BASE_URL", "") // DashScope 文本重排端点(空=不启用 rerank)
rerankKey := envOr("RERANK_API_KEY", "") rerankKey := envOr("RERANK_API_KEY", "")
rerankModel := envOr("RERANK_MODEL", "") rerankModel := envOr("RERANK_MODEL", "")
neo4jURI := envOr("NEO4J_URI", "neo4j://localhost:7687") // GraphRAG 图谱(连不上则降级)
neo4jUser := envOr("NEO4J_USER", "neo4j")
neo4jPass := envOr("NEO4J_PASS", "sundynix")
b, err := sharedbus.Connect(natsURL) b, err := sharedbus.Connect(natsURL)
if err != nil { if err != nil {
@@ -47,23 +50,39 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop() defer stop()
// RAG 核心链:embedding(env 初值) + Milvus(向量) + Bleve(全文) + 可选 rerank // RAG 核心链:embedding + Milvus(向量) + Bleve(全文) + Neo4j(图谱) + 可选 rerank
ragEngine := rag.Open(ctx, milvusAddr, embBase, embKey, embModel, rerankBase, rerankKey, rerankModel) ragEngine := rag.Open(ctx, rag.Config{
MilvusAddr: milvusAddr,
EmbedBase: embBase, EmbedKey: embKey, EmbedModel: embModel,
RerankBase: rerankBase, RerankKey: rerankKey, RerankModel: rerankModel,
Neo4jURI: neo4jURI, Neo4jUser: neo4jUser, Neo4jPass: neo4jPass,
})
defer ragEngine.Close() defer ragEngine.Close()
// 配置控制面:启动取激活 embedding 配置 + 订阅热更新(覆盖 env,持久化由 Gateway 管) // 配置控制面:取激活 embedding(向量) + chat(图谱抽取) 配置并订阅热更新
applyEmbed := func(cfg *contract.ModelConfig) { applyEmbed := func(cfg *contract.ModelConfig) {
if cfg != nil { if cfg != nil {
ragEngine.SetEmbedding(cfg.BaseURL, cfg.APIKey, cfg.Model) ragEngine.SetEmbedding(cfg.BaseURL, cfg.APIKey, cfg.Model)
} }
} }
applyChat := func(cfg *contract.ModelConfig) {
if cfg != nil {
ragEngine.SetChat(cfg.BaseURL, cfg.APIKey, cfg.Model)
}
}
cctx, ccancel := context.WithTimeout(ctx, 3*time.Second) cctx, ccancel := context.WithTimeout(ctx, 3*time.Second)
if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindEmbedding); cfg != nil { if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindEmbedding); cfg != nil {
applyEmbed(cfg) applyEmbed(cfg)
} else { } else {
log.Println("[mcp_go] 未取到 embedding 控制面配置(用 env 或降级)") log.Println("[mcp_go] 未取到 embedding 控制面配置(用 env 或降级)")
} }
if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindChat); cfg != nil {
applyChat(cfg)
}
ccancel() ccancel()
if _, err := b.SubscribeConfigUpdated(contract.ConfigKindChat, applyChat); err != nil {
log.Printf("[mcp_go] subscribe chat config: %v", err)
}
if _, err := b.SubscribeConfigUpdated(contract.ConfigKindEmbedding, applyEmbed); err != nil { if _, err := b.SubscribeConfigUpdated(contract.ConfigKindEmbedding, applyEmbed); err != nil {
log.Printf("[mcp_go] subscribe embedding config: %v", err) log.Printf("[mcp_go] subscribe embedding config: %v", err)
} }
+1
View File
@@ -5,6 +5,7 @@ go 1.24
require ( require (
github.com/blevesearch/bleve/v2 v2.4.2 github.com/blevesearch/bleve/v2 v2.4.2
github.com/milvus-io/milvus-sdk-go/v2 v2.4.1 github.com/milvus-io/milvus-sdk-go/v2 v2.4.1
github.com/neo4j/neo4j-go-driver/v5 v5.24.0
github.com/redis/go-redis/v9 v9.20.0 github.com/redis/go-redis/v9 v9.20.0
github.com/sundynix/sundynix-shared v0.0.0 github.com/sundynix/sundynix-shared v0.0.0
gorm.io/driver/postgres v1.6.0 gorm.io/driver/postgres v1.6.0
+2
View File
@@ -256,6 +256,8 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo=
github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+15 -1
View File
@@ -37,7 +37,7 @@ func (g *Gateway) Serve(ctx context.Context) error {
return err return err
} }
defer func() { _ = unsub() }() defer func() { _ = unsub() }()
log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, memory_get, memory_upsert, history_get, history_append, echo", log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, kb_ingest, kb_search, kb_graph, memory_*, history_*, echo",
contract.SubjectToolsGoAll, contract.QueueToolsGo) contract.SubjectToolsGoAll, contract.QueueToolsGo)
<-ctx.Done() <-ctx.Done()
return ctx.Err() return ctx.Err()
@@ -53,6 +53,8 @@ func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contra
return g.kbIngest(ctx, call) return g.kbIngest(ctx, call)
case "kb_search": case "kb_search":
return g.kbSearch(ctx, call) return g.kbSearch(ctx, call)
case "kb_graph":
return g.kbGraph(ctx, call)
case "memory_get": case "memory_get":
return g.memoryGet(ctx, call) return g.memoryGet(ctx, call)
case "memory_upsert": case "memory_upsert":
@@ -160,6 +162,18 @@ func (g *Gateway) kbSearch(ctx context.Context, call *contract.ToolCall) *contra
return &contract.ToolResult{OK: true, Content: string(data)} return &contract.ToolResult{OK: true, Content: string(data)}
} }
// kbGraph 返回某知识库的图谱三元组 JSON [{s,p,o},...](供 UI 可视化 Neo4j 情况)。
func (g *Gateway) kbGraph(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
kb, _ := call.Args["kb"].(string)
limit := 100
if v, ok := call.Args["limit"].(float64); ok && v > 0 {
limit = int(v)
}
triples := g.rag.Triples(ctx, kb, limit)
data, _ := json.Marshal(triples)
return &contract.ToolResult{OK: true, Content: string(data)}
}
// kbIngest 把文本入库(切块→embedding→Milvus+Bleve)。 // kbIngest 把文本入库(切块→embedding→Milvus+Bleve)。
// 带 job_id 时逐阶段把进度发到 sundynix.streams.<job_id>,供 UI 实时入库监控。 // 带 job_id 时逐阶段把进度发到 sundynix.streams.<job_id>,供 UI 实时入库监控。
func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { func (g *Gateway) kbIngest(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
+72
View File
@@ -0,0 +1,72 @@
package rag
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
// chatClient 是 OpenAI 兼容的非流式对话客户端,供图谱实体抽取用。
// 配置由控制面(chat kind)经 NATS 下发(与 Dispatcher 共用同一个模型)。
type chatClient struct {
baseURL string
apiKey string
model string
hc *http.Client
}
func newChatClient(baseURL, apiKey, model string) *chatClient {
if baseURL == "" || model == "" {
return nil
}
return &chatClient{baseURL: baseURL, apiKey: apiKey, model: model, hc: &http.Client{Timeout: 60 * time.Second}}
}
func (c *chatClient) ready() bool { return c != nil && c.baseURL != "" }
// complete 一次性补全(非流式),返回助手回复文本。
func (c *chatClient) complete(ctx context.Context, system, user string) (string, error) {
body, _ := json.Marshal(map[string]any{
"model": c.model,
"messages": []map[string]string{
{"role": "system", "content": system},
{"role": "user", "content": user},
},
"stream": false,
})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
if c.apiKey != "" {
req.Header.Set("Authorization", "Bearer "+c.apiKey)
}
resp, err := c.hc.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
buf := new(bytes.Buffer)
_, _ = buf.ReadFrom(resp.Body)
return "", fmt.Errorf("chat http %d: %s", resp.StatusCode, buf.String())
}
var out struct {
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
} `json:"choices"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
if len(out.Choices) == 0 {
return "", fmt.Errorf("chat: empty choices")
}
return out.Choices[0].Message.Content, nil
}
+158
View File
@@ -0,0 +1,158 @@
package rag
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/auth"
)
// Triple 是一条知识三元组(主体-关系-客体)。
type Triple struct {
S string `json:"s"`
P string `json:"p"`
O string `json:"o"`
}
// graphStore 是 GraphRAG 的图路:实体/关系存 Neo4j。
type graphStore struct {
driver neo4j.DriverWithContext
}
func openGraph(ctx context.Context, uri, user, pass string) *graphStore {
if uri == "" {
return &graphStore{}
}
drv, err := neo4j.NewDriverWithContext(uri,
auth.BasicTokenManager(func(context.Context) (neo4j.AuthToken, error) {
return neo4j.BasicAuth(user, pass, ""), nil
}))
if err != nil {
log.Printf("[rag] Neo4j 连接失败,图谱路降级: %v", err)
return &graphStore{}
}
if err := drv.VerifyConnectivity(ctx); err != nil {
log.Printf("[rag] Neo4j 不可用,图谱路降级: %v", err)
return &graphStore{}
}
// 实体唯一约束(kb+name)。
_, _ = neo4j.ExecuteQuery(ctx, drv,
"CREATE CONSTRAINT entity_key IF NOT EXISTS FOR (e:Entity) REQUIRE (e.kb, e.name) IS UNIQUE",
nil, neo4j.EagerResultTransformer)
log.Printf("[rag] Neo4j connected %s", uri)
return &graphStore{driver: drv}
}
func (g *graphStore) ready() bool { return g != nil && g.driver != nil }
func (g *graphStore) close(ctx context.Context) {
if g.ready() {
_ = g.driver.Close(ctx)
}
}
// store 把三元组 MERGE 进 Neo4j(实体 + 关系,按 kb 隔离)。
func (g *graphStore) store(ctx context.Context, kb string, triples []Triple) (int, error) {
if !g.ready() {
return 0, nil
}
n := 0
for _, t := range triples {
if t.S == "" || t.O == "" || t.P == "" {
continue
}
_, err := neo4j.ExecuteQuery(ctx, g.driver,
`MERGE (a:Entity {kb:$kb, name:$s})
MERGE (b:Entity {kb:$kb, name:$o})
MERGE (a)-[r:REL {type:$p}]->(b)`,
map[string]any{"kb": kb, "s": t.S, "o": t.O, "p": t.P},
neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase("neo4j"))
if err != nil {
return n, err
}
n++
}
return n, nil
}
// search 图谱召回:找查询里提到的实体,返回其相连三元组(文本化)。
func (g *graphStore) search(ctx context.Context, kb, query string, limit int) []Hit {
if !g.ready() || query == "" {
return nil
}
res, err := neo4j.ExecuteQuery(ctx, g.driver,
`MATCH (a:Entity {kb:$kb})-[r:REL]->(b:Entity {kb:$kb})
WHERE $q CONTAINS a.name OR $q CONTAINS b.name
RETURN a.name AS s, r.type AS p, b.name AS o LIMIT $k`,
map[string]any{"kb": kb, "q": query, "k": limit},
neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase("neo4j"))
if err != nil {
return nil
}
var hits []Hit
for _, rec := range res.Records {
s, _ := rec.Get("s")
p, _ := rec.Get("p")
o, _ := rec.Get("o")
hits = append(hits, Hit{Text: fmt.Sprintf("%v —%v→ %v", s, p, o), Score: 1})
}
return hits
}
// triples 返回某 kb 的全部三元组(供 UI 图谱可视化)。
func (g *graphStore) triples(ctx context.Context, kb string, limit int) []Triple {
if !g.ready() {
return nil
}
res, err := neo4j.ExecuteQuery(ctx, g.driver,
`MATCH (a:Entity {kb:$kb})-[r:REL]->(b:Entity {kb:$kb})
RETURN a.name AS s, r.type AS p, b.name AS o LIMIT $k`,
map[string]any{"kb": kb, "k": limit},
neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase("neo4j"))
if err != nil {
return nil
}
var out []Triple
for _, rec := range res.Records {
s, _ := rec.Get("s")
p, _ := rec.Get("p")
o, _ := rec.Get("o")
out = append(out, Triple{S: fmt.Sprint(s), P: fmt.Sprint(p), O: fmt.Sprint(o)})
}
return out
}
// extractTriples 用 LLM 从文本抽取知识三元组。
func extractTriples(ctx context.Context, chat *chatClient, text string) ([]Triple, error) {
if !chat.ready() {
return nil, nil
}
const sys = "你是知识图谱抽取器。从用户文本中抽取知识三元组,输出 JSON 数组,每项形如 {\"s\":\"主体\",\"p\":\"关系\",\"o\":\"客体\"}。实体用简洁名词,关系用简短动词短语。只输出 JSON,不要任何解释或代码块标记。"
out, err := chat.complete(ctx, sys, text)
if err != nil {
return nil, err
}
return parseTriples(out), nil
}
// parseTriples 容忍代码块/前后噪声地解析三元组 JSON。
func parseTriples(s string) []Triple {
s = strings.TrimSpace(s)
s = strings.TrimPrefix(s, "```json")
s = strings.TrimPrefix(s, "```")
s = strings.TrimSuffix(s, "```")
if i := strings.Index(s, "["); i >= 0 {
if j := strings.LastIndex(s, "]"); j > i {
s = s[i : j+1]
}
}
var triples []Triple
if json.Unmarshal([]byte(s), &triples) != nil {
return nil
}
return triples
}
+69 -16
View File
@@ -15,14 +15,24 @@ import (
// embedBatch 是每批向量化的块数(让大文件的入库进度可观测)。 // embedBatch 是每批向量化的块数(让大文件的入库进度可观测)。
const embedBatch = 10 const embedBatch = 10
// Engine 聚合 embedding + Milvus(向量) + Bleve(全文) + RRF 融合 + 可选 rerank // Config 是 RAG 引擎的初始化配置
// embedding 可热更新(控制面下发)。 type Config struct {
MilvusAddr string
EmbedBase, EmbedKey, EmbedModel string
RerankBase, RerankKey, RerankModel string
Neo4jURI, Neo4jUser, Neo4jPass string
}
// Engine 聚合 embedding + Milvus(向量) + Bleve(全文) + Neo4j(图谱) → RRF 融合 + 可选 rerank。
// embedding 与 chat(图谱抽取用)可热更新(控制面下发)。
type Engine struct { type Engine struct {
mu sync.RWMutex mu sync.RWMutex
emb *embedClient emb *embedClient
chat *chatClient
mv *milvusStore mv *milvusStore
bleve *bleveStore bleve *bleveStore
rerank *rerankClient rerank *rerankClient
graph *graphStore
} }
// SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。 // SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。
@@ -37,36 +47,60 @@ func (e *Engine) SetEmbedding(base, key, model string) {
log.Printf("[rag] embedding 配置: %s model=%s", base, model) log.Printf("[rag] embedding 配置: %s model=%s", base, model)
} }
// SetChat 热更新对话模型配置(图谱实体抽取用,复用控制面 chat 模型)。
func (e *Engine) SetChat(base, key, model string) {
e.mu.Lock()
defer e.mu.Unlock()
e.chat = newChatClient(base, key, model)
if e.chat.ready() {
log.Printf("[rag] 图谱抽取模型: %s model=%s", base, model)
}
}
func (e *Engine) embed() *embedClient { func (e *Engine) embed() *embedClient {
e.mu.RLock() e.mu.RLock()
defer e.mu.RUnlock() defer e.mu.RUnlock()
return e.emb return e.emb
} }
// Open 建立 RAG 引擎。embedding 未配 / Milvus 连不上 → 降级(检索返回空,不阻断工具服务)。 func (e *Engine) chatClient() *chatClient {
// rerank* 为空则不启用重排(融合结果直接返回)。 e.mu.RLock()
func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel, rerankBase, rerankKey, rerankModel string) *Engine { defer e.mu.RUnlock()
e := &Engine{bleve: openBleve(), rerank: newRerankClient(rerankBase, rerankKey, rerankModel)} return e.chat
if e.rerank.ready() { }
log.Printf("[rag] rerank: %s model=%s", rerankBase, rerankModel)
// Open 建立 RAG 引擎。各路连不上 → 降级(不阻断工具服务)。
func Open(ctx context.Context, cfg Config) *Engine {
e := &Engine{
bleve: openBleve(),
rerank: newRerankClient(cfg.RerankBase, cfg.RerankKey, cfg.RerankModel),
graph: openGraph(ctx, cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPass),
} }
if embBase != "" && embModel != "" { if e.rerank.ready() {
e.SetEmbedding(embBase, embKey, embModel) // env 初值(控制面会覆盖) log.Printf("[rag] rerank: %s model=%s", cfg.RerankBase, cfg.RerankModel)
}
if cfg.EmbedBase != "" && cfg.EmbedModel != "" {
e.SetEmbedding(cfg.EmbedBase, cfg.EmbedKey, cfg.EmbedModel)
} else { } else {
log.Println("[rag] embedding 未配置(待控制面下发),向量检索暂降级") log.Println("[rag] embedding 未配置(待控制面下发),向量检索暂降级")
} }
if milvusAddr != "" { if cfg.MilvusAddr != "" {
mv, err := openMilvus(ctx, milvusAddr) mv, err := openMilvus(ctx, cfg.MilvusAddr)
if err != nil { if err != nil {
log.Printf("[rag] Milvus 不可用,向量检索降级: %v", err) log.Printf("[rag] Milvus 不可用,向量检索降级: %v", err)
} else { } else {
e.mv = mv e.mv = mv
log.Printf("[rag] Milvus connected %s", milvusAddr) log.Printf("[rag] Milvus connected %s", cfg.MilvusAddr)
} }
} }
return e return e
} }
// Triples 返回某 kb 的图谱三元组(供 UI 可视化)。
func (e *Engine) Triples(ctx context.Context, kb string, limit int) []Triple {
return e.graph.triples(ctx, kb, limit)
}
// Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。 // Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。
func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil } func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil }
@@ -108,6 +142,22 @@ func (e *Engine) Ingest(ctx context.Context, kb, text string, onProgress func(co
emit(contract.IngestEvent{Stage: "写Bleve", Msg: "全文索引写入中"}) emit(contract.IngestEvent{Stage: "写Bleve", Msg: "全文索引写入中"})
_ = e.bleve.index(kb, chunks) // 同步写全文索引(失败不阻断向量入库) _ = e.bleve.index(kb, chunks) // 同步写全文索引(失败不阻断向量入库)
// 图谱路:LLM 抽实体/关系 → Neo4j(可降级,不阻断向量入库)。
if e.graph.ready() && e.chatClient().ready() {
emit(contract.IngestEvent{Stage: "抽实体", Msg: "LLM 抽取知识三元组"})
triples, terr := extractTriples(ctx, e.chatClient(), text)
if terr != nil {
log.Printf("[rag] 三元组抽取失败(图谱降级): %v", terr)
} else if len(triples) > 0 {
emit(contract.IngestEvent{Stage: "写Neo4j", Total: len(triples), Msg: itoa(len(triples)) + " 条三元组写入图谱"})
if n, gerr := e.graph.store(ctx, kb, triples); gerr != nil {
log.Printf("[rag] 写 Neo4j 失败(图谱降级): %v", gerr)
} else {
log.Printf("[rag] 图谱: 写入 %d 条三元组到 kb=%s", n, kb)
}
}
}
return len(chunks), nil return len(chunks), nil
} }
@@ -155,9 +205,11 @@ func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit,
vecHits, _ := e.mv.search(ctx, kb, vecs[0], fanout) vecHits, _ := e.mv.search(ctx, kb, vecs[0], fanout)
// 全文路 // 全文路
ftHits := e.bleve.search(kb, query, fanout) ftHits := e.bleve.search(kb, query, fanout)
// RRF 融合(按文本去重 // 图谱路(GraphRAG:查询提到的实体的相连三元组
cand := rrf([][]Hit{vecHits, ftHits}, fanout) graphHits := e.graph.search(ctx, kb, query, fanout)
log.Printf("[rag] hybrid: 向量=%d 全文=%d → 融合=%d", len(vecHits), len(ftHits), len(cand)) // RRF 融合(三路,按文本去重)
cand := rrf([][]Hit{vecHits, ftHits, graphHits}, fanout)
log.Printf("[rag] hybrid: 向量=%d 全文=%d 图谱=%d → 融合=%d", len(vecHits), len(ftHits), len(graphHits), len(cand))
// 可选 rerank:对融合候选重排取 topK // 可选 rerank:对融合候选重排取 topK
if e.rerank.ready() && len(cand) > 1 { if e.rerank.ready() && len(cand) > 1 {
@@ -177,6 +229,7 @@ func (e *Engine) Close() {
if e.mv != nil { if e.mv != nil {
e.mv.close() e.mv.close()
} }
e.graph.close(context.Background())
} }
// chunk 朴素切块:按行切,去空白;过长再按长度切。真实系统应做版面/语义切块。 // chunk 朴素切块:按行切,去空白;过长再按长度切。真实系统应做版面/语义切块。