Files
Blizzard 8f619c2a62 test(dispatcher): 引擎主链路集成测试(pool 抽接口 + 假替身端到端)
把 Orchestrator.pool 从 *llm.Pool 抽成 LLM 接口(Ready/ChatStream/StreamText/Chat),
*llm.Pool 天然满足、main 不变;从而可注入假模型做端到端测试,不依赖网络/Docker/LLM。

新增 integration_test.go(假 LLM/工具/sink/exec 替身):
- runGraph 分支路由:true/false 边标签精确选路(true 边故意列后)。
- runGraph 工具→agent:工具产出注入 agent 上下文。
- runGraph map fan-out:拆项 → 各章并行撰写 → 多章成稿。
- runGraph 输出护栏:流式 token 中疑似密钥被脱敏。
- handleReport:规划 → 分章撰写 → report_store 存源 → 流含标题/各章 + CompleteStream。

全部 go test -race 通过(修了测试替身 fakeExec 的并发追加竞态;生产 ExecSink 安全)。
至此引擎与报告主链路从"仅手动验证"升级为自动化端到端覆盖。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:51:39 +08:00

239 lines
7.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package eino
import (
"context"
"encoding/json"
"strings"
"sync"
"testing"
"github.com/sundynix/sundynix-dispatcher/internal/harness"
"github.com/sundynix/sundynix-dispatcher/internal/llm"
"github.com/sundynix/sundynix-shared/contract"
)
// ---- 测试替身 ----
type fakeLLM struct {
ready bool
stream func(msgs []llm.ChatMessage) string // ChatStream 要回流的整段文本
chat func(msgs []llm.ChatMessage) (string, error) // Chat 返回
}
func (f *fakeLLM) Ready() bool { return f.ready }
func (f *fakeLLM) ChatStream(_ context.Context, msgs []llm.ChatMessage, onToken func(string)) error {
if f.stream != nil {
onToken(f.stream(msgs))
}
return nil
}
func (f *fakeLLM) StreamText(_ context.Context, text string, onToken func([]byte)) error {
onToken([]byte(text))
return nil
}
func (f *fakeLLM) Chat(_ context.Context, msgs []llm.ChatMessage) (string, error) {
if f.chat != nil {
return f.chat(msgs)
}
return "", nil
}
type fakeSink struct {
mu sync.Mutex
tokens []string
done bool
}
func (s *fakeSink) PublishToken(_ string, tok []byte) error {
s.mu.Lock()
s.tokens = append(s.tokens, string(tok))
s.mu.Unlock()
return nil
}
func (s *fakeSink) CompleteStream(_ string) error { s.done = true; return nil }
func (s *fakeSink) text() string {
s.mu.Lock()
defer s.mu.Unlock()
return strings.Join(s.tokens, "")
}
type fakeTools struct {
mu sync.Mutex
calls []string
fn func(call *contract.ToolCall) *contract.ToolResult
}
func (t *fakeTools) CallTool(_ context.Context, _ string, call *contract.ToolCall) (*contract.ToolResult, error) {
t.mu.Lock()
t.calls = append(t.calls, call.Tool)
t.mu.Unlock()
if t.fn != nil {
return t.fn(call), nil
}
return &contract.ToolResult{OK: true, Content: ""}, nil
}
func (t *fakeTools) called(tool string) bool {
t.mu.Lock()
defer t.mu.Unlock()
for _, c := range t.calls {
if c == tool {
return true
}
}
return false
}
type fakeExec struct {
mu sync.Mutex
events []contract.ExecEvent
}
func (e *fakeExec) PublishExec(_ string, data []byte) error {
var ev contract.ExecEvent
if json.Unmarshal(data, &ev) == nil {
e.mu.Lock()
e.events = append(e.events, ev)
e.mu.Unlock()
}
return nil
}
func (e *fakeExec) CompleteExec(_ string) error { return nil }
func newOrch(ll *fakeLLM, ft *fakeTools, fs *fakeSink, fe *fakeExec) *Orchestrator {
return &Orchestrator{pool: ll, breaker: harness.NewCircuitBreaker(), sink: fs, tools: ft, exec: fe}
}
func task(graph string) *contract.Task {
return &contract.Task{ID: "t1", Graph: json.RawMessage(graph), Meta: map[string]any{}}
}
// ---- 集成测试:runGraph ----
// 分支按 true/false 边标签精确选路(true 边故意列第二位)。
func TestRunGraph_BranchRouting(t *testing.T) {
g := `{"nodes":[
{"id":"i","kind":"input","config":{"text":"hi"}},
{"id":"b","kind":"branch","config":{"condition":"%s"}},
{"id":"a","kind":"agent","config":{"system":"AAA"}},
{"id":"c","kind":"agent","config":{"system":"BBB"}}
],"edges":[
{"source":"i","target":"b"},
{"source":"b","target":"c","sourceHandle":"false"},
{"source":"b","target":"a","sourceHandle":"true"}
]}`
// fakeLLM 回流"系统提示词",借此判断哪个 agent 跑了。
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }}
run := func(cond string) string {
o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{})
ans, err := o.runGraph(context.Background(), task(strings.Replace(g, "%s", cond, 1)), o.tracer("t1"))
if err != nil {
t.Fatal(err)
}
return ans
}
if a := run("2>1"); !strings.Contains(a, "AAA") || strings.Contains(a, "BBB") {
t.Errorf("条件真应走 true 标签(AAA)got %q", a)
}
if a := run("1>2"); !strings.Contains(a, "BBB") || strings.Contains(a, "AAA") {
t.Errorf("条件假应走 false 标签(BBB)got %q", a)
}
}
// 工具节点产出注入 agent 上下文。
func TestRunGraph_ToolFeedsAgent(t *testing.T) {
g := `{"nodes":[
{"id":"i","kind":"input","config":{"text":"hi"}},
{"id":"t","kind":"tool","config":{"tool":"wiki_search"}},
{"id":"a","kind":"agent","config":{"system":"S"}}
],"edges":[{"source":"i","target":"t"},{"source":"t","target":"a"}]}`
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }}
ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult {
return &contract.ToolResult{OK: true, Content: "TOOLDATA"}
}}
o := newOrch(ll, ft, &fakeSink{}, &fakeExec{})
ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1"))
if err != nil {
t.Fatal(err)
}
if !ft.called("wiki_search") {
t.Error("应调用 wiki_search 工具")
}
if !strings.Contains(ans, "TOOLDATA") {
t.Errorf("工具产出应注入 agent 上下文,got %q", ans)
}
}
// map 并行 fan-out:拆项 → 各章撰写 → 汇成多章成稿。
func TestRunGraph_MapFanout(t *testing.T) {
g := `{"nodes":[
{"id":"i","kind":"input","config":{"text":"猫"}},
{"id":"m","kind":"map","config":{"splitBy":"要点"}}
],"edges":[{"source":"i","target":"m"}]}`
ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) {
u := m[len(m)-1].Content
if strings.Contains(u, "拆分") {
return `["第一章","第二章"]`, nil
}
return "正文XYZ", nil
}}
o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{})
ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1"))
if err != nil {
t.Fatal(err)
}
if !strings.Contains(ans, "## 第一章") || !strings.Contains(ans, "## 第二章") || !strings.Contains(ans, "正文XYZ") {
t.Errorf("map 应产出多章成稿,got %q", ans)
}
}
// 输出护栏:发射的 token 中疑似密钥被脱敏。
func TestRunGraph_OutputRedaction(t *testing.T) {
g := `{"nodes":[{"id":"i","kind":"input","config":{"text":"hi"}},{"id":"a","kind":"agent","config":{"system":"S"}}],"edges":[{"source":"i","target":"a"}]}`
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string {
return "你的 key 是 sk-912cf85b16d04b22bcb95f4576423bfb 请保密"
}}
fs := &fakeSink{}
o := newOrch(ll, &fakeTools{}, fs, &fakeExec{})
if _, err := o.runGraph(context.Background(), task(g), o.tracer("t1")); err != nil {
t.Fatal(err)
}
out := fs.text()
if strings.Contains(out, "sk-912cf85b16d04b22bcb95f4576423bfb") {
t.Errorf("流式输出未脱敏密钥: %q", out)
}
if !strings.Contains(out, "[已脱敏]") {
t.Errorf("应出现脱敏标记: %q", out)
}
}
// ---- 集成测试:handleReport ----
func TestHandleReport_PlanWriteStore(t *testing.T) {
ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) {
u := m[len(m)-1].Content
if strings.Contains(u, "大纲") {
return `{"title":"咖啡报告","sections":["提神","风险"]}`, nil
}
return "本章正文。", nil
}}
ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult {
return &contract.ToolResult{OK: true, Content: "/tmp/x.json"}
}}
fs := &fakeSink{}
o := newOrch(ll, ft, fs, &fakeExec{})
tk := &contract.Task{ID: "r1", Graph: json.RawMessage(`{}`), Meta: map[string]any{contract.MetaTopic: "咖啡"}}
if err := o.handleReport(context.Background(), tk, o.tracer("r1")); err != nil {
t.Fatal(err)
}
if !ft.called("report_store") {
t.Error("应调用 report_store 存源")
}
out := fs.text()
if !strings.Contains(out, "# 咖啡报告") || !strings.Contains(out, "## 提神") || !strings.Contains(out, "## 风险") {
t.Errorf("报告流应含标题与各章,got %q", out)
}
if !fs.done {
t.Error("报告结束应 CompleteStream")
}
}