Files
sundynix-agentix/sundynix-gateway/internal/handler/kb.go
T
Blizzard 3a175e46f3 feat(kb): 批量文件入库(文件列表) + 项目/案件知识库 + owner 作用域隔离
回应三点诉求:一次入一批文件、按文件夹/项目/案件组织、且只有我能查我的库。

隔离(核心):知识库实际分区键 = "owner/name",owner 由网关从 X-User-ID 注入,
客户端只发库名、发不了 owner —— 故任何人都只能查到自己 owner 前缀下的数据。
- gateway: scopedKB(owner/kb) 注入 ingest/search/graph;ingest/search/graph 全部带身份头。
- store: sundynix_kb 注册表(owner+name 唯一 + kind),ListKB/EnsureKB(OnConflict DoNothing)。

项目/案件组织:
- gateway: GET /kb/list(owner 隔离列表)、POST /kb/create(folder/project/case/general);
  入库时 EnsureKB 自动登记。
- 前端: KbView 顶部知识库下拉 + 新建(项目/案件/文件夹/通用),检索/图谱/入库都绑定所选库。

批量文件:
- 前端: 选择文件(multiple) + 选择文件夹(webkitdirectory) + 拖拽一批 → 每文件一个 job,
  文件列表实时显示各自状态(排队/解析/向量化/写入/抽取/完成/失败)+ 完成/失败计数。

验证:curl 证隔离 —— wt 入 default→可检索;alice 查同名 default→[] 空;alice 列表不含 wt 案件库。
Preview 证 UI —— 知识库下拉含 案件-2024-001(案件)+default(通用)、owner 隔离徽标、批量/文件夹按钮。
tsc+vite+gateway build 通过;重建 .app 重启窗口。

注:身份目前来自 X-User-ID 头(可信前端),生产应换 JWT 鉴权中间件——隔离机制(owner 前缀)已就位。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 14:50:33 +08:00

