Files
sundynix-agentix/sundynix-dispatcher/internal/eino/orchestrator.go
T
Blizzard 8f619c2a62 test(dispatcher): 引擎主链路集成测试(pool 抽接口 + 假替身端到端)
把 Orchestrator.pool 从 *llm.Pool 抽成 LLM 接口(Ready/ChatStream/StreamText/Chat),
*llm.Pool 天然满足、main 不变;从而可注入假模型做端到端测试,不依赖网络/Docker/LLM。

新增 integration_test.go(假 LLM/工具/sink/exec 替身):
- runGraph 分支路由:true/false 边标签精确选路(true 边故意列后)。
- runGraph 工具→agent:工具产出注入 agent 上下文。
- runGraph map fan-out:拆项 → 各章并行撰写 → 多章成稿。
- runGraph 输出护栏:流式 token 中疑似密钥被脱敏。
- handleReport:规划 → 分章撰写 → report_store 存源 → 流含标题/各章 + CompleteStream。

全部 go test -race 通过(修了测试替身 fakeExec 的并发追加竞态;生产 ExecSink 安全)。
至此引擎与报告主链路从"仅手动验证"升级为自动化端到端覆盖。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:51:39 +08:00

198 lines
7.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package eino 封装基于 CloudWeGo Eino 的 Agent 图编排引擎。
package eino
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/cloudwego/eino/schema"
"github.com/sundynix/sundynix-dispatcher/internal/dsl"
"github.com/sundynix/sundynix-dispatcher/internal/harness"
"github.com/sundynix/sundynix-dispatcher/internal/llm"
"github.com/sundynix/sundynix-shared/contract"
)
// TokenSink 是 Token 流回流出口(由 NATS bus 实现)。
type TokenSink interface {
PublishToken(taskID string, token []byte) error
CompleteStream(taskID string) error
}
// ToolCaller 经 NATS 调起第 5 层 MCP 工具(由 NATS bus 实现)。
type ToolCaller interface {
CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error)
}
// LLM 是编排所需的语言模型能力(生产由 *llm.Pool 实现)。抽成接口便于测试注入假模型。
type LLM interface {
Ready() bool
ChatStream(ctx context.Context, msgs []llm.ChatMessage, onToken func(string)) error
StreamText(ctx context.Context, text string, onToken func([]byte)) error
Chat(ctx context.Context, msgs []llm.ChatMessage) (string, error)
}
// 工具调用超时;超时即降级(不带工具上下文继续推理)。
const toolCallTimeout = 3 * time.Second
// Orchestrator 把每个 DSL 任务动态编译为 Eino 图并执行(记忆召回 → 工具节点 → 注入 → 流式)。
type Orchestrator struct {
pool LLM
breaker *harness.CircuitBreaker
eval *harness.Evaluator
sink TokenSink
tools ToolCaller
exec ExecSink
}
// NewOrchestrator 持有依赖;图按任务的 DSL 在 Handle 内动态编译。
// exec 为执行可视化事件出口(可为 nil,则不发轨迹事件);eval 为自动化评测(可为 nil)。
func NewOrchestrator(pool LLM, breaker *harness.CircuitBreaker, eval *harness.Evaluator, sink TokenSink, tools ToolCaller, exec ExecSink) (*Orchestrator, error) {
return &Orchestrator{pool: pool, breaker: breaker, eval: eval, sink: sink, tools: tools, exec: exec}, nil
}
// Handle 消费一个任务:按 DSL 编译 Eino 图并执行,把 Token 流回流到 sundynix.streams.<id>。
func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error {
tr := o.tracer(t.ID)
defer tr.done()
// 熔断开启:快速拒绝,但要让客户端解阻(回流提示 + 收尾流),不静默丢弃。
if !o.breaker.Allow() {
log.Printf("[eino] 熔断开启,拒绝任务 %s", t.ID)
tr.info("task", "system", "服务熔断", "后端连续失败,暂时拒绝新任务,请稍后重试")
_ = o.sink.PublishToken(t.ID, []byte("⚠️ 服务繁忙(已触发熔断保护),请稍后重试。"))
_ = o.sink.CompleteStream(t.ID)
return nil
}
// 报告生成走专用多步编排(规划→分章并行检索撰写→汇聚→渲染 Word),而非通用对话图。
if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport {
return o.handleReport(ctx, t, tr)
}
log.Printf("[eino] task %s received (graph=%d bytes), 按图执行(拓扑+连线+分支)...", t.ID, len(t.Graph))
tr.info("task", "system", "任务受理", fmt.Sprintf("DSL %d 字节,按图执行", len(t.Graph)))
// 按 DSL 图的真实拓扑/连线/分支执行(graph.go 解释器),agent 节点流式回流 token。
answer, err := o.runGraph(ctx, t, tr)
if err != nil {
log.Printf("[eino] task %s graph error: %v", t.ID, err)
_ = o.sink.CompleteStream(t.ID)
o.breaker.Report(false)
return err
}
if cerr := o.sink.CompleteStream(t.ID); cerr != nil {
log.Printf("[eino] complete stream failed: %v", cerr)
}
log.Printf("[eino] task %s done (%d 字答复)", t.ID, len([]rune(answer)))
o.breaker.Report(true)
// 写回阶段:离开热路径、异步落历史 + (TODO)抽取记忆。
go o.memorize(t, answer)
// 自动化评测:离开热路径,对本轮输出打分并记录(规则 + LLM-as-judge)。
go o.evaluate(t, dsl.Compile(t.Graph).Query, answer)
return nil
}
// evaluate 异步对一次输出做自动化评测并记录评分(off 热路径,不影响响应)。
func (o *Orchestrator) evaluate(t *contract.Task, input, output string) {
if o.eval == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
r := o.eval.Score(ctx, input, output)
log.Printf("[eval] task %s 综合 %.2f(规则 %.2f / LLM %.2fflags=%v %s",
t.ID, r.Overall, r.Rule, r.LLM, r.Flags, r.Reason)
}
// fetchMemory 经 MCP memory_get 工具召回用户常驻画像。
// 工具不可用/超时/无 user_id 时返回空串,降级为无记忆推理(不阻断主流程)。
func (o *Orchestrator) fetchMemory(ctx context.Context, userID, _ string) string {
if o.tools == nil || userID == "" {
return ""
}
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("memory_get"), &contract.ToolCall{
Tool: "memory_get",
Args: map[string]any{"user_id": userID},
})
if err != nil {
log.Printf("[eino] memory_get unavailable for %s, degrade: %v", userID, err)
return ""
}
if !res.OK {
log.Printf("[eino] memory_get error for %s: %s", userID, res.Error)
return ""
}
log.Printf("[eino] memory_get ok for %s: %s", userID, res.Content)
return res.Content
}
// fetchHistory 经 MCP history_get 工具召回会话短期多轮历史,转为 Eino 消息。
// 工具不可用/无 session 时返回空,降级为无历史(不阻断主流程)。
func (o *Orchestrator) fetchHistory(ctx context.Context, sessionID string) []*schema.Message {
if o.tools == nil || sessionID == "" {
return nil
}
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("history_get"), &contract.ToolCall{
Tool: "history_get",
Args: map[string]any{"session_id": sessionID},
})
if err != nil || res == nil || !res.OK || res.Content == "" {
return nil
}
var turns []struct {
Role string `json:"role"`
Content string `json:"content"`
}
if json.Unmarshal([]byte(res.Content), &turns) != nil {
return nil
}
msgs := make([]*schema.Message, 0, len(turns))
for _, tn := range turns {
if tn.Role == "assistant" {
msgs = append(msgs, schema.AssistantMessage(tn.Content, nil))
} else {
msgs = append(msgs, schema.UserMessage(tn.Content))
}
}
if len(msgs) > 0 {
log.Printf("[eino] history_get ok for %s: %d 条历史", sessionID, len(msgs))
}
return msgs
}
// memorize 写回阶段:把本轮对话落进短期历史,并(TODO)抽取长期偏好记忆。
// 异步执行,离开热路径。
func (o *Orchestrator) memorize(t *contract.Task, answer string) {
uid, _ := t.Meta[contract.MetaUserID].(string)
sid, _ := t.Meta[contract.MetaSessionID].(string)
if sid != "" && o.tools != nil {
o.appendHistory(sid, "user", dsl.Compile(t.Graph).Query) // 落真实用户输入,而非 DSL 原文
o.appendHistory(sid, "assistant", answer)
log.Printf("[eino] (writeback) task %s 已落会话历史 session=%s", t.ID, sid)
}
if uid != "" {
// 从本轮对话抽取长期偏好 → 去重 → memory_upsert(离开热路径,已在 goroutine 内)。
o.extractMemory(context.Background(), uid, dsl.Compile(t.Graph).Query, answer)
}
}
func (o *Orchestrator) appendHistory(sessionID, role, content string) {
cctx, cancel := context.WithTimeout(context.Background(), toolCallTimeout)
defer cancel()
if _, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("history_append"), &contract.ToolCall{
Tool: "history_append",
Args: map[string]any{"session_id": sessionID, "role": role, "content": content},
}); err != nil {
log.Printf("[eino] history_append failed: %v", err)
}
}