From cdc5b3a8476be121ba47dc22aa7c92ffe031a380 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Fri, 12 Jun 2026 14:29:28 +0800 Subject: [PATCH] =?UTF-8?q?feat(observability):=20=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=8F=AF=E8=A7=86=E5=8C=96=20=E2=80=94=20=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E7=BA=A7=E5=AE=9E=E6=97=B6=E8=BD=A8=E8=BF=B9=EF=BC=88=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=C2=B7=E8=A7=82=E6=B5=8B=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 把任务执行做成可观测:Dispatcher 在每个节点/阶段发结构化 ExecEvent, 经独立 NATS 通道回流,前端逐节点点亮(状态/耗时/工具入参产出)。 - shared: contract.ExecEvent + ExecSubject(sundynix.exec.,与 Token 流分流); bus.PublishExec/CompleteExec/SubscribeExec(core NATS,复用结束头) - dispatcher: execTracer(自增 Seq 保序 + span 自动计耗时); Orchestrator 加 ExecSink;通用图(init 召回 / 各 tool 入参→产出 / prompt / model 首token+token数)与报告编排(规划大纲 / 各章并行 start-end / 渲染)全程埋点 - gateway: SubscribeExec + GET /tasks/:id/exec SSE(与 token 流并行) - desktop: streamExec + deriveNodes(按 node 归并 start/end/error/info); 复用组件 ExecTrace(竖向轨道,按 kind 着色,运行中脉冲灯); 新 RunsView(运行·观测:轨迹+输出双栏);BottomDrawer 轨迹/工具调用 tab 接真实数据; ReportView 加执行轨迹栏;左导航「运行」置就绪 实测: - 报告任务 /exec:规划(2680ms,4章) → 4 章并行(seq 交错,各~7-8s 重叠=真并行, 每章带 docs 知识库检索预览+成稿字数) → 渲染(docx 落盘) - 通用图 /exec:tool:kb_search(678ms,入参→Milvus 产出) → prompt(2消息) → model(首token 860ms / 4 tokens) - 浏览器(Preview):报告页执行轨迹逐节点点亮、章节带耗时/字数/检索片段,完成后下载 Word Co-Authored-By: Claude Opus 4.8 --- sundynix-desktop/frontend/src/App.tsx | 17 +++- .../frontend/src/components/ExecTrace.tsx | 92 +++++++++++++++++++ sundynix-desktop/frontend/src/lib/api.ts | 32 +++++++ sundynix-desktop/frontend/src/lib/run.ts | 55 ++++++++++- .../frontend/src/shell/BottomDrawer.tsx | 48 +++++++--- .../frontend/src/shell/LeftNav.tsx | 2 +- .../frontend/src/views/ReportView.tsx | 52 +++++++---- .../frontend/src/views/RunsView.tsx | 57 ++++++++++++ sundynix-dispatcher/cmd/dispatcher/main.go | 4 +- sundynix-dispatcher/internal/eino/compile.go | 41 +++++++-- sundynix-dispatcher/internal/eino/exec.go | 68 ++++++++++++++ .../internal/eino/orchestrator.go | 24 ++++- sundynix-dispatcher/internal/eino/report.go | 30 +++++- .../internal/nats/subscriber.go | 10 ++ .../internal/handler/task_handler.go | 39 ++++++++ sundynix-gateway/internal/nats/publisher.go | 5 + sundynix-gateway/internal/router/router.go | 11 ++- sundynix-shared/bus/bus.go | 31 +++++++ sundynix-shared/contract/task.go | 32 +++++-- 19 files changed, 587 insertions(+), 63 deletions(-) create mode 100644 sundynix-desktop/frontend/src/components/ExecTrace.tsx create mode 100644 sundynix-desktop/frontend/src/views/RunsView.tsx create mode 100644 sundynix-dispatcher/internal/eino/exec.go diff --git a/sundynix-desktop/frontend/src/App.tsx b/sundynix-desktop/frontend/src/App.tsx index 6e038ac..3dc1514 100644 --- a/sundynix-desktop/frontend/src/App.tsx +++ b/sundynix-desktop/frontend/src/App.tsx @@ -7,9 +7,10 @@ import { StudioView } from "./studio/StudioView"; import { MemoryView } from "./views/MemoryView"; import { KbView } from "./views/KbView"; import { ReportView } from "./views/ReportView"; +import { RunsView } from "./views/RunsView"; import { Home } from "./views/Home"; import { Placeholder } from "./views/Placeholder"; -import { submitTask, streamTokens, type Identity } from "./lib/api"; +import { submitTask, streamTokens, streamExec, type Identity } from "./lib/api"; import type { TaskDsl } from "./lib/dsl"; import { emptyRun, type RunState } from "./lib/run"; @@ -28,11 +29,14 @@ export default function App() { const [run, setRun] = useState(emptyRun); const closeRef = useRef<(() => void) | null>(null); + const execCloseRef = useRef<(() => void) | null>(null); + const onRun = useCallback( async (dsl: TaskDsl) => { closeRef.current?.(); + execCloseRef.current?.(); const t0 = Date.now(); - setRun({ phase: "submitting", output: "", events: [{ t: 0, label: "提交任务" }] }); + setRun({ phase: "submitting", output: "", events: [{ t: 0, label: "提交任务" }], exec: [] }); try { const taskId = await submitTask(dsl, identity); let first = true; @@ -42,6 +46,13 @@ export default function App() { taskId, events: [...r.events, { t: Date.now() - t0, label: `已发布 ${taskId}` }], })); + // 执行轨迹(运行·观测):与 token 流并行订阅,逐节点点亮。 + execCloseRef.current = streamExec( + taskId, + (ev) => setRun((r) => ({ ...r, exec: [...r.exec, ev] })), + () => {}, + () => {}, + ); closeRef.current = streamTokens( taskId, (tok) => @@ -84,6 +95,8 @@ export default function App() { ) : view === "report" ? ( + ) : view === "runs" ? ( + ) : view === "memory" ? ( ) : ( diff --git a/sundynix-desktop/frontend/src/components/ExecTrace.tsx b/sundynix-desktop/frontend/src/components/ExecTrace.tsx new file mode 100644 index 0000000..0cab1ea --- /dev/null +++ b/sundynix-desktop/frontend/src/components/ExecTrace.tsx @@ -0,0 +1,92 @@ +import { deriveNodes, type NodeTrace, type RunPhase } from "../lib/run"; +import type { ExecEvent } from "../lib/api"; + +// 各节点类别的图标与配色(与后端 ExecEvent.kind 对应)。 +const KIND: Record = { + system: { icon: "▸", cls: "text-slate-400", name: "系统" }, + memory: { icon: "◇", cls: "text-violet-300", name: "记忆召回" }, + tool: { icon: "⚙", cls: "text-amber-300", name: "工具调用" }, + prompt: { icon: "▤", cls: "text-sky-300", name: "提示词" }, + model: { icon: "✦", cls: "text-emerald-300", name: "模型推理" }, + plan: { icon: "◷", cls: "text-cyan-300", name: "规划" }, + section: { icon: "¶", cls: "text-indigo-300", name: "章节" }, + render: { icon: "▦", cls: "text-rose-300", name: "渲染" }, +}; + +function meta(kind: string) { + return KIND[kind] ?? { icon: "•", cls: "text-slate-400", name: kind }; +} + +function StatusDot({ status }: { status: NodeTrace["status"] }) { + if (status === "running") + return ; + if (status === "done") return ; + if (status === "error") return ; + return ; +} + +// ExecTrace 把执行事件流渲染为竖向轨道:每个节点一颗灯,实时点亮 + 耗时 + 入参/产出。 +export function ExecTrace({ + events, + phase, + compact, +}: { + events: ExecEvent[]; + phase?: RunPhase; + compact?: boolean; +}) { + const nodes = deriveNodes(events); + if (nodes.length === 0) { + return ( +
+ 运行后,这里会逐节点点亮执行轨迹:记忆召回 → 工具调用(入参/产出)→ 提示词组装 → 模型推理。 +
+ 报告任务则显示:规划大纲 → 各章并行检索撰写 → 渲染 Word。 +
+ ); + } + const total = nodes.reduce((s, n) => s + (n.ms ?? 0), 0); + return ( +
+ {!compact && ( +
+ {nodes.length} 个节点 + · + 累计 {total} ms + {phase === "streaming" && ● 执行中} + {phase === "done" && ✓ 完成} + {phase === "error" && ✗ 出错} +
+ )} +
    + {nodes.map((n) => { + const m = meta(n.kind); + return ( +
  1. + + + +
    +
    + {m.icon} + {n.label} + {m.name} + {n.ms != null && n.ms > 0 && ( + {n.ms} ms + )} + {n.status === "running" && 运行中…} +
    + {n.detail &&

    {n.detail}

    } + {n.notes.map((note, i) => ( +

    + {note} +

    + ))} +
    +
  2. + ); + })} +
+
+ ); +} diff --git a/sundynix-desktop/frontend/src/lib/api.ts b/sundynix-desktop/frontend/src/lib/api.ts index b424d0b..cdd4a53 100644 --- a/sundynix-desktop/frontend/src/lib/api.ts +++ b/sundynix-desktop/frontend/src/lib/api.ts @@ -47,6 +47,38 @@ export function streamTokens( return () => es.close(); } +// 执行轨迹事件(与后端 contract.ExecEvent 对应):运行·观测的节点级实时事件。 +export interface ExecEvent { + seq: number; + ts: number; + node: string; // init / tool:wiki_search / prompt / model / plan / section:0 / render / task / compile + kind: string; // memory|tool|prompt|model|plan|section|render|system + phase: string; // start|end|error|info + label: string; + detail?: string; + ms?: number; +} + +// streamExec: 订阅 SSE /api/v1/tasks/:id/exec —— 与 token 流并行,逐节点点亮执行轨迹。 +export function streamExec( + taskId: string, + onEvent: (ev: ExecEvent) => void, + onDone: () => void, + onError?: (e: unknown) => void, +): () => void { + const es = new EventSource(`${GATEWAY}/api/v1/tasks/${taskId}/exec`); + es.addEventListener("exec", (e) => onEvent(JSON.parse((e as MessageEvent).data) as ExecEvent)); + es.addEventListener("done", () => { + es.close(); + onDone(); + }); + es.onerror = (e) => { + es.close(); + onError?.(e); + }; + return () => es.close(); +} + // 入库进度事件(与后端 contract.IngestEvent 对应)。 export interface IngestEvent { stage: string; diff --git a/sundynix-desktop/frontend/src/lib/run.ts b/sundynix-desktop/frontend/src/lib/run.ts index 6191180..f22d5ca 100644 --- a/sundynix-desktop/frontend/src/lib/run.ts +++ b/sundynix-desktop/frontend/src/lib/run.ts @@ -1,4 +1,6 @@ // 运行状态 —— 跨 Studio 与底部抽屉共享。 +import type { ExecEvent } from "./api"; + export type RunPhase = "idle" | "submitting" | "streaming" | "done" | "error"; export interface RunEvent { @@ -11,7 +13,58 @@ export interface RunState { taskId?: string; output: string; events: RunEvent[]; + exec: ExecEvent[]; // 后端回流的节点级执行轨迹(运行·观测) error?: string; } -export const emptyRun: RunState = { phase: "idle", output: "", events: [] }; +export const emptyRun: RunState = { phase: "idle", output: "", events: [], exec: [] }; + +// ---- 执行轨迹派生:把扁平 ExecEvent 流归并为按节点聚合的轨迹 ---- + +export type NodeStatus = "running" | "done" | "error" | "info"; + +export interface NodeTrace { + node: string; + kind: string; // memory|tool|prompt|model|plan|section|render|system + label: string; + status: NodeStatus; + ms?: number; + detail?: string; + notes: string[]; // info 子事件(如检索到的参考资料预览) + order: number; +} + +// deriveNodes 把事件流按 node 归并:start→running,end→done(带耗时),error→error,info→点事件/附注。 +export function deriveNodes(events: ExecEvent[]): NodeTrace[] { + const map = new Map(); + let order = 0; + for (const e of events) { + let n = map.get(e.node); + if (!n) { + n = { node: e.node, kind: e.kind, label: e.label, status: "info", notes: [], order: order++ }; + map.set(e.node, n); + } + if (e.label) n.label = e.label; + n.kind = e.kind; + switch (e.phase) { + case "start": + if (n.status !== "done" && n.status !== "error") n.status = "running"; + break; + case "end": + n.status = "done"; + n.ms = e.ms; + if (e.detail) n.detail = e.detail; + break; + case "error": + n.status = "error"; + n.ms = e.ms; + if (e.detail) n.detail = e.detail; + break; + case "info": + if (e.detail) n.notes.push(e.detail); + else if (e.label) n.notes.push(e.label); + break; + } + } + return [...map.values()].sort((a, b) => a.order - b.order); +} diff --git a/sundynix-desktop/frontend/src/shell/BottomDrawer.tsx b/sundynix-desktop/frontend/src/shell/BottomDrawer.tsx index 94f5b94..bbf5441 100644 --- a/sundynix-desktop/frontend/src/shell/BottomDrawer.tsx +++ b/sundynix-desktop/frontend/src/shell/BottomDrawer.tsx @@ -1,5 +1,6 @@ import { useState } from "react"; -import type { RunState } from "../lib/run"; +import { deriveNodes, type RunState } from "../lib/run"; +import { ExecTrace } from "../components/ExecTrace"; type Tab = "output" | "trace" | "tools" | "cite" | "eval"; const TABS: Array<{ key: Tab; label: string }> = [ @@ -43,24 +44,14 @@ export function BottomDrawer({ run }: { run: RunState }) { {open && ( -
+
{tab === "output" && (
               {run.output || "在编排页搭图 → 运行,模型注入画像与历史后流式作答,token 在此呈现。"}
             
)} - {tab === "trace" && ( -
    - {run.events.length === 0 &&
  • 尚无运行。
  • } - {run.events.map((e, i) => ( -
  • - +{e.t}ms · {e.label} -
  • - ))} - {run.events.length > 0 &&
  • (节点级轨迹待后端回流节点事件后逐节点点亮)
  • } -
- )} - {tab === "tools" &&

工具调用日志:每次 sundynix.tools.* 的请求/响应(需后端回流工具事件)。

} + {tab === "trace" && } + {tab === "tools" && } {tab === "cite" &&

引用列表:RAG 答案的来源块(源文档 + 分数 + 来源徽标)。

} {tab === "eval" &&

评测:忠实度 / 完整度质量门结果(需 harness eval)。

}
@@ -68,3 +59,32 @@ export function BottomDrawer({ run }: { run: RunState }) {
); } + +// ToolCalls:从执行事件里筛出工具调用节点,逐条展示入参 → 产出 + 耗时/状态。 +function ToolCalls({ run }: { run: RunState }) { + const tools = deriveNodes(run.exec).filter((n) => n.kind === "tool"); + if (tools.length === 0) { + return

