feat(dispatcher): 编排引擎按图执行(拓扑+连线+分支剪枝),弃用线性拍平

旧 compileFlow 把 DSL 图拍平成线性 init→tool…→prompt→model,连线/分支/
memory/aggregate/render 节点全被忽略——"画得出、跑不全"。改为纯 Go 图解释器
(graph.go),按真实拓扑与连线执行,每种节点 kind 有真实行为:

- input     注入用户输入
- memory    按勾选注入画像/历史(无 memory 节点则沿用默认注入,不回归)
- retriever kb 按 owner 作用域 → kb_search 累计参考资料
- tool      调 MCP 工具,产出进黑板,失败降级不阻断
- agent     据黑板拼消息 → pool 流式回流 token,累计成稿
- aggregate 按策略合并参考资料(拼接/去重合并/摘要)
- render    把成稿经 report_render 渲染 docx
- branch    求值条件 + active-set 剪枝下游(边序约定 [true,false])
- map       占位(fan-out 暂串行,路线图 Phase 2)
- output    终端

全程逐节点点亮"运行·观测",token 流与记忆写回保持不变;报告 intent 走原专用
编排不动。compile.go 精简为只留 RunCtx/buildMessages/previewArgs。

实测(gateway+dispatcher+DeepSeek 实跑):
- input→agent→output 真实流式答复 ✓
- branch 条件 2>1 走分支A、1>2 走分支B(下游真被剪枝)✓
- memory 节点按勾选注入;exec 事件按新节点名(agent:a 等)回流 ✓
- 桌面端 Studio 载示例→运行:4节点3连线校验通过,检索节点 mcp-go 不在时
  优雅降级,agent 据空资料如实作答,输出/轨迹面板正常 ✓

