diff --git a/deploy/nats/nats-server.conf b/deploy/nats/nats-server.conf index 4992b50..67ef3cf 100644 --- a/deploy/nats/nats-server.conf +++ b/deploy/nats/nats-server.conf @@ -1,6 +1,7 @@ # NATS 零拷贝骨干网 — JetStream 开启 port: 4222 http_port: 8222 # 监控端点 +max_payload: 8MB # 容纳文件(base64)经工具调用传输;大文件应改走对象存储 jetstream { store_dir: "/data/jetstream" diff --git a/sundynix-desktop/frontend/src/lib/api.ts b/sundynix-desktop/frontend/src/lib/api.ts index e01850b..c259472 100644 --- a/sundynix-desktop/frontend/src/lib/api.ts +++ b/sundynix-desktop/frontend/src/lib/api.ts @@ -59,6 +59,17 @@ export async function ingestKb(kb: string, text: string): Promise { return data.message ?? "ok"; } +// ingestFile: POST /api/v1/kb/ingest_file(multipart)—— 上传文件入库(docx/xlsx/pdf… → mcp-py 解析)。 +export async function ingestFile(kb: string, file: File): Promise { + const fd = new FormData(); + fd.append("kb", kb); + fd.append("file", file); + const res = await fetch(`${GATEWAY}/api/v1/kb/ingest_file`, { method: "POST", body: fd }); + const data = (await res.json()) as { message?: string; chars?: number; error?: string }; + if (!res.ok) throw new Error(data.error ?? `ingest file failed: ${res.status}`); + return `${file.name}:解析 ${data.chars ?? 0} 字 → ${data.message ?? "ok"}`; +} + export interface KbHit { text: string; score: number; diff --git a/sundynix-desktop/frontend/src/views/KbView.tsx b/sundynix-desktop/frontend/src/views/KbView.tsx index cf7fd9d..c1be71b 100644 --- a/sundynix-desktop/frontend/src/views/KbView.tsx +++ b/sundynix-desktop/frontend/src/views/KbView.tsx @@ -1,5 +1,5 @@ -import { useState } from "react"; -import { ingestKb, searchKb, type KbHit } from "../lib/api"; +import { useRef, useState } from "react"; +import { ingestKb, ingestFile, searchKb, type KbHit } from "../lib/api"; interface IngestLog { t: string; @@ -14,6 +14,8 @@ export function KbView() { const [logs, setLogs] = useState([]); const [ingesting, setIngesting] = useState(false); + const fileRef = useRef(null); + const [q, setQ] = useState(""); const [topK, setTopK] = useState(5); const [hits, setHits] = useState(null); @@ -36,6 +38,20 @@ export function KbView() { } }; + const onFile = async (file?: File) => { + if (!file) return; + setIngesting(true); + try { + const msg = await ingestFile(kb, file); + setLogs((l) => [{ t: stamp(), msg, ok: true }, ...l]); + } catch (e) { + setLogs((l) => [{ t: stamp(), msg: `${file.name}: ${(e as Error).message}`, ok: false }, ...l]); + } finally { + setIngesting(false); + if (fileRef.current) fileRef.current.value = ""; + } + }; + const onSearch = async () => { if (!q.trim()) return; setSearching(true); @@ -74,13 +90,25 @@ export function KbView() { onChange={(e) => setText(e.target.value)} placeholder={"每行一条知识,例如:\nsundynix 用 Milvus 做向量库\nsundynix 用 NATS 做消息总线"} /> - +
+ + + onFile(e.target.files?.[0])} + disabled={ingesting} + className="text-xs file:mr-2 file:rounded file:border file:bg-gray-50 file:px-2 file:py-1 file:text-xs" + /> +
+ 支持 txt/md/csv/docx/xlsx/pdf(docx/xlsx/pdf 经 mcp-py 解析)

