diff --git a/sundynix-desktop/frontend/src/lib/dsl.ts b/sundynix-desktop/frontend/src/lib/dsl.ts index fafbd7f..6667044 100644 --- a/sundynix-desktop/frontend/src/lib/dsl.ts +++ b/sundynix-desktop/frontend/src/lib/dsl.ts @@ -5,7 +5,8 @@ import { NODE_KINDS } from "../studio/nodeCatalog"; export interface TaskDsl { version: "1"; nodes: Array<{ id: string; kind: string; label?: string; config: unknown }>; - edges: Array<{ source: string; target: string }>; + // sourceHandle 标记从 branch 节点引出的边走真/假分支("true"/"false")。 + edges: Array<{ source: string; target: string; sourceHandle?: string }>; } // exportDsl 把画布的节点/连线导出为类型化 JSON DSL。 @@ -16,7 +17,11 @@ export function exportDsl(nodes: Node[], edges: Edge[]): TaskDsl { const d = n.data as { kind: string; label?: string; config?: unknown }; return { id: n.id, kind: d.kind, label: d.label, config: d.config ?? {} }; }), - edges: edges.map((e) => ({ source: e.source, target: e.target })), + edges: edges.map((e) => ({ + source: e.source, + target: e.target, + ...(e.sourceHandle ? { sourceHandle: e.sourceHandle } : {}), + })), }; } diff --git a/sundynix-desktop/frontend/src/studio/TypedNode.tsx b/sundynix-desktop/frontend/src/studio/TypedNode.tsx index f5ddc78..65f55f7 100644 --- a/sundynix-desktop/frontend/src/studio/TypedNode.tsx +++ b/sundynix-desktop/frontend/src/studio/TypedNode.tsx @@ -19,7 +19,7 @@ export function TypedNode({ data, selected }: NodeProps) { return (
@@ -32,7 +32,17 @@ export function TypedNode({ data, selected }: NodeProps) {
{d.label || k.desc}
{d.summary &&
{d.summary}
}
- + {d.kind === "branch" ? ( + // 分支节点:两个出口手柄 —— 真(绿)/假(红),连线时各自带 sourceHandle,引擎据此精确选路。 + <> + + + + + + ) : ( + + )} ); } diff --git a/sundynix-dispatcher/internal/dsl/compile.go b/sundynix-dispatcher/internal/dsl/compile.go index 1fcf29a..3197b0b 100644 --- a/sundynix-dispatcher/internal/dsl/compile.go +++ b/sundynix-dispatcher/internal/dsl/compile.go @@ -17,10 +17,11 @@ type Node struct { Config map[string]any `json:"config"` } -// Edge 是一条连线。 +// Edge 是一条连线。SourceHandle 标记从 branch 节点引出的边走真/假分支("true"/"false")。 type Edge struct { - Source string `json:"source"` - Target string `json:"target"` + Source string `json:"source"` + Target string `json:"target"` + SourceHandle string `json:"sourceHandle"` } // Flow 是整张图。 diff --git a/sundynix-dispatcher/internal/eino/graph.go b/sundynix-dispatcher/internal/eino/graph.go index f9c625b..290e59d 100644 --- a/sundynix-dispatcher/internal/eino/graph.go +++ b/sundynix-dispatcher/internal/eino/graph.go @@ -22,9 +22,11 @@ type board struct { query string profile string history []*schema.Message - refs []string // 检索 / 聚合得到的参考资料 - toolOut []string // 工具节点产出 - answer string // 终端 agent 的成稿(流式累计) + kb string // 最近一个检索节点的 owner 作用域库名(供 map 并行各项检索) + refs []string // 检索 / 聚合得到的参考资料 + toolOut []string // 工具节点产出 + sections []reportSection // map 并行 fan-out 产出的分项成稿(供 render 多章渲染) + answer string // 终端 agent / map 的成稿(流式累计) } // runGraph 按 DSL 图的真实拓扑与连线执行(替代旧的线性拍平 compileFlow)。 @@ -51,9 +53,9 @@ func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execT return b.answer, nil } - // 建邻接与入度(只认两端都存在的边)。 + // 建邻接与入度(只认两端都存在的边)。保留整条边以便 branch 按 true/false 标签选路。 nodeByID := make(map[string]dsl.Node, len(flow.Nodes)) - out := make(map[string][]string) + outE := make(map[string][]dsl.Edge) indeg := make(map[string]int, len(flow.Nodes)) for _, n := range flow.Nodes { nodeByID[n.ID] = n @@ -66,7 +68,7 @@ func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execT if _, ok := nodeByID[e.Target]; !ok { continue } - out[e.Source] = append(out[e.Source], e.Target) + outE[e.Source] = append(outE[e.Source], e) indeg[e.Target]++ } @@ -95,7 +97,7 @@ func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execT if !active[n.ID] { continue // 被 branch 剪掉的下游,不执行 } - propagate := out[n.ID] // 默认激活全部出边;branch 会改写 + propagate := targetsOf(outE[n.ID]) // 默认激活全部出边;branch 会改写 switch n.Kind { case "input": if txt := cstr(n.Config, "text"); txt != "" { @@ -124,9 +126,9 @@ func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execT case "render": o.renderNode(ctx, t.ID, n, b, tr) case "branch": - propagate = o.branchNode(n, b, out[n.ID], nodeByID, tr) + propagate = o.branchNode(n, b, outE[n.ID], nodeByID, tr) case "map": - tr.info("map:"+n.ID, "system", labelOf(n, "并行"), "fan-out 暂按串行执行(路线图 Phase 2)") + o.mapNode(ctx, t.ID, n, b, tr) case "output": tr.info("output:"+n.ID, "system", labelOf(n, "输出"), "目标:"+firstNonEmpty(cstr(n.Config, "target"), "屏幕")) default: @@ -151,6 +153,9 @@ func (o *Orchestrator) retrieverNode(ctx context.Context, n dsl.Node, b *board, if b.uid != "" && kb != "" && !strings.Contains(kb, "/") { scoped = b.uid + "/" + kb } + if scoped != "" { + b.kb = scoped // 记下作用域库名,供后续 map 并行各项检索复用 + } end := tr.span("retriever:"+n.ID, "tool", labelOf(n, "检索")) refs := o.retrieve(ctx, scoped, b.query) if refs != "" { @@ -159,6 +164,24 @@ func (o *Orchestrator) retrieverNode(ctx context.Context, n dsl.Node, b *board, end(fmt.Sprintf("kb=%s · 命中 %d 段", firstNonEmpty(kb, "(未指定)"), countLines(refs)), nil) } +// mapNode 执行并行 fan-out:把主题拆成若干项,各项有界并发撰写(复用 report 的 writeSections), +// 成稿存入黑板(结构化 sections + 拼进 answer),并流式呈现进度。 +func (o *Orchestrator) mapNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { + end := tr.span("map:"+n.ID, "plan", labelOf(n, "并行 fan-out")) + items := o.planItems(ctx, b.query, cstr(n.Config, "splitBy")) + end(fmt.Sprintf("拆出 %d 项:%s", len(items), strings.Join(items, " / ")), nil) + + o.emit(taskID, fmt.Sprintf("\n> 并行处理 %d 项…\n\n", len(items))) + secs := o.writeSections(ctx, b.query, b.kb, items, tr) // 有界并发,trace 出 section:i 各项 + b.sections = secs + for _, s := range secs { + chunk := "## " + s.Heading + "\n\n" + s.Body + "\n\n" + o.emit(taskID, chunk) + b.answer += chunk + b.refs = append(b.refs, s.Heading+":"+s.Body) + } +} + // execToolNode 执行工具节点:调 MCP 工具,产出累计进黑板;失败降级不阻断。 func (o *Orchestrator) execToolNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { tool, args := dsl.ToolBinding(n) @@ -242,7 +265,10 @@ func (o *Orchestrator) renderNode(ctx context.Context, taskID string, n dsl.Node format := firstNonEmpty(cstr(n.Config, "format"), "docx") end := tr.span("render:"+n.ID, "render", labelOf(n, "渲染 "+format)) title := truncate(b.query, 40) - secs := []reportSection{{Heading: title, Body: b.answer}} + secs := b.sections // map 产出的多章优先;否则把整段成稿当单章 + if len(secs) == 0 { + secs = []reportSection{{Heading: title, Body: b.answer}} + } if path := o.renderReport(ctx, taskID, title, secs); path != "" { end("已落盘:"+path, nil) o.emit(taskID, "\n\n---\n✅ 已渲染 "+format+" 文档,可在「下载」获取。\n") @@ -251,32 +277,70 @@ func (o *Orchestrator) renderNode(ctx context.Context, taskID string, n dsl.Node } } -// branchNode 执行分支节点:求值条件,按边序约定 [true, false] 选出要激活的下游。 -// 注:当前 DSL 的边不带 true/false 标签,故以"出边顺序"约定语义(Phase 2 将由前端给边打标)。 -func (o *Orchestrator) branchNode(n dsl.Node, b *board, outs []string, byID map[string]dsl.Node, tr *execTracer) []string { +// branchNode 执行分支节点:求值条件,按出边的 true/false 标签选出要激活的下游。 +// 边带 sourceHandle("true"/"false") 时按标签精确选路(前端 Phase 2 给 branch 的边打标); +// 无标签的旧图退回"出边顺序 [true,false]"约定,保持向后兼容。 +func (o *Orchestrator) branchNode(n dsl.Node, b *board, outs []dsl.Edge, byID map[string]dsl.Node, tr *execTracer) []string { cond := cstr(n.Config, "condition") res := evalCondition(cond, b) - chosen := outs - switch { - case len(outs) >= 2: - if res { - chosen = outs[:1] - } else { - chosen = outs[1:2] + + var truthy, falsy []string + labeled := false + for _, e := range outs { + switch e.SourceHandle { + case "true": + truthy = append(truthy, e.Target) + labeled = true + case "false": + falsy = append(falsy, e.Target) + labeled = true } - case len(outs) == 1 && !res: - chosen = nil // 单出边且条件为假 → 不继续 } + + var chosen []string + mode := "边标签" + if labeled { + if res { + chosen = truthy + } else { + chosen = falsy + } + } else { + mode = "边序约定[true,false]" + targets := targetsOf(outs) + switch { + case len(targets) >= 2: + if res { + chosen = targets[:1] + } else { + chosen = targets[1:2] + } + case len(targets) == 1 && !res: + chosen = nil + default: + chosen = targets + } + } + names := make([]string, 0, len(chosen)) for _, id := range chosen { names = append(names, labelOf(byID[id], id)) } tr.info("branch:"+n.ID, "system", labelOf(n, "分支"), - fmt.Sprintf("条件「%s」→ %v ⇒ 走 [%s](边序约定 [true,false])", - firstNonEmpty(cond, "(空=真)"), res, strings.Join(names, ", "))) + fmt.Sprintf("条件「%s」→ %v ⇒ 走 [%s](%s)", + firstNonEmpty(cond, "(空=真)"), res, strings.Join(names, ", "), mode)) return chosen } +// targetsOf 取一组边的目标节点 ID(保持顺序)。 +func targetsOf(edges []dsl.Edge) []string { + out := make([]string, 0, len(edges)) + for _, e := range edges { + out = append(out, e.Target) + } + return out +} + // evalCondition 求值 branch 条件。支持: // // 空 → true;关键字 refs/tools/answer/profile 作左值(取数量/字数); diff --git a/sundynix-dispatcher/internal/eino/report.go b/sundynix-dispatcher/internal/eino/report.go index a0d1c97..affd634 100644 --- a/sundynix-dispatcher/internal/eino/report.go +++ b/sundynix-dispatcher/internal/eino/report.go @@ -122,6 +122,35 @@ func (o *Orchestrator) planOutline(ctx context.Context, topic string) reportOutl return out } +// planItems 为 map 节点把主题拆成一组并行子项(splitBy 为拆分依据提示)。 +// 模型不可用/解析失败则用通用兜底分项。 +func (o *Orchestrator) planItems(ctx context.Context, topic, splitBy string) []string { + fallback := []string{"背景与现状", "核心分析", "结论与建议"} + if !o.pool.Ready() { + return fallback + } + hint := splitBy + if hint == "" { + hint = "合理的章节" + } + user := fmt.Sprintf("请把主题《%s》拆分为一组「%s」,用于并行撰写。"+ + "只输出 JSON 数组:[\"子项1\",\"子项2\", ...],3 到 6 项,不要任何多余文字。", topic, hint) + txt, err := o.pool.Chat(ctx, []llm.ChatMessage{ + {Role: "system", Content: "你擅长把一个任务拆解为可并行处理的若干子项。"}, + {Role: "user", Content: user}, + }) + if err != nil { + log.Printf("[map] 拆分子项失败,用兜底: %v", err) + return fallback + } + var items []string + if json.Unmarshal([]byte(stripFence(txt)), &items) != nil || len(items) == 0 { + log.Printf("[map] 子项 JSON 解析失败,用兜底。原文: %s", truncate(txt, 200)) + return fallback + } + return items +} + // writeSections 各章节并行撰写(有界并发),结果按原顺序返回。 func (o *Orchestrator) writeSections(ctx context.Context, topic, kb string, headings []string, tr *execTracer) []reportSection { out := make([]reportSection, len(headings))