From fd145b5852188b3972ebd5bc1257a6ef1462773d Mon Sep 17 00:00:00 2001 From: Blizzard Date: Mon, 15 Jun 2026 11:42:29 +0800 Subject: [PATCH] =?UTF-8?q?feat(dispatcher):=20=E7=BC=96=E6=8E=92=E5=BC=95?= =?UTF-8?q?=E6=93=8E=E6=8C=89=E5=9B=BE=E6=89=A7=E8=A1=8C=EF=BC=88=E6=8B=93?= =?UTF-8?q?=E6=89=91+=E8=BF=9E=E7=BA=BF+=E5=88=86=E6=94=AF=E5=89=AA?= =?UTF-8?q?=E6=9E=9D=EF=BC=89=EF=BC=8C=E5=BC=83=E7=94=A8=E7=BA=BF=E6=80=A7?= =?UTF-8?q?=E6=8B=8D=E5=B9=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 旧 compileFlow 把 DSL 图拍平成线性 init→tool…→prompt→model,连线/分支/ memory/aggregate/render 节点全被忽略——"画得出、跑不全"。改为纯 Go 图解释器 (graph.go),按真实拓扑与连线执行,每种节点 kind 有真实行为: - input 注入用户输入 - memory 按勾选注入画像/历史(无 memory 节点则沿用默认注入,不回归) - retriever kb 按 owner 作用域 → kb_search 累计参考资料 - tool 调 MCP 工具,产出进黑板,失败降级不阻断 - agent 据黑板拼消息 → pool 流式回流 token,累计成稿 - aggregate 按策略合并参考资料(拼接/去重合并/摘要) - render 把成稿经 report_render 渲染 docx - branch 求值条件 + active-set 剪枝下游(边序约定 [true,false]) - map 占位(fan-out 暂串行,路线图 Phase 2) - output 终端 全程逐节点点亮"运行·观测",token 流与记忆写回保持不变;报告 intent 走原专用 编排不动。compile.go 精简为只留 RunCtx/buildMessages/previewArgs。 实测(gateway+dispatcher+DeepSeek 实跑): - input→agent→output 真实流式答复 ✓ - branch 条件 2>1 走分支A、1>2 走分支B(下游真被剪枝)✓ - memory 节点按勾选注入;exec 事件按新节点名(agent:a 等)回流 ✓ - 桌面端 Studio 载示例→运行:4节点3连线校验通过,检索节点 mcp-go 不在时 优雅降级,agent 据空资料如实作答,输出/轨迹面板正常 ✓ 路线图 Phase 2:map 真并行 fan-out + aggregate reduce 接上 report 那套; 前端给 branch 的边打 true/false 标签,使条件分支完全精确(当前靠出边顺序约定)。 Co-Authored-By: Claude Opus 4.8 --- go.work.sum | 1 + sundynix-dispatcher/internal/eino/compile.go | 158 +------ sundynix-dispatcher/internal/eino/graph.go | 401 ++++++++++++++++++ .../internal/eino/orchestrator.go | 57 +-- 4 files changed, 421 insertions(+), 196 deletions(-) create mode 100644 sundynix-dispatcher/internal/eino/graph.go diff --git a/go.work.sum b/go.work.sum index b1821a8..e35addc 100644 --- a/go.work.sum +++ b/go.work.sum @@ -26,6 +26,7 @@ github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stg github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= diff --git a/sundynix-dispatcher/internal/eino/compile.go b/sundynix-dispatcher/internal/eino/compile.go index 01ea957..56bd90c 100644 --- a/sundynix-dispatcher/internal/eino/compile.go +++ b/sundynix-dispatcher/internal/eino/compile.go @@ -3,18 +3,13 @@ package eino import ( "context" "encoding/json" - "fmt" "strings" - "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" - - "github.com/sundynix/sundynix-dispatcher/internal/dsl" - "github.com/sundynix/sundynix-shared/contract" ) -// RunCtx 是图中流转的"黑板":init 填充,工具节点逐个增补,prompt 节点据此组装消息。 -// 用统一类型在节点间流转,规避 Eino 严格类型对齐的麻烦。 +// RunCtx 是组装模型消息用的上下文:图解释器(graph.go)把黑板汇总到它, +// buildMessages 据此拼出发给模型的消息序列。用统一结构避免散落多处拼装。 type RunCtx struct { UserID string SessionID string @@ -22,147 +17,10 @@ type RunCtx struct { Query string // 用户输入 Profile string // 召回的画像 History []*schema.Message // 短期历史 - ToolOut []string // 工具节点产出(按执行序) + ToolOut []string // 工具/检索节点产出(含参考资料) } -// compileFlow 把一个任务的 DSL 图动态编译为可执行的 Eino 图: -// -// START → init(编译+记忆召回) → tool_0 → tool_1 → … → prompt(组装消息) → model(流式) → END -// -// 工具/检索节点按拓扑序真实调用 MCP(sundynix.tools.go.*),结果注入模型上下文。 -// 分支/并行节点暂未编译(TODO:compose.Branch / fan-out)。 -func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task, tr *execTracer) (compose.Runnable[*contract.Task, *schema.Message], error) { - plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底 - flow, _ := dsl.Parse(t.Graph) - - g := compose.NewGraph[*contract.Task, *schema.Message]() - - // init:取身份 → 召回画像+历史 → 初始化黑板。 - if err := g.AddLambdaNode("init", compose.InvokableLambda( - func(ctx context.Context, task *contract.Task) (*RunCtx, error) { - uid, _ := task.Meta[contract.MetaUserID].(string) - sid, _ := task.Meta[contract.MetaSessionID].(string) - end := tr.span("init", "memory", "召回画像与历史") - profile := o.fetchMemory(ctx, uid, plan.Query) - history := o.fetchHistory(ctx, sid) - end(fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(profile)), len(history)), nil) - return &RunCtx{ - UserID: uid, - SessionID: sid, - System: plan.System, - Query: plan.Query, - Profile: profile, - History: history, - }, nil - })); err != nil { - return nil, err - } - - // 按拓扑序为每个工具/检索节点加一个真实执行节点。 - prev := "init" - idx := 0 - if flow != nil { - for _, n := range flow.Topo() { - tool, args := dsl.ToolBinding(n) - if tool == "" { - continue - } - key := fmt.Sprintf("tool_%d", idx) - idx++ - uid, _ := t.Meta[contract.MetaUserID].(string) - if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args, tr, uid))); err != nil { - return nil, err - } - if err := g.AddEdge(prev, key); err != nil { - return nil, err - } - prev = key - } - } - - // prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。 - if err := g.AddLambdaNode("prompt", compose.InvokableLambda( - func(ctx context.Context, rc *RunCtx) ([]*schema.Message, error) { - msgs, err := buildMessages(ctx, rc) - tr.info("prompt", "prompt", "组装提示词", fmt.Sprintf("%d 条消息 · 工具产出 %d 段", len(msgs), len(rc.ToolOut))) - return msgs, err - })); err != nil { - return nil, err - } - if err := g.AddEdge(prev, "prompt"); err != nil { - return nil, err - } - - // model:LLM Pool 流式(已配置在线模型则真实推理)。 - if err := g.AddChatModelNode("model", newPoolModel(o.pool)); err != nil { - return nil, err - } - - if err := g.AddEdge(compose.START, "init"); err != nil { - return nil, err - } - if err := g.AddEdge("prompt", "model"); err != nil { - return nil, err - } - if err := g.AddEdge("model", compose.END); err != nil { - return nil, err - } - - return g.Compile(ctx) -} - -// makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。 -// uid 非空时把检索类工具的 kb 锁进 owner 作用域("uid/kb"),使编排检索命中本人知识库。 -func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any, tr *execTracer, uid string) func(context.Context, *RunCtx) (*RunCtx, error) { - node := "tool:" + tool - return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) { - if o.tools == nil { - tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过") - return rc, nil - } - // 未显式带查询词则注入当前用户输入,便于检索类工具。 - call := map[string]any{} - for k, v := range args { - call[k] = v - } - if call["q"] == nil && call["query"] == nil { - call["q"] = rc.Query - } - // 检索类工具的 kb 按 owner 作用域,对齐知识库隔离(前端只发库名)。 - if uid != "" { - if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") { - call["kb"] = uid + "/" + kbv - } - } - end := tr.span(node, "tool", "调用工具 "+tool) - cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) - defer cancel() - res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{ - Tool: tool, TaskID: taskID, Args: call, - }) - if err != nil { - end("调用失败,降级跳过", err) - return rc, nil - } - if res == nil || !res.OK || res.Content == "" { - end("无结果,降级跳过", nil) - return rc, nil // 工具不可用/无结果 → 降级跳过 - } - end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil) - rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content) - return rc, nil - } -} - -// previewArgs 把工具入参压成一行短预览。 -func previewArgs(args map[string]any) string { - if data, err := json.Marshal(args); err == nil { - return truncate(string(data), 120) - } - return "" -} - -// buildMessages 把黑板组装为发给模型的消息序列。 +// buildMessages 把上下文组装为发给模型的消息序列(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。 func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) { var sys strings.Builder sys.WriteString(rc.System) @@ -181,3 +39,11 @@ func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) { msgs = append(msgs, schema.UserMessage(rc.Query)) return msgs, nil } + +// previewArgs 把工具入参压成一行短预览。 +func previewArgs(args map[string]any) string { + if data, err := json.Marshal(args); err == nil { + return truncate(string(data), 120) + } + return "" +} diff --git a/sundynix-dispatcher/internal/eino/graph.go b/sundynix-dispatcher/internal/eino/graph.go new file mode 100644 index 0000000..f9c625b --- /dev/null +++ b/sundynix-dispatcher/internal/eino/graph.go @@ -0,0 +1,401 @@ +package eino + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/cloudwego/eino/schema" + + "github.com/sundynix/sundynix-dispatcher/internal/dsl" + "github.com/sundynix/sundynix-shared/contract" +) + +// defaultAgentSystem 是 agent 节点未填系统提示词时的兜底。 +const defaultAgentSystem = "你是 sundynix-agentix 平台的 AI 助手。" + +// board 是图执行的"黑板":节点按拓扑序流转时读写它。 +type board struct { + uid, sid string + query string + profile string + history []*schema.Message + refs []string // 检索 / 聚合得到的参考资料 + toolOut []string // 工具节点产出 + answer string // 终端 agent 的成稿(流式累计) +} + +// runGraph 按 DSL 图的真实拓扑与连线执行(替代旧的线性拍平 compileFlow)。 +// +// 入度0 入口 → 沿连线传播 active → 每个节点按 kind 执行真实行为 → +// branch 按条件只激活选中的下游(剪枝)→ agent 节点流式回流 token。 +// +// 逐节点点亮"运行·观测"。返回终端 agent 的完整产出(供写回历史)。 +func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execTracer) (string, error) { + flow, ferr := dsl.Parse(t.Graph) + plan := dsl.Compile(t.Graph) + b := &board{ + uid: meta(t, contract.MetaUserID), + sid: meta(t, contract.MetaSessionID), + query: plan.Query, + } + + // 无法解析或空图:退化为"无图单轮对话"(注入默认记忆 + 直接出模型)。 + if ferr != nil || flow == nil || len(flow.Nodes) == 0 { + tr.info("task", "system", "无结构化图", "按单轮对话执行") + b.profile = o.fetchMemory(ctx, b.uid, b.query) + b.history = o.fetchHistory(ctx, b.sid) + o.runAgent(ctx, t.ID, b, plan.System, tr, "agent") + return b.answer, nil + } + + // 建邻接与入度(只认两端都存在的边)。 + nodeByID := make(map[string]dsl.Node, len(flow.Nodes)) + out := make(map[string][]string) + indeg := make(map[string]int, len(flow.Nodes)) + for _, n := range flow.Nodes { + nodeByID[n.ID] = n + indeg[n.ID] = 0 + } + for _, e := range flow.Edges { + if _, ok := nodeByID[e.Source]; !ok { + continue + } + if _, ok := nodeByID[e.Target]; !ok { + continue + } + out[e.Source] = append(out[e.Source], e.Target) + indeg[e.Target]++ + } + + // 入口节点(入度 0)置 active;执行时沿连线把下游激活,branch 只激活选中分支。 + active := make(map[string]bool) + for _, n := range flow.Nodes { + if indeg[n.ID] == 0 { + active[n.ID] = true + } + } + + // 图里没有 memory 节点 → 沿用旧默认:注入画像+历史(避免回归)。 + hasMemory := false + for _, n := range flow.Nodes { + if n.Kind == "memory" { + hasMemory = true + break + } + } + if !hasMemory { + b.profile = o.fetchMemory(ctx, b.uid, b.query) + b.history = o.fetchHistory(ctx, b.sid) + } + + for _, n := range flow.Topo() { + if !active[n.ID] { + continue // 被 branch 剪掉的下游,不执行 + } + propagate := out[n.ID] // 默认激活全部出边;branch 会改写 + switch n.Kind { + case "input": + if txt := cstr(n.Config, "text"); txt != "" { + b.query = txt + } + tr.info("input:"+n.ID, "system", labelOf(n, "输入"), truncate(b.query, 80)) + case "memory": + if cbool(n.Config, "profile") { + b.profile = o.fetchMemory(ctx, b.uid, b.query) + } + if cbool(n.Config, "history") { + b.history = o.fetchHistory(ctx, b.sid) + } + tr.info("memory:"+n.ID, "memory", labelOf(n, "记忆"), + fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(b.profile)), len(b.history))) + case "retriever": + o.retrieverNode(ctx, n, b, tr) + case "tool": + o.execToolNode(ctx, t.ID, n, b, tr) + case "agent": + o.runAgent(ctx, t.ID, b, firstNonEmpty(cstr(n.Config, "system"), plan.System), tr, "agent:"+n.ID) + case "aggregate": + merged := aggregate(cstr(n.Config, "strategy"), append(append([]string{}, b.refs...), b.toolOut...)) + b.refs, b.toolOut = merged, nil + tr.info("aggregate:"+n.ID, "system", labelOf(n, "汇聚"), "策略:"+firstNonEmpty(cstr(n.Config, "strategy"), "拼接")) + case "render": + o.renderNode(ctx, t.ID, n, b, tr) + case "branch": + propagate = o.branchNode(n, b, out[n.ID], nodeByID, tr) + case "map": + tr.info("map:"+n.ID, "system", labelOf(n, "并行"), "fan-out 暂按串行执行(路线图 Phase 2)") + case "output": + tr.info("output:"+n.ID, "system", labelOf(n, "输出"), "目标:"+firstNonEmpty(cstr(n.Config, "target"), "屏幕")) + default: + tr.info(n.Kind+":"+n.ID, "system", labelOf(n, n.Kind), "未识别节点,跳过") + } + for _, tgt := range propagate { + active[tgt] = true + } + } + + // 图里无 agent 节点(纯工具/检索图)也要出一段模型答复,否则没有输出。 + if b.answer == "" { + o.runAgent(ctx, t.ID, b, plan.System, tr, "agent") + } + return b.answer, nil +} + +// retrieverNode 执行检索节点:kb 按 owner 作用域 → kb_search → 累计参考资料。 +func (o *Orchestrator) retrieverNode(ctx context.Context, n dsl.Node, b *board, tr *execTracer) { + kb := cstr(n.Config, "kb") + scoped := kb + if b.uid != "" && kb != "" && !strings.Contains(kb, "/") { + scoped = b.uid + "/" + kb + } + end := tr.span("retriever:"+n.ID, "tool", labelOf(n, "检索")) + refs := o.retrieve(ctx, scoped, b.query) + if refs != "" { + b.refs = append(b.refs, refs) + } + end(fmt.Sprintf("kb=%s · 命中 %d 段", firstNonEmpty(kb, "(未指定)"), countLines(refs)), nil) +} + +// execToolNode 执行工具节点:调 MCP 工具,产出累计进黑板;失败降级不阻断。 +func (o *Orchestrator) execToolNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { + tool, args := dsl.ToolBinding(n) + if tool == "" { + return + } + node := "tool:" + tool + if o.tools == nil { + tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过") + return + } + call := map[string]any{} + for k, v := range args { + call[k] = v + } + if call["q"] == nil && call["query"] == nil { + call["q"] = b.query + } + if b.uid != "" { + if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") { + call["kb"] = b.uid + "/" + kbv + } + } + end := tr.span(node, "tool", "调用工具 "+tool) + cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) + defer cancel() + res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{Tool: tool, TaskID: taskID, Args: call}) + if err != nil { + end("调用失败,降级跳过", err) + return + } + if res == nil || !res.OK || res.Content == "" { + end("无结果,降级跳过", nil) + return + } + end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil) + b.toolOut = append(b.toolOut, "["+tool+"] "+res.Content) +} + +// runAgent 执行 agent/模型节点:据黑板拼消息 → 流式回流 token → 累计成稿。 +func (o *Orchestrator) runAgent(ctx context.Context, taskID string, b *board, system string, tr *execTracer, node string) { + rc := &RunCtx{ + System: firstNonEmpty(system, defaultAgentSystem), + Query: b.query, + Profile: b.profile, + History: b.history, + ToolOut: append(append([]string{}, b.toolOut...), b.refs...), + } + msgs, _ := buildMessages(ctx, rc) + tr.emit(node, "model", "start", "模型流式推理", "", 0) + t0 := time.Now() + n := 0 + send := func(s string) { + if s == "" { + return + } + _ = o.sink.PublishToken(taskID, []byte(s)) + b.answer += s + n++ + } + var err error + if o.pool.Ready() { + err = o.pool.ChatStream(ctx, toChatMessages(msgs), send) + } else { + err = o.pool.StreamText(ctx, replyFor(msgs), func(tok []byte) { send(string(tok)) }) + } + if err != nil { + tr.emit(node, "model", "error", "模型流式推理", err.Error(), time.Since(t0).Milliseconds()) + return + } + tr.emit(node, "model", "end", "模型流式推理", + fmt.Sprintf("%d tokens / %d 字", n, len([]rune(b.answer))), time.Since(t0).Milliseconds()) +} + +// renderNode 执行渲染节点:把当前成稿渲染成 Word(经 mcp-go report_render)。 +func (o *Orchestrator) renderNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) { + if strings.TrimSpace(b.answer) == "" { + tr.info("render:"+n.ID, "render", labelOf(n, "渲染"), "暂无正文可渲染(render 前需有 agent 产出)") + return + } + format := firstNonEmpty(cstr(n.Config, "format"), "docx") + end := tr.span("render:"+n.ID, "render", labelOf(n, "渲染 "+format)) + title := truncate(b.query, 40) + secs := []reportSection{{Heading: title, Body: b.answer}} + if path := o.renderReport(ctx, taskID, title, secs); path != "" { + end("已落盘:"+path, nil) + o.emit(taskID, "\n\n---\n✅ 已渲染 "+format+" 文档,可在「下载」获取。\n") + } else { + end("渲染服务不可用", fmt.Errorf("render unavailable")) + } +} + +// branchNode 执行分支节点:求值条件,按边序约定 [true, false] 选出要激活的下游。 +// 注:当前 DSL 的边不带 true/false 标签,故以"出边顺序"约定语义(Phase 2 将由前端给边打标)。 +func (o *Orchestrator) branchNode(n dsl.Node, b *board, outs []string, byID map[string]dsl.Node, tr *execTracer) []string { + cond := cstr(n.Config, "condition") + res := evalCondition(cond, b) + chosen := outs + switch { + case len(outs) >= 2: + if res { + chosen = outs[:1] + } else { + chosen = outs[1:2] + } + case len(outs) == 1 && !res: + chosen = nil // 单出边且条件为假 → 不继续 + } + names := make([]string, 0, len(chosen)) + for _, id := range chosen { + names = append(names, labelOf(byID[id], id)) + } + tr.info("branch:"+n.ID, "system", labelOf(n, "分支"), + fmt.Sprintf("条件「%s」→ %v ⇒ 走 [%s](边序约定 [true,false])", + firstNonEmpty(cond, "(空=真)"), res, strings.Join(names, ", "))) + return chosen +} + +// evalCondition 求值 branch 条件。支持: +// +// 空 → true;关键字 refs/tools/answer/profile 作左值(取数量/字数); +// 形如 "a op b"(op: >= <= == != > <)数值比较;其余非空 → 默认真。 +func evalCondition(cond string, b *board) bool { + cond = strings.TrimSpace(cond) + if cond == "" { + return true + } + for _, op := range []string{">=", "<=", "==", "!=", ">", "<"} { + if i := strings.Index(cond, op); i >= 0 { + l := resolveOperand(strings.TrimSpace(cond[:i]), b) + r := resolveOperand(strings.TrimSpace(cond[i+len(op):]), b) + switch op { + case ">": + return l > r + case "<": + return l < r + case ">=": + return l >= r + case "<=": + return l <= r + case "==": + return l == r + case "!=": + return l != r + } + } + } + return true +} + +// resolveOperand 把条件里的左/右值解析为数值(关键字取运行时数量,否则按字面量)。 +func resolveOperand(s string, b *board) float64 { + switch strings.ToLower(s) { + case "refs": + return float64(len(b.refs)) + case "tools": + return float64(len(b.toolOut)) + case "answer": + return float64(len([]rune(b.answer))) + case "profile": + return float64(len([]rune(b.profile))) + } + f, _ := strconv.ParseFloat(s, 64) + return f +} + +// aggregate 按策略合并多段参考资料为一段。 +func aggregate(strategy string, parts []string) []string { + var nonEmpty []string + for _, p := range parts { + if strings.TrimSpace(p) != "" { + nonEmpty = append(nonEmpty, p) + } + } + if len(nonEmpty) == 0 { + return nil + } + switch strategy { + case "去重合并": + seen := map[string]bool{} + var uniq []string + for _, p := range nonEmpty { + if !seen[p] { + seen[p] = true + uniq = append(uniq, p) + } + } + return []string{strings.Join(uniq, "\n---\n")} + case "摘要": + return []string{truncate(strings.Join(nonEmpty, "\n"), 800)} + default: // 拼接 + return []string{strings.Join(nonEmpty, "\n---\n")} + } +} + +// ---- 小工具 ---- + +func meta(t *contract.Task, key string) string { + v, _ := t.Meta[key].(string) + return v +} + +func cstr(cfg map[string]any, key string) string { + if cfg == nil { + return "" + } + v, ok := cfg[key] + if !ok || v == nil { + return "" + } + if s, ok := v.(string); ok { + return strings.TrimSpace(s) + } + return strings.TrimSpace(fmt.Sprint(v)) +} + +func cbool(cfg map[string]any, key string) bool { + if cfg == nil { + return false + } + if v, ok := cfg[key].(bool); ok { + return v + } + return false +} + +func labelOf(n dsl.Node, def string) string { + if strings.TrimSpace(n.Label) != "" { + return n.Label + } + return def +} + +func countLines(s string) int { + s = strings.TrimSpace(s) + if s == "" { + return 0 + } + return strings.Count(s, "\n") + 1 +} diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index 8e6e5a6..1aa2b7a 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -4,11 +4,8 @@ package eino import ( "context" "encoding/json" - "errors" "fmt" - "io" "log" - "strings" "time" "github.com/cloudwego/eino/schema" @@ -61,66 +58,26 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport { return o.handleReport(ctx, t, tr) } - log.Printf("[eino] task %s received (graph=%d bytes), compiling DSL → Eino graph...", t.ID, len(t.Graph)) - tr.info("task", "system", "任务受理", fmt.Sprintf("DSL %d 字节,编译 Eino 图", len(t.Graph))) + log.Printf("[eino] task %s received (graph=%d bytes), 按图执行(拓扑+连线+分支)...", t.ID, len(t.Graph)) + tr.info("task", "system", "任务受理", fmt.Sprintf("DSL %d 字节,按图执行", len(t.Graph))) - endCompile := tr.span("compile", "system", "编译 Eino 图") - run, err := o.compileFlow(ctx, t, tr) + // 按 DSL 图的真实拓扑/连线/分支执行(graph.go 解释器),agent 节点流式回流 token。 + answer, err := o.runGraph(ctx, t, tr) if err != nil { - endCompile("", err) - log.Printf("[eino] task %s compile error: %v", t.ID, err) - _ = o.sink.CompleteStream(t.ID) - o.breaker.Report(false) - return err - } - endCompile("图编译完成", nil) - - stream, err := run.Stream(ctx, t) - if err != nil { - tr.emit("model", "model", "error", "模型推理", err.Error(), 0) log.Printf("[eino] task %s graph error: %v", t.ID, err) _ = o.sink.CompleteStream(t.ID) o.breaker.Report(false) return err } - defer stream.Close() - - n := 0 - var answer strings.Builder - t0 := time.Now() - for { - chunk, rerr := stream.Recv() - if errors.Is(rerr, io.EOF) { - break - } - if rerr != nil { - log.Printf("[eino] task %s stream recv error: %v", t.ID, rerr) - break - } - if chunk == nil || chunk.Content == "" { - continue - } - if n == 0 { - tr.emit("model", "model", "start", "模型流式推理", fmt.Sprintf("首 token %dms", time.Since(t0).Milliseconds()), 0) - } - if perr := o.sink.PublishToken(t.ID, []byte(chunk.Content)); perr != nil { - log.Printf("[eino] publish token failed: %v", perr) - break - } - answer.WriteString(chunk.Content) - n++ - } - tr.emit("model", "model", "end", "模型流式推理", fmt.Sprintf("%d tokens / %d 字", n, len([]rune(answer.String()))), time.Since(t0).Milliseconds()) if cerr := o.sink.CompleteStream(t.ID); cerr != nil { log.Printf("[eino] complete stream failed: %v", cerr) } - log.Printf("[eino] task %s done, %d tokens streamed", t.ID, n) + log.Printf("[eino] task %s done (%d 字答复)", t.ID, len([]rune(answer))) o.breaker.Report(true) - // 写回阶段:流已排空(= 模型生成结束),此处离开热路径、异步落历史 + 抽取记忆。 - // 注:流式节点用 OnEndWithStreamOutput 而非 OnEndFn,故不走回调而在此触发。 - go o.memorize(t, answer.String()) + // 写回阶段:离开热路径、异步落历史 + (TODO)抽取记忆。 + go o.memorize(t, answer) return nil }