package eino import ( "context" "encoding/json" "fmt" "strings" "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" "github.com/sundynix/sundynix-dispatcher/internal/dsl" "github.com/sundynix/sundynix-shared/contract" ) // RunCtx 是图中流转的"黑板":init 填充,工具节点逐个增补,prompt 节点据此组装消息。 // 用统一类型在节点间流转,规避 Eino 严格类型对齐的麻烦。 type RunCtx struct { UserID string SessionID string System string // Agent 节点系统提示词 Query string // 用户输入 Profile string // 召回的画像 History []*schema.Message // 短期历史 ToolOut []string // 工具节点产出(按执行序) } // compileFlow 把一个任务的 DSL 图动态编译为可执行的 Eino 图: // // START → init(编译+记忆召回) → tool_0 → tool_1 → … → prompt(组装消息) → model(流式) → END // // 工具/检索节点按拓扑序真实调用 MCP(sundynix.tools.go.*),结果注入模型上下文。 // 分支/并行节点暂未编译(TODO:compose.Branch / fan-out)。 func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task, tr *execTracer) (compose.Runnable[*contract.Task, *schema.Message], error) { plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底 flow, _ := dsl.Parse(t.Graph) g := compose.NewGraph[*contract.Task, *schema.Message]() // init:取身份 → 召回画像+历史 → 初始化黑板。 if err := g.AddLambdaNode("init", compose.InvokableLambda( func(ctx context.Context, task *contract.Task) (*RunCtx, error) { uid, _ := task.Meta[contract.MetaUserID].(string) sid, _ := task.Meta[contract.MetaSessionID].(string) end := tr.span("init", "memory", "召回画像与历史") profile := o.fetchMemory(ctx, uid, plan.Query) history := o.fetchHistory(ctx, sid) end(fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(profile)), len(history)), nil) return &RunCtx{ UserID: uid, SessionID: sid, System: plan.System, Query: plan.Query, Profile: profile, History: history, }, nil })); err != nil { return nil, err } // 按拓扑序为每个工具/检索节点加一个真实执行节点。 prev := "init" idx := 0 if flow != nil { for _, n := range flow.Topo() { tool, args := dsl.ToolBinding(n) if tool == "" { continue } key := fmt.Sprintf("tool_%d", idx) idx++ if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args, tr))); err != nil { return nil, err } if err := g.AddEdge(prev, key); err != nil { return nil, err } prev = key } } // prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。 if err := g.AddLambdaNode("prompt", compose.InvokableLambda( func(ctx context.Context, rc *RunCtx) ([]*schema.Message, error) { msgs, err := buildMessages(ctx, rc) tr.info("prompt", "prompt", "组装提示词", fmt.Sprintf("%d 条消息 · 工具产出 %d 段", len(msgs), len(rc.ToolOut))) return msgs, err })); err != nil { return nil, err } if err := g.AddEdge(prev, "prompt"); err != nil { return nil, err } // model:LLM Pool 流式(已配置在线模型则真实推理)。 if err := g.AddChatModelNode("model", newPoolModel(o.pool)); err != nil { return nil, err } if err := g.AddEdge(compose.START, "init"); err != nil { return nil, err } if err := g.AddEdge("prompt", "model"); err != nil { return nil, err } if err := g.AddEdge("model", compose.END); err != nil { return nil, err } return g.Compile(ctx) } // makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。 func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any, tr *execTracer) func(context.Context, *RunCtx) (*RunCtx, error) { node := "tool:" + tool return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) { if o.tools == nil { tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过") return rc, nil } // 未显式带查询词则注入当前用户输入,便于检索类工具。 call := map[string]any{} for k, v := range args { call[k] = v } if call["q"] == nil && call["query"] == nil { call["q"] = rc.Query } end := tr.span(node, "tool", "调用工具 "+tool) cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) defer cancel() res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{ Tool: tool, TaskID: taskID, Args: call, }) if err != nil { end("调用失败,降级跳过", err) return rc, nil } if res == nil || !res.OK || res.Content == "" { end("无结果,降级跳过", nil) return rc, nil // 工具不可用/无结果 → 降级跳过 } end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil) rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content) return rc, nil } } // previewArgs 把工具入参压成一行短预览。 func previewArgs(args map[string]any) string { if data, err := json.Marshal(args); err == nil { return truncate(string(data), 120) } return "" } // buildMessages 把黑板组装为发给模型的消息序列。 func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) { var sys strings.Builder sys.WriteString(rc.System) if rc.Profile != "" { sys.WriteString("\n\n关于当前用户的已知信息:\n") sys.WriteString(rc.Profile) sys.WriteString("\n请据此个性化作答并保持其偏好。") } if len(rc.ToolOut) > 0 { sys.WriteString("\n\n以下是工具/检索得到的参考资料:\n") sys.WriteString(strings.Join(rc.ToolOut, "\n---\n")) } msgs := make([]*schema.Message, 0, len(rc.History)+2) msgs = append(msgs, schema.SystemMessage(sys.String())) msgs = append(msgs, rc.History...) msgs = append(msgs, schema.UserMessage(rc.Query)) return msgs, nil }