Files
sundynix-agentix/sundynix-gateway/internal/handler/kb.go
T
Blizzard f610d8d2da feat(kb): 大文档正文存 MinIO(PG 只留元数据+预览+对象键)
超过阈值(8000 字)的正文落对象存储,彻底解决十几万字文件塞 PG 的问题。

- internal/blob:minio-go 封装 Store(Open/Put/Get/Delete + Ready 降级);连不上则降级内联。
- docker-compose:milvus-minio 暴露 9000 端口供网关用作文档对象存储(bucket sundynix-docs)。
- main/router/handler:注入 blob.Store(env MINIO_*,默认 localhost:9000 minioadmin)。
- runIngest:size>8000 且 MinIO 可用 → 正文 Put 到 owner/kb/name,PG content 置空仅存
  object_key+preview+size;否则内联。SaveDoc 改为按全文显式传 preview(offload 后内联为空也有预览)。
- KbDoc:object_key 非空时从 MinIO 取回全文。

验证:入 12182 字笔记 → PG content_len=0、object_key=wt/default/超大文件测试、preview 非空、
size=12182;/kb/doc 取回完整 12182 字(来自 MinIO);6321 字的仍内联(object_key 空)。
列表只读元数据+预览。gateway build 通过。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 17:02:44 +08:00