路线图 Phase 2:map 真并行 fan-out + aggregate reduce 接上 report 那套;
前端给 branch 的边打 true/false 标签,使条件分支完全精确(当前靠出边顺序约定)。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Blizzard
2026-06-15 11:42:29 +08:00
parent 5d76652bff
commit fd145b5852
4 changed files with 421 additions and 196 deletions
+12 -146
View File
@@ -3,18 +3,13 @@ package eino
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/sundynix/sundynix-dispatcher/internal/dsl"
"github.com/sundynix/sundynix-shared/contract"
)
// RunCtx 是图中流转的"黑板":init 填充,工具节点逐个增补,prompt 节点据此组装消息。
// 用统一类型在节点间流转,规避 Eino 严格类型对齐的麻烦
// RunCtx 是组装模型消息用的上下文:图解释器(graph.go)把黑板汇总到它,
// buildMessages 据此拼出发给模型的消息序列。用统一结构避免散落多处拼装
type RunCtx struct {
UserID string
SessionID string
@@ -22,147 +17,10 @@ type RunCtx struct {
Query string // 用户输入
Profile string // 召回的画像
History []*schema.Message // 短期历史
ToolOut []string // 工具节点产出(按执行序
ToolOut []string // 工具/检索节点产出(含参考资料
}
// compileFlow 把一个任务的 DSL 图动态编译为可执行的 Eino 图:
//
// START → init(编译+记忆召回) → tool_0 → tool_1 → … → prompt(组装消息) → model(流式) → END
//
// 工具/检索节点按拓扑序真实调用 MCPsundynix.tools.go.*),结果注入模型上下文。
// 分支/并行节点暂未编译(TODOcompose.Branch / fan-out)。
func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task, tr *execTracer) (compose.Runnable[*contract.Task, *schema.Message], error) {
plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底
flow, _ := dsl.Parse(t.Graph)
g := compose.NewGraph[*contract.Task, *schema.Message]()
// init:取身份 → 召回画像+历史 → 初始化黑板。
if err := g.AddLambdaNode("init", compose.InvokableLambda(
func(ctx context.Context, task *contract.Task) (*RunCtx, error) {
uid, _ := task.Meta[contract.MetaUserID].(string)
sid, _ := task.Meta[contract.MetaSessionID].(string)
end := tr.span("init", "memory", "召回画像与历史")
profile := o.fetchMemory(ctx, uid, plan.Query)
history := o.fetchHistory(ctx, sid)
end(fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(profile)), len(history)), nil)
return &RunCtx{
UserID: uid,
SessionID: sid,
System: plan.System,
Query: plan.Query,
Profile: profile,
History: history,
}, nil
})); err != nil {
return nil, err
}
// 按拓扑序为每个工具/检索节点加一个真实执行节点。
prev := "init"
idx := 0
if flow != nil {
for _, n := range flow.Topo() {
tool, args := dsl.ToolBinding(n)
if tool == "" {
continue
}
key := fmt.Sprintf("tool_%d", idx)
idx++
uid, _ := t.Meta[contract.MetaUserID].(string)
if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args, tr, uid))); err != nil {
return nil, err
}
if err := g.AddEdge(prev, key); err != nil {
return nil, err
}
prev = key
}
}
// prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。
if err := g.AddLambdaNode("prompt", compose.InvokableLambda(
func(ctx context.Context, rc *RunCtx) ([]*schema.Message, error) {
msgs, err := buildMessages(ctx, rc)
tr.info("prompt", "prompt", "组装提示词", fmt.Sprintf("%d 条消息 · 工具产出 %d 段", len(msgs), len(rc.ToolOut)))
return msgs, err
})); err != nil {
return nil, err
}
if err := g.AddEdge(prev, "prompt"); err != nil {
return nil, err
}
// modelLLM Pool 流式(已配置在线模型则真实推理)。
if err := g.AddChatModelNode("model", newPoolModel(o.pool)); err != nil {
return nil, err
}
if err := g.AddEdge(compose.START, "init"); err != nil {
return nil, err
}
if err := g.AddEdge("prompt", "model"); err != nil {
return nil, err
}
if err := g.AddEdge("model", compose.END); err != nil {
return nil, err
}
return g.Compile(ctx)
}
// makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。
// uid 非空时把检索类工具的 kb 锁进 owner 作用域("uid/kb"),使编排检索命中本人知识库。
func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any, tr *execTracer, uid string) func(context.Context, *RunCtx) (*RunCtx, error) {
node := "tool:" + tool
return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) {
if o.tools == nil {
tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过")
return rc, nil
}
// 未显式带查询词则注入当前用户输入,便于检索类工具。
call := map[string]any{}
for k, v := range args {
call[k] = v
}
if call["q"] == nil && call["query"] == nil {
call["q"] = rc.Query
}
// 检索类工具的 kb 按 owner 作用域,对齐知识库隔离(前端只发库名)。
if uid != "" {
if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") {
call["kb"] = uid + "/" + kbv
}
}
end := tr.span(node, "tool", "调用工具 "+tool)
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{
Tool: tool, TaskID: taskID, Args: call,
})
if err != nil {
end("调用失败,降级跳过", err)
return rc, nil
}
if res == nil || !res.OK || res.Content == "" {
end("无结果,降级跳过", nil)
return rc, nil // 工具不可用/无结果 → 降级跳过
}
end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil)
rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content)
return rc, nil
}
}
// previewArgs 把工具入参压成一行短预览。
func previewArgs(args map[string]any) string {
if data, err := json.Marshal(args); err == nil {
return truncate(string(data), 120)
}
return ""
}
// buildMessages 把黑板组装为发给模型的消息序列。
// buildMessages 把上下文组装为发给模型的消息序列(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。
func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) {
var sys strings.Builder
sys.WriteString(rc.System)
@@ -181,3 +39,11 @@ func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) {
msgs = append(msgs, schema.UserMessage(rc.Query))
return msgs, nil
}
// previewArgs 把工具入参压成一行短预览。
func previewArgs(args map[string]any) string {
if data, err := json.Marshal(args); err == nil {
return truncate(string(data), 120)
}
return ""
}
+401
View File
@@ -0,0 +1,401 @@
package eino
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/cloudwego/eino/schema"
"github.com/sundynix/sundynix-dispatcher/internal/dsl"
"github.com/sundynix/sundynix-shared/contract"
)
// defaultAgentSystem 是 agent 节点未填系统提示词时的兜底。
const defaultAgentSystem = "你是 sundynix-agentix 平台的 AI 助手。"
// board 是图执行的"黑板":节点按拓扑序流转时读写它。
type board struct {
uid, sid string
query string
profile string
history []*schema.Message
refs []string // 检索 / 聚合得到的参考资料
toolOut []string // 工具节点产出
answer string // 终端 agent 的成稿(流式累计)
}
// runGraph 按 DSL 图的真实拓扑与连线执行(替代旧的线性拍平 compileFlow)。
//
// 入度0 入口 → 沿连线传播 active → 每个节点按 kind 执行真实行为 →
// branch 按条件只激活选中的下游(剪枝)→ agent 节点流式回流 token。
//
// 逐节点点亮"运行·观测"。返回终端 agent 的完整产出(供写回历史)。
func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execTracer) (string, error) {
flow, ferr := dsl.Parse(t.Graph)
plan := dsl.Compile(t.Graph)
b := &board{
uid: meta(t, contract.MetaUserID),
sid: meta(t, contract.MetaSessionID),
query: plan.Query,
}
// 无法解析或空图:退化为"无图单轮对话"(注入默认记忆 + 直接出模型)。
if ferr != nil || flow == nil || len(flow.Nodes) == 0 {
tr.info("task", "system", "无结构化图", "按单轮对话执行")
b.profile = o.fetchMemory(ctx, b.uid, b.query)
b.history = o.fetchHistory(ctx, b.sid)
o.runAgent(ctx, t.ID, b, plan.System, tr, "agent")
return b.answer, nil
}
// 建邻接与入度(只认两端都存在的边)。
nodeByID := make(map[string]dsl.Node, len(flow.Nodes))
out := make(map[string][]string)
indeg := make(map[string]int, len(flow.Nodes))
for _, n := range flow.Nodes {
nodeByID[n.ID] = n
indeg[n.ID] = 0
}
for _, e := range flow.Edges {
if _, ok := nodeByID[e.Source]; !ok {
continue
}
if _, ok := nodeByID[e.Target]; !ok {
continue
}
out[e.Source] = append(out[e.Source], e.Target)
indeg[e.Target]++
}
// 入口节点(入度 0)置 active;执行时沿连线把下游激活,branch 只激活选中分支。
active := make(map[string]bool)
for _, n := range flow.Nodes {
if indeg[n.ID] == 0 {
active[n.ID] = true
}
}
// 图里没有 memory 节点 → 沿用旧默认:注入画像+历史(避免回归)。
hasMemory := false
for _, n := range flow.Nodes {
if n.Kind == "memory" {
hasMemory = true
break
}
}
if !hasMemory {
b.profile = o.fetchMemory(ctx, b.uid, b.query)
b.history = o.fetchHistory(ctx, b.sid)
}
for _, n := range flow.Topo() {
if !active[n.ID] {
continue // 被 branch 剪掉的下游,不执行
}
propagate := out[n.ID] // 默认激活全部出边;branch 会改写
switch n.Kind {
case "input":
if txt := cstr(n.Config, "text"); txt != "" {
b.query = txt
}
tr.info("input:"+n.ID, "system", labelOf(n, "输入"), truncate(b.query, 80))
case "memory":
if cbool(n.Config, "profile") {
b.profile = o.fetchMemory(ctx, b.uid, b.query)
}
if cbool(n.Config, "history") {
b.history = o.fetchHistory(ctx, b.sid)
}
tr.info("memory:"+n.ID, "memory", labelOf(n, "记忆"),
fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(b.profile)), len(b.history)))
case "retriever":
o.retrieverNode(ctx, n, b, tr)
case "tool":
o.execToolNode(ctx, t.ID, n, b, tr)
case "agent":
o.runAgent(ctx, t.ID, b, firstNonEmpty(cstr(n.Config, "system"), plan.System), tr, "agent:"+n.ID)
case "aggregate":
merged := aggregate(cstr(n.Config, "strategy"), append(append([]string{}, b.refs...), b.toolOut...))
b.refs, b.toolOut = merged, nil
tr.info("aggregate:"+n.ID, "system", labelOf(n, "汇聚"), "策略:"+firstNonEmpty(cstr(n.Config, "strategy"), "拼接"))
case "render":
o.renderNode(ctx, t.ID, n, b, tr)
case "branch":
propagate = o.branchNode(n, b, out[n.ID], nodeByID, tr)
case "map":
tr.info("map:"+n.ID, "system", labelOf(n, "并行"), "fan-out 暂按串行执行(路线图 Phase 2")
case "output":
tr.info("output:"+n.ID, "system", labelOf(n, "输出"), "目标:"+firstNonEmpty(cstr(n.Config, "target"), "屏幕"))
default:
tr.info(n.Kind+":"+n.ID, "system", labelOf(n, n.Kind), "未识别节点,跳过")
}
for _, tgt := range propagate {
active[tgt] = true
}
}
// 图里无 agent 节点(纯工具/检索图)也要出一段模型答复,否则没有输出。
if b.answer == "" {
o.runAgent(ctx, t.ID, b, plan.System, tr, "agent")
}
return b.answer, nil
}
// retrieverNode 执行检索节点:kb 按 owner 作用域 → kb_search → 累计参考资料。
func (o *Orchestrator) retrieverNode(ctx context.Context, n dsl.Node, b *board, tr *execTracer) {
kb := cstr(n.Config, "kb")
scoped := kb
if b.uid != "" && kb != "" && !strings.Contains(kb, "/") {
scoped = b.uid + "/" + kb
}
end := tr.span("retriever:"+n.ID, "tool", labelOf(n, "检索"))
refs := o.retrieve(ctx, scoped, b.query)
if refs != "" {
b.refs = append(b.refs, refs)
}
end(fmt.Sprintf("kb=%s · 命中 %d 段", firstNonEmpty(kb, "(未指定)"), countLines(refs)), nil)
}
// execToolNode 执行工具节点:调 MCP 工具,产出累计进黑板;失败降级不阻断。
func (o *Orchestrator) execToolNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) {
tool, args := dsl.ToolBinding(n)
if tool == "" {
return
}
node := "tool:" + tool
if o.tools == nil {
tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过")
return
}
call := map[string]any{}
for k, v := range args {
call[k] = v
}
if call["q"] == nil && call["query"] == nil {
call["q"] = b.query
}
if b.uid != "" {
if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") {
call["kb"] = b.uid + "/" + kbv
}
}
end := tr.span(node, "tool", "调用工具 "+tool)
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{Tool: tool, TaskID: taskID, Args: call})
if err != nil {
end("调用失败,降级跳过", err)
return
}
if res == nil || !res.OK || res.Content == "" {
end("无结果,降级跳过", nil)
return
}
end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil)
b.toolOut = append(b.toolOut, "["+tool+"] "+res.Content)
}
// runAgent 执行 agent/模型节点:据黑板拼消息 → 流式回流 token → 累计成稿。
func (o *Orchestrator) runAgent(ctx context.Context, taskID string, b *board, system string, tr *execTracer, node string) {
rc := &RunCtx{
System: firstNonEmpty(system, defaultAgentSystem),
Query: b.query,
Profile: b.profile,
History: b.history,
ToolOut: append(append([]string{}, b.toolOut...), b.refs...),
}
msgs, _ := buildMessages(ctx, rc)
tr.emit(node, "model", "start", "模型流式推理", "", 0)
t0 := time.Now()
n := 0
send := func(s string) {
if s == "" {
return
}
_ = o.sink.PublishToken(taskID, []byte(s))
b.answer += s
n++
}
var err error
if o.pool.Ready() {
err = o.pool.ChatStream(ctx, toChatMessages(msgs), send)
} else {
err = o.pool.StreamText(ctx, replyFor(msgs), func(tok []byte) { send(string(tok)) })
}
if err != nil {
tr.emit(node, "model", "error", "模型流式推理", err.Error(), time.Since(t0).Milliseconds())
return
}
tr.emit(node, "model", "end", "模型流式推理",
fmt.Sprintf("%d tokens / %d 字", n, len([]rune(b.answer))), time.Since(t0).Milliseconds())
}
// renderNode 执行渲染节点:把当前成稿渲染成 Word(经 mcp-go report_render)。
func (o *Orchestrator) renderNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) {
if strings.TrimSpace(b.answer) == "" {
tr.info("render:"+n.ID, "render", labelOf(n, "渲染"), "暂无正文可渲染(render 前需有 agent 产出)")
return
}
format := firstNonEmpty(cstr(n.Config, "format"), "docx")
end := tr.span("render:"+n.ID, "render", labelOf(n, "渲染 "+format))
title := truncate(b.query, 40)
secs := []reportSection{{Heading: title, Body: b.answer}}
if path := o.renderReport(ctx, taskID, title, secs); path != "" {
end("已落盘:"+path, nil)
o.emit(taskID, "\n\n---\n✅ 已渲染 "+format+" 文档,可在「下载」获取。\n")
} else {
end("渲染服务不可用", fmt.Errorf("render unavailable"))
}
}
// branchNode 执行分支节点:求值条件,按边序约定 [true, false] 选出要激活的下游。
// 注:当前 DSL 的边不带 true/false 标签,故以"出边顺序"约定语义(Phase 2 将由前端给边打标)。
func (o *Orchestrator) branchNode(n dsl.Node, b *board, outs []string, byID map[string]dsl.Node, tr *execTracer) []string {
cond := cstr(n.Config, "condition")
res := evalCondition(cond, b)
chosen := outs
switch {
case len(outs) >= 2:
if res {
chosen = outs[:1]
} else {
chosen = outs[1:2]
}
case len(outs) == 1 && !res:
chosen = nil // 单出边且条件为假 → 不继续
}
names := make([]string, 0, len(chosen))
for _, id := range chosen {
names = append(names, labelOf(byID[id], id))
}
tr.info("branch:"+n.ID, "system", labelOf(n, "分支"),
fmt.Sprintf("条件「%s」→ %v ⇒ 走 [%s](边序约定 [true,false]",
firstNonEmpty(cond, "(空=真)"), res, strings.Join(names, ", ")))
return chosen
}
// evalCondition 求值 branch 条件。支持:
//
// 空 → true;关键字 refs/tools/answer/profile 作左值(取数量/字数);
// 形如 "a op b"op: >= <= == != > <)数值比较;其余非空 → 默认真。
func evalCondition(cond string, b *board) bool {
cond = strings.TrimSpace(cond)
if cond == "" {
return true
}
for _, op := range []string{">=", "<=", "==", "!=", ">", "<"} {
if i := strings.Index(cond, op); i >= 0 {
l := resolveOperand(strings.TrimSpace(cond[:i]), b)
r := resolveOperand(strings.TrimSpace(cond[i+len(op):]), b)
switch op {
case ">":
return l > r
case "<":
return l < r
case ">=":
return l >= r
case "<=":
return l <= r
case "==":
return l == r
case "!=":
return l != r
}
}
}
return true
}
// resolveOperand 把条件里的左/右值解析为数值(关键字取运行时数量,否则按字面量)。
func resolveOperand(s string, b *board) float64 {
switch strings.ToLower(s) {
case "refs":
return float64(len(b.refs))
case "tools":
return float64(len(b.toolOut))
case "answer":
return float64(len([]rune(b.answer)))
case "profile":
return float64(len([]rune(b.profile)))
}
f, _ := strconv.ParseFloat(s, 64)
return f
}
// aggregate 按策略合并多段参考资料为一段。
func aggregate(strategy string, parts []string) []string {
var nonEmpty []string
for _, p := range parts {
if strings.TrimSpace(p) != "" {
nonEmpty = append(nonEmpty, p)
}
}
if len(nonEmpty) == 0 {
return nil
}
switch strategy {
case "去重合并":
seen := map[string]bool{}
var uniq []string
for _, p := range nonEmpty {
if !seen[p] {
seen[p] = true
uniq = append(uniq, p)
}
}
return []string{strings.Join(uniq, "\n---\n")}
case "摘要":
return []string{truncate(strings.Join(nonEmpty, "\n"), 800)}
default: // 拼接
return []string{strings.Join(nonEmpty, "\n---\n")}
}
}
// ---- 小工具 ----
func meta(t *contract.Task, key string) string {
v, _ := t.Meta[key].(string)
return v
}
func cstr(cfg map[string]any, key string) string {
if cfg == nil {
return ""
}
v, ok := cfg[key]
if !ok || v == nil {
return ""
}
if s, ok := v.(string); ok {
return strings.TrimSpace(s)
}
return strings.TrimSpace(fmt.Sprint(v))
}
func cbool(cfg map[string]any, key string) bool {
if cfg == nil {
return false
}
if v, ok := cfg[key].(bool); ok {
return v
}
return false
}
func labelOf(n dsl.Node, def string) string {
if strings.TrimSpace(n.Label) != "" {
return n.Label
}
return def
}
func countLines(s string) int {
s = strings.TrimSpace(s)
if s == "" {
return 0
}
return strings.Count(s, "\n") + 1
}
@@ -4,11 +4,8 @@ package eino
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"strings"
"time"
"github.com/cloudwego/eino/schema"
@@ -61,66 +58,26 @@ func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error {
if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport {
return o.handleReport(ctx, t, tr)
}
log.Printf("[eino] task %s received (graph=%d bytes), compiling DSL → Eino graph...", t.ID, len(t.Graph))
tr.info("task", "system", "任务受理", fmt.Sprintf("DSL %d 字节,编译 Eino 图", len(t.Graph)))
log.Printf("[eino] task %s received (graph=%d bytes), 按图执行(拓扑+连线+分支)...", t.ID, len(t.Graph))
tr.info("task", "system", "任务受理", fmt.Sprintf("DSL %d 字节,按图执行", len(t.Graph)))
endCompile := tr.span("compile", "system", "编译 Eino 图")
run, err := o.compileFlow(ctx, t, tr)
// 按 DSL 图的真实拓扑/连线/分支执行(graph.go 解释器),agent 节点流式回流 token。
answer, err := o.runGraph(ctx, t, tr)
if err != nil {
endCompile("", err)
log.Printf("[eino] task %s compile error: %v", t.ID, err)
_ = o.sink.CompleteStream(t.ID)
o.breaker.Report(false)
return err
}
endCompile("图编译完成", nil)
stream, err := run.Stream(ctx, t)
if err != nil {
tr.emit("model", "model", "error", "模型推理", err.Error(), 0)
log.Printf("[eino] task %s graph error: %v", t.ID, err)
_ = o.sink.CompleteStream(t.ID)
o.breaker.Report(false)
return err
}
defer stream.Close()
n := 0
var answer strings.Builder
t0 := time.Now()
for {
chunk, rerr := stream.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
log.Printf("[eino] task %s stream recv error: %v", t.ID, rerr)
break
}
if chunk == nil || chunk.Content == "" {
continue
}
if n == 0 {
tr.emit("model", "model", "start", "模型流式推理", fmt.Sprintf("首 token %dms", time.Since(t0).Milliseconds()), 0)
}
if perr := o.sink.PublishToken(t.ID, []byte(chunk.Content)); perr != nil {
log.Printf("[eino] publish token failed: %v", perr)
break
}
answer.WriteString(chunk.Content)
n++
}
tr.emit("model", "model", "end", "模型流式推理", fmt.Sprintf("%d tokens / %d 字", n, len([]rune(answer.String()))), time.Since(t0).Milliseconds())
if cerr := o.sink.CompleteStream(t.ID); cerr != nil {
log.Printf("[eino] complete stream failed: %v", cerr)
}
log.Printf("[eino] task %s done, %d tokens streamed", t.ID, n)
log.Printf("[eino] task %s done (%d 字答复)", t.ID, len([]rune(answer)))
o.breaker.Report(true)
// 写回阶段:流已排空(= 模型生成结束),此处离开热路径、异步落历史 + 抽取记忆。
// 注:流式节点用 OnEndWithStreamOutput 而非 OnEndFn,故不走回调而在此触发。
go o.memorize(t, answer.String())
// 写回阶段:离开热路径、异步落历史 + TODO抽取记忆。
go o.memorize(t, answer)
return nil
}