Files
sundynix-agentix/sundynix-dispatcher/internal/eino/compile.go
T
Blizzard 337d4d7619 feat(studio): 完善编排 —— 检索接本人知识库 + 真实模型下拉 + 模板/示例
把编排从"演示桩"接到真实平台:检索节点查本人 owner 隔离的知识库,节点下拉用真实数据。

- dispatcher:makeToolNode 用 task user_id 给检索类工具的 kb 加 owner 前缀("uid/kb"),
  编排里的「检索(RAG)」节点真正命中本人知识库(与隔离对齐)。
- 前端 StudioView:加 identity,载入 /kb/list 与 chat 模型作为「检索.kb」「Agent.model」下拉真值;
  Inspector 支持 dynamicOptions(无真值时提示去创建)。
- 编辑体验:示例(一键加载 输入→检索→Agent→输出 可运行图)/ 清空 / 模板名+保存(localStorage,
  含布局)/ 载入下拉;ReactFlow deleteKeyCode 支持 Del/Backspace 删节点。

验证:示例图运行 → gateway 发布任务 → dispatcher 编译 → mcp-go 日志 `tool=wiki_search
args=[kb:wt/default ...]`(kb 已按 owner 作用域)→ 命中本人库 → DeepSeek 流式作答;
底部抽屉 完成 ✓ · 工具调用 1。tsc+vite+dispatcher build 通过;重建 .app。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 16:14:54 +08:00

