feat: 文件入库 — docx/xlsx/pdf/csv 经 mcp-py 解析 → RAG

入库从纯文本升级为多文件类型:解析(mcp-py 算法层)与切块/embedding 解耦。
上传文件 → Gateway 按类型路由 → mcp-py parse_document 解析为文本 → kb_ingest。

- mcp-py: parsers.py(docx=python-docx / xlsx=openpyxl / pdf=pypdf / csv / txt→文本);
  parse_document 工具做真(base64 文件→文本,线程池跑 CPU 密集解析);pyproject 加依赖
- gateway: POST /api/v1/kb/ingest_file(multipart);parseFile 文本类直读、office/pdf→mcp-py
- nats-server.conf: max_payload 8MB(容纳 base64 文件经工具调用;大文件应走对象存储)
- frontend: KbView 加文件上传(accept docx/xlsx/pdf/csv...);api.ingestFile
- 验证: 全模块 build✓ + e2e PASS; live——4 类文件上传→mcp-py 解析→入库→检索命中:
  docx(营收报告)/xlsx(销量表行)/pdf(Q2计划)/csv(城市人口) 全部正确
- 边界: 扫描件/版面 OCR(MinerU/PaddleOCR)推迟;大文件 base64 走 NATS 受 max_payload
  限,生产应走对象存储(MinIO)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Blizzard
