8f619c2a62
把 Orchestrator.pool 从 *llm.Pool 抽成 LLM 接口(Ready/ChatStream/StreamText/Chat), *llm.Pool 天然满足、main 不变;从而可注入假模型做端到端测试,不依赖网络/Docker/LLM。 新增 integration_test.go(假 LLM/工具/sink/exec 替身): - runGraph 分支路由:true/false 边标签精确选路(true 边故意列后)。 - runGraph 工具→agent:工具产出注入 agent 上下文。 - runGraph map fan-out:拆项 → 各章并行撰写 → 多章成稿。 - runGraph 输出护栏:流式 token 中疑似密钥被脱敏。 - handleReport:规划 → 分章撰写 → report_store 存源 → 流含标题/各章 + CompleteStream。 全部 go test -race 通过(修了测试替身 fakeExec 的并发追加竞态;生产 ExecSink 安全)。 至此引擎与报告主链路从"仅手动验证"升级为自动化端到端覆盖。 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
239 lines
7.4 KiB
Go
239 lines
7.4 KiB
Go
package eino
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"strings"
|
||
"sync"
|
||
"testing"
|
||
|
||
"github.com/sundynix/sundynix-dispatcher/internal/harness"
|
||
"github.com/sundynix/sundynix-dispatcher/internal/llm"
|
||
"github.com/sundynix/sundynix-shared/contract"
|
||
)
|
||
|
||
// ---- 测试替身 ----
|
||
|
||
type fakeLLM struct {
|
||
ready bool
|
||
stream func(msgs []llm.ChatMessage) string // ChatStream 要回流的整段文本
|
||
chat func(msgs []llm.ChatMessage) (string, error) // Chat 返回
|
||
}
|
||
|
||
func (f *fakeLLM) Ready() bool { return f.ready }
|
||
func (f *fakeLLM) ChatStream(_ context.Context, msgs []llm.ChatMessage, onToken func(string)) error {
|
||
if f.stream != nil {
|
||
onToken(f.stream(msgs))
|
||
}
|
||
return nil
|
||
}
|
||
func (f *fakeLLM) StreamText(_ context.Context, text string, onToken func([]byte)) error {
|
||
onToken([]byte(text))
|
||
return nil
|
||
}
|
||
func (f *fakeLLM) Chat(_ context.Context, msgs []llm.ChatMessage) (string, error) {
|
||
if f.chat != nil {
|
||
return f.chat(msgs)
|
||
}
|
||
return "", nil
|
||
}
|
||
|
||
type fakeSink struct {
|
||
mu sync.Mutex
|
||
tokens []string
|
||
done bool
|
||
}
|
||
|
||
func (s *fakeSink) PublishToken(_ string, tok []byte) error {
|
||
s.mu.Lock()
|
||
s.tokens = append(s.tokens, string(tok))
|
||
s.mu.Unlock()
|
||
return nil
|
||
}
|
||
func (s *fakeSink) CompleteStream(_ string) error { s.done = true; return nil }
|
||
func (s *fakeSink) text() string {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
return strings.Join(s.tokens, "")
|
||
}
|
||
|
||
type fakeTools struct {
|
||
mu sync.Mutex
|
||
calls []string
|
||
fn func(call *contract.ToolCall) *contract.ToolResult
|
||
}
|
||
|
||
func (t *fakeTools) CallTool(_ context.Context, _ string, call *contract.ToolCall) (*contract.ToolResult, error) {
|
||
t.mu.Lock()
|
||
t.calls = append(t.calls, call.Tool)
|
||
t.mu.Unlock()
|
||
if t.fn != nil {
|
||
return t.fn(call), nil
|
||
}
|
||
return &contract.ToolResult{OK: true, Content: ""}, nil
|
||
}
|
||
func (t *fakeTools) called(tool string) bool {
|
||
t.mu.Lock()
|
||
defer t.mu.Unlock()
|
||
for _, c := range t.calls {
|
||
if c == tool {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
type fakeExec struct {
|
||
mu sync.Mutex
|
||
events []contract.ExecEvent
|
||
}
|
||
|
||
func (e *fakeExec) PublishExec(_ string, data []byte) error {
|
||
var ev contract.ExecEvent
|
||
if json.Unmarshal(data, &ev) == nil {
|
||
e.mu.Lock()
|
||
e.events = append(e.events, ev)
|
||
e.mu.Unlock()
|
||
}
|
||
return nil
|
||
}
|
||
func (e *fakeExec) CompleteExec(_ string) error { return nil }
|
||
|
||
func newOrch(ll *fakeLLM, ft *fakeTools, fs *fakeSink, fe *fakeExec) *Orchestrator {
|
||
return &Orchestrator{pool: ll, breaker: harness.NewCircuitBreaker(), sink: fs, tools: ft, exec: fe}
|
||
}
|
||
|
||
func task(graph string) *contract.Task {
|
||
return &contract.Task{ID: "t1", Graph: json.RawMessage(graph), Meta: map[string]any{}}
|
||
}
|
||
|
||
// ---- 集成测试:runGraph ----
|
||
|
||
// 分支按 true/false 边标签精确选路(true 边故意列第二位)。
|
||
func TestRunGraph_BranchRouting(t *testing.T) {
|
||
g := `{"nodes":[
|
||
{"id":"i","kind":"input","config":{"text":"hi"}},
|
||
{"id":"b","kind":"branch","config":{"condition":"%s"}},
|
||
{"id":"a","kind":"agent","config":{"system":"AAA"}},
|
||
{"id":"c","kind":"agent","config":{"system":"BBB"}}
|
||
],"edges":[
|
||
{"source":"i","target":"b"},
|
||
{"source":"b","target":"c","sourceHandle":"false"},
|
||
{"source":"b","target":"a","sourceHandle":"true"}
|
||
]}`
|
||
// fakeLLM 回流"系统提示词",借此判断哪个 agent 跑了。
|
||
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }}
|
||
run := func(cond string) string {
|
||
o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{})
|
||
ans, err := o.runGraph(context.Background(), task(strings.Replace(g, "%s", cond, 1)), o.tracer("t1"))
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
return ans
|
||
}
|
||
if a := run("2>1"); !strings.Contains(a, "AAA") || strings.Contains(a, "BBB") {
|
||
t.Errorf("条件真应走 true 标签(AAA),got %q", a)
|
||
}
|
||
if a := run("1>2"); !strings.Contains(a, "BBB") || strings.Contains(a, "AAA") {
|
||
t.Errorf("条件假应走 false 标签(BBB),got %q", a)
|
||
}
|
||
}
|
||
|
||
// 工具节点产出注入 agent 上下文。
|
||
func TestRunGraph_ToolFeedsAgent(t *testing.T) {
|
||
g := `{"nodes":[
|
||
{"id":"i","kind":"input","config":{"text":"hi"}},
|
||
{"id":"t","kind":"tool","config":{"tool":"wiki_search"}},
|
||
{"id":"a","kind":"agent","config":{"system":"S"}}
|
||
],"edges":[{"source":"i","target":"t"},{"source":"t","target":"a"}]}`
|
||
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }}
|
||
ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult {
|
||
return &contract.ToolResult{OK: true, Content: "TOOLDATA"}
|
||
}}
|
||
o := newOrch(ll, ft, &fakeSink{}, &fakeExec{})
|
||
ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1"))
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if !ft.called("wiki_search") {
|
||
t.Error("应调用 wiki_search 工具")
|
||
}
|
||
if !strings.Contains(ans, "TOOLDATA") {
|
||
t.Errorf("工具产出应注入 agent 上下文,got %q", ans)
|
||
}
|
||
}
|
||
|
||
// map 并行 fan-out:拆项 → 各章撰写 → 汇成多章成稿。
|
||
func TestRunGraph_MapFanout(t *testing.T) {
|
||
g := `{"nodes":[
|
||
{"id":"i","kind":"input","config":{"text":"猫"}},
|
||
{"id":"m","kind":"map","config":{"splitBy":"要点"}}
|
||
],"edges":[{"source":"i","target":"m"}]}`
|
||
ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) {
|
||
u := m[len(m)-1].Content
|
||
if strings.Contains(u, "拆分") {
|
||
return `["第一章","第二章"]`, nil
|
||
}
|
||
return "正文XYZ", nil
|
||
}}
|
||
o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{})
|
||
ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1"))
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if !strings.Contains(ans, "## 第一章") || !strings.Contains(ans, "## 第二章") || !strings.Contains(ans, "正文XYZ") {
|
||
t.Errorf("map 应产出多章成稿,got %q", ans)
|
||
}
|
||
}
|
||
|
||
// 输出护栏:发射的 token 中疑似密钥被脱敏。
|
||
func TestRunGraph_OutputRedaction(t *testing.T) {
|
||
g := `{"nodes":[{"id":"i","kind":"input","config":{"text":"hi"}},{"id":"a","kind":"agent","config":{"system":"S"}}],"edges":[{"source":"i","target":"a"}]}`
|
||
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string {
|
||
return "你的 key 是 sk-912cf85b16d04b22bcb95f4576423bfb 请保密"
|
||
}}
|
||
fs := &fakeSink{}
|
||
o := newOrch(ll, &fakeTools{}, fs, &fakeExec{})
|
||
if _, err := o.runGraph(context.Background(), task(g), o.tracer("t1")); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
out := fs.text()
|
||
if strings.Contains(out, "sk-912cf85b16d04b22bcb95f4576423bfb") {
|
||
t.Errorf("流式输出未脱敏密钥: %q", out)
|
||
}
|
||
if !strings.Contains(out, "[已脱敏]") {
|
||
t.Errorf("应出现脱敏标记: %q", out)
|
||
}
|
||
}
|
||
|
||
// ---- 集成测试:handleReport ----
|
||
|
||
func TestHandleReport_PlanWriteStore(t *testing.T) {
|
||
ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) {
|
||
u := m[len(m)-1].Content
|
||
if strings.Contains(u, "大纲") {
|
||
return `{"title":"咖啡报告","sections":["提神","风险"]}`, nil
|
||
}
|
||
return "本章正文。", nil
|
||
}}
|
||
ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult {
|
||
return &contract.ToolResult{OK: true, Content: "/tmp/x.json"}
|
||
}}
|
||
fs := &fakeSink{}
|
||
o := newOrch(ll, ft, fs, &fakeExec{})
|
||
tk := &contract.Task{ID: "r1", Graph: json.RawMessage(`{}`), Meta: map[string]any{contract.MetaTopic: "咖啡"}}
|
||
if err := o.handleReport(context.Background(), tk, o.tracer("r1")); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if !ft.called("report_store") {
|
||
t.Error("应调用 report_store 存源")
|
||
}
|
||
out := fs.text()
|
||
if !strings.Contains(out, "# 咖啡报告") || !strings.Contains(out, "## 提神") || !strings.Contains(out, "## 风险") {
|
||
t.Errorf("报告流应含标题与各章,got %q", out)
|
||
}
|
||
if !fs.done {
|
||
t.Error("报告结束应 CompleteStream")
|
||
}
|
||
}
|