入库监控

    {logs.length === 0 &&
  • 尚无入库记录。
  • } diff --git a/sundynix-gateway/internal/handler/kb.go b/sundynix-gateway/internal/handler/kb.go index 51ff0e6..a5595f6 100644 --- a/sundynix-gateway/internal/handler/kb.go +++ b/sundynix-gateway/internal/handler/kb.go @@ -1,8 +1,14 @@ package handler import ( + "context" + "encoding/base64" "encoding/json" + "errors" + "io" "net/http" + "path/filepath" + "strings" "github.com/gin-gonic/gin" @@ -33,6 +39,66 @@ func (h *Handler) KbIngest(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content}) } +// KbIngestFile: POST /api/v1/kb/ingest_file(multipart)—— 上传文件入库。 +// 按类型路由:文本直读;docx/xlsx/pdf/csv → mcp-py parse_document 解析为文本 → kb_ingest。 +func (h *Handler) KbIngestFile(c *gin.Context) { + kb := c.PostForm("kb") + fh, err := c.FormFile("file") + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "file required"}) + return + } + f, err := fh.Open() + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + defer f.Close() + data, err := io.ReadAll(f) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + text, err := h.parseFile(c.Request.Context(), fh.Filename, data) + if err != nil { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "解析失败: " + err.Error()}) + return + } + res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("kb_ingest"), + &contract.ToolCall{Tool: "kb_ingest", Args: map[string]any{"kb": kb, "text": text}}) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + if !res.OK { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "ok", "file": fh.Filename, "chars": len([]rune(text)), "message": res.Content}) +} + +// parseFile 把文件字节转为纯文本:文本类直读,其余经 mcp-py parse_document(算法层)。 +func (h *Handler) parseFile(ctx context.Context, filename string, data []byte) (string, error) { + switch strings.ToLower(filepath.Ext(filename)) { + case ".txt", ".md", ".markdown", ".text": + return string(data), nil + } + res, err := h.bus.CallTool(ctx, contract.ToolSubjectPy("parse_document"), + &contract.ToolCall{Tool: "parse_document", Args: map[string]any{ + "filename": filename, "content_b64": base64.StdEncoding.EncodeToString(data), + }}) + if err != nil { + return "", err + } + if res == nil || !res.OK { + if res != nil && res.Error != "" { + return "", errors.New(res.Error) + } + return "", errors.New("parse_document 无响应(mcp-py 未运行?)") + } + return res.Content, nil +} + // KbSearch: POST /api/v1/kb/search —— 检索台:查某知识库,返回带分数的命中(→ mcp-go kb_search)。 func (h *Handler) KbSearch(c *gin.Context) { var body struct { diff --git a/sundynix-gateway/internal/router/router.go b/sundynix-gateway/internal/router/router.go index bfb57c2..5ca3302 100644 --- a/sundynix-gateway/internal/router/router.go +++ b/sundynix-gateway/internal/router/router.go @@ -23,7 +23,8 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine { api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert) - api.POST("/kb/ingest", h.KbIngest) // 知识库入库(→ mcp-go kb_ingest,RAG) + api.POST("/kb/ingest", h.KbIngest) // 知识库入库(文本,→ mcp-go kb_ingest) + api.POST("/kb/ingest_file", h.KbIngestFile) // 文件入库(docx/xlsx/pdf… → mcp-py 解析 → 入库) api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search) api.GET("/billing", h.Billing) diff --git a/sundynix-mcp-py/pyproject.toml b/sundynix-mcp-py/pyproject.toml index c0118bf..e1561bc 100644 --- a/sundynix-mcp-py/pyproject.toml +++ b/sundynix-mcp-py/pyproject.toml @@ -7,7 +7,10 @@ dependencies = [ "mcp>=1.2.0", # MCP 协议 "nats-py>=2.7.0", # 接入 NATS 骨干网 "docker>=7.1.0", # Docker 隔离沙箱 / Code Interpreter - # "magic-pdf", # MinerU 多模态解析 (PaddleOCR),按需安装 + "python-docx>=1.1.0", # Word 解析 + "openpyxl>=3.1.0", # Excel 解析 + "pypdf>=4.0.0", # PDF 文本解析 + # "magic-pdf", # MinerU 多模态解析 (PaddleOCR),扫描件 OCR,按需安装 ] [build-system] diff --git a/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py b/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py index 7f52edc..227ad4b 100644 --- a/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py +++ b/sundynix-mcp-py/src/sundynix_mcp_py/mcp_gateway.py @@ -98,9 +98,18 @@ class McpGateway: return f"[run_code] Docker 隔离执行(桩) stdout={result.get('stdout','')!r}" async def _parse_document(self, args: dict) -> str: - path = str(args.get("path", "")) - result = await self.parser.parse(path) # MinerU / PaddleOCR(桩) - return f"[parse_document] MinerU 解析(桩) path={result.get('path','')!r} blocks={len(result.get('blocks', []))}" + """文件 → 纯文本。content_b64=文件内容(base64),filename 决定解析器。""" + import base64 + + from . import parsers + + filename = str(args.get("filename", "")) + content_b64 = str(args.get("content_b64", "")) + if not content_b64: + return str(args.get("text", "")) + data = base64.b64decode(content_b64) + # 解析是 CPU 密集,丢到线程池避免阻塞事件循环。 + return await asyncio.to_thread(parsers.parse, filename, data) async def _secure_sandbox(self, args: dict) -> str: code = str(args.get("code", "")) diff --git a/sundynix-mcp-py/src/sundynix_mcp_py/parsers.py b/sundynix-mcp-py/src/sundynix_mcp_py/parsers.py new file mode 100644 index 0000000..f1ff9ef --- /dev/null +++ b/sundynix-mcp-py/src/sundynix_mcp_py/parsers.py @@ -0,0 +1,65 @@ +"""文档解析:各类文件 → 纯文本(供 RAG 切块/embedding)。 +按扩展名路由:txt/md/csv 直读;docx(python-docx);xlsx(openpyxl);pdf(pypdf)。 +扫描件/版面 OCR(MinerU/PaddleOCR) 为后续。 +""" + +from __future__ import annotations + +import csv +import io + + +def parse(filename: str, data: bytes) -> str: + ext = filename.lower().rsplit(".", 1)[-1] if "." in filename else "" + if ext in ("txt", "md", "markdown", "text", ""): + return data.decode("utf-8", errors="replace") + if ext == "csv": + return _csv(data) + if ext == "docx": + return _docx(data) + if ext == "xlsx": + return _xlsx(data) + if ext == "pdf": + return _pdf(data) + raise ValueError(f"暂不支持的文件类型: .{ext}") + + +def _csv(data: bytes) -> str: + text = data.decode("utf-8", errors="replace") + rows = list(csv.reader(io.StringIO(text))) + return "\n".join(" | ".join(r) for r in rows if any(c.strip() for c in r)) + + +def _docx(data: bytes) -> str: + from docx import Document # python-docx + + doc = Document(io.BytesIO(data)) + lines: list[str] = [p.text for p in doc.paragraphs if p.text.strip()] + for table in doc.tables: + for row in table.rows: + cells = [c.text.strip() for c in row.cells] + if any(cells): + lines.append(" | ".join(cells)) + return "\n".join(lines) + + +def _xlsx(data: bytes) -> str: + from openpyxl import load_workbook + + wb = load_workbook(io.BytesIO(data), read_only=True, data_only=True) + lines: list[str] = [] + for ws in wb.worksheets: + lines.append(f"# 工作表: {ws.title}") + for row in ws.iter_rows(values_only=True): + cells = [str(c) for c in row if c is not None] + if cells: + lines.append(" | ".join(cells)) + return "\n".join(lines) + + +def _pdf(data: bytes) -> str: + from pypdf import PdfReader + + reader = PdfReader(io.BytesIO(data)) + pages = [(page.extract_text() or "").strip() for page in reader.pages] + return "\n\n".join(p for p in pages if p)