431 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-shared/contract"
)
// docInlineMax 是内联存 PG 的正文字数上限;超过则正文落 MinIO,PG 只留元数据+预览+对象键。
const docInlineMax = 8000
// rawKB 规整知识库名(去空白,空则 default)—— 注册表里的展示名。
func rawKB(kb string) string {
kb = strings.TrimSpace(kb)
if kb == "" {
return "default"
}
return kb
}
// scopedKB 把知识库名锁进当前用户作用域:"owner/name"。
// owner 来自身份(X-User-ID),客户端只发库名、发不了 owner,故无法越权查到他人的库。
func scopedKB(c *gin.Context, kb string) string {
return userID(c) + "/" + rawKB(kb)
}
// KbList: GET /api/v1/kb/list —— 当前用户的知识库列表(按 owner 隔离)。
func (h *Handler) KbList(c *gin.Context) {
rows, err := h.db.ListKB(c.Request.Context(), userID(c))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
out := make([]gin.H, 0, len(rows))
for _, r := range rows {
out = append(out, gin.H{"name": r.Name, "kind": r.Kind})
}
c.JSON(http.StatusOK, gin.H{"kbs": out})
}
// KbCreate: POST /api/v1/kb/create {name, kind} —— 新建知识库(folder/project/case/general)。
func (h *Handler) KbCreate(c *gin.Context) {
var body struct {
Name string `json:"name"`
Kind string `json:"kind"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name required"})
return
}
if err := h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.Name), body.Kind); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"name": rawKB(body.Name), "kind": body.Kind})
}
// KbIngest: POST /api/v1/kb/ingest —— 文本入库(异步,返回 job_id;进度经 SSE 看)。
func (h *Handler) KbIngest(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Text string `json:"text"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Text == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "text required"})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.KB), "general")
job := newJobID()
go h.runIngest(job, userID(c), rawKB(body.KB), scopedKB(c, body.KB), "", "", nil, body.Text)
c.JSON(http.StatusAccepted, gin.H{"job_id": job})
}
// KbSaveNote: POST /api/v1/kb/note {kb, name, content} —— 新建/编辑笔记。
// 立即落库(文库可见),并以 name 为 doc 重新入库(替换旧块,搜索/图谱同步)。返回 job_id。
func (h *Handler) KbSaveNote(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Name string `json:"name"`
Content string `json:"content"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" || strings.TrimSpace(body.Content) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name/content required"})
return
}
owner := userID(c)
_ = h.db.EnsureKB(c.Request.Context(), owner, rawKB(body.KB), "general")
// 落库 + 重建索引由后台 runIngest 统一处理(forceDoc=name 保持笔记身份)。
job := newJobID()
go h.runIngest(job, owner, rawKB(body.KB), scopedKB(c, body.KB), body.Name, "", nil, body.Content)
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "name": body.Name})
}
// wikiLinks 从内容抽取所有 [[名称]](忽略别名)去重,用于维护双链索引。
func wikiLinks(s string) []string {
seen := map[string]bool{}
var out []string
for _, m := range wikiRe.FindAllStringSubmatch(s, -1) {
n := strings.TrimSpace(m[1])
if n != "" && !seen[n] {
seen[n] = true
out = append(out, n)
}
}
return out
}
var wikiRe = regexp.MustCompile(`\[\[([^\]|]+)(\|[^\]]*)?\]\]`)
// KbVault: GET /api/v1/kb/vault?kb= —— 某知识库的全部原始文档(名+内容),供 Obsidian 式文库浏览。
func (h *Handler) KbVault(c *gin.Context) {
rows, err := h.db.ListVault(c.Request.Context(), userID(c), rawKB(c.Query("kb")))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
docs := make([]gin.H, 0, len(rows))
for _, r := range rows {
docs = append(docs, gin.H{"name": r.Name, "size": r.Size, "preview": r.Preview})
}
c.JSON(http.StatusOK, gin.H{"docs": docs})
}
// KbDoc: GET /api/v1/kb/doc?kb=&name= —— 取单篇文档全文(按需加载,不在列表里拉全量)。
func (h *Handler) KbDoc(c *gin.Context) {
d, err := h.db.GetDoc(c.Request.Context(), userID(c), rawKB(c.Query("kb")), c.Query("name"))
if err != nil || d == nil {
c.JSON(http.StatusNotFound, gin.H{"error": "文档不存在"})
return
}
content := d.Content
if d.ObjectKey != "" && h.blob.Ready() { // 大文档:从 MinIO 取回正文
if obj, oerr := h.blob.Get(c.Request.Context(), d.ObjectKey); oerr == nil {
content = obj
}
}
c.JSON(http.StatusOK, gin.H{"name": d.Name, "content": content, "size": d.Size})
}
// KbLinks: GET /api/v1/kb/links?kb= —— 某库全部 [[双链]](from→to),供反链/笔记关系图。
func (h *Handler) KbLinks(c *gin.Context) {
rows, err := h.db.ListLinks(c.Request.Context(), userID(c), rawKB(c.Query("kb")))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
links := make([]gin.H, 0, len(rows))
for _, l := range rows {
links = append(links, gin.H{"from": l.FromName, "to": l.ToName})
}
c.JSON(http.StatusOK, gin.H{"links": links})
}
// noteName 取文本首个非空行作笔记名(截断 40 字),用于文本入库的文库留存。
func noteName(text string) string {
for _, line := range strings.Split(text, "\n") {
line = strings.TrimSpace(line)
if line != "" {
r := []rune(line)
if len(r) > 40 {
return string(r[:40])
}
return line
}
}
return "笔记"
}
// KbIngestFile: POST /api/v1/kb/ingest_filemultipart)—— 文件入库(异步,返回 job_id)。
// 流水线(解析→切块→向量化→写入)的进度经 sundynix.streams.<job_id> 回流,UI 用 SSE 看。
func (h *Handler) KbIngestFile(c *gin.Context) {
kb := c.PostForm("kb")
fh, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "file required"})
return
}
f, err := fh.Open()
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(kb), "general")
job := newJobID()
go h.runIngest(job, userID(c), rawKB(kb), scopedKB(c, kb), "", fh.Filename, data, "")
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "file": fh.Filename})
}
// runIngest 后台跑入库流水线,逐阶段把进度发到 sundynix.streams.<job>。
// owner+kbName 用于"文库"原文留存;scoped 是 owner/kb 作向量/全文/图谱分区键。
// forceDoc 非空时强制以它为文档名(笔记编辑用,保持笔记身份稳定)。
// filename 非空表示文件入库(先经 mcp-py 解析);否则用 rawText。
func (h *Handler) runIngest(job, owner, kbName, scoped, forceDoc, filename string, data []byte, rawText string) {
ctx := context.Background()
emit := func(ev contract.IngestEvent) { _ = h.bus.PublishIngest(job, &ev) }
time.Sleep(400 * time.Millisecond) // 给 SSE 客户端订阅时间(core NATS 无缓冲)
text := rawText
if filename != "" {
emit(contract.IngestEvent{Stage: "解析", Msg: filename})
parsed, err := h.parseFile(ctx, filename, data)
if err != nil {
emit(contract.IngestEvent{Stage: "失败", Error: "解析失败: " + err.Error()})
_ = h.bus.CompleteStream(job)
return
}
emit(contract.IngestEvent{
Stage: "解析完成",
Msg: fmt.Sprintf("%s · 解析出 %d 字", fileKind(filename), len([]rune(parsed))),
Preview: head(parsed, 240),
})
text = parsed
}
// 文库留存原文:编辑指定名 > 文件名 > 文本首行。
docName := forceDoc
if docName == "" {
docName = filename
}
if docName == "" {
docName = noteName(text)
}
if text != "" {
size := len([]rune(text))
inline, objectKey := text, ""
// 大文档正文落对象存储,PG 只留元数据+预览+对象键(避免把十几万字塞进 PG)。
if size > docInlineMax && h.blob.Ready() {
key := owner + "/" + kbName + "/" + docName
if err := h.blob.Put(ctx, key, text); err == nil {
inline, objectKey = "", key
} else {
log.Printf("[gateway] 大文档转 MinIO 失败,回退内联: %v", err)
}
}
_ = h.db.SaveDoc(ctx, owner, kbName, docName, inline, objectKey, size, head(text, 500))
_ = h.db.ReplaceDocLinks(ctx, owner, kbName, docName, wikiLinks(text)) // 维护 [[双链]] 索引
}
// 调 mcp-go kb_ingest(带 job_id):它会发 切块/向量化/写入/完成 事件 + CompleteStream。
res, err := h.bus.CallTool(ctx, contract.ToolSubjectGo("kb_ingest"),
&contract.ToolCall{Tool: "kb_ingest", Args: map[string]any{"kb": scoped, "doc": docName, "text": text, "job_id": job}})
if err != nil || res == nil || !res.OK {
msg := "kb_ingest 失败"
if err != nil {
msg = err.Error()
} else if res != nil {
msg = res.Error
}
emit(contract.IngestEvent{Stage: "失败", Error: msg})
_ = h.bus.CompleteStream(job)
}
}
// KbIngestStream: GET /api/v1/kb/ingest/:id/stream —— SSE 实时推送入库进度事件。
func (h *Handler) KbIngestStream(c *gin.Context) {
job := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 64)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(job,
func(ev []byte) {
select {
case events <- ev:
default:
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("progress", string(ev))
return true
case <-done:
c.SSEvent("done", job)
return false
case <-c.Request.Context().Done():
return false
}
})
}
func newJobID() string {
var b [8]byte
_, _ = rand.Read(b[:])
return "ingest_" + hex.EncodeToString(b[:])
}
// itoa 简易整数转字符串(避免引入 strconv)。
func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var b []byte
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
if neg {
b = append([]byte{'-'}, b...)
}
return string(b)
}
// head 取文本前 n 个字符(按 rune),用于解析预览。
func head(s string, n int) string {
s = strings.TrimSpace(s)
r := []rune(s)
if len(r) <= n {
return s
}
return string(r[:n]) + "…"
}
// fileKind 由扩展名给出可读的文件类型标签。
func fileKind(filename string) string {
switch strings.ToLower(filepath.Ext(filename)) {
case ".docx":
return "Word 文档"
case ".xlsx", ".xls":
return "Excel 表格"
case ".pdf":
return "PDF"
case ".csv":
return "CSV"
case ".md", ".markdown":
return "Markdown"
default:
return "文本"
}
}
// parseFile 把文件字节转为纯文本:文本类直读,其余经 mcp-py parse_document(算法层)。
func (h *Handler) parseFile(ctx context.Context, filename string, data []byte) (string, error) {
switch strings.ToLower(filepath.Ext(filename)) {
case ".txt", ".md", ".markdown", ".text":
return string(data), nil
}
res, err := h.bus.CallTool(ctx, contract.ToolSubjectPy("parse_document"),
&contract.ToolCall{Tool: "parse_document", Args: map[string]any{
"filename": filename, "content_b64": base64.StdEncoding.EncodeToString(data),
}})
if err != nil {
return "", err
}
if res == nil || !res.OK {
if res != nil && res.Error != "" {
return "", errors.New(res.Error)
}
return "", errors.New("parse_document 无响应(mcp-py 未运行?)")
}
return res.Content, nil
}
// KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。
func (h *Handler) KbSearch(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Q string `json:"q"`
TopK int `json:"topK"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Q == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "q required"})
return
}
args := map[string]any{"kb": scopedKB(c, body.KB), "q": body.Q}
if body.TopK > 0 {
args["topK"] = body.TopK
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_search"),
&contract.ToolCall{Tool: "kb_search", Args: args})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
var hits []map[string]any
_ = json.Unmarshal([]byte(res.Content), &hits)
c.JSON(http.StatusOK, gin.H{"hits": hits})
}
// KbGraph: GET /api/v1/kb/graph?kb= —— 某知识库的图谱三元组(→ mcp-go kb_graphNeo4j)。
func (h *Handler) KbGraph(c *gin.Context) {
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_graph"),
&contract.ToolCall{Tool: "kb_graph", Args: map[string]any{"kb": scopedKB(c, c.Query("kb")), "limit": 100}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
var triples []map[string]any
_ = json.Unmarshal([]byte(res.Content), &triples)
c.JSON(http.StatusOK, gin.H{"triples": triples})
}