Files
sundynix-agentix/sundynix-gateway/internal/handler/kb.go
T
Blizzard 69967ea534 refactor(kb): 文库列表/正文分离 + [[双链]]索引表(可扛大文件)
不再一次拉回整库正文、不再前端扫全文 —— 列表只读元数据,正文按需取,链接走索引。

- store: SaveDoc 维护 size+preview(前 500 字);ListVault 仅 Select 元数据(name/size/preview,
  不含 content);GetDoc 取单篇全文;DocLink 表 + ReplaceDocLinks(入库/编辑时按 from 重建出链)
  + ListLinks。
- gateway: 入库/笔记保存时正则抽 [[链接]]→ReplaceDocLinks 维护索引;
  /kb/vault 改返元数据+预览;新增 /kb/doc(单篇全文) 与 /kb/links(全库双链)。
- 前端:listVault 返元数据,新增 getDoc/listLinks;VaultPanel 列表只展示名/字数,
  选中后 getDoc 按需载正文(带加载态),反链/笔记关系图改用服务端 links 索引(不扫全文)。

验证:curl /kb/vault 仅 name/size/preview;/kb/doc 取单篇;/kb/links 返 3 条双链。
Preview:文库点「架构总览」按需载正文(平台分五层)、反向链接(1)=Dispatcher(来自索引)。tsc+vite+gateway build 通过。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 16:53:23 +08:00

