Files
sundynix-agentix/sundynix-dispatcher/internal/eino/compile.go
T
Blizzard 71db0e295f feat: compose.NewGraph 全图编译 — 工具节点在 Eino 图里真实执行
dispatcher 按每个任务的 DSL 动态编译 Eino 图:工具/检索节点按拓扑序作为真实图
节点经 NATS 调 MCP,产出注入模型上下文。不再是固定的 recall→prompt→model。

- dsl: 加 Parse(图结构) + (Flow)Topo(Kahn 拓扑序,环退化声明序) + ToolBinding(tool/
  retriever 节点→工具名+参数)
- eino/compile.go: 逐任务 compileFlow —— START→init(身份+记忆召回)→tool_n(真调 MCP,
  失败降级)→prompt(黑板 RunCtx 组装 system+画像+工具产出+历史+输入)→model→END
- eino/orchestrator: 去掉启动期静态图,Handle 内按 DSL 动态编译;删旧 graph.go/state.go
- 工具节点产出作为参考资料注入 system,模型据此作答
- 验证: 全模块 build✓ + e2e PASS; 真实 DeepSeek 双证——回归(input+agent)→'蓝色';
  工具节点(echo 注入事实)→mcp-go 日志证明图里真调 echo→模型据参考资料答'…Milvus…'

注: 分支/并行节点(compose.Branch/fan-out)暂未编译,是更大 TODO。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 16:45:33 +08:00

149 lines
4.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package eino
import (
"context"
"fmt"
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/sundynix/sundynix-dispatcher/internal/dsl"
"github.com/sundynix/sundynix-shared/contract"
)
// RunCtx 是图中流转的"黑板":init 填充,工具节点逐个增补,prompt 节点据此组装消息。
// 用统一类型在节点间流转,规避 Eino 严格类型对齐的麻烦。
type RunCtx struct {
UserID string
SessionID string
System string // Agent 节点系统提示词
Query string // 用户输入
Profile string // 召回的画像
History []*schema.Message // 短期历史
ToolOut []string // 工具节点产出(按执行序)
}
// compileFlow 把一个任务的 DSL 图动态编译为可执行的 Eino 图:
//
// START → init(编译+记忆召回) → tool_0 → tool_1 → … → prompt(组装消息) → model(流式) → END
//
// 工具/检索节点按拓扑序真实调用 MCPsundynix.tools.go.*),结果注入模型上下文。
// 分支/并行节点暂未编译(TODOcompose.Branch / fan-out)。
func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task) (compose.Runnable[*contract.Task, *schema.Message], error) {
plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底
flow, _ := dsl.Parse(t.Graph)
g := compose.NewGraph[*contract.Task, *schema.Message]()
// init:取身份 → 召回画像+历史 → 初始化黑板。
if err := g.AddLambdaNode("init", compose.InvokableLambda(
func(ctx context.Context, task *contract.Task) (*RunCtx, error) {
uid, _ := task.Meta[contract.MetaUserID].(string)
sid, _ := task.Meta[contract.MetaSessionID].(string)
return &RunCtx{
UserID: uid,
SessionID: sid,
System: plan.System,
Query: plan.Query,
Profile: o.fetchMemory(ctx, uid, plan.Query),
History: o.fetchHistory(ctx, sid),
}, nil
})); err != nil {
return nil, err
}
// 按拓扑序为每个工具/检索节点加一个真实执行节点。
prev := "init"
idx := 0
if flow != nil {
for _, n := range flow.Topo() {
tool, args := dsl.ToolBinding(n)
if tool == "" {
continue
}
key := fmt.Sprintf("tool_%d", idx)
idx++
if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args))); err != nil {
return nil, err
}
if err := g.AddEdge(prev, key); err != nil {
return nil, err
}
prev = key
}
}
// prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。
if err := g.AddLambdaNode("prompt", compose.InvokableLambda(buildMessages)); err != nil {
return nil, err
}
if err := g.AddEdge(prev, "prompt"); err != nil {
return nil, err
}
// modelLLM Pool 流式(已配置在线模型则真实推理)。
if err := g.AddChatModelNode("model", newPoolModel(o.pool)); err != nil {
return nil, err
}
if err := g.AddEdge(compose.START, "init"); err != nil {
return nil, err
}
if err := g.AddEdge("prompt", "model"); err != nil {
return nil, err
}
if err := g.AddEdge("model", compose.END); err != nil {
return nil, err
}
return g.Compile(ctx)
}
// makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。
func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any) func(context.Context, *RunCtx) (*RunCtx, error) {
return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) {
if o.tools == nil {
return rc, nil
}
// 未显式带查询词则注入当前用户输入,便于检索类工具。
call := map[string]any{}
for k, v := range args {
call[k] = v
}
if call["q"] == nil && call["query"] == nil {
call["q"] = rc.Query
}
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{
Tool: tool, TaskID: taskID, Args: call,
})
if err != nil || res == nil || !res.OK || res.Content == "" {
return rc, nil // 工具不可用/无结果 → 降级跳过
}
rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content)
return rc, nil
}
}
// buildMessages 把黑板组装为发给模型的消息序列。
func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) {
var sys strings.Builder
sys.WriteString(rc.System)
if rc.Profile != "" {
sys.WriteString("\n\n关于当前用户的已知信息:\n")
sys.WriteString(rc.Profile)
sys.WriteString("\n请据此个性化作答并保持其偏好。")
}
if len(rc.ToolOut) > 0 {
sys.WriteString("\n\n以下是工具/检索得到的参考资料:\n")
sys.WriteString(strings.Join(rc.ToolOut, "\n---\n"))
}
msgs := make([]*schema.Message, 0, len(rc.History)+2)
msgs = append(msgs, schema.SystemMessage(sys.String()))
msgs = append(msgs, rc.History...)
msgs = append(msgs, schema.UserMessage(rc.Query))
return msgs, nil
}