2026-06-11 10:10:07 +08:00
parent 85a5c2c1e7
commit 3550a22557
8 changed files with 198 additions and 14 deletions
+1
View File
@@ -1,6 +1,7 @@
# NATS 零拷贝骨干网 — JetStream 开启 # NATS 零拷贝骨干网 — JetStream 开启
port: 4222 port: 4222
http_port: 8222 # 监控端点 http_port: 8222 # 监控端点
max_payload: 8MB # 容纳文件(base64)经工具调用传输;大文件应改走对象存储
jetstream { jetstream {
store_dir: "/data/jetstream" store_dir: "/data/jetstream"
+11
View File
@@ -59,6 +59,17 @@ export async function ingestKb(kb: string, text: string): Promise<string> {
return data.message ?? "ok"; return data.message ?? "ok";
} }
// ingestFile: POST /api/v1/kb/ingest_filemultipart)—— 上传文件入库(docx/xlsx/pdf… → mcp-py 解析)。
export async function ingestFile(kb: string, file: File): Promise<string> {
const fd = new FormData();
fd.append("kb", kb);
fd.append("file", file);
const res = await fetch(`${GATEWAY}/api/v1/kb/ingest_file`, { method: "POST", body: fd });
const data = (await res.json()) as { message?: string; chars?: number; error?: string };
if (!res.ok) throw new Error(data.error ?? `ingest file failed: ${res.status}`);
return `${file.name}:解析 ${data.chars ?? 0} 字 → ${data.message ?? "ok"}`;
}
export interface KbHit { export interface KbHit {
text: string; text: string;
score: number; score: number;
+37 -9
View File
@@ -1,5 +1,5 @@
import { useState } from "react"; import { useRef, useState } from "react";
import { ingestKb, searchKb, type KbHit } from "../lib/api"; import { ingestKb, ingestFile, searchKb, type KbHit } from "../lib/api";
interface IngestLog { interface IngestLog {
t: string; t: string;
@@ -14,6 +14,8 @@ export function KbView() {
const [logs, setLogs] = useState<IngestLog[]>([]); const [logs, setLogs] = useState<IngestLog[]>([]);
const [ingesting, setIngesting] = useState(false); const [ingesting, setIngesting] = useState(false);
const fileRef = useRef<HTMLInputElement>(null);
const [q, setQ] = useState(""); const [q, setQ] = useState("");
const [topK, setTopK] = useState(5); const [topK, setTopK] = useState(5);
const [hits, setHits] = useState<KbHit[] | null>(null); const [hits, setHits] = useState<KbHit[] | null>(null);
@@ -36,6 +38,20 @@ export function KbView() {
} }
}; };
const onFile = async (file?: File) => {
if (!file) return;
setIngesting(true);
try {
const msg = await ingestFile(kb, file);
setLogs((l) => [{ t: stamp(), msg, ok: true }, ...l]);
} catch (e) {
setLogs((l) => [{ t: stamp(), msg: `${file.name}: ${(e as Error).message}`, ok: false }, ...l]);
} finally {
setIngesting(false);
if (fileRef.current) fileRef.current.value = "";
}
};
const onSearch = async () => { const onSearch = async () => {
if (!q.trim()) return; if (!q.trim()) return;
setSearching(true); setSearching(true);
@@ -74,13 +90,25 @@ export function KbView() {
onChange={(e) => setText(e.target.value)} onChange={(e) => setText(e.target.value)}
placeholder={"每行一条知识,例如:\nsundynix 用 Milvus 做向量库\nsundynix 用 NATS 做消息总线"} placeholder={"每行一条知识,例如:\nsundynix 用 Milvus 做向量库\nsundynix 用 NATS 做消息总线"}
/> />
<button <div className="mt-2 flex items-center gap-2">
onClick={onIngest} <button
disabled={ingesting || !text.trim()} onClick={onIngest}
className="mt-2 self-start rounded bg-emerald-600 px-3 py-1 text-sm text-white disabled:opacity-40" disabled={ingesting || !text.trim()}
> className="rounded bg-emerald-600 px-3 py-1 text-sm text-white disabled:opacity-40"
{ingesting ? "入库中…" : "⬆ 入库"} >
</button> {ingesting ? "入库中…" : "⬆ 入库文本"}
</button>
<span className="text-[11px] text-gray-400"></span>
<input
ref={fileRef}
type="file"
accept=".txt,.md,.csv,.docx,.xlsx,.pdf"
onChange={(e) => onFile(e.target.files?.[0])}
disabled={ingesting}
className="text-xs file:mr-2 file:rounded file:border file:bg-gray-50 file:px-2 file:py-1 file:text-xs"
/>
</div>
<span className="mt-1 text-[10px] text-gray-400"> txt/md/csv/docx/xlsx/pdfdocx/xlsx/pdf mcp-py </span>
<h3 className="mb-1 mt-4 text-xs font-semibold text-gray-600"></h3> <h3 className="mb-1 mt-4 text-xs font-semibold text-gray-600"></h3>
<ul className="flex-1 space-y-1 overflow-auto"> <ul className="flex-1 space-y-1 overflow-auto">
{logs.length === 0 && <li className="text-xs text-gray-400"></li>} {logs.length === 0 && <li className="text-xs text-gray-400"></li>}
+66
View File
@@ -1,8 +1,14 @@
package handler package handler
import ( import (
"context"
"encoding/base64"
"encoding/json" "encoding/json"
"errors"
"io"
"net/http" "net/http"
"path/filepath"
"strings"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -33,6 +39,66 @@ func (h *Handler) KbIngest(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content}) c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content})
} }
// KbIngestFile: POST /api/v1/kb/ingest_filemultipart)—— 上传文件入库。
// 按类型路由:文本直读;docx/xlsx/pdf/csv → mcp-py parse_document 解析为文本 → kb_ingest。
func (h *Handler) KbIngestFile(c *gin.Context) {
kb := c.PostForm("kb")
fh, err := c.FormFile("file")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "file required"})
return
}
f, err := fh.Open()
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
text, err := h.parseFile(c.Request.Context(), fh.Filename, data)
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "解析失败: " + err.Error()})
return
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_ingest"),
&contract.ToolCall{Tool: "kb_ingest", Args: map[string]any{"kb": kb, "text": text}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "file": fh.Filename, "chars": len([]rune(text)), "message": res.Content})
}
// parseFile 把文件字节转为纯文本:文本类直读,其余经 mcp-py parse_document(算法层)。
func (h *Handler) parseFile(ctx context.Context, filename string, data []byte) (string, error) {
switch strings.ToLower(filepath.Ext(filename)) {
case ".txt", ".md", ".markdown", ".text":
return string(data), nil
}
res, err := h.bus.CallTool(ctx, contract.ToolSubjectPy("parse_document"),
&contract.ToolCall{Tool: "parse_document", Args: map[string]any{
"filename": filename, "content_b64": base64.StdEncoding.EncodeToString(data),
}})
if err != nil {
return "", err
}
if res == nil || !res.OK {
if res != nil && res.Error != "" {
return "", errors.New(res.Error)
}
return "", errors.New("parse_document 无响应(mcp-py 未运行?)")
}
return res.Content, nil
}
// KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。 // KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。
func (h *Handler) KbSearch(c *gin.Context) { func (h *Handler) KbSearch(c *gin.Context) {
var body struct { var body struct {
+2 -1
View File
@@ -23,7 +23,8 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine {
api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS
api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream
api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert
api.POST("/kb/ingest", h.KbIngest) // 知识库入库(→ mcp-go kb_ingestRAG api.POST("/kb/ingest", h.KbIngest) // 知识库入库(文本,→ mcp-go kb_ingest
api.POST("/kb/ingest_file", h.KbIngestFile) // 文件入库(docx/xlsx/pdf… → mcp-py 解析 → 入库)
api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search
api.GET("/billing", h.Billing) api.GET("/billing", h.Billing)
+4 -1
View File
@@ -7,7 +7,10 @@ dependencies = [
"mcp>=1.2.0", # MCP 协议 "mcp>=1.2.0", # MCP 协议
"nats-py>=2.7.0", # 接入 NATS 骨干网 "nats-py>=2.7.0", # 接入 NATS 骨干网
"docker>=7.1.0", # Docker 隔离沙箱 / Code Interpreter "docker>=7.1.0", # Docker 隔离沙箱 / Code Interpreter
# "magic-pdf", # MinerU 多模态解析 (PaddleOCR),按需安装 "python-docx>=1.1.0", # Word 解析
"openpyxl>=3.1.0", # Excel 解析
"pypdf>=4.0.0", # PDF 文本解析
# "magic-pdf", # MinerU 多模态解析 (PaddleOCR),扫描件 OCR,按需安装
] ]
[build-system] [build-system]
@@ -98,9 +98,18 @@ class McpGateway:
return f"[run_code] Docker 隔离执行(桩) stdout={result.get('stdout','')!r}" return f"[run_code] Docker 隔离执行(桩) stdout={result.get('stdout','')!r}"
async def _parse_document(self, args: dict) -> str: async def _parse_document(self, args: dict) -> str:
path = str(args.get("path", "")) """文件 → 纯文本。content_b64=文件内容(base64)filename 决定解析器。"""
result = await self.parser.parse(path) # MinerU / PaddleOCR(桩) import base64
return f"[parse_document] MinerU 解析(桩) path={result.get('path','')!r} blocks={len(result.get('blocks', []))}"
from . import parsers
filename = str(args.get("filename", ""))
content_b64 = str(args.get("content_b64", ""))
if not content_b64:
return str(args.get("text", ""))
data = base64.b64decode(content_b64)
# 解析是 CPU 密集,丢到线程池避免阻塞事件循环。
return await asyncio.to_thread(parsers.parse, filename, data)
async def _secure_sandbox(self, args: dict) -> str: async def _secure_sandbox(self, args: dict) -> str:
code = str(args.get("code", "")) code = str(args.get("code", ""))
@@ -0,0 +1,65 @@
"""文档解析:各类文件 → 纯文本(供 RAG 切块/embedding)。
按扩展名路由:txt/md/csv 直读;docx(python-docx)xlsx(openpyxl)pdf(pypdf)。
扫描件/版面 OCR(MinerU/PaddleOCR) 为后续。
"""
from __future__ import annotations
import csv
import io
def parse(filename: str, data: bytes) -> str:
ext = filename.lower().rsplit(".", 1)[-1] if "." in filename else ""
if ext in ("txt", "md", "markdown", "text", ""):
return data.decode("utf-8", errors="replace")
if ext == "csv":
return _csv(data)
if ext == "docx":
return _docx(data)
if ext == "xlsx":
return _xlsx(data)
if ext == "pdf":
return _pdf(data)
raise ValueError(f"暂不支持的文件类型: .{ext}")
def _csv(data: bytes) -> str:
text = data.decode("utf-8", errors="replace")
rows = list(csv.reader(io.StringIO(text)))
return "\n".join(" | ".join(r) for r in rows if any(c.strip() for c in r))
def _docx(data: bytes) -> str:
from docx import Document # python-docx
doc = Document(io.BytesIO(data))
lines: list[str] = [p.text for p in doc.paragraphs if p.text.strip()]
for table in doc.tables:
for row in table.rows:
cells = [c.text.strip() for c in row.cells]
if any(cells):
lines.append(" | ".join(cells))
return "\n".join(lines)
def _xlsx(data: bytes) -> str:
from openpyxl import load_workbook
wb = load_workbook(io.BytesIO(data), read_only=True, data_only=True)
lines: list[str] = []
for ws in wb.worksheets:
lines.append(f"# 工作表: {ws.title}")
for row in ws.iter_rows(values_only=True):
cells = [str(c) for c in row if c is not None]
if cells:
lines.append(" | ".join(cells))
return "\n".join(lines)
def _pdf(data: bytes) -> str:
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(data))
pages = [(page.extract_text() or "").strip() for page in reader.pages]
return "\n\n".join(p for p in pages if p)