From 2d5fd2fca54edb9a8df008b4934e5d7d067dc1f4 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Thu, 11 Jun 2026 10:33:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E6=97=B6=E5=85=A5=E5=BA=93?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=20+=20=E5=90=91=E9=87=8F=E6=8B=86=E5=88=86?= =?UTF-8?q?=E5=8F=AF=E8=A7=86=E5=8C=96=EF=BC=88=E5=BC=82=E6=AD=A5=E5=85=A5?= =?UTF-8?q?=E5=BA=93=20+=20=E8=BF=9B=E5=BA=A6=20SSE=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 入库从同步改为异步流水线 + 进度回流(复用 token 流 NATS streaming)。 UI 实时看到 解析→切块→向量化(分批)→写入 各阶段 + 拆分块预览。 - shared: contract.IngestEvent(stage/done/total/chunks/error) - mcp-go: rag.Ingest 加 onProgress + 分批向量化(10/批)逐批回报;kb_ingest 带 job_id 把进度发到 sundynix.streams. + CompleteStream - gateway: 入库异步返回 job_id,后台 runIngest 发进度;GET /kb/ingest/:id/stream SSE - frontend: streamIngest(EventSource);KbView 实时进度面板(阶段徽标+进度条+拆分列表) - 验证: build✓+e2e PASS; 浏览器 12 行→6 阶段点亮+进度条 12/12+拆分 12 块逐条 Co-Authored-By: Claude Opus 4.8 --- sundynix-desktop/frontend/src/lib/api.ts | 46 ++++-- .../frontend/src/views/KbView.tsx | 133 +++++++++++++++--- sundynix-gateway/internal/handler/kb.go | 130 +++++++++++++---- sundynix-gateway/internal/nats/publisher.go | 13 ++ sundynix-gateway/internal/router/router.go | 5 +- sundynix-mcp-go/internal/mcp/gateway.go | 22 ++- sundynix-mcp-go/internal/rag/rag.go | 62 +++++++- sundynix-shared/contract/task.go | 10 ++ 8 files changed, 358 insertions(+), 63 deletions(-) diff --git a/sundynix-desktop/frontend/src/lib/api.ts b/sundynix-desktop/frontend/src/lib/api.ts index c259472..6aaff2c 100644 --- a/sundynix-desktop/frontend/src/lib/api.ts +++ b/sundynix-desktop/frontend/src/lib/api.ts @@ -47,27 +47,57 @@ export function streamTokens( return () => es.close(); } -// ingestKb: POST /api/v1/kb/ingest,把文本入库(→ mcp-go kb_ingest:切块/embedding/Milvus)。 +// 入库进度事件(与后端 contract.IngestEvent 对应)。 +export interface IngestEvent { + stage: string; + msg?: string; + done?: number; + total?: number; + chunks?: string[]; + error?: string; +} + +// ingestKb: POST /api/v1/kb/ingest —— 文本入库(异步,返回 job_id)。 export async function ingestKb(kb: string, text: string): Promise { const res = await fetch(`${GATEWAY}/api/v1/kb/ingest`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ kb, text }), }); - const data = (await res.json()) as { message?: string; error?: string }; - if (!res.ok) throw new Error(data.error ?? `ingest failed: ${res.status}`); - return data.message ?? "ok"; + const data = (await res.json()) as { job_id?: string; error?: string }; + if (!res.ok || !data.job_id) throw new Error(data.error ?? `ingest failed: ${res.status}`); + return data.job_id; } -// ingestFile: POST /api/v1/kb/ingest_file(multipart)—— 上传文件入库(docx/xlsx/pdf… → mcp-py 解析)。 +// ingestFile: POST /api/v1/kb/ingest_file(multipart)—— 文件入库(异步,返回 job_id)。 export async function ingestFile(kb: string, file: File): Promise { const fd = new FormData(); fd.append("kb", kb); fd.append("file", file); const res = await fetch(`${GATEWAY}/api/v1/kb/ingest_file`, { method: "POST", body: fd }); - const data = (await res.json()) as { message?: string; chars?: number; error?: string }; - if (!res.ok) throw new Error(data.error ?? `ingest file failed: ${res.status}`); - return `${file.name}:解析 ${data.chars ?? 0} 字 → ${data.message ?? "ok"}`; + const data = (await res.json()) as { job_id?: string; error?: string }; + if (!res.ok || !data.job_id) throw new Error(data.error ?? `ingest file failed: ${res.status}`); + return data.job_id; +} + +// streamIngest: SSE 订阅入库进度(/kb/ingest/:id/stream)。返回关闭函数。 +export function streamIngest( + jobId: string, + onEvent: (ev: IngestEvent) => void, + onDone: () => void, + onError?: () => void, +): () => void { + const es = new EventSource(`${GATEWAY}/api/v1/kb/ingest/${jobId}/stream`); + es.addEventListener("progress", (e) => onEvent(JSON.parse((e as MessageEvent).data) as IngestEvent)); + es.addEventListener("done", () => { + es.close(); + onDone(); + }); + es.onerror = () => { + es.close(); + onError?.(); + }; + return () => es.close(); } export interface KbHit { diff --git a/sundynix-desktop/frontend/src/views/KbView.tsx b/sundynix-desktop/frontend/src/views/KbView.tsx index c1be71b..033f7c4 100644 --- a/sundynix-desktop/frontend/src/views/KbView.tsx +++ b/sundynix-desktop/frontend/src/views/KbView.tsx @@ -1,5 +1,5 @@ import { useRef, useState } from "react"; -import { ingestKb, ingestFile, searchKb, type KbHit } from "../lib/api"; +import { ingestKb, ingestFile, streamIngest, searchKb, type IngestEvent, type KbHit } from "../lib/api"; interface IngestLog { t: string; @@ -7,13 +7,21 @@ interface IngestLog { ok: boolean; } -// 知识库管理:入库监控(切块/embedding/Milvus)+ 检索调试台(带分数与来源)。 +interface Progress { + active: boolean; + stage: string; + done?: number; + total?: number; + chunks: string[]; + error?: string; +} + +// 知识库管理:实时入库监控(解析→切块→向量化→写入 + 拆分可视化)+ 检索调试台。 export function KbView() { const [kb, setKb] = useState("docs"); const [text, setText] = useState(""); const [logs, setLogs] = useState([]); - const [ingesting, setIngesting] = useState(false); - + const [prog, setProg] = useState(null); const fileRef = useRef(null); const [q, setQ] = useState(""); @@ -23,31 +31,54 @@ export function KbView() { const [err, setErr] = useState(""); const stamp = () => new Date().toLocaleTimeString(); + const ingesting = prog?.active ?? false; + + // 订阅某入库 job 的进度流。 + const follow = (job: string, label: string) => { + setProg({ active: true, stage: "提交", chunks: [] }); + streamIngest( + job, + (ev: IngestEvent) => + setProg((p) => ({ + active: ev.stage !== "完成" && ev.stage !== "失败", + stage: ev.stage, + done: ev.done ?? p?.done, + total: ev.total ?? p?.total, + chunks: ev.chunks ?? p?.chunks ?? [], + error: ev.error, + })), + () => + setProg((p) => { + const ok = p?.stage !== "失败"; + setLogs((l) => [ + { t: stamp(), msg: ok ? `${label}:${p?.total ?? 0} 块入库完成` : `${label}:${p?.error ?? "失败"}`, ok }, + ...l, + ]); + return p ? { ...p, active: false } : null; + }), + () => setProg((p) => (p ? { ...p, active: false, stage: "连接中断" } : null)), + ); + }; const onIngest = async () => { if (!text.trim()) return; - setIngesting(true); try { - const msg = await ingestKb(kb, text); - setLogs((l) => [{ t: stamp(), msg, ok: true }, ...l]); + const job = await ingestKb(kb, text); setText(""); + follow(job, "文本"); } catch (e) { setLogs((l) => [{ t: stamp(), msg: (e as Error).message, ok: false }, ...l]); - } finally { - setIngesting(false); } }; const onFile = async (file?: File) => { if (!file) return; - setIngesting(true); try { - const msg = await ingestFile(kb, file); - setLogs((l) => [{ t: stamp(), msg, ok: true }, ...l]); + const job = await ingestFile(kb, file); + follow(job, file.name); } catch (e) { setLogs((l) => [{ t: stamp(), msg: `${file.name}: ${(e as Error).message}`, ok: false }, ...l]); } finally { - setIngesting(false); if (fileRef.current) fileRef.current.value = ""; } }; @@ -66,6 +97,8 @@ export function KbView() { } }; + const pct = prog?.total ? Math.round(((prog.done ?? 0) / prog.total) * 100) : 0; + return (
@@ -77,18 +110,18 @@ export function KbView() { placeholder="知识库名" title="知识库(Milvus kb 字段分区)" /> - 入库 → 切块 / embedding / Milvus;检索 → 向量召回 + 入库 → 解析 / 切块 / 向量化 / 写入;检索 → 混合召回
- {/* 左:入库 + 监控日志 */} + {/* 左:入库 + 实时监控 */}
-

入库(按行切块)

+

入库