184 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package eino
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/sundynix/sundynix-dispatcher/internal/dsl"
"github.com/sundynix/sundynix-shared/contract"
)
// RunCtx 是图中流转的"黑板":init 填充,工具节点逐个增补,prompt 节点据此组装消息。
// 用统一类型在节点间流转,规避 Eino 严格类型对齐的麻烦。
type RunCtx struct {
UserID string
SessionID string
System string // Agent 节点系统提示词
Query string // 用户输入
Profile string // 召回的画像
History []*schema.Message // 短期历史
ToolOut []string // 工具节点产出(按执行序)
}
// compileFlow 把一个任务的 DSL 图动态编译为可执行的 Eino 图:
//
// START → init(编译+记忆召回) → tool_0 → tool_1 → … → prompt(组装消息) → model(流式) → END
//
// 工具/检索节点按拓扑序真实调用 MCPsundynix.tools.go.*),结果注入模型上下文。
// 分支/并行节点暂未编译(TODOcompose.Branch / fan-out)。
func (o *Orchestrator) compileFlow(ctx context.Context, t *contract.Task, tr *execTracer) (compose.Runnable[*contract.Task, *schema.Message], error) {
plan := dsl.Compile(t.Graph) // 系统提示词 / 用户输入 / 默认兜底
flow, _ := dsl.Parse(t.Graph)
g := compose.NewGraph[*contract.Task, *schema.Message]()
// init:取身份 → 召回画像+历史 → 初始化黑板。
if err := g.AddLambdaNode("init", compose.InvokableLambda(
func(ctx context.Context, task *contract.Task) (*RunCtx, error) {
uid, _ := task.Meta[contract.MetaUserID].(string)
sid, _ := task.Meta[contract.MetaSessionID].(string)
end := tr.span("init", "memory", "召回画像与历史")
profile := o.fetchMemory(ctx, uid, plan.Query)
history := o.fetchHistory(ctx, sid)
end(fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(profile)), len(history)), nil)
return &RunCtx{
UserID: uid,
SessionID: sid,
System: plan.System,
Query: plan.Query,
Profile: profile,
History: history,
}, nil
})); err != nil {
return nil, err
}
// 按拓扑序为每个工具/检索节点加一个真实执行节点。
prev := "init"
idx := 0
if flow != nil {
for _, n := range flow.Topo() {
tool, args := dsl.ToolBinding(n)
if tool == "" {
continue
}
key := fmt.Sprintf("tool_%d", idx)
idx++
uid, _ := t.Meta[contract.MetaUserID].(string)
if err := g.AddLambdaNode(key, compose.InvokableLambda(o.makeToolNode(t.ID, tool, args, tr, uid))); err != nil {
return nil, err
}
if err := g.AddEdge(prev, key); err != nil {
return nil, err
}
prev = key
}
}
// prompt:黑板 → []*schema.Message(系统提示词 + 画像 + 工具产出 + 历史 + 用户输入)。
if err := g.AddLambdaNode("prompt", compose.InvokableLambda(
func(ctx context.Context, rc *RunCtx) ([]*schema.Message, error) {
msgs, err := buildMessages(ctx, rc)
tr.info("prompt", "prompt", "组装提示词", fmt.Sprintf("%d 条消息 · 工具产出 %d 段", len(msgs), len(rc.ToolOut)))
return msgs, err
})); err != nil {
return nil, err
}
if err := g.AddEdge(prev, "prompt"); err != nil {
return nil, err
}
// modelLLM Pool 流式(已配置在线模型则真实推理)。
if err := g.AddChatModelNode("model", newPoolModel(o.pool)); err != nil {
return nil, err
}
if err := g.AddEdge(compose.START, "init"); err != nil {
return nil, err
}
if err := g.AddEdge("prompt", "model"); err != nil {
return nil, err
}
if err := g.AddEdge("model", compose.END); err != nil {
return nil, err
}
return g.Compile(ctx)
}
// makeToolNode 返回一个真实调用 MCP 工具的图节点:把结果增补进黑板,失败降级不阻断。
// uid 非空时把检索类工具的 kb 锁进 owner 作用域("uid/kb"),使编排检索命中本人知识库。
func (o *Orchestrator) makeToolNode(taskID, tool string, args map[string]any, tr *execTracer, uid string) func(context.Context, *RunCtx) (*RunCtx, error) {
node := "tool:" + tool
return func(ctx context.Context, rc *RunCtx) (*RunCtx, error) {
if o.tools == nil {
tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过")
return rc, nil
}
// 未显式带查询词则注入当前用户输入,便于检索类工具。
call := map[string]any{}
for k, v := range args {
call[k] = v
}
if call["q"] == nil && call["query"] == nil {
call["q"] = rc.Query
}
// 检索类工具的 kb 按 owner 作用域,对齐知识库隔离(前端只发库名)。
if uid != "" {
if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") {
call["kb"] = uid + "/" + kbv
}
}
end := tr.span(node, "tool", "调用工具 "+tool)
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{
Tool: tool, TaskID: taskID, Args: call,
})
if err != nil {
end("调用失败,降级跳过", err)
return rc, nil
}
if res == nil || !res.OK || res.Content == "" {
end("无结果,降级跳过", nil)
return rc, nil // 工具不可用/无结果 → 降级跳过
}
end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil)
rc.ToolOut = append(rc.ToolOut, "["+tool+"] "+res.Content)
return rc, nil
}
}
// previewArgs 把工具入参压成一行短预览。
func previewArgs(args map[string]any) string {
if data, err := json.Marshal(args); err == nil {
return truncate(string(data), 120)
}
return ""
}
// buildMessages 把黑板组装为发给模型的消息序列。
func buildMessages(_ context.Context, rc *RunCtx) ([]*schema.Message, error) {
var sys strings.Builder
sys.WriteString(rc.System)
if rc.Profile != "" {
sys.WriteString("\n\n关于当前用户的已知信息:\n")
sys.WriteString(rc.Profile)
sys.WriteString("\n请据此个性化作答并保持其偏好。")
}
if len(rc.ToolOut) > 0 {
sys.WriteString("\n\n以下是工具/检索得到的参考资料:\n")
sys.WriteString(strings.Join(rc.ToolOut, "\n---\n"))
}
msgs := make([]*schema.Message, 0, len(rc.History)+2)
msgs = append(msgs, schema.SystemMessage(sys.String()))
msgs = append(msgs, rc.History...)
msgs = append(msgs, schema.UserMessage(rc.Query))
return msgs, nil
}