package eino import ( "context" "fmt" "strconv" "strings" "time" "github.com/cloudwego/eino/schema" "github.com/sundynix/sundynix-dispatcher/internal/dsl" "github.com/sundynix/sundynix-shared/contract" ) // defaultAgentSystem 是 agent 节点未填系统提示词时的兜底。 const defaultAgentSystem = "你是 sundynix-agentix 平台的 AI 助手。" // board 是图执行的"黑板":节点按拓扑序流转时读写它。 type board struct { uid, sid string query string profile string history []*schema.Message kb string // 最近一个检索节点的 owner 作用域库名(供 map 并行各项检索) refs []string // 检索 / 聚合得到的参考资料 toolOut []string // 工具节点产出 sections []reportSection // map 并行 fan-out 产出的分项成稿(供 render 多章渲染) answer string // 终端 agent / map 的成稿(流式累计) } // runGraph 按 DSL 图的真实拓扑与连线执行(替代旧的线性拍平 compileFlow)。 // // 入度0 入口 → 沿连线传播 active → 每个节点按 kind 执行真实行为 → // branch 按条件只激活选中的下游(剪枝)→ agent 节点流式回流 token。 // // 逐节点点亮"运行·观测"。返回终端 agent 的完整产出(供写回历史)。 func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execTracer) (string, error) { flow, ferr := dsl.Parse(t.Graph) plan := dsl.Compile(t.Graph) b := &board{ uid: meta(t, contract.MetaUserID), sid: meta(t, contract.MetaSessionID), query: plan.Query, } // 无法解析或空图:退化为"无图单轮对话"(注入默认记忆 + 直接出模型)。 if ferr != nil || flow == nil || len(flow.Nodes) == 0 { tr.info("task", "system", "无结构化图", "按单轮对话执行") b.profile = o.fetchMemory(ctx, b.uid, b.query) b.history = o.fetchHistory(ctx, b.sid) o.runAgent(ctx, t.ID, b, plan.System, tr, "agent") return b.answer, nil } // 建邻接与入度(只认两端都存在的边)。保留整条边以便 branch 按 true/false 标签选路。 nodeByID := make(map[string]dsl.Node, len(flow.Nodes)) outE := make(map[string][]dsl.Edge) indeg := make(map[string]int, len(flow.Nodes)) for _, n := range flow.Nodes { nodeByID[n.ID] = n indeg[n.ID] = 0 } for _, e := range flow.Edges { if _, ok := nodeByID[e.Source]; !ok { continue } if _, ok := nodeByID[e.Target]; !ok { continue } outE[e.Source] = append(outE[e.Source], e) indeg[e.Target]++ } // 入口节点(入度 0)置 active;执行时沿连线把下游激活,branch 只激活选中分支。 active := make(map[string]bool) for _, n := range flow.Nodes { if indeg[n.ID] == 0 { active[n.ID] = true } } // 图里没有 memory 节点 → 沿用旧默认:注入画像+历史(避免回归)。 hasMemory := false for _, n := range flow.Nodes { if n.Kind == "memory" { hasMemory = true break } } if !hasMemory { b.profile = o.fetchMemory(ctx, b.uid, b.query) b.history = o.fetchHistory(ctx, b.sid) } for _, n := range flow.Topo() { if !active[n.ID] { continue // 被 branch 剪掉的下游,不执行 } propagate := targetsOf(outE[n.ID]) // 默认激活全部出边;branch 会改写 switch n.Kind { case "input": if txt := cstr(n.Config, "text"); txt != "" { b.query = txt } tr.info("input:"+n.ID, "system", labelOf(n, "输入"), truncate(b.query, 80)) case "memory": if cbool(n.Config, "profile") { b.profile = o.fetchMemory(ctx, b.uid, b.query) } if cbool(n.Config, "history") { b.history = o.fetchHistory(ctx, b.sid) } tr.info("memory:"+n.ID, "memory", labelOf(n, "记忆"), fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(b.profile)), len(b.history))) case "retriever": o.retrieverNode(ctx, n, b, tr) case "tool": o.execToolNode(ctx, t.ID, n, b, tr) case "agent": o.runAgent(ctx, t.ID, b, firstNonEmpty(cstr(n.Config, "system"), plan.System), tr, "agent:"+n.ID) case "aggregate": merged := aggregate(cstr(n.Config, "strategy"), append(append([]string{}, b.refs...), b.toolOut...)) b.refs, b.toolOut = merged, nil tr.info("aggregate:"+n.ID, "system", labelOf(n, "汇聚"), "策略:"+firstNonEmpty(cstr(n.Config, "strategy"), "拼接")) case "render": o.renderNode(ctx, t.ID, n, b, tr) case "branch": propagate = o.branchNode(n, b, outE[n.ID], nodeByID, tr) case "map": o.mapNode(ctx, t.ID, n, b, tr) case "output": tr.info("output:"+n.ID, "system", labelOf(n, "输出"), "目标:"+firstNonEmpty(cstr(n.Config, "target"), "屏幕")) default: tr.info(n.Kind+":"+n.ID, "system", labelOf(n, n.Kind), "未识别节点,跳过") } for _, tgt := range propagate { active[tgt] = true } } // 图里无 agent 节点(纯工具/检索图)也要出一段模型答复,否则没有输出。 if b.answer == "" { o.runAgent(ctx, t.ID, b, plan.System, tr, "agent") } return b.answer, nil } // retrieverNode 执行检索节点:kb 按 owner 作用域 → kb_search → 累计参考资料。 func (o *Orchestrator) retrieverNode(ctx context.Context, n dsl.Node, b *board, tr *execTracer) { kb := cstr(n.Config, "kb") scoped := kb if b.uid != "" && kb != "" && !strings.Contains(kb, "/") { scoped = b.uid + "/" + kb } if scoped != "" { b.kb = scoped // 记下作用域库名,供后续 map 并行各项检索复用 } end := tr.span("retriever:"+n.ID, "tool", labelOf(n, "检索")) refs := o.retrieve(ctx, scoped, b.query) if refs != "" { b.refs = append(b.refs, refs) } end(fmt.Sprintf("kb=%s · 命中 %d 段", firstNonEmpty(kb, "(未指定)"), countLines(refs)), nil) } // mapNode 执行并行 fan-out:把主题拆成若干项,各项有界并发撰写(复用 report 的 writeSections), // 成稿存入黑板(结构化 sections + 拼进 answer),并流式呈现进度。 func (o *Orchestrator) mapNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { end := tr.span("map:"+n.ID, "plan", labelOf(n, "并行 fan-out")) items := o.planItems(ctx, b.query, cstr(n.Config, "splitBy")) end(fmt.Sprintf("拆出 %d 项:%s", len(items), strings.Join(items, " / ")), nil) o.emit(taskID, fmt.Sprintf("\n> 并行处理 %d 项…\n\n", len(items))) secs := o.writeSections(ctx, b.query, b.kb, items, tr) // 有界并发,trace 出 section:i 各项 b.sections = secs for _, s := range secs { chunk := "## " + s.Heading + "\n\n" + s.Body + "\n\n" o.emit(taskID, chunk) b.answer += chunk b.refs = append(b.refs, s.Heading+":"+s.Body) } } // execToolNode 执行工具节点:调 MCP 工具,产出累计进黑板;失败降级不阻断。 func (o *Orchestrator) execToolNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { tool, args := dsl.ToolBinding(n) if tool == "" { return } node := "tool:" + tool if o.tools == nil { tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过") return } call := map[string]any{} for k, v := range args { call[k] = v } if call["q"] == nil && call["query"] == nil { call["q"] = b.query } if b.uid != "" { if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") { call["kb"] = b.uid + "/" + kbv } } end := tr.span(node, "tool", "调用工具 "+tool) cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) defer cancel() res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{Tool: tool, TaskID: taskID, Args: call}) if err != nil { end("调用失败,降级跳过", err) return } if res == nil || !res.OK || res.Content == "" { end("无结果,降级跳过", nil) return } end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil) b.toolOut = append(b.toolOut, "["+tool+"] "+res.Content) } // runAgent 执行 agent/模型节点:据黑板拼消息 → 流式回流 token → 累计成稿。 func (o *Orchestrator) runAgent(ctx context.Context, taskID string, b *board, system string, tr *execTracer, node string) { rc := &RunCtx{ System: firstNonEmpty(system, defaultAgentSystem), Query: b.query, Profile: b.profile, History: b.history, ToolOut: append(append([]string{}, b.toolOut...), b.refs...), } msgs, _ := buildMessages(ctx, rc) tr.emit(node, "model", "start", "模型流式推理", "", 0) t0 := time.Now() n := 0 send := func(s string) { if s == "" { return } _ = o.sink.PublishToken(taskID, []byte(s)) b.answer += s n++ } var err error if o.pool.Ready() { err = o.pool.ChatStream(ctx, toChatMessages(msgs), send) } else { err = o.pool.StreamText(ctx, replyFor(msgs), func(tok []byte) { send(string(tok)) }) } if err != nil { tr.emit(node, "model", "error", "模型流式推理", err.Error(), time.Since(t0).Milliseconds()) return } tr.emit(node, "model", "end", "模型流式推理", fmt.Sprintf("%d tokens / %d 字", n, len([]rune(b.answer))), time.Since(t0).Milliseconds()) } // renderNode 执行渲染节点:把当前成稿渲染成 Word(经 mcp-go report_render)。 func (o *Orchestrator) renderNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { if strings.TrimSpace(b.answer) == "" { tr.info("render:"+n.ID, "render", labelOf(n, "渲染"), "暂无正文可渲染(render 前需有 agent 产出)") return } format := firstNonEmpty(cstr(n.Config, "format"), "docx") end := tr.span("render:"+n.ID, "render", labelOf(n, "渲染 "+format)) title := truncate(b.query, 40) secs := b.sections // map 产出的多章优先;否则把整段成稿当单章 if len(secs) == 0 { secs = []reportSection{{Heading: title, Body: b.answer}} } if path := o.renderReport(ctx, taskID, title, secs); path != "" { end("已落盘:"+path, nil) o.emit(taskID, "\n\n---\n✅ 已渲染 "+format+" 文档,可在「下载」获取。\n") } else { end("渲染服务不可用", fmt.Errorf("render unavailable")) } } // branchNode 执行分支节点:求值条件,按出边的 true/false 标签选出要激活的下游。 // 边带 sourceHandle("true"/"false") 时按标签精确选路(前端 Phase 2 给 branch 的边打标); // 无标签的旧图退回"出边顺序 [true,false]"约定,保持向后兼容。 func (o *Orchestrator) branchNode(n dsl.Node, b *board, outs []dsl.Edge, byID map[string]dsl.Node, tr *execTracer) []string { cond := cstr(n.Config, "condition") res := evalCondition(cond, b) var truthy, falsy []string labeled := false for _, e := range outs { switch e.SourceHandle { case "true": truthy = append(truthy, e.Target) labeled = true case "false": falsy = append(falsy, e.Target) labeled = true } } var chosen []string mode := "边标签" if labeled { if res { chosen = truthy } else { chosen = falsy } } else { mode = "边序约定[true,false]" targets := targetsOf(outs) switch { case len(targets) >= 2: if res { chosen = targets[:1] } else { chosen = targets[1:2] } case len(targets) == 1 && !res: chosen = nil default: chosen = targets } } names := make([]string, 0, len(chosen)) for _, id := range chosen { names = append(names, labelOf(byID[id], id)) } tr.info("branch:"+n.ID, "system", labelOf(n, "分支"), fmt.Sprintf("条件「%s」→ %v ⇒ 走 [%s](%s)", firstNonEmpty(cond, "(空=真)"), res, strings.Join(names, ", "), mode)) return chosen } // targetsOf 取一组边的目标节点 ID(保持顺序)。 func targetsOf(edges []dsl.Edge) []string { out := make([]string, 0, len(edges)) for _, e := range edges { out = append(out, e.Target) } return out } // evalCondition 求值 branch 条件。支持: // // 空 → true;关键字 refs/tools/answer/profile 作左值(取数量/字数); // 形如 "a op b"(op: >= <= == != > <)数值比较;其余非空 → 默认真。 func evalCondition(cond string, b *board) bool { cond = strings.TrimSpace(cond) if cond == "" { return true } for _, op := range []string{">=", "<=", "==", "!=", ">", "<"} { if i := strings.Index(cond, op); i >= 0 { l := resolveOperand(strings.TrimSpace(cond[:i]), b) r := resolveOperand(strings.TrimSpace(cond[i+len(op):]), b) switch op { case ">": return l > r case "<": return l < r case ">=": return l >= r case "<=": return l <= r case "==": return l == r case "!=": return l != r } } } return true } // resolveOperand 把条件里的左/右值解析为数值(关键字取运行时数量,否则按字面量)。 func resolveOperand(s string, b *board) float64 { switch strings.ToLower(s) { case "refs": return float64(len(b.refs)) case "tools": return float64(len(b.toolOut)) case "answer": return float64(len([]rune(b.answer))) case "profile": return float64(len([]rune(b.profile))) } f, _ := strconv.ParseFloat(s, 64) return f } // aggregate 按策略合并多段参考资料为一段。 func aggregate(strategy string, parts []string) []string { var nonEmpty []string for _, p := range parts { if strings.TrimSpace(p) != "" { nonEmpty = append(nonEmpty, p) } } if len(nonEmpty) == 0 { return nil } switch strategy { case "去重合并": seen := map[string]bool{} var uniq []string for _, p := range nonEmpty { if !seen[p] { seen[p] = true uniq = append(uniq, p) } } return []string{strings.Join(uniq, "\n---\n")} case "摘要": return []string{truncate(strings.Join(nonEmpty, "\n"), 800)} default: // 拼接 return []string{strings.Join(nonEmpty, "\n---\n")} } } // ---- 小工具 ---- func meta(t *contract.Task, key string) string { v, _ := t.Meta[key].(string) return v } func cstr(cfg map[string]any, key string) string { if cfg == nil { return "" } v, ok := cfg[key] if !ok || v == nil { return "" } if s, ok := v.(string); ok { return strings.TrimSpace(s) } return strings.TrimSpace(fmt.Sprint(v)) } func cbool(cfg map[string]any, key string) bool { if cfg == nil { return false } if v, ok := cfg[key].(bool); ok { return v } return false } func labelOf(n dsl.Node, def string) string { if strings.TrimSpace(n.Label) != "" { return n.Label } return def } func countLines(s string) int { s = strings.TrimSpace(s) if s == "" { return 0 } return strings.Count(s, "\n") + 1 }