Files
sundynix-agentix/sundynix-gateway/internal/handler/kb.go
T
Blizzard 5d76652bff refactor(kb): 文件主表 + 文档关联改用雪花ID(弃用按名关联)
把 sundynix_doc 明确为"文件主表",补齐文件基础信息字段;
文档间 [[双链]] 改为以 Doc.ID 关联,查询/渲染一律按文件 ID。

store:
- Doc 增 Ext(后缀)/MD5(内容指纹) 字段;ObjectKey 即"存放链接"
- DocLink 由 (FromName,ToName) 改为 (FromID,ToID,ToName)
  · FromID/ToID 关联 Doc.ID;ToName 保留用于悬空链接展示与回填
- SaveDoc 返回新建/更新文件的雪花 ID(供建链)
- 新增 GetDocByID(按 ID + owner 取正文,防越权)
- ReplaceDocLinks 以 fromID 重建出链,按 [[名称]] 解析目标 ID
- 新增 ResolveInboundLinks:目标入库后回填指向它的悬空链接
- ListLinks 只返回已解析(to_id 非空)的 ID→ID 边
- migrateDocLinkToID:旧按名双链表无 from_id 列则重建为按 ID 关联

gateway/handler:
- runIngest 计算 ext/md5,SaveDoc 取回 ID 后建链 + 回填悬空
- KbDoc 改为 GET ?id=(按文件 ID 取全文)
- KbVault 返回 id+ext;KbLinks 返回 from/to 为 ID

desktop:
- VaultDoc 增 id/ext;getDoc(docId) 按 ID 取正文
- VaultPanel 选中态/正文/反链/关系图改用 ID,名↔ID 双向映射
  渲染;保存笔记后按名定位回其新 ID

验证(gateway+PG+MinIO 实测):vault 带 id+ext;双链 ID→ID 且
A→B 悬空链接在 B 入库后成功回填;按 ID 取大文档(15006字)从
MinIO 完整取回;跨 owner 按 ID 取文档 404(隔离生效)。桌面端
文库 Tab 按 ID 选中/载入/反链渲染正常,无控制台报错。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 09:38:02 +08:00