本次运行暂无工具调用。图里挂了检索/工具节点,或报告挂了知识库时,每次 sundynix.tools.* 调用会在此列出入参与产出。

; + } + return ( +
    + {tools.map((t) => ( +
  • +
    + + {t.node.replace(/^tool:/, "")} + + {t.status === "error" ? "失败" : t.status === "running" ? "调用中" : "成功"} + + {t.ms != null && t.ms > 0 && {t.ms} ms} +
    + {t.detail &&

    {t.detail}

    } +
  • + ))} +
+ ); +} diff --git a/sundynix-desktop/frontend/src/shell/LeftNav.tsx b/sundynix-desktop/frontend/src/shell/LeftNav.tsx index 5813c46..abb7b47 100644 --- a/sundynix-desktop/frontend/src/shell/LeftNav.tsx +++ b/sundynix-desktop/frontend/src/shell/LeftNav.tsx @@ -21,7 +21,7 @@ const ITEMS: Item[] = [ { key: "studio", label: "编排", icon: "◆", group: "BUILD", ready: true }, { key: "kb", label: "知识库", icon: "▣", group: "BUILD", ready: true }, { key: "report", label: "报告", icon: "▦", group: "BUILD", ready: true }, - { key: "runs", label: "运行", icon: "▸", group: "RUN" }, + { key: "runs", label: "运行", icon: "▸", group: "RUN", ready: true }, { key: "memory", label: "记忆", icon: "◇", group: "MANAGE", ready: true }, { key: "market", label: "市场", icon: "⌧", group: "MANAGE" }, { key: "admin", label: "管理", icon: "⚙", group: "MANAGE" }, diff --git a/sundynix-desktop/frontend/src/views/ReportView.tsx b/sundynix-desktop/frontend/src/views/ReportView.tsx index f99cafe..a1cdd85 100644 --- a/sundynix-desktop/frontend/src/views/ReportView.tsx +++ b/sundynix-desktop/frontend/src/views/ReportView.tsx @@ -1,5 +1,6 @@ import { useRef, useState } from "react"; -import { generateReport, streamTokens, reportDownloadUrl, type Identity } from "../lib/api"; +import { generateReport, streamTokens, streamExec, reportDownloadUrl, type Identity, type ExecEvent } from "../lib/api"; +import { ExecTrace } from "../components/ExecTrace"; type Phase = "idle" | "running" | "done" | "error"; @@ -10,22 +11,32 @@ export function ReportView({ identity }: { identity: Identity }) { const [kb, setKb] = useState(""); const [phase, setPhase] = useState("idle"); const [out, setOut] = useState(""); + const [exec, setExec] = useState([]); const [taskId, setTaskId] = useState(""); const [err, setErr] = useState(""); const closeRef = useRef<(() => void) | null>(null); + const execCloseRef = useRef<(() => void) | null>(null); const running = phase === "running"; const onGenerate = async () => { if (!topic.trim() || running) return; closeRef.current?.(); + execCloseRef.current?.(); setPhase("running"); setOut(""); + setExec([]); setErr(""); setTaskId(""); try { const id = await generateReport(identity, topic.trim(), kb.trim() || undefined); setTaskId(id); + execCloseRef.current = streamExec( + id, + (ev) => setExec((xs) => [...xs, ev]), + () => {}, + () => {}, + ); closeRef.current = streamTokens( id, (tok) => setOut((o) => o + tok), @@ -95,21 +106,30 @@ export function ReportView({ identity }: { identity: Identity }) { - {/* 实时进度 / 正文 */} -
-
- - 实时编排 · {taskId || "未开始"} -
-
- {out ? ( -
{out}
- ) : ( -
- 输入主题并点击「生成报告」,这里将实时显示规划与撰写过程。 -
- )} -
+ {/* 执行轨迹 + 报告正文 */} +
+
+
执行轨迹
+
+ +
+
+ +
+
+ + 报告正文 · {taskId || "未开始"} +
+
+ {out ? ( +
{out}
+ ) : ( +
+ 输入主题并点击「生成报告」,这里将实时显示规划与撰写过程。 +
+ )} +
+
); diff --git a/sundynix-desktop/frontend/src/views/RunsView.tsx b/sundynix-desktop/frontend/src/views/RunsView.tsx new file mode 100644 index 0000000..d02093f --- /dev/null +++ b/sundynix-desktop/frontend/src/views/RunsView.tsx @@ -0,0 +1,57 @@ +import { ExecTrace } from "../components/ExecTrace"; +import { deriveNodes, type RunState } from "../lib/run"; + +// 运行·观测:把最近一次运行的执行轨迹实时可视化(节点逐个点亮 + 工具入参/产出 + 耗时), +// 右侧并列模型输出。数据来自 Studio 运行时订阅的 sundynix.exec. 事件流。 +export function RunsView({ run }: { run: RunState }) { + const nodes = deriveNodes(run.exec); + const tools = nodes.filter((n) => n.kind === "tool"); + const phaseText = + run.phase === "streaming" ? "执行中" : run.phase === "done" ? "完成" : run.phase === "error" ? "出错" : run.phase === "submitting" ? "提交中" : "就绪"; + const phaseCls = + run.phase === "streaming" ? "text-cyan-400" : run.phase === "done" ? "text-emerald-400" : run.phase === "error" ? "text-rose-400" : "text-slate-500"; + + return ( +
+
+
+

运行 · 观测

+

+ 实时执行轨迹:每个节点(记忆/工具/提示词/模型,或报告的规划/分章/渲染)逐个点亮,附入参产出与耗时。 +

+
+
+ + 任务 {run.taskId ?? "—"} + + ● {phaseText} + {nodes.length} 节点 · {tools.length} 次工具调用 +
+
+ +
+ {/* 执行轨迹 */} +
+
执行轨迹
+
+ +
+
+ + {/* 模型输出 */} +
+
模型输出
+
+ {run.output ? ( +
{run.output}
+ ) : ( +
+ 在「编排」页搭图并运行,或在「报告」页生成报告,
执行轨迹与输出会实时出现在这里。 +
+ )} +
+
+
+
+ ); +} diff --git a/sundynix-dispatcher/cmd/dispatcher/main.go b/sundynix-dispatcher/cmd/dispatcher/main.go index b829ce6..da87c8c 100644 --- a/sundynix-dispatcher/cmd/dispatcher/main.go +++ b/sundynix-dispatcher/cmd/dispatcher/main.go @@ -36,8 +36,8 @@ func main() { log.Printf("[dispatcher] subscribe model config: %v", err) } - // sub 同时作为 Token 回流出口(TokenSink)与 MCP 工具调用出口(ToolCaller)。 - orch, err := eino.NewOrchestrator(pool, breaker, sub, sub) + // sub 同时作为 Token 回流出口(TokenSink)、MCP 工具调用出口(ToolCaller)与执行事件出口(ExecSink)。 + orch, err := eino.NewOrchestrator(pool, breaker, sub, sub, sub) if err != nil { log.Fatalf("[dispatcher] build eino graph: %v", err) } diff --git a/sundynix-dispatcher/internal/eino/compile.go b/sundynix-dispatcher/internal/eino/compile.go index 5caed68..289f314 100644 --- a/sundynix-dispatcher/internal/eino/compile.go +++ b/sundynix-dispatcher/internal/eino/compile.go @@ -2,6 +2,7 @@ package eino import ( "context" + "encoding/json" "fmt" "strings" @@ -30,7 +31,7 @@ type RunCtx struct { // // 工具/检索节点按拓扑序真实调用 MCP(sundynix.tools.go.*),结果注入模型上下文。 // 分支/并行节点暂未编译(TODO:compose.Branch / fan-out)。 -func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compose.Runnable[*contract.Task, *schema.Message], error) { +func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task, tr *execTracer) (compose.Runnable[*contract.Task, *schema.Message], error) { plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底 flow, _ := dsl.Parse(t.Graph) @@ -41,13 +42,17 @@ func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compo func(ctx context.Context, task *contract.Task) (*RunCtx, error) { uid, _ := task.Meta[contract.MetaUserID].(string) sid, _ := task.Meta[contract.MetaSessionID].(string) + end := tr.span("init", "memory", "召回画像与历史") + profile := o.fetchMemory(ctx, uid, plan.Query) + history := o.fetchHistory(ctx, sid) + end(fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(profile)), len(history)), nil) return &RunCtx{ UserID: uid, SessionID: sid, System: plan.System, Query: plan.Query, - Profile: o.fetchMemory(ctx, uid, plan.Query), - History: o.fetchHistory(ctx, sid), + Profile: profile, + History: history, }, nil })); err != nil { return nil, err @@ -64,7 +69,7 @@ func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compo } key := fmt.Sprintf("tool_%d", idx) idx++ - if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args))); err != nil { + if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args, tr))); err != nil { return nil, err } if err := g.AddEdge(prev, key); err != nil { @@ -75,7 +80,12 @@ func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compo } // prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。 - if err := g.AddLambdaNode("prompt", compose.InvokableLambda(buildMessages)); err != nil { + if err := g.AddLambdaNode("prompt", compose.InvokableLambda( + func(ctx context.Context, rc *RunCtx) ([]*schema.Message, error) { + msgs, err := buildMessages(ctx, rc) + tr.info("prompt", "prompt", "组装提示词", fmt.Sprintf("%d 条消息 · 工具产出 %d 段", len(msgs), len(rc.ToolOut))) + return msgs, err + })); err != nil { return nil, err } if err := g.AddEdge(prev, "prompt"); err != nil { @@ -101,9 +111,11 @@ func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compo } // makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。 -func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any) func(context.Context, *RunCtx) (*RunCtx, error) { +func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any, tr *execTracer) func(context.Context, *RunCtx) (*RunCtx, error) { + node := "tool:" + tool return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) { if o.tools == nil { + tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过") return rc, nil } // 未显式带查询词则注入当前用户输入,便于检索类工具。 @@ -114,19 +126,34 @@ func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any) fu if call["q"] == nil && call["query"] == nil { call["q"] = rc.Query } + end := tr.span(node, "tool", "调用工具 "+tool) cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) defer cancel() res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{ Tool: tool, TaskID: taskID, Args: call, }) - if err != nil || res == nil || !res.OK || res.Content == "" { + if err != nil { + end("调用失败,降级跳过", err) + return rc, nil + } + if res == nil || !res.OK || res.Content == "" { + end("无结果,降级跳过", nil) return rc, nil // 工具不可用/无结果 → 降级跳过 } + end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil) rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content) return rc, nil } } +// previewArgs 把工具入参压成一行短预览。 +func previewArgs(args map[string]any) string { + if data, err := json.Marshal(args); err == nil { + return truncate(string(data), 120) + } + return "" +} + // buildMessages 把黑板组装为发给模型的消息序列。 func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) { var sys strings.Builder diff --git a/sundynix-dispatcher/internal/eino/exec.go b/sundynix-dispatcher/internal/eino/exec.go new file mode 100644 index 0000000..a836741 --- /dev/null +++ b/sundynix-dispatcher/internal/eino/exec.go @@ -0,0 +1,68 @@ +package eino + +import ( + "encoding/json" + "sync/atomic" + "time" + + "github.com/sundynix/sundynix-shared/contract" +) + +// ExecSink 是执行可视化事件的回流出口(由 NATS bus 实现): +// 把节点/阶段生命周期事件发到 sundynix.exec.,供"运行·观测"实时点亮轨迹。 +type ExecSink interface { + PublishExec(taskID string, data []byte) error + CompleteExec(taskID string) error +} + +// execTracer 为一个任务发结构化执行事件,自增 Seq 保序、span 自动计耗时。 +type execTracer struct { + sink ExecSink + task string + seq int32 +} + +// tracer 为某任务建一个事件发射器(sink 为空时所有方法变空操作)。 +func (o *Orchestrator) tracer(taskID string) *execTracer { + return &execTracer{sink: o.exec, task: taskID} +} + +func (e *execTracer) emit(node, kind, phase, label, detail string, ms int64) { + if e == nil || e.sink == nil { + return + } + ev := contract.ExecEvent{ + Seq: int(atomic.AddInt32(&e.seq, 1)), TS: time.Now().UnixMilli(), + Node: node, Kind: kind, Phase: phase, Label: label, Detail: detail, MS: ms, + } + if data, err := json.Marshal(&ev); err == nil { + _ = e.sink.PublishExec(e.task, data) + } +} + +// info 发一条瞬时事件(无耗时)。 +func (e *execTracer) info(node, kind, label, detail string) { + e.emit(node, kind, "info", label, detail, 0) +} + +// span 发 start,并返回一个结束函数:调用时按 err 发 end / error,附带耗时。 +func (e *execTracer) span(node, kind, label string) func(detail string, err error) { + e.emit(node, kind, "start", label, "", 0) + t0 := time.Now() + return func(detail string, err error) { + ms := time.Since(t0).Milliseconds() + if err != nil { + e.emit(node, kind, "error", label, err.Error(), ms) + return + } + e.emit(node, kind, "end", label, detail, ms) + } +} + +// done 关闭该任务的执行事件流(让 SSE 客户端收尾)。 +func (e *execTracer) done() { + if e == nil || e.sink == nil { + return + } + _ = e.sink.CompleteExec(e.task) +} diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index ca5d24e..8e6e5a6 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "log" "strings" @@ -38,11 +39,13 @@ type Orchestrator struct { breaker *harness.CircuitBreaker sink TokenSink tools ToolCaller + exec ExecSink } // NewOrchestrator 持有依赖;图按任务的 DSL 在 Handle 内动态编译。 -func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) (*Orchestrator, error) { - return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools}, nil +// exec 为执行可视化事件出口(可为 nil,则不发轨迹事件)。 +func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller, exec ExecSink) (*Orchestrator, error) { + return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools, exec: exec}, nil } // Handle 消费一个任务:按 DSL 编译 Eino 图并执行,把 Token 流回流到 sundynix.streams.。 @@ -51,22 +54,30 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { log.Printf("[eino] circuit open, drop task %s", t.ID) return nil } + tr := o.tracer(t.ID) + defer tr.done() + // 报告生成走专用多步编排(规划→分章并行检索撰写→汇聚→渲染 Word),而非通用对话图。 if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport { - return o.handleReport(ctx, t) + return o.handleReport(ctx, t, tr) } log.Printf("[eino] task %s received (graph=%d bytes), compiling DSL → Eino graph...", t.ID, len(t.Graph)) + tr.info("task", "system", "任务受理", fmt.Sprintf("DSL %d 字节,编译 Eino 图", len(t.Graph))) - run, err := o.compileFlow(ctx, t) + endCompile := tr.span("compile", "system", "编译 Eino 图") + run, err := o.compileFlow(ctx, t, tr) if err != nil { + endCompile("", err) log.Printf("[eino] task %s compile error: %v", t.ID, err) _ = o.sink.CompleteStream(t.ID) o.breaker.Report(false) return err } + endCompile("图编译完成", nil) stream, err := run.Stream(ctx, t) if err != nil { + tr.emit("model", "model", "error", "模型推理", err.Error(), 0) log.Printf("[eino] task %s graph error: %v", t.ID, err) _ = o.sink.CompleteStream(t.ID) o.breaker.Report(false) @@ -76,6 +87,7 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { n := 0 var answer strings.Builder + t0 := time.Now() for { chunk, rerr := stream.Recv() if errors.Is(rerr, io.EOF) { @@ -88,6 +100,9 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { if chunk == nil || chunk.Content == "" { continue } + if n == 0 { + tr.emit("model", "model", "start", "模型流式推理", fmt.Sprintf("首 token %dms", time.Since(t0).Milliseconds()), 0) + } if perr := o.sink.PublishToken(t.ID, []byte(chunk.Content)); perr != nil { log.Printf("[eino] publish token failed: %v", perr) break @@ -95,6 +110,7 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { answer.WriteString(chunk.Content) n++ } + tr.emit("model", "model", "end", "模型流式推理", fmt.Sprintf("%d tokens / %d 字", n, len([]rune(answer.String()))), time.Since(t0).Milliseconds()) if cerr := o.sink.CompleteStream(t.ID); cerr != nil { log.Printf("[eino] complete stream failed: %v", cerr) diff --git a/sundynix-dispatcher/internal/eino/report.go b/sundynix-dispatcher/internal/eino/report.go index 8bc3c6b..a0d1c97 100644 --- a/sundynix-dispatcher/internal/eino/report.go +++ b/sundynix-dispatcher/internal/eino/report.go @@ -38,7 +38,7 @@ type reportSection struct { // // 全程把人可读的 Markdown 进度与正文经 sundynix.streams. 流回客户端; // 最终调 mcp-go 的 report_render 落盘 docx,客户端凭 task_id 下载。 -func (o *Orchestrator) handleReport(ctx context.Context, t *contract.Task) error { +func (o *Orchestrator) handleReport(ctx context.Context, t *contract.Task, tr *execTracer) error { defer func() { _ = o.sink.CompleteStream(t.ID) }() topic, _ := t.Meta[contract.MetaTopic].(string) @@ -50,9 +50,12 @@ func (o *Orchestrator) handleReport(ctx context.Context, t *contract.Task) error topic = "未命名报告" } log.Printf("[report] task %s 生成报告: topic=%q kb=%q", t.ID, topic, kb) + tr.info("task", "system", "报告任务受理", fmt.Sprintf("主题:%s%s", topic, kbSuffix(kb))) o.emit(t.ID, "> 正在规划大纲…\n\n") + endPlan := tr.span("plan", "plan", "规划大纲") outline := o.planOutline(ctx, topic) + endPlan(fmt.Sprintf("%d 章:%s", len(outline.Sections), strings.Join(outline.Sections, " / ")), nil) o.emit(t.ID, fmt.Sprintf("**报告大纲**(%d 章)\n", len(outline.Sections))) for i, s := range outline.Sections { @@ -64,7 +67,7 @@ func (o *Orchestrator) handleReport(ctx context.Context, t *contract.Task) error o.emit(t.ID, "\n> 正在并行撰写各章…\n\n") } - sections := o.writeSections(ctx, topic, kb, outline.Sections) + sections := o.writeSections(ctx, topic, kb, outline.Sections, tr) // 把完整报告正文流式呈现给客户端。 o.emit(t.ID, "\n---\n\n# "+firstNonEmpty(outline.Title, topic)+"\n\n") @@ -74,16 +77,26 @@ func (o *Orchestrator) handleReport(ctx context.Context, t *contract.Task) error // 渲染真实 Word 文档。 o.emit(t.ID, "> 正在渲染 Word 文档…\n\n") + endRender := tr.span("render", "render", "渲染 Word 文档") if path := o.renderReport(ctx, t.ID, firstNonEmpty(outline.Title, topic), sections); path != "" { + endRender("docx 已落盘:"+path, nil) o.emit(t.ID, "---\n✅ 报告已生成 Word 文档,可点击上方「下载 Word」保存。\n") log.Printf("[report] task %s 完成,docx=%s", t.ID, path) } else { + endRender("渲染服务不可用", fmt.Errorf("render unavailable")) o.emit(t.ID, "---\n⚠️ Word 渲染未完成(渲染服务不可用),以上为报告正文。\n") } o.breaker.Report(true) return nil } +func kbSuffix(kb string) string { + if kb == "" { + return "(不挂知识库)" + } + return ",知识库 " + kb +} + // planOutline 让模型规划 3–5 章大纲;模型不可用/解析失败则用通用兜底大纲。 func (o *Orchestrator) planOutline(ctx context.Context, topic string) reportOutline { fallback := reportOutline{Title: topic, Sections: []string{"背景与现状", "核心分析", "结论与建议"}} @@ -110,7 +123,7 @@ func (o *Orchestrator) planOutline(ctx context.Context, topic string) reportOutl } // writeSections 各章节并行撰写(有界并发),结果按原顺序返回。 -func (o *Orchestrator) writeSections(ctx context.Context, topic, kb string, headings []string) []reportSection { +func (o *Orchestrator) writeSections(ctx context.Context, topic, kb string, headings []string, tr *execTracer) []reportSection { out := make([]reportSection, len(headings)) sem := make(chan struct{}, reportFanout) var wg sync.WaitGroup @@ -120,7 +133,11 @@ func (o *Orchestrator) writeSections(ctx context.Context, topic, kb string, head defer wg.Done() sem <- struct{}{} defer func() { <-sem }() - out[i] = reportSection{Heading: h, Body: o.writeSection(ctx, topic, kb, h)} + node := fmt.Sprintf("section:%d", i) + end := tr.span(node, "section", fmt.Sprintf("第%d章 %s", i+1, h)) + body := o.writeSection(ctx, topic, kb, h, tr, node) + end(fmt.Sprintf("成稿 %d 字", len([]rune(body))), nil) + out[i] = reportSection{Heading: h, Body: body} }(i, h) } wg.Wait() @@ -128,8 +145,11 @@ func (o *Orchestrator) writeSections(ctx context.Context, topic, kb string, head } // writeSection 撰写一章:先 RAG 检索参考资料(若挂了知识库),再让模型成稿。 -func (o *Orchestrator) writeSection(ctx context.Context, topic, kb, heading string) string { +func (o *Orchestrator) writeSection(ctx context.Context, topic, kb, heading string, tr *execTracer, node string) string { refs := o.retrieve(ctx, kb, topic+" "+heading) + if refs != "" { + tr.info(node, "section", "检索参考资料", truncate(strings.ReplaceAll(refs, "\n", " "), 120)) + } if !o.pool.Ready() { if refs != "" { return "(模型未配置,以下为检索到的参考资料)\n" + refs diff --git a/sundynix-dispatcher/internal/nats/subscriber.go b/sundynix-dispatcher/internal/nats/subscriber.go index b6402e5..f4c0410 100644 --- a/sundynix-dispatcher/internal/nats/subscriber.go +++ b/sundynix-dispatcher/internal/nats/subscriber.go @@ -53,6 +53,16 @@ func (s *Subscriber) CompleteStream(taskID string) error { return s.inner.CompleteStream(taskID) } +// PublishExec / CompleteExec 让 Subscriber 满足 eino.ExecSink, +// 把执行轨迹事件回流到 sundynix.exec.(与 Token 流分开)。 +func (s *Subscriber) PublishExec(taskID string, data []byte) error { + return s.inner.PublishExec(taskID, data) +} + +func (s *Subscriber) CompleteExec(taskID string) error { + return s.inner.CompleteExec(taskID) +} + // CallTool 让 Subscriber 满足 eino.ToolCaller,经 NATS request-reply 调起第 5 层 MCP 工具。 func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) { return s.inner.CallTool(ctx, subject, call) diff --git a/sundynix-gateway/internal/handler/task_handler.go b/sundynix-gateway/internal/handler/task_handler.go index 5bc4af2..4c5a7a0 100644 --- a/sundynix-gateway/internal/handler/task_handler.go +++ b/sundynix-gateway/internal/handler/task_handler.go @@ -91,6 +91,45 @@ func (h *Handler) StreamTask(c *gin.Context) { }) } +// StreamExec: 订阅 sundynix.exec.,以 SSE 把执行轨迹事件推给客户端(运行·观测)。 +// 与 StreamTask(token 流)并行:前端同时连两路,token 走输出、exec 走轨迹/工具面板。 +func (h *Handler) StreamExec(c *gin.Context) { + taskID := c.Param("id") + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + + events := make(chan []byte, 256) + done := make(chan struct{}) + unsub, err := h.bus.SubscribeExec(taskID, + func(ev []byte) { + select { + case events <- ev: + default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调 + } + }, + func() { close(done) }, + ) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + defer func() { _ = unsub() }() + + c.Stream(func(w io.Writer) bool { + select { + case ev := <-events: + c.SSEvent("exec", string(ev)) + return true + case <-done: + c.SSEvent("done", taskID) + return false + case <-c.Request.Context().Done(): + return false + } + }) +} + // SetMemory: 写入/更新一条用户偏好记忆,经 NATS 调 mcp-go 的 memory_upsert 工具。 // 桌面端"偏好记忆面板"可用它让用户显式登记/纠正模型对自己的记忆。 func (h *Handler) SetMemory(c *gin.Context) { diff --git a/sundynix-gateway/internal/nats/publisher.go b/sundynix-gateway/internal/nats/publisher.go index c3e574d..6b97edc 100644 --- a/sundynix-gateway/internal/nats/publisher.go +++ b/sundynix-gateway/internal/nats/publisher.go @@ -44,6 +44,11 @@ func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func() return b.inner.SubscribeTokens(taskID, onToken, onDone) } +// SubscribeExec 订阅 sundynix.exec. 的执行轨迹事件(用于"运行·观测"SSE)。 +func (b *Bus) SubscribeExec(taskID string, onEvent func([]byte), onDone func()) (func() error, error) { + return b.inner.SubscribeExec(taskID, onEvent, onDone) +} + // CallTool 经 NATS 同步调用一个 MCP 工具(用于网关侧写偏好记忆等)。 func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) { return b.inner.CallTool(ctx, subject, call) diff --git a/sundynix-gateway/internal/router/router.go b/sundynix-gateway/internal/router/router.go index c891b01..7ef4226 100644 --- a/sundynix-gateway/internal/router/router.go +++ b/sundynix-gateway/internal/router/router.go @@ -13,17 +13,18 @@ import ( // New 构建带有 Guardrail / 限流中间件的 Gin 引擎。 func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine { r := gin.Default() - r.Use(cors()) // 桌面端/浏览器跨源访问(开发期放开) + r.Use(cors()) // 桌面端/浏览器跨源访问(开发期放开) r.Use(middleware.RateLimit(cache)) r.Use(middleware.Guardrail()) // Harness: Input/Output Guardrail h := handler.New(db, cache, bus) api := r.Group("/api/v1") { - api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS - api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream - api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert) - api.POST("/kb/ingest", h.KbIngest) // 知识库入库(文本,→ mcp-go kb_ingest) + api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS + api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream + api.GET("/tasks/:id/exec", h.StreamExec) // 4b. SSE 回流执行轨迹事件(运行·观测) + api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert) + api.POST("/kb/ingest", h.KbIngest) // 知识库入库(文本,→ mcp-go kb_ingest) api.POST("/kb/ingest_file", h.KbIngestFile) // 文件入库(docx/xlsx/pdf… 异步) api.GET("/kb/ingest/:id/stream", h.KbIngestStream) // 入库进度 SSE(实时监控) api.POST("/kb/search", h.KbSearch) // 知识库检索台(→ mcp-go kb_search) diff --git a/sundynix-shared/bus/bus.go b/sundynix-shared/bus/bus.go index dc0443a..c18e44b 100644 --- a/sundynix-shared/bus/bus.go +++ b/sundynix-shared/bus/bus.go @@ -134,6 +134,37 @@ func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func() return sub.Unsubscribe, nil } +// ---- 执行可视化事件(core NATS,与 Token 流分流)---- + +// PublishExec 把一条执行事件(JSON)发到 sundynix.exec.。 +func (b *Bus) PublishExec(taskID string, data []byte) error { + return b.nc.Publish(contract.ExecSubject(taskID), data) +} + +// CompleteExec 发送执行事件流结束信号(空体 + 结束头)。 +func (b *Bus) CompleteExec(taskID string) error { + msg := nats.NewMsg(contract.ExecSubject(taskID)) + msg.Header.Set(contract.HeaderStreamEnd, "1") + return b.nc.PublishMsg(msg) +} + +// SubscribeExec 订阅某 task 的执行事件流。每条事件触发 onEvent;结束触发 onDone。 +func (b *Bus) SubscribeExec(taskID string, onEvent func([]byte), onDone func()) (unsub func() error, err error) { + sub, err := b.nc.Subscribe(contract.ExecSubject(taskID), func(m *nats.Msg) { + if m.Header.Get(contract.HeaderStreamEnd) == "1" { + onDone() + return + } + data := make([]byte, len(m.Data)) + copy(data, m.Data) + onEvent(data) + }) + if err != nil { + return nil, fmt.Errorf("subscribe exec: %w", err) + } + return sub.Unsubscribe, nil +} + // ---- MCP 工具调用(core NATS request-reply)---- // CallTool 同步调用一个 MCP 工具:发到 subject,阻塞等待应答。 diff --git a/sundynix-shared/contract/task.go b/sundynix-shared/contract/task.go index 9c32de8..75fddf1 100644 --- a/sundynix-shared/contract/task.go +++ b/sundynix-shared/contract/task.go @@ -6,11 +6,11 @@ import "encoding/json" // NATS subject / stream 约定(与 README、各服务 config 保持一致)。 const ( - StreamTasks = "SUNDYNIX_TASKS" // JetStream stream 名 - SubjectTasks = "sundynix.tasks" // 任务发布主题前缀;实际为 sundynix.tasks. - SubjectTasksAll = "sundynix.tasks.>" // stream 捕获的通配 - SubjectStream = "sundynix.streams" // Token 回流前缀;实际 sundynix.streams. - ConsumerDurable = "dispatchers" // Dispatcher 持久消费者(队列组负载均衡) + StreamTasks = "SUNDYNIX_TASKS" // JetStream stream 名 + SubjectTasks = "sundynix.tasks" // 任务发布主题前缀;实际为 sundynix.tasks. + SubjectTasksAll = "sundynix.tasks.>" // stream 捕获的通配 + SubjectStream = "sundynix.streams" // Token 回流前缀;实际 sundynix.streams. + ConsumerDurable = "dispatchers" // Dispatcher 持久消费者(队列组负载均衡) // HeaderStreamEnd 是 Token 流的结束信号(core NATS 消息头)。 // 置为 "1" 的消息体为空,表示该 task 的 Token 流结束。 @@ -47,6 +47,26 @@ const ( func ConfigGetSubject(kind string) string { return "sundynix.config." + kind + ".get" } func ConfigUpdatedSubject(kind string) string { return "sundynix.config." + kind + ".updated" } +// SubjectExec 是执行可视化事件的回流前缀;实际 sundynix.exec.。 +// 与 Token 流(sundynix.streams.)分流:Token 是零拷贝字节,Exec 是结构化节点事件。 +const SubjectExec = "sundynix.exec" + +// ExecSubject 返回某任务的执行事件回流主题。 +func ExecSubject(id string) string { return SubjectExec + "." + id } + +// ExecEvent 是一次任务执行中某节点/阶段的生命周期事件(经 sundynix.exec. 回流给 UI, +// 用于"运行·观测"的实时轨迹:节点点亮、工具调用入参/产出、各阶段耗时)。 +type ExecEvent struct { + Seq int `json:"seq"` // 任务内自增序号(保序) + TS int64 `json:"ts"` // unix 毫秒 + Node string `json:"node"` // 稳定节点 id:init / tool:wiki_search / prompt / model / plan / section:0 / render + Kind string `json:"kind"` // 归类着色:memory|tool|prompt|model|plan|section|render|system + Phase string `json:"phase"` // start|end|error|info + Label string `json:"label"` // 人读标题 + Detail string `json:"detail,omitempty"` // 入参/产出/计数预览 + MS int64 `json:"ms,omitempty"` // end 事件的耗时(毫秒) +} + // IngestEvent 是入库流水线的实时进度事件(经 sundynix.streams. 回流给 UI)。 type IngestEvent struct { Stage string `json:"stage"` // 解析/切块/向量化/写Milvus/写Bleve/完成/失败 @@ -103,7 +123,7 @@ type ToolResult struct { } // Marshal / Unmarshal 便捷方法。 -func (t *Task) Marshal() ([]byte, error) { return json.Marshal(t) } +func (t *Task) Marshal() ([]byte, error) { return json.Marshal(t) } func Unmarshal(b []byte) (*Task, error) { var t Task if err := json.Unmarshal(b, &t); err != nil {