package eino import ( "context" "encoding/json" "strings" "sync" "testing" "github.com/sundynix/sundynix-dispatcher/internal/harness" "github.com/sundynix/sundynix-dispatcher/internal/llm" "github.com/sundynix/sundynix-shared/contract" ) // ---- 测试替身 ---- type fakeLLM struct { ready bool stream func(msgs []llm.ChatMessage) string // ChatStream 要回流的整段文本 chat func(msgs []llm.ChatMessage) (string, error) // Chat 返回 } func (f *fakeLLM) Ready() bool { return f.ready } func (f *fakeLLM) ChatStream(_ context.Context, msgs []llm.ChatMessage, onToken func(string)) error { if f.stream != nil { onToken(f.stream(msgs)) } return nil } func (f *fakeLLM) StreamText(_ context.Context, text string, onToken func([]byte)) error { onToken([]byte(text)) return nil } func (f *fakeLLM) Chat(_ context.Context, msgs []llm.ChatMessage) (string, error) { if f.chat != nil { return f.chat(msgs) } return "", nil } type fakeSink struct { mu sync.Mutex tokens []string done bool } func (s *fakeSink) PublishToken(_ string, tok []byte) error { s.mu.Lock() s.tokens = append(s.tokens, string(tok)) s.mu.Unlock() return nil } func (s *fakeSink) CompleteStream(_ string) error { s.done = true; return nil } func (s *fakeSink) text() string { s.mu.Lock() defer s.mu.Unlock() return strings.Join(s.tokens, "") } type fakeTools struct { mu sync.Mutex calls []string fn func(call *contract.ToolCall) *contract.ToolResult } func (t *fakeTools) CallTool(_ context.Context, _ string, call *contract.ToolCall) (*contract.ToolResult, error) { t.mu.Lock() t.calls = append(t.calls, call.Tool) t.mu.Unlock() if t.fn != nil { return t.fn(call), nil } return &contract.ToolResult{OK: true, Content: ""}, nil } func (t *fakeTools) called(tool string) bool { t.mu.Lock() defer t.mu.Unlock() for _, c := range t.calls { if c == tool { return true } } return false } type fakeExec struct { mu sync.Mutex events []contract.ExecEvent } func (e *fakeExec) PublishExec(_ string, data []byte) error { var ev contract.ExecEvent if json.Unmarshal(data, &ev) == nil { e.mu.Lock() e.events = append(e.events, ev) e.mu.Unlock() } return nil } func (e *fakeExec) CompleteExec(_ string) error { return nil } func newOrch(ll *fakeLLM, ft *fakeTools, fs *fakeSink, fe *fakeExec) *Orchestrator { return &Orchestrator{pool: ll, breaker: harness.NewCircuitBreaker(), sink: fs, tools: ft, exec: fe} } func task(graph string) *contract.Task { return &contract.Task{ID: "t1", Graph: json.RawMessage(graph), Meta: map[string]any{}} } // ---- 集成测试:runGraph ---- // 分支按 true/false 边标签精确选路(true 边故意列第二位)。 func TestRunGraph_BranchRouting(t *testing.T) { g := `{"nodes":[ {"id":"i","kind":"input","config":{"text":"hi"}}, {"id":"b","kind":"branch","config":{"condition":"%s"}}, {"id":"a","kind":"agent","config":{"system":"AAA"}}, {"id":"c","kind":"agent","config":{"system":"BBB"}} ],"edges":[ {"source":"i","target":"b"}, {"source":"b","target":"c","sourceHandle":"false"}, {"source":"b","target":"a","sourceHandle":"true"} ]}` // fakeLLM 回流"系统提示词",借此判断哪个 agent 跑了。 ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }} run := func(cond string) string { o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{}) ans, err := o.runGraph(context.Background(), task(strings.Replace(g, "%s", cond, 1)), o.tracer("t1")) if err != nil { t.Fatal(err) } return ans } if a := run("2>1"); !strings.Contains(a, "AAA") || strings.Contains(a, "BBB") { t.Errorf("条件真应走 true 标签(AAA),got %q", a) } if a := run("1>2"); !strings.Contains(a, "BBB") || strings.Contains(a, "AAA") { t.Errorf("条件假应走 false 标签(BBB),got %q", a) } } // 工具节点产出注入 agent 上下文。 func TestRunGraph_ToolFeedsAgent(t *testing.T) { g := `{"nodes":[ {"id":"i","kind":"input","config":{"text":"hi"}}, {"id":"t","kind":"tool","config":{"tool":"wiki_search"}}, {"id":"a","kind":"agent","config":{"system":"S"}} ],"edges":[{"source":"i","target":"t"},{"source":"t","target":"a"}]}` ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }} ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult { return &contract.ToolResult{OK: true, Content: "TOOLDATA"} }} o := newOrch(ll, ft, &fakeSink{}, &fakeExec{}) ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1")) if err != nil { t.Fatal(err) } if !ft.called("wiki_search") { t.Error("应调用 wiki_search 工具") } if !strings.Contains(ans, "TOOLDATA") { t.Errorf("工具产出应注入 agent 上下文,got %q", ans) } } // map 并行 fan-out:拆项 → 各章撰写 → 汇成多章成稿。 func TestRunGraph_MapFanout(t *testing.T) { g := `{"nodes":[ {"id":"i","kind":"input","config":{"text":"猫"}}, {"id":"m","kind":"map","config":{"splitBy":"要点"}} ],"edges":[{"source":"i","target":"m"}]}` ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) { u := m[len(m)-1].Content if strings.Contains(u, "拆分") { return `["第一章","第二章"]`, nil } return "正文XYZ", nil }} o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{}) ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1")) if err != nil { t.Fatal(err) } if !strings.Contains(ans, "## 第一章") || !strings.Contains(ans, "## 第二章") || !strings.Contains(ans, "正文XYZ") { t.Errorf("map 应产出多章成稿,got %q", ans) } } // 输出护栏:发射的 token 中疑似密钥被脱敏。 func TestRunGraph_OutputRedaction(t *testing.T) { g := `{"nodes":[{"id":"i","kind":"input","config":{"text":"hi"}},{"id":"a","kind":"agent","config":{"system":"S"}}],"edges":[{"source":"i","target":"a"}]}` ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return "你的 key 是 sk-912cf85b16d04b22bcb95f4576423bfb 请保密" }} fs := &fakeSink{} o := newOrch(ll, &fakeTools{}, fs, &fakeExec{}) if _, err := o.runGraph(context.Background(), task(g), o.tracer("t1")); err != nil { t.Fatal(err) } out := fs.text() if strings.Contains(out, "sk-912cf85b16d04b22bcb95f4576423bfb") { t.Errorf("流式输出未脱敏密钥: %q", out) } if !strings.Contains(out, "[已脱敏]") { t.Errorf("应出现脱敏标记: %q", out) } } // ---- 集成测试:handleReport ---- func TestHandleReport_PlanWriteStore(t *testing.T) { ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) { u := m[len(m)-1].Content if strings.Contains(u, "大纲") { return `{"title":"咖啡报告","sections":["提神","风险"]}`, nil } return "本章正文。", nil }} ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult { return &contract.ToolResult{OK: true, Content: "/tmp/x.json"} }} fs := &fakeSink{} o := newOrch(ll, ft, fs, &fakeExec{}) tk := &contract.Task{ID: "r1", Graph: json.RawMessage(`{}`), Meta: map[string]any{contract.MetaTopic: "咖啡"}} if err := o.handleReport(context.Background(), tk, o.tracer("r1")); err != nil { t.Fatal(err) } if !ft.called("report_store") { t.Error("应调用 report_store 存源") } out := fs.text() if !strings.Contains(out, "# 咖啡报告") || !strings.Contains(out, "## 提神") || !strings.Contains(out, "## 风险") { t.Errorf("报告流应含标题与各章,got %q", out) } if !fs.done { t.Error("报告结束应 CompleteStream") } }