440 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"context"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-shared/contract"
)
// docInlineMax 是内联存 PG 的正文字数上限;超过则正文落 MinIO,PG 只留元数据+预览+对象键。
const docInlineMax = 8000
// rawKB 规整知识库名(去空白,空则 default)—— 注册表里的展示名。
func rawKB(kb string) string {
kb = strings.TrimSpace(kb)
if kb == "" {
return "default"
}
return kb
}
// scopedKB 把知识库名锁进当前用户作用域:"owner/name"。
// owner 来自身份(X-User-ID),客户端只发库名、发不了 owner,故无法越权查到他人的库。
func scopedKB(c *gin.Context, kb string) string {
return userID(c) + "/" + rawKB(kb)
}
// KbList: GET /api/v1/kb/list —— 当前用户的知识库列表(按 owner 隔离)。
func (h *Handler) KbList(c *gin.Context) {
rows, err := h.db.ListKB(c.Request.Context(), userID(c))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
out := make([]gin.H, 0, len(rows))
for _, r := range rows {
out = append(out, gin.H{"name": r.Name, "kind": r.Kind})
}
c.JSON(http.StatusOK, gin.H{"kbs": out})
}
// KbCreate: POST /api/v1/kb/create {name, kind} —— 新建知识库(folder/project/case/general)。
func (h *Handler) KbCreate(c *gin.Context) {
var body struct {
Name string `json:"name"`
Kind string `json:"kind"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name required"})
return
}
if err := h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.Name), body.Kind); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"name": rawKB(body.Name), "kind": body.Kind})
}
// KbIngest: POST /api/v1/kb/ingest —— 文本入库(异步,返回 job_id;进度经 SSE 看)。
func (h *Handler) KbIngest(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Text string `json:"text"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Text == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "text required"})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.KB), "general")
job := newJobID()
go h.runIngest(job, userID(c), rawKB(body.KB), scopedKB(c, body.KB), "", "", nil, body.Text)
c.JSON(http.StatusAccepted, gin.H{"job_id": job})
}
// KbSaveNote: POST /api/v1/kb/note {kb, name, content} —— 新建/编辑笔记。
// 立即落库(文库可见),并以 name 为 doc 重新入库(替换旧块,搜索/图谱同步)。返回 job_id。
func (h *Handler) KbSaveNote(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Name string `json:"name"`
Content string `json:"content"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" || strings.TrimSpace(body.Content) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name/content required"})
return
}
owner := userID(c)
_ = h.db.EnsureKB(c.Request.Context(), owner, rawKB(body.KB), "general")
// 落库 + 重建索引由后台 runIngest 统一处理(forceDoc=name 保持笔记身份)。
job := newJobID()
go h.runIngest(job, owner, rawKB(body.KB), scopedKB(c, body.KB), body.Name, "", nil, body.Content)
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "name": body.Name})
}
// wikiLinks 从内容抽取所有 [[名称]](忽略别名)去重,用于维护双链索引。
func wikiLinks(s string) []string {
seen := map[string]bool{}
var out []string
for _, m := range wikiRe.FindAllStringSubmatch(s, -1) {
n := strings.TrimSpace(m[1])
if n != "" && !seen[n] {
seen[n] = true
out = append(out, n)
}
}
return out
}
var wikiRe = regexp.MustCompile(`\[\[([^\]|]+)(\|[^\]]*)?\]\]`)
// KbVault: GET /api/v1/kb/vault?kb= —— 某知识库的全部原始文档(名+内容),供 Obsidian 式文库浏览。
func (h *Handler) KbVault(c *gin.Context) {
rows, err := h.db.ListVault(c.Request.Context(), userID(c), rawKB(c.Query("kb")))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
docs := make([]gin.H, 0, len(rows))
for _, r := range rows {
docs = append(docs, gin.H{"id": r.ID, "name": r.Name, "ext": r.Ext, "size": r.Size, "preview": r.Preview})
}
c.JSON(http.StatusOK, gin.H{"docs": docs})
}
// KbDoc: GET /api/v1/kb/doc?id= —— 按文件 ID 取单篇全文(按需加载,不在列表里拉全量)。
func (h *Handler) KbDoc(c *gin.Context) {
d, err := h.db.GetDocByID(c.Request.Context(), userID(c), c.Query("id"))
if err != nil || d == nil {
c.JSON(http.StatusNotFound, gin.H{"error": "文档不存在"})
return
}
content := d.Content
if d.ObjectKey != "" && h.blob.Ready() { // 大文档:从 MinIO 取回正文
if obj, oerr := h.blob.Get(c.Request.Context(), d.ObjectKey); oerr == nil {
content = obj
}
}
c.JSON(http.StatusOK, gin.H{"id": d.ID, "name": d.Name, "ext": d.Ext, "content": content, "size": d.Size})
}
// KbLinks: GET /api/v1/kb/links?kb= —— 某库已解析的 [[双链]](FromID→ToID),供反链/笔记关系图按 ID 渲染。
func (h *Handler) KbLinks(c *gin.Context) {
rows, err := h.db.ListLinks(c.Request.Context(), userID(c), rawKB(c.Query("kb")))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
links := make([]gin.H, 0, len(rows))
for _, l := range rows {
links = append(links, gin.H{"from": l.FromID, "to": l.ToID})
}
c.JSON(http.StatusOK, gin.H{"links": links})
}
// noteName 取文本首个非空行作笔记名(截断 40 字),用于文本入库的文库留存。
func noteName(text string) string {
for _, line := range strings.Split(text, "\n") {
line = strings.TrimSpace(line)
if line != "" {
r := []rune(line)
if len(r) > 40 {
return string(r[:40])
}
return line
}
}
return "笔记"
}
// KbIngestFile: POST /api/v1/kb/ingest_filemultipart)—— 文件入库(异步,返回 job_id)。
// 流水线(解析→切块→向量化→写入)的进度经 sundynix.streams.<job_id> 回流,UI 用 SSE 看。
func (h *Handler) KbIngestFile(c *gin.Context) {
kb := c.PostForm("kb")
fh, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "file required"})
return
}
f, err := fh.Open()
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(kb), "general")
job := newJobID()
go h.runIngest(job, userID(c), rawKB(kb), scopedKB(c, kb), "", fh.Filename, data, "")
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "file": fh.Filename})
}
// runIngest 后台跑入库流水线,逐阶段把进度发到 sundynix.streams.<job>。
// owner+kbName 用于"文库"原文留存;scoped 是 owner/kb 作向量/全文/图谱分区键。
// forceDoc 非空时强制以它为文档名(笔记编辑用,保持笔记身份稳定)。
// filename 非空表示文件入库(先经 mcp-py 解析);否则用 rawText。
func (h *Handler) runIngest(job, owner, kbName, scoped, forceDoc, filename string, data []byte, rawText string) {
ctx := context.Background()
emit := func(ev contract.IngestEvent) { _ = h.bus.PublishIngest(job, &ev) }
time.Sleep(400 * time.Millisecond) // 给 SSE 客户端订阅时间(core NATS 无缓冲)
text := rawText
if filename != "" {
emit(contract.IngestEvent{Stage: "解析", Msg: filename})
parsed, err := h.parseFile(ctx, filename, data)
if err != nil {
emit(contract.IngestEvent{Stage: "失败", Error: "解析失败: " + err.Error()})
_ = h.bus.CompleteStream(job)
return
}
emit(contract.IngestEvent{
Stage: "解析完成",
Msg: fmt.Sprintf("%s · 解析出 %d 字", fileKind(filename), len([]rune(parsed))),
Preview: head(parsed, 240),
})
text = parsed
}
// 文库留存原文:编辑指定名 > 文件名 > 文本首行。
docName := forceDoc
if docName == "" {
docName = filename
}
if docName == "" {
docName = noteName(text)
}
if text != "" {
size := len([]rune(text))
ext := strings.ToLower(filepath.Ext(filename)) // 笔记/文本入库时 filename 为空 → ext 为空
sum := md5.Sum([]byte(text))
md5hex := hex.EncodeToString(sum[:])
inline, objectKey := text, ""
// 大文档正文落对象存储,PG 只留元数据+预览+对象键(避免把十几万字塞进 PG)。
if size > docInlineMax && h.blob.Ready() {
key := owner + "/" + kbName + "/" + docName
if err := h.blob.Put(ctx, key, text); err == nil {
inline, objectKey = "", key
} else {
log.Printf("[gateway] 大文档转 MinIO 失败,回退内联: %v", err)
}
}
docID, err := h.db.SaveDoc(ctx, owner, kbName, docName, ext, md5hex, inline, objectKey, size, head(text, 500))
if err != nil {
log.Printf("[gateway] 文件入库失败: %v", err)
} else if docID != "" {
_ = h.db.ReplaceDocLinks(ctx, owner, kbName, docID, wikiLinks(text)) // 以本文件 ID 维护出链
_ = h.db.ResolveInboundLinks(ctx, owner, kbName, docName, docID) // 回填指向本文件的悬空链接
}
}
// 调 mcp-go kb_ingest(带 job_id):它会发 切块/向量化/写入/完成 事件 + CompleteStream。
res, err := h.bus.CallTool(ctx, contract.ToolSubjectGo("kb_ingest"),
&contract.ToolCall{Tool: "kb_ingest", Args: map[string]any{"kb": scoped, "doc": docName, "text": text, "job_id": job}})
if err != nil || res == nil || !res.OK {
msg := "kb_ingest 失败"
if err != nil {
msg = err.Error()
} else if res != nil {
msg = res.Error
}
emit(contract.IngestEvent{Stage: "失败", Error: msg})
_ = h.bus.CompleteStream(job)
}
}
// KbIngestStream: GET /api/v1/kb/ingest/:id/stream —— SSE 实时推送入库进度事件。
func (h *Handler) KbIngestStream(c *gin.Context) {
job := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 64)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(job,
func(ev []byte) {
select {
case events <- ev:
default:
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("progress", string(ev))
return true
case <-done:
c.SSEvent("done", job)
return false
case <-c.Request.Context().Done():
return false
}
})
}
func newJobID() string {
var b [8]byte
_, _ = rand.Read(b[:])
return "ingest_" + hex.EncodeToString(b[:])
}
// itoa 简易整数转字符串(避免引入 strconv)。
func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var b []byte
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
if neg {
b = append([]byte{'-'}, b...)
}
return string(b)
}
// head 取文本前 n 个字符(按 rune),用于解析预览。
func head(s string, n int) string {
s = strings.TrimSpace(s)
r := []rune(s)
if len(r) <= n {
return s
}
return string(r[:n]) + "…"
}
// fileKind 由扩展名给出可读的文件类型标签。
func fileKind(filename string) string {
switch strings.ToLower(filepath.Ext(filename)) {
case ".docx":
return "Word 文档"
case ".xlsx", ".xls":
return "Excel 表格"
case ".pdf":
return "PDF"
case ".csv":
return "CSV"
case ".md", ".markdown":
return "Markdown"
default:
return "文本"
}
}
// parseFile 把文件字节转为纯文本:文本类直读,其余经 mcp-py parse_document(算法层)。
func (h *Handler) parseFile(ctx context.Context, filename string, data []byte) (string, error) {
switch strings.ToLower(filepath.Ext(filename)) {
case ".txt", ".md", ".markdown", ".text":
return string(data), nil
}
res, err := h.bus.CallTool(ctx, contract.ToolSubjectPy("parse_document"),
&contract.ToolCall{Tool: "parse_document", Args: map[string]any{
"filename": filename, "content_b64": base64.StdEncoding.EncodeToString(data),
}})
if err != nil {
return "", err
}
if res == nil || !res.OK {
if res != nil && res.Error != "" {
return "", errors.New(res.Error)
}
return "", errors.New("parse_document 无响应(mcp-py 未运行?)")
}
return res.Content, nil
}
// KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。
func (h *Handler) KbSearch(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Q string `json:"q"`
TopK int `json:"topK"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Q == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "q required"})
return
}
args := map[string]any{"kb": scopedKB(c, body.KB), "q": body.Q}
if body.TopK > 0 {
args["topK"] = body.TopK
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_search"),
&contract.ToolCall{Tool: "kb_search", Args: args})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
var hits []map[string]any
_ = json.Unmarshal([]byte(res.Content), &hits)
c.JSON(http.StatusOK, gin.H{"hits": hits})
}
// KbGraph: GET /api/v1/kb/graph?kb= —— 某知识库的图谱三元组(→ mcp-go kb_graphNeo4j)。
func (h *Handler) KbGraph(c *gin.Context) {
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_graph"),
&contract.ToolCall{Tool: "kb_graph", Args: map[string]any{"kb": scopedKB(c, c.Query("kb")), "limit": 100}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
var triples []map[string]any
_ = json.Unmarshal([]byte(res.Content), &triples)
c.JSON(http.StatusOK, gin.H{"triples": triples})
}