410 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-shared/contract"
)
// rawKB 规整知识库名(去空白,空则 default)—— 注册表里的展示名。
func rawKB(kb string) string {
kb = strings.TrimSpace(kb)
if kb == "" {
return "default"
}
return kb
}
// scopedKB 把知识库名锁进当前用户作用域:"owner/name"。
// owner 来自身份(X-User-ID),客户端只发库名、发不了 owner,故无法越权查到他人的库。
func scopedKB(c *gin.Context, kb string) string {
return userID(c) + "/" + rawKB(kb)
}
// KbList: GET /api/v1/kb/list —— 当前用户的知识库列表(按 owner 隔离)。
func (h *Handler) KbList(c *gin.Context) {
rows, err := h.db.ListKB(c.Request.Context(), userID(c))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
out := make([]gin.H, 0, len(rows))
for _, r := range rows {
out = append(out, gin.H{"name": r.Name, "kind": r.Kind})
}
c.JSON(http.StatusOK, gin.H{"kbs": out})
}
// KbCreate: POST /api/v1/kb/create {name, kind} —— 新建知识库(folder/project/case/general)。
func (h *Handler) KbCreate(c *gin.Context) {
var body struct {
Name string `json:"name"`
Kind string `json:"kind"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name required"})
return
}
if err := h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.Name), body.Kind); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"name": rawKB(body.Name), "kind": body.Kind})
}
// KbIngest: POST /api/v1/kb/ingest —— 文本入库(异步,返回 job_id;进度经 SSE 看)。
func (h *Handler) KbIngest(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Text string `json:"text"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Text == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "text required"})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.KB), "general")
job := newJobID()
go h.runIngest(job, userID(c), rawKB(body.KB), scopedKB(c, body.KB), "", "", nil, body.Text)
c.JSON(http.StatusAccepted, gin.H{"job_id": job})
}
// KbSaveNote: POST /api/v1/kb/note {kb, name, content} —— 新建/编辑笔记。
// 立即落库(文库可见),并以 name 为 doc 重新入库(替换旧块,搜索/图谱同步)。返回 job_id。
func (h *Handler) KbSaveNote(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Name string `json:"name"`
Content string `json:"content"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" || strings.TrimSpace(body.Content) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name/content required"})
return
}
owner := userID(c)
_ = h.db.EnsureKB(c.Request.Context(), owner, rawKB(body.KB), "general")
// 落库 + 重建索引由后台 runIngest 统一处理(forceDoc=name 保持笔记身份)。
job := newJobID()
go h.runIngest(job, owner, rawKB(body.KB), scopedKB(c, body.KB), body.Name, "", nil, body.Content)
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "name": body.Name})
}
// wikiLinks 从内容抽取所有 [[名称]](忽略别名)去重,用于维护双链索引。
func wikiLinks(s string) []string {
seen := map[string]bool{}
var out []string
for _, m := range wikiRe.FindAllStringSubmatch(s, -1) {
n := strings.TrimSpace(m[1])
if n != "" && !seen[n] {
seen[n] = true
out = append(out, n)
}
}
return out
}
var wikiRe = regexp.MustCompile(`\[\[([^\]|]+)(\|[^\]]*)?\]\]`)
// KbVault: GET /api/v1/kb/vault?kb= —— 某知识库的全部原始文档(名+内容),供 Obsidian 式文库浏览。
func (h *Handler) KbVault(c *gin.Context) {
rows, err := h.db.ListVault(c.Request.Context(), userID(c), rawKB(c.Query("kb")))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
docs := make([]gin.H, 0, len(rows))
for _, r := range rows {
docs = append(docs, gin.H{"name": r.Name, "size": r.Size, "preview": r.Preview})
}
c.JSON(http.StatusOK, gin.H{"docs": docs})
}
// KbDoc: GET /api/v1/kb/doc?kb=&name= —— 取单篇文档全文(按需加载,不在列表里拉全量)。
func (h *Handler) KbDoc(c *gin.Context) {
d, err := h.db.GetDoc(c.Request.Context(), userID(c), rawKB(c.Query("kb")), c.Query("name"))
if err != nil || d == nil {
c.JSON(http.StatusNotFound, gin.H{"error": "文档不存在"})
return
}
c.JSON(http.StatusOK, gin.H{"name": d.Name, "content": d.Content, "size": d.Size})
}
// KbLinks: GET /api/v1/kb/links?kb= —— 某库全部 [[双链]](from→to),供反链/笔记关系图。
func (h *Handler) KbLinks(c *gin.Context) {
rows, err := h.db.ListLinks(c.Request.Context(), userID(c), rawKB(c.Query("kb")))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
links := make([]gin.H, 0, len(rows))
for _, l := range rows {
links = append(links, gin.H{"from": l.FromName, "to": l.ToName})
}
c.JSON(http.StatusOK, gin.H{"links": links})
}
// noteName 取文本首个非空行作笔记名(截断 40 字),用于文本入库的文库留存。
func noteName(text string) string {
for _, line := range strings.Split(text, "\n") {
line = strings.TrimSpace(line)
if line != "" {
r := []rune(line)
if len(r) > 40 {
return string(r[:40])
}
return line
}
}
return "笔记"
}
// KbIngestFile: POST /api/v1/kb/ingest_filemultipart)—— 文件入库(异步,返回 job_id)。
// 流水线(解析→切块→向量化→写入)的进度经 sundynix.streams.<job_id> 回流,UI 用 SSE 看。
func (h *Handler) KbIngestFile(c *gin.Context) {
kb := c.PostForm("kb")
fh, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "file required"})
return
}
f, err := fh.Open()
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(kb), "general")
job := newJobID()
go h.runIngest(job, userID(c), rawKB(kb), scopedKB(c, kb), "", fh.Filename, data, "")
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "file": fh.Filename})
}
// runIngest 后台跑入库流水线,逐阶段把进度发到 sundynix.streams.<job>。
// owner+kbName 用于"文库"原文留存;scoped 是 owner/kb 作向量/全文/图谱分区键。
// forceDoc 非空时强制以它为文档名(笔记编辑用,保持笔记身份稳定)。
// filename 非空表示文件入库(先经 mcp-py 解析);否则用 rawText。
func (h *Handler) runIngest(job, owner, kbName, scoped, forceDoc, filename string, data []byte, rawText string) {
ctx := context.Background()
emit := func(ev contract.IngestEvent) { _ = h.bus.PublishIngest(job, &ev) }
time.Sleep(400 * time.Millisecond) // 给 SSE 客户端订阅时间(core NATS 无缓冲)
text := rawText
if filename != "" {
emit(contract.IngestEvent{Stage: "解析", Msg: filename})
parsed, err := h.parseFile(ctx, filename, data)
if err != nil {
emit(contract.IngestEvent{Stage: "失败", Error: "解析失败: " + err.Error()})
_ = h.bus.CompleteStream(job)
return
}
emit(contract.IngestEvent{
Stage: "解析完成",
Msg: fmt.Sprintf("%s · 解析出 %d 字", fileKind(filename), len([]rune(parsed))),
Preview: head(parsed, 240),
})
text = parsed
}
// 文库留存原文:编辑指定名 > 文件名 > 文本首行。
docName := forceDoc
if docName == "" {
docName = filename
}
if docName == "" {
docName = noteName(text)
}
if text != "" {
_ = h.db.SaveDoc(ctx, owner, kbName, docName, text, "", len([]rune(text)))
_ = h.db.ReplaceDocLinks(ctx, owner, kbName, docName, wikiLinks(text)) // 维护 [[双链]] 索引
}
// 调 mcp-go kb_ingest(带 job_id):它会发 切块/向量化/写入/完成 事件 + CompleteStream。
res, err := h.bus.CallTool(ctx, contract.ToolSubjectGo("kb_ingest"),
&contract.ToolCall{Tool: "kb_ingest", Args: map[string]any{"kb": scoped, "doc": docName, "text": text, "job_id": job}})
if err != nil || res == nil || !res.OK {
msg := "kb_ingest 失败"
if err != nil {
msg = err.Error()
} else if res != nil {
msg = res.Error
}
emit(contract.IngestEvent{Stage: "失败", Error: msg})
_ = h.bus.CompleteStream(job)
}
}
// KbIngestStream: GET /api/v1/kb/ingest/:id/stream —— SSE 实时推送入库进度事件。
func (h *Handler) KbIngestStream(c *gin.Context) {
job := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 64)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(job,
func(ev []byte) {
select {
case events <- ev:
default:
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("progress", string(ev))
return true
case <-done:
c.SSEvent("done", job)
return false
case <-c.Request.Context().Done():
return false
}
})
}
func newJobID() string {
var b [8]byte
_, _ = rand.Read(b[:])
return "ingest_" + hex.EncodeToString(b[:])
}
// itoa 简易整数转字符串(避免引入 strconv)。
func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var b []byte
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
if neg {
b = append([]byte{'-'}, b...)
}
return string(b)
}
// head 取文本前 n 个字符(按 rune),用于解析预览。
func head(s string, n int) string {
s = strings.TrimSpace(s)
r := []rune(s)
if len(r) <= n {
return s
}
return string(r[:n]) + "…"
}
// fileKind 由扩展名给出可读的文件类型标签。
func fileKind(filename string) string {
switch strings.ToLower(filepath.Ext(filename)) {
case ".docx":
return "Word 文档"
case ".xlsx", ".xls":
return "Excel 表格"
case ".pdf":
return "PDF"
case ".csv":
return "CSV"
case ".md", ".markdown":
return "Markdown"
default:
return "文本"
}
}
// parseFile 把文件字节转为纯文本:文本类直读,其余经 mcp-py parse_document(算法层)。
func (h *Handler) parseFile(ctx context.Context, filename string, data []byte) (string, error) {
switch strings.ToLower(filepath.Ext(filename)) {
case ".txt", ".md", ".markdown", ".text":
return string(data), nil
}
res, err := h.bus.CallTool(ctx, contract.ToolSubjectPy("parse_document"),
&contract.ToolCall{Tool: "parse_document", Args: map[string]any{
"filename": filename, "content_b64": base64.StdEncoding.EncodeToString(data),
}})
if err != nil {
return "", err
}
if res == nil || !res.OK {
if res != nil && res.Error != "" {
return "", errors.New(res.Error)
}
return "", errors.New("parse_document 无响应(mcp-py 未运行?)")
}
return res.Content, nil
}
// KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。
func (h *Handler) KbSearch(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Q string `json:"q"`
TopK int `json:"topK"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Q == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "q required"})
return
}
args := map[string]any{"kb": scopedKB(c, body.KB), "q": body.Q}
if body.TopK > 0 {
args["topK"] = body.TopK
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_search"),
&contract.ToolCall{Tool: "kb_search", Args: args})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
var hits []map[string]any
_ = json.Unmarshal([]byte(res.Content), &hits)
c.JSON(http.StatusOK, gin.H{"hits": hits})
}
// KbGraph: GET /api/v1/kb/graph?kb= —— 某知识库的图谱三元组(→ mcp-go kb_graphNeo4j)。
func (h *Handler) KbGraph(c *gin.Context) {
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_graph"),
&contract.ToolCall{Tool: "kb_graph", Args: map[string]any{"kb": scopedKB(c, c.Query("kb")), "limit": 100}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
var triples []map[string]any
_ = json.Unmarshal([]byte(res.Content), &triples)
c.JSON(http.StatusOK, gin.H{"triples": triples})
}