Files
sundynix-agentix/sundynix-dispatcher/internal/eino/graph.go
T
Blizzard b7360439ab feat(dispatcher): 输出护栏 —— 发射层脱敏疑似密钥/令牌
补齐 Harness 输出侧:harness.RedactSecrets 识别并脱敏 sk-/AKIA/JWT/Bearer 等
疑似密钥令牌(纯逻辑 + 单测)。runAgent 在每个 token 分片发射前调用(流式无法回收
已发,故逐片脱敏),脱敏会累计进 b.answer(写回历史也是脱敏版);有命中则在
运行·观测打一条'已脱敏 N 处'轨迹。

注:跨分片的密钥可能漏(流式现实),逐片为最佳努力;生产可加滑窗缓冲增强。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 11:52:53 +08:00

473 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package eino
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/cloudwego/eino/schema"
"github.com/sundynix/sundynix-dispatcher/internal/dsl"
"github.com/sundynix/sundynix-dispatcher/internal/harness"
"github.com/sundynix/sundynix-shared/contract"
)
// defaultAgentSystem 是 agent 节点未填系统提示词时的兜底。
const defaultAgentSystem = "你是 sundynix-agentix 平台的 AI 助手。"
// board 是图执行的"黑板":节点按拓扑序流转时读写它。
type board struct {
uid, sid string
query string
profile string
history []*schema.Message
kb string // 最近一个检索节点的 owner 作用域库名(供 map 并行各项检索)
refs []string // 检索 / 聚合得到的参考资料
toolOut []string // 工具节点产出
sections []reportSection // map 并行 fan-out 产出的分项成稿(供 render 多章渲染)
answer string // 终端 agent / map 的成稿(流式累计)
}
// runGraph 按 DSL 图的真实拓扑与连线执行(替代旧的线性拍平 compileFlow)。
//
// 入度0 入口 → 沿连线传播 active → 每个节点按 kind 执行真实行为 →
// branch 按条件只激活选中的下游(剪枝)→ agent 节点流式回流 token。
//
// 逐节点点亮"运行·观测"。返回终端 agent 的完整产出(供写回历史)。
func (o *Orchestrator) runGraph(ctx context.Context, t *contract.Task, tr *execTracer) (string, error) {
flow, ferr := dsl.Parse(t.Graph)
plan := dsl.Compile(t.Graph)
b := &board{
uid: meta(t, contract.MetaUserID),
sid: meta(t, contract.MetaSessionID),
query: plan.Query,
}
// 无法解析或空图:退化为"无图单轮对话"(注入默认记忆 + 直接出模型)。
if ferr != nil || flow == nil || len(flow.Nodes) == 0 {
tr.info("task", "system", "无结构化图", "按单轮对话执行")
b.profile = o.fetchMemory(ctx, b.uid, b.query)
b.history = o.fetchHistory(ctx, b.sid)
o.runAgent(ctx, t.ID, b, plan.System, tr, "agent")
return b.answer, nil
}
// 建邻接与入度(只认两端都存在的边)。保留整条边以便 branch 按 true/false 标签选路。
nodeByID := make(map[string]dsl.Node, len(flow.Nodes))
outE := make(map[string][]dsl.Edge)
indeg := make(map[string]int, len(flow.Nodes))
for _, n := range flow.Nodes {
nodeByID[n.ID] = n
indeg[n.ID] = 0
}
for _, e := range flow.Edges {
if _, ok := nodeByID[e.Source]; !ok {
continue
}
if _, ok := nodeByID[e.Target]; !ok {
continue
}
outE[e.Source] = append(outE[e.Source], e)
indeg[e.Target]++
}
// 入口节点(入度 0)置 active;执行时沿连线把下游激活,branch 只激活选中分支。
active := make(map[string]bool)
for _, n := range flow.Nodes {
if indeg[n.ID] == 0 {
active[n.ID] = true
}
}
// 图里没有 memory 节点 → 沿用旧默认:注入画像+历史(避免回归)。
hasMemory := false
for _, n := range flow.Nodes {
if n.Kind == "memory" {
hasMemory = true
break
}
}
if !hasMemory {
b.profile = o.fetchMemory(ctx, b.uid, b.query)
b.history = o.fetchHistory(ctx, b.sid)
}
for _, n := range flow.Topo() {
if !active[n.ID] {
continue // 被 branch 剪掉的下游,不执行
}
propagate := targetsOf(outE[n.ID]) // 默认激活全部出边;branch 会改写
switch n.Kind {
case "input":
if txt := cstr(n.Config, "text"); txt != "" {
b.query = txt
}
tr.info("input:"+n.ID, "system", labelOf(n, "输入"), truncate(b.query, 80))
case "memory":
if cbool(n.Config, "profile") {
b.profile = o.fetchMemory(ctx, b.uid, b.query)
}
if cbool(n.Config, "history") {
b.history = o.fetchHistory(ctx, b.sid)
}
tr.info("memory:"+n.ID, "memory", labelOf(n, "记忆"),
fmt.Sprintf("画像 %d 字 · 历史 %d 条", len([]rune(b.profile)), len(b.history)))
case "retriever":
o.retrieverNode(ctx, n, b, tr)
case "tool":
o.execToolNode(ctx, t.ID, n, b, tr)
case "agent":
o.runAgent(ctx, t.ID, b, firstNonEmpty(cstr(n.Config, "system"), plan.System), tr, "agent:"+n.ID)
case "aggregate":
merged := aggregate(cstr(n.Config, "strategy"), append(append([]string{}, b.refs...), b.toolOut...))
b.refs, b.toolOut = merged, nil
tr.info("aggregate:"+n.ID, "system", labelOf(n, "汇聚"), "策略:"+firstNonEmpty(cstr(n.Config, "strategy"), "拼接"))
case "render":
o.renderNode(ctx, t.ID, n, b, tr)
case "branch":
propagate = o.branchNode(n, b, outE[n.ID], nodeByID, tr)
case "map":
o.mapNode(ctx, t.ID, n, b, tr)
case "output":
tr.info("output:"+n.ID, "system", labelOf(n, "输出"), "目标:"+firstNonEmpty(cstr(n.Config, "target"), "屏幕"))
default:
tr.info(n.Kind+":"+n.ID, "system", labelOf(n, n.Kind), "未识别节点,跳过")
}
for _, tgt := range propagate {
active[tgt] = true
}
}
// 图里无 agent 节点(纯工具/检索图)也要出一段模型答复,否则没有输出。
if b.answer == "" {
o.runAgent(ctx, t.ID, b, plan.System, tr, "agent")
}
return b.answer, nil
}
// retrieverNode 执行检索节点:kb 按 owner 作用域 → kb_search → 累计参考资料。
func (o *Orchestrator) retrieverNode(ctx context.Context, n dsl.Node, b *board, tr *execTracer) {
kb := cstr(n.Config, "kb")
scoped := kb
if b.uid != "" && kb != "" && !strings.Contains(kb, "/") {
scoped = b.uid + "/" + kb
}
if scoped != "" {
b.kb = scoped // 记下作用域库名,供后续 map 并行各项检索复用
}
end := tr.span("retriever:"+n.ID, "tool", labelOf(n, "检索"))
refs := o.retrieve(ctx, scoped, b.query)
if refs != "" {
b.refs = append(b.refs, refs)
}
end(fmt.Sprintf("kb=%s · 命中 %d 段", firstNonEmpty(kb, "(未指定)"), countLines(refs)), nil)
}
// mapNode 执行并行 fan-out:把主题拆成若干项,各项有界并发撰写(复用 report 的 writeSections),
// 成稿存入黑板(结构化 sections + 拼进 answer),并流式呈现进度。
func (o *Orchestrator) mapNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) {
end := tr.span("map:"+n.ID, "plan", labelOf(n, "并行 fan-out"))
items := o.planItems(ctx, b.query, cstr(n.Config, "splitBy"))
end(fmt.Sprintf("拆出 %d 项:%s", len(items), strings.Join(items, " / ")), nil)
o.emit(taskID, fmt.Sprintf("\n> 并行处理 %d 项…\n\n", len(items)))
secs := o.writeSections(ctx, b.query, b.kb, items, tr) // 有界并发,trace 出 section:i 各项
b.sections = secs
for _, s := range secs {
chunk := "## " + s.Heading + "\n\n" + s.Body + "\n\n"
o.emit(taskID, chunk)
b.answer += chunk
b.refs = append(b.refs, s.Heading+""+s.Body)
}
}
// execToolNode 执行工具节点:调 MCP 工具,产出累计进黑板;失败降级不阻断。
func (o *Orchestrator) execToolNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) {
tool, args := dsl.ToolBinding(n)
if tool == "" {
return
}
node := "tool:" + tool
if o.tools == nil {
tr.info(node, "tool", "工具 "+tool, "工具总线未接入,跳过")
return
}
call := map[string]any{}
for k, v := range args {
call[k] = v
}
if call["q"] == nil && call["query"] == nil {
call["q"] = b.query
}
if b.uid != "" {
if kbv, ok := call["kb"].(string); ok && kbv != "" && !strings.Contains(kbv, "/") {
call["kb"] = b.uid + "/" + kbv
}
}
end := tr.span(node, "tool", "调用工具 "+tool)
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo(tool), &contract.ToolCall{Tool: tool, TaskID: taskID, Args: call})
if err != nil {
end("调用失败,降级跳过", err)
return
}
if res == nil || !res.OK || res.Content == "" {
end("无结果,降级跳过", nil)
return
}
end("入参 "+previewArgs(call)+" → 产出 "+truncate(res.Content, 160), nil)
b.toolOut = append(b.toolOut, "["+tool+"] "+res.Content)
}
// runAgent 执行 agent/模型节点:据黑板拼消息 → 流式回流 token → 累计成稿。
func (o *Orchestrator) runAgent(ctx context.Context, taskID string, b *board, system string, tr *execTracer, node string) {
rc := &RunCtx{
System: firstNonEmpty(system, defaultAgentSystem),
Query: b.query,
Profile: b.profile,
History: b.history,
ToolOut: append(append([]string{}, b.toolOut...), b.refs...),
}
msgs, _ := buildMessages(ctx, rc)
tr.emit(node, "model", "start", "模型流式推理", "", 0)
t0 := time.Now()
n, redacted := 0, 0
send := func(s string) {
if s == "" {
return
}
// 输出护栏:发射前逐片脱敏疑似密钥/令牌(流式无法回收已发,故逐片处理)。
safe, hit := harness.RedactSecrets(s)
redacted += hit
_ = o.sink.PublishToken(taskID, []byte(safe))
b.answer += safe
n++
}
var err error
if o.pool.Ready() {
err = o.pool.ChatStream(ctx, toChatMessages(msgs), send)
} else {
err = o.pool.StreamText(ctx, replyFor(msgs), func(tok []byte) { send(string(tok)) })
}
if err != nil {
tr.emit(node, "model", "error", "模型流式推理", err.Error(), time.Since(t0).Milliseconds())
return
}
if redacted > 0 {
tr.info(node, "system", "输出护栏", fmt.Sprintf("已脱敏 %d 处疑似密钥/令牌", redacted))
}
tr.emit(node, "model", "end", "模型流式推理",
fmt.Sprintf("%d tokens / %d 字", n, len([]rune(b.answer))), time.Since(t0).Milliseconds())
}
// renderNode 执行渲染节点:把当前成稿渲染成 Word(经 mcp-go report_render)。
func (o *Orchestrator) renderNode(ctx context.Context, taskID string, n dsl.Node, b *board, tr *execTracer) {
if strings.TrimSpace(b.answer) == "" {
tr.info("render:"+n.ID, "render", labelOf(n, "渲染"), "暂无正文可渲染(render 前需有 agent 产出)")
return
}
format := firstNonEmpty(cstr(n.Config, "format"), "docx")
end := tr.span("render:"+n.ID, "render", labelOf(n, "渲染 "+format))
title := truncate(b.query, 40)
secs := b.sections // map 产出的多章优先;否则把整段成稿当单章
if len(secs) == 0 {
secs = []reportSection{{Heading: title, Body: b.answer}}
}
if path := o.renderReport(ctx, taskID, title, secs); path != "" {
end("已落盘:"+path, nil)
o.emit(taskID, "\n\n---\n✅ 已渲染 "+format+" 文档,可在「下载」获取。\n")
} else {
end("渲染服务不可用", fmt.Errorf("render unavailable"))
}
}
// branchNode 执行分支节点:求值条件,按出边的 true/false 标签选出要激活的下游。
// 边带 sourceHandle("true"/"false") 时按标签精确选路(前端 Phase 2 给 branch 的边打标);
// 无标签的旧图退回"出边顺序 [true,false]"约定,保持向后兼容。
func (o *Orchestrator) branchNode(n dsl.Node, b *board, outs []dsl.Edge, byID map[string]dsl.Node, tr *execTracer) []string {
cond := cstr(n.Config, "condition")
res := evalCondition(cond, b)
var truthy, falsy []string
labeled := false
for _, e := range outs {
switch e.SourceHandle {
case "true":
truthy = append(truthy, e.Target)
labeled = true
case "false":
falsy = append(falsy, e.Target)
labeled = true
}
}
var chosen []string
mode := "边标签"
if labeled {
if res {
chosen = truthy
} else {
chosen = falsy
}
} else {
mode = "边序约定[true,false]"
targets := targetsOf(outs)
switch {
case len(targets) >= 2:
if res {
chosen = targets[:1]
} else {
chosen = targets[1:2]
}
case len(targets) == 1 && !res:
chosen = nil
default:
chosen = targets
}
}
names := make([]string, 0, len(chosen))
for _, id := range chosen {
names = append(names, labelOf(byID[id], id))
}
tr.info("branch:"+n.ID, "system", labelOf(n, "分支"),
fmt.Sprintf("条件「%s」→ %v ⇒ 走 [%s]%s",
firstNonEmpty(cond, "(空=真)"), res, strings.Join(names, ", "), mode))
return chosen
}
// targetsOf 取一组边的目标节点 ID(保持顺序)。
func targetsOf(edges []dsl.Edge) []string {
out := make([]string, 0, len(edges))
for _, e := range edges {
out = append(out, e.Target)
}
return out
}
// evalCondition 求值 branch 条件。支持:
//
// 空 → true;关键字 refs/tools/answer/profile 作左值(取数量/字数);
// 形如 "a op b"op: >= <= == != > <)数值比较;其余非空 → 默认真。
func evalCondition(cond string, b *board) bool {
cond = strings.TrimSpace(cond)
if cond == "" {
return true
}
for _, op := range []string{">=", "<=", "==", "!=", ">", "<"} {
if i := strings.Index(cond, op); i >= 0 {
l := resolveOperand(strings.TrimSpace(cond[:i]), b)
r := resolveOperand(strings.TrimSpace(cond[i+len(op):]), b)
switch op {
case ">":
return l > r
case "<":
return l < r
case ">=":
return l >= r
case "<=":
return l <= r
case "==":
return l == r
case "!=":
return l != r
}
}
}
return true
}
// resolveOperand 把条件里的左/右值解析为数值(关键字取运行时数量,否则按字面量)。
func resolveOperand(s string, b *board) float64 {
switch strings.ToLower(s) {
case "refs":
return float64(len(b.refs))
case "tools":
return float64(len(b.toolOut))
case "answer":
return float64(len([]rune(b.answer)))
case "profile":
return float64(len([]rune(b.profile)))
}
f, _ := strconv.ParseFloat(s, 64)
return f
}
// aggregate 按策略合并多段参考资料为一段。
func aggregate(strategy string, parts []string) []string {
var nonEmpty []string
for _, p := range parts {
if strings.TrimSpace(p) != "" {
nonEmpty = append(nonEmpty, p)
}
}
if len(nonEmpty) == 0 {
return nil
}
switch strategy {
case "去重合并":
seen := map[string]bool{}
var uniq []string
for _, p := range nonEmpty {
if !seen[p] {
seen[p] = true
uniq = append(uniq, p)
}
}
return []string{strings.Join(uniq, "\n---\n")}
case "摘要":
return []string{truncate(strings.Join(nonEmpty, "\n"), 800)}
default: // 拼接
return []string{strings.Join(nonEmpty, "\n---\n")}
}
}
// ---- 小工具 ----
func meta(t *contract.Task, key string) string {
v, _ := t.Meta[key].(string)
return v
}
func cstr(cfg map[string]any, key string) string {
if cfg == nil {
return ""
}
v, ok := cfg[key]
if !ok || v == nil {
return ""
}
if s, ok := v.(string); ok {
return strings.TrimSpace(s)
}
return strings.TrimSpace(fmt.Sprint(v))
}
func cbool(cfg map[string]any, key string) bool {
if cfg == nil {
return false
}
if v, ok := cfg[key].(bool); ok {
return v
}
return false
}
func labelOf(n dsl.Node, def string) string {
if strings.TrimSpace(n.Label) != "" {
return n.Label
}
return def
}
func countLines(s string) int {
s = strings.TrimSpace(s)
if s == "" {
return 0
}
return strings.Count(s, "\n") + 1
}