305 lines
8.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-shared/contract"
)
// rawKB 规整知识库名(去空白,空则 default)—— 注册表里的展示名。
func rawKB(kb string) string {
kb = strings.TrimSpace(kb)
if kb == "" {
return "default"
}
return kb
}
// scopedKB 把知识库名锁进当前用户作用域:"owner/name"。
// owner 来自身份(X-User-ID),客户端只发库名、发不了 owner,故无法越权查到他人的库。
func scopedKB(c *gin.Context, kb string) string {
return userID(c) + "/" + rawKB(kb)
}
// KbList: GET /api/v1/kb/list —— 当前用户的知识库列表(按 owner 隔离)。
func (h *Handler) KbList(c *gin.Context) {
rows, err := h.db.ListKB(c.Request.Context(), userID(c))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
out := make([]gin.H, 0, len(rows))
for _, r := range rows {
out = append(out, gin.H{"name": r.Name, "kind": r.Kind})
}
c.JSON(http.StatusOK, gin.H{"kbs": out})
}
// KbCreate: POST /api/v1/kb/create {name, kind} —— 新建知识库(folder/project/case/general)。
func (h *Handler) KbCreate(c *gin.Context) {
var body struct {
Name string `json:"name"`
Kind string `json:"kind"`
}
if err := c.ShouldBindJSON(&body); err != nil || strings.TrimSpace(body.Name) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "name required"})
return
}
if err := h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.Name), body.Kind); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"name": rawKB(body.Name), "kind": body.Kind})
}
// KbIngest: POST /api/v1/kb/ingest —— 文本入库(异步,返回 job_id;进度经 SSE 看)。
func (h *Handler) KbIngest(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Text string `json:"text"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Text == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "text required"})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(body.KB), "general")
job := newJobID()
go h.runIngest(job, scopedKB(c, body.KB), "", nil, body.Text)
c.JSON(http.StatusAccepted, gin.H{"job_id": job})
}
// KbIngestFile: POST /api/v1/kb/ingest_filemultipart)—— 文件入库(异步,返回 job_id)。
// 流水线(解析→切块→向量化→写入)的进度经 sundynix.streams.<job_id> 回流,UI 用 SSE 看。
func (h *Handler) KbIngestFile(c *gin.Context) {
kb := c.PostForm("kb")
fh, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "file required"})
return
}
f, err := fh.Open()
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
_ = h.db.EnsureKB(c.Request.Context(), userID(c), rawKB(kb), "general")
job := newJobID()
go h.runIngest(job, scopedKB(c, kb), fh.Filename, data, "")
c.JSON(http.StatusAccepted, gin.H{"job_id": job, "file": fh.Filename})
}
// runIngest 后台跑入库流水线,逐阶段把进度发到 sundynix.streams.<job>。
// filename 非空表示文件入库(先经 mcp-py 解析);否则用 rawText。
func (h *Handler) runIngest(job, kb, filename string, data []byte, rawText string) {
ctx := context.Background()
emit := func(ev contract.IngestEvent) { _ = h.bus.PublishIngest(job, &ev) }
time.Sleep(400 * time.Millisecond) // 给 SSE 客户端订阅时间(core NATS 无缓冲)
text := rawText
if filename != "" {
emit(contract.IngestEvent{Stage: "解析", Msg: filename})
parsed, err := h.parseFile(ctx, filename, data)
if err != nil {
emit(contract.IngestEvent{Stage: "失败", Error: "解析失败: " + err.Error()})
_ = h.bus.CompleteStream(job)
return
}
emit(contract.IngestEvent{
Stage: "解析完成",
Msg: fmt.Sprintf("%s · 解析出 %d 字", fileKind(filename), len([]rune(parsed))),
Preview: head(parsed, 240),
})
text = parsed
}
// 调 mcp-go kb_ingest(带 job_id):它会发 切块/向量化/写入/完成 事件 + CompleteStream。
res, err := h.bus.CallTool(ctx, contract.ToolSubjectGo("kb_ingest"),
&contract.ToolCall{Tool: "kb_ingest", Args: map[string]any{"kb": kb, "text": text, "job_id": job}})
if err != nil || res == nil || !res.OK {
msg := "kb_ingest 失败"
if err != nil {
msg = err.Error()
} else if res != nil {
msg = res.Error
}
emit(contract.IngestEvent{Stage: "失败", Error: msg})
_ = h.bus.CompleteStream(job)
}
}
// KbIngestStream: GET /api/v1/kb/ingest/:id/stream —— SSE 实时推送入库进度事件。
func (h *Handler) KbIngestStream(c *gin.Context) {
job := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 64)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(job,
func(ev []byte) {
select {
case events <- ev:
default:
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("progress", string(ev))
return true
case <-done:
c.SSEvent("done", job)
return false
case <-c.Request.Context().Done():
return false
}
})
}
func newJobID() string {
var b [8]byte
_, _ = rand.Read(b[:])
return "ingest_" + hex.EncodeToString(b[:])
}
// itoa 简易整数转字符串(避免引入 strconv)。
func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var b []byte
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
if neg {
b = append([]byte{'-'}, b...)
}
return string(b)
}
// head 取文本前 n 个字符(按 rune),用于解析预览。
func head(s string, n int) string {
s = strings.TrimSpace(s)
r := []rune(s)
if len(r) <= n {
return s
}
return string(r[:n]) + "…"
}
// fileKind 由扩展名给出可读的文件类型标签。
func fileKind(filename string) string {
switch strings.ToLower(filepath.Ext(filename)) {
case ".docx":
return "Word 文档"
case ".xlsx", ".xls":
return "Excel 表格"
case ".pdf":
return "PDF"
case ".csv":
return "CSV"
case ".md", ".markdown":
return "Markdown"
default:
return "文本"
}
}
// parseFile 把文件字节转为纯文本:文本类直读,其余经 mcp-py parse_document(算法层)。
func (h *Handler) parseFile(ctx context.Context, filename string, data []byte) (string, error) {
switch strings.ToLower(filepath.Ext(filename)) {
case ".txt", ".md", ".markdown", ".text":
return string(data), nil
}
res, err := h.bus.CallTool(ctx, contract.ToolSubjectPy("parse_document"),
&contract.ToolCall{Tool: "parse_document", Args: map[string]any{
"filename": filename, "content_b64": base64.StdEncoding.EncodeToString(data),
}})
if err != nil {
return "", err
}
if res == nil || !res.OK {
if res != nil && res.Error != "" {
return "", errors.New(res.Error)
}
return "", errors.New("parse_document 无响应(mcp-py 未运行?)")
}
return res.Content, nil
}
// KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。
func (h *Handler) KbSearch(c *gin.Context) {
var body struct {
KB string `json:"kb"`
Q string `json:"q"`
TopK int `json:"topK"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Q == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "q required"})
return
}
args := map[string]any{"kb": scopedKB(c, body.KB), "q": body.Q}
if body.TopK > 0 {
args["topK"] = body.TopK
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_search"),
&contract.ToolCall{Tool: "kb_search", Args: args})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
var hits []map[string]any
_ = json.Unmarshal([]byte(res.Content), &hits)
c.JSON(http.StatusOK, gin.H{"hits": hits})
}
// KbGraph: GET /api/v1/kb/graph?kb= —— 某知识库的图谱三元组(→ mcp-go kb_graphNeo4j)。
func (h *Handler) KbGraph(c *gin.Context) {
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_graph"),
&contract.ToolCall{Tool: "kb_graph", Args: map[string]any{"kb": scopedKB(c, c.Query("kb")), "limit": 100}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
var triples []map[string]any
_ = json.Unmarshal([]byte(res.Content), &triples)
c.JSON(http.StatusOK, gin.H{"triples": triples})
}