From 71db0e295fea7e4418def4acb3114b67839f4279 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Wed, 10 Jun 2026 16:45:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20compose.NewGraph=20=E5=85=A8=E5=9B=BE?= =?UTF-8?q?=E7=BC=96=E8=AF=91=20=E2=80=94=20=E5=B7=A5=E5=85=B7=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E5=9C=A8=20Eino=20=E5=9B=BE=E9=87=8C=E7=9C=9F?= =?UTF-8?q?=E5=AE=9E=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dispatcher 按每个任务的 DSL 动态编译 Eino 图:工具/检索节点按拓扑序作为真实图 节点经 NATS 调 MCP,产出注入模型上下文。不再是固定的 recall→prompt→model。 - dsl: 加 Parse(图结构) + (Flow)Topo(Kahn 拓扑序,环退化声明序) + ToolBinding(tool/ retriever 节点→工具名+参数) - eino/compile.go: 逐任务 compileFlow —— START→init(身份+记忆召回)→tool_n(真调 MCP, 失败降级)→prompt(黑板 RunCtx 组装 system+画像+工具产出+历史+输入)→model→END - eino/orchestrator: 去掉启动期静态图,Handle 内按 DSL 动态编译;删旧 graph.go/state.go - 工具节点产出作为参考资料注入 system,模型据此作答 - 验证: 全模块 build✓ + e2e PASS; 真实 DeepSeek 双证——回归(input+agent)→'蓝色'; 工具节点(echo 注入事实)→mcp-go 日志证明图里真调 echo→模型据参考资料答'…Milvus…' 注: 分支/并行节点(compose.Branch/fan-out)暂未编译,是更大 TODO。 Co-Authored-By: Claude Opus 4.8 --- sundynix-dispatcher/internal/dsl/compile.go | 83 +++++++++- sundynix-dispatcher/internal/eino/compile.go | 148 ++++++++++++++++++ sundynix-dispatcher/internal/eino/graph.go | 85 ---------- .../internal/eino/orchestrator.go | 29 ++-- sundynix-dispatcher/internal/eino/state.go | 11 -- 5 files changed, 244 insertions(+), 112 deletions(-) create mode 100644 sundynix-dispatcher/internal/eino/compile.go delete mode 100644 sundynix-dispatcher/internal/eino/graph.go delete mode 100644 sundynix-dispatcher/internal/eino/state.go diff --git a/sundynix-dispatcher/internal/dsl/compile.go b/sundynix-dispatcher/internal/dsl/compile.go index d1a5df7..1fcf29a 100644 --- a/sundynix-dispatcher/internal/dsl/compile.go +++ b/sundynix-dispatcher/internal/dsl/compile.go @@ -39,10 +39,69 @@ type Plan struct { const defaultSystem = "你是 sundynix-agentix 平台的 AI 助手。" +// Parse 把 DSL 原文解析为图结构。 +func Parse(graph json.RawMessage) (*Flow, error) { + var f Flow + if err := json.Unmarshal(graph, &f); err != nil { + return nil, err + } + return &f, nil +} + +// Topo 返回节点的拓扑序(Kahn);无边/有环时退化为声明顺序。 +func (f *Flow) Topo() []Node { + byID := make(map[string]Node, len(f.Nodes)) + indeg := make(map[string]int, len(f.Nodes)) + adj := make(map[string][]string) + for _, n := range f.Nodes { + byID[n.ID] = n + indeg[n.ID] = 0 + } + for _, e := range f.Edges { + if _, ok := byID[e.Source]; !ok { + continue + } + if _, ok := byID[e.Target]; !ok { + continue + } + adj[e.Source] = append(adj[e.Source], e.Target) + indeg[e.Target]++ + } + var queue, order []string + for _, n := range f.Nodes { // 按声明序入队,保证确定性 + if indeg[n.ID] == 0 { + queue = append(queue, n.ID) + } + } + for len(queue) > 0 { + id := queue[0] + queue = queue[1:] + order = append(order, id) + for _, t := range adj[id] { + indeg[t]-- + if indeg[t] == 0 { + queue = append(queue, t) + } + } + } + out := make([]Node, 0, len(f.Nodes)) + seen := make(map[string]bool) + for _, id := range order { + out = append(out, byID[id]) + seen[id] = true + } + for _, n := range f.Nodes { // 有环时补齐剩余 + if !seen[n.ID] { + out = append(out, n) + } + } + return out +} + // Compile 解析 DSL 图,抽取对话计划。无法解析时退化为把原文当输入(兼容旧行为)。 func Compile(graph json.RawMessage) Plan { - var f Flow - if err := json.Unmarshal(graph, &f); err != nil || len(f.Nodes) == 0 { + f, err := Parse(graph) + if err != nil || len(f.Nodes) == 0 { return Plan{System: defaultSystem, Query: strings.TrimSpace(string(graph))} } @@ -78,6 +137,26 @@ func Compile(graph json.RawMessage) Plan { return Plan{System: system, Query: query, Tools: tools} } +// ToolBinding 从 tool/retriever 节点抽取要调用的 MCP 工具名与参数。 +// 非工具型节点返回空工具名(由编译器跳过)。 +func ToolBinding(n Node) (tool string, args map[string]any) { + args = map[string]any{} + switch n.Kind { + case "tool": + tool = str(n.Config["tool"]) + if raw := str(n.Config["args"]); raw != "" { + _ = json.Unmarshal([]byte(raw), &args) // 前端 args 为 JSON 字符串 + } + case "retriever": + // 检索节点暂映射到 wiki_search;RAG 接真后改 memory_search / 真实混合检索。 + tool = "wiki_search" + if kb := str(n.Config["kb"]); kb != "" { + args["kb"] = kb + } + } + return tool, args +} + func str(v any) string { if v == nil { return "" diff --git a/sundynix-dispatcher/internal/eino/compile.go b/sundynix-dispatcher/internal/eino/compile.go new file mode 100644 index 0000000..5caed68 --- /dev/null +++ b/sundynix-dispatcher/internal/eino/compile.go @@ -0,0 +1,148 @@ +package eino + +import ( + "context" + "fmt" + "strings" + + "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" + + "github.com/sundynix/sundynix-dispatcher/internal/dsl" + "github.com/sundynix/sundynix-shared/contract" +) + +// RunCtx 是图中流转的"黑板":init 填充,工具节点逐个增补,prompt 节点据此组装消息。 +// 用统一类型在节点间流转,规避 Eino 严格类型对齐的麻烦。 +type RunCtx struct { + UserID string + SessionID string + System string // Agent 节点系统提示词 + Query string // 用户输入 + Profile string // 召回的画像 + History []*schema.Message // 短期历史 + ToolOut []string // 工具节点产出(按执行序) +} + +// compileFlow 把一个任务的 DSL 图动态编译为可执行的 Eino 图: +// +// START → init(编译+记忆召回) → tool_0 → tool_1 → … → prompt(组装消息) → model(流式) → END +// +// 工具/检索节点按拓扑序真实调用 MCP(sundynix.tools.go.*),结果注入模型上下文。 +// 分支/并行节点暂未编译(TODO:compose.Branch / fan-out)。 +func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compose.Runnable[*contract.Task, *schema.Message], error) { + plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底 + flow, _ := dsl.Parse(t.Graph) + + g := compose.NewGraph[*contract.Task, *schema.Message]() + + // init:取身份 → 召回画像+历史 → 初始化黑板。 + if err := g.AddLambdaNode("init", compose.InvokableLambda( + func(ctx context.Context, task *contract.Task) (*RunCtx, error) { + uid, _ := task.Meta[contract.MetaUserID].(string) + sid, _ := task.Meta[contract.MetaSessionID].(string) + return &RunCtx{ + UserID: uid, + SessionID: sid, + System: plan.System, + Query: plan.Query, + Profile: o.fetchMemory(ctx, uid, plan.Query), + History: o.fetchHistory(ctx, sid), + }, nil + })); err != nil { + return nil, err + } + + // 按拓扑序为每个工具/检索节点加一个真实执行节点。 + prev := "init" + idx := 0 + if flow != nil { + for _, n := range flow.Topo() { + tool, args := dsl.ToolBinding(n) + if tool == "" { + continue + } + key := fmt.Sprintf("tool_%d", idx) + idx++ + if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args))); err != nil { + return nil, err + } + if err := g.AddEdge(prev, key); err != nil { + return nil, err + } + prev = key + } + } + + // prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。 + if err := g.AddLambdaNode("prompt", compose.InvokableLambda(buildMessages)); err != nil { + return nil, err + } + if err := g.AddEdge(prev, "prompt"); err != nil { + return nil, err + } + + // model:LLM Pool 流式(已配置在线模型则真实推理)。 + if err := g.AddChatModelNode("model", newPoolModel(o.pool)); err != nil { + return nil, err + } + + if err := g.AddEdge(compose.START, "init"); err != nil { + return nil, err + } + if err := g.AddEdge("prompt", "model"); err != nil { + return nil, err + } + if err := g.AddEdge("model", compose.END); err != nil { + return nil, err + } + + return g.Compile(ctx) +} + +// makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。 +func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any) func(context.Context, *RunCtx) (*RunCtx, error) { + return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) { + if o.tools == nil { + return rc, nil + } + // 未显式带查询词则注入当前用户输入,便于检索类工具。 + call := map[string]any{} + for k, v := range args { + call[k] = v + } + if call["q"] == nil && call["query"] == nil { + call["q"] = rc.Query + } + cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) + defer cancel() + res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{ + Tool: tool, TaskID: taskID, Args: call, + }) + if err != nil || res == nil || !res.OK || res.Content == "" { + return rc, nil // 工具不可用/无结果 → 降级跳过 + } + rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content) + return rc, nil + } +} + +// buildMessages 把黑板组装为发给模型的消息序列。 +func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) { + var sys strings.Builder + sys.WriteString(rc.System) + if rc.Profile != "" { + sys.WriteString("\n\n关于当前用户的已知信息:\n") + sys.WriteString(rc.Profile) + sys.WriteString("\n请据此个性化作答并保持其偏好。") + } + if len(rc.ToolOut) > 0 { + sys.WriteString("\n\n以下是工具/检索得到的参考资料:\n") + sys.WriteString(strings.Join(rc.ToolOut, "\n---\n")) + } + msgs := make([]*schema.Message, 0, len(rc.History)+2) + msgs = append(msgs, schema.SystemMessage(sys.String())) + msgs = append(msgs, rc.History...) + msgs = append(msgs, schema.UserMessage(rc.Query)) + return msgs, nil +} diff --git a/sundynix-dispatcher/internal/eino/graph.go b/sundynix-dispatcher/internal/eino/graph.go deleted file mode 100644 index 5450fb2..0000000 --- a/sundynix-dispatcher/internal/eino/graph.go +++ /dev/null @@ -1,85 +0,0 @@ -package eino - -import ( - "context" - - "github.com/cloudwego/eino/components/prompt" - "github.com/cloudwego/eino/compose" - "github.com/cloudwego/eino/schema" - - "github.com/sundynix/sundynix-dispatcher/internal/dsl" - "github.com/sundynix/sundynix-dispatcher/internal/llm" - "github.com/sundynix/sundynix-shared/contract" -) - -// memoryFetcher 召回某用户与本次输入相关的偏好记忆(经 MCP memory_get 工具)。 -type memoryFetcher func(ctx context.Context, userID, query string) string - -// historyFetcher 召回某会话的短期多轮历史(经 MCP history_get 工具)。 -type historyFetcher func(ctx context.Context, sessionID string) []*schema.Message - -// buildGraph 编译这套"记忆增强"图: -// -// START → recall(召回画像+历史→写State) → prompt(注入system+history) → model(流式) → END -// -// 返回可流式执行的 Runnable。 -func buildGraph(ctx context.Context, pool *llm.Pool, fetch memoryFetcher, fetchHist historyFetcher) (compose.Runnable[*contract.Task, *schema.Message], error) { - g := compose.NewGraph[*contract.Task, *schema.Message]( - compose.WithGenLocalState(func(context.Context) *AgentState { return &AgentState{} }), - ) - - // 1) recall:编译 DSL → 取系统提示词/用户输入 → 召回画像+历史 → 写 State,输出模板变量。 - if err := g.AddLambdaNode("recall", compose.InvokableLambda( - func(ctx context.Context, t *contract.Task) (map[string]any, error) { - uid, _ := t.Meta[contract.MetaUserID].(string) - sid, _ := t.Meta[contract.MetaSessionID].(string) - plan := dsl.Compile(t.Graph) // DSL→对话编译:抽取 system / query / tools - profile := fetch(ctx, uid, plan.Query) - hist := fetchHist(ctx, sid) - _ = compose.ProcessState(ctx, func(_ context.Context, s *AgentState) error { - s.UserID, s.SessionID, s.Profile, s.Input = uid, sid, profile, plan.Query - return nil - }) - if profile == "" { - profile = "(暂无该用户的偏好记忆)" - } - return map[string]any{ - "system": plan.System, - "profile": profile, - "query": plan.Query, - "history": hist, - }, nil - })); err != nil { - return nil, err - } - - // 2) prompt:Agent 节点系统提示词 + 画像注入 system,历史用占位符,用户输入作为 user message。 - tpl := prompt.FromMessages(schema.FString, - schema.SystemMessage("{system}\n\n关于当前用户的已知信息:\n{profile}\n请据此个性化作答并保持其偏好。"), - schema.MessagesPlaceholder("history", true), - schema.UserMessage("{query}"), - ) - if err := g.AddChatTemplateNode("prompt", tpl); err != nil { - return nil, err - } - - // 3) model:LLM Pool 适配为 ChatModel 节点,流式产出。 - if err := g.AddChatModelNode("model", newPoolModel(pool)); err != nil { - return nil, err - } - - if err := g.AddEdge(compose.START, "recall"); err != nil { - return nil, err - } - if err := g.AddEdge("recall", "prompt"); err != nil { - return nil, err - } - if err := g.AddEdge("prompt", "model"); err != nil { - return nil, err - } - if err := g.AddEdge("model", compose.END); err != nil { - return nil, err - } - - return g.Compile(ctx) -} diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index 8434f42..a90c9bd 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" "github.com/sundynix/sundynix-dispatcher/internal/dsl" @@ -33,34 +32,36 @@ type ToolCaller interface { // 工具调用超时;超时即降级(不带工具上下文继续推理)。 const toolCallTimeout = 3 * time.Second -// Orchestrator 把 DSL 任务交给编译好的 Eino 图执行(记忆召回 → 注入 → 流式)。 +// Orchestrator 把每个 DSL 任务动态编译为 Eino 图并执行(记忆召回 → 工具节点 → 注入 → 流式)。 type Orchestrator struct { + pool *llm.Pool breaker *harness.CircuitBreaker sink TokenSink tools ToolCaller - run compose.Runnable[*contract.Task, *schema.Message] } -// NewOrchestrator 构建并编译记忆增强图。 +// NewOrchestrator 持有依赖;图按任务的 DSL 在 Handle 内动态编译。 func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) (*Orchestrator, error) { - o := &Orchestrator{breaker: breaker, sink: sink, tools: tools} - run, err := buildGraph(context.Background(), pool, o.fetchMemory, o.fetchHistory) - if err != nil { - return nil, err - } - o.run = run - return o, nil + return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools}, nil } -// Handle 消费一个任务:执行 Eino 图,把 Token 流回流到 sundynix.streams.。 +// Handle 消费一个任务:按 DSL 编译 Eino 图并执行,把 Token 流回流到 sundynix.streams.。 func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { if !o.breaker.Allow() { log.Printf("[eino] circuit open, drop task %s", t.ID) return nil } - log.Printf("[eino] task %s received (graph=%d bytes), running graph...", t.ID, len(t.Graph)) + log.Printf("[eino] task %s received (graph=%d bytes), compiling DSL → Eino graph...", t.ID, len(t.Graph)) - stream, err := o.run.Stream(ctx, t) + run, err := o.compileFlow(ctx, t) + if err != nil { + log.Printf("[eino] task %s compile error: %v", t.ID, err) + _ = o.sink.CompleteStream(t.ID) + o.breaker.Report(false) + return err + } + + stream, err := run.Stream(ctx, t) if err != nil { log.Printf("[eino] task %s graph error: %v", t.ID, err) _ = o.sink.CompleteStream(t.ID) diff --git a/sundynix-dispatcher/internal/eino/state.go b/sundynix-dispatcher/internal/eino/state.go deleted file mode 100644 index 70486d1..0000000 --- a/sundynix-dispatcher/internal/eino/state.go +++ /dev/null @@ -1,11 +0,0 @@ -package eino - -// AgentState 是 Eino 图的全局状态,贯穿 recall→prompt→model 各节点。 -// 偏好记忆经 recall 节点写入,供模板注入与写回抽取使用。 -type AgentState struct { - UserID string // 来自 Task.Meta["user_id"] - SessionID string // 来自 Task.Meta["session_id"] - Profile string // 召回到的常驻画像(always-on 偏好记忆) - Input string // 本次输入(DSL 原文) - Answer string // 累积输出,供写回阶段抽取新记忆 -}