diff --git a/PROGRESS.md b/PROGRESS.md index b6d5476..0b64e56 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -84,7 +84,7 @@ - [x] 后端首批单测(19 纯逻辑用例:引擎/DSL/docx/报告)+ mcp-go 集成测试(Profile 迁移) - [x] **真实鉴权(JWT)闭环**:后端注册/登录/校验 + RequireAuth 保护路由 + owner=已验证 uid(去掉 header 兜底);前端登录/注册门 + 存 token + Bearer + 401 自动登出 + 顶栏用户/登出。实跑验证(含 CORS Authorization 修复) - [x] 生产安全硬化:JWT 默认密钥生产 fail-fast · /admin 加 RequireAdmin(ADMIN_USER_IDS 白名单)· CORS 来源可配(CORS_ALLOW_ORIGIN) -- [ ] 集成/前端测试(`runGraph` / `handleReport` 需 mock pool/tools/sink;前端无测试) +- [x] 后端集成测试:pool 抽成 LLM 接口,runGraph(分支/工具/map/脱敏)+handleReport 端到端假替身测试(含 -race);🟡 前端测试仍无 --- diff --git a/sundynix-dispatcher/internal/eino/integration_test.go b/sundynix-dispatcher/internal/eino/integration_test.go new file mode 100644 index 0000000..ba0b58f --- /dev/null +++ b/sundynix-dispatcher/internal/eino/integration_test.go @@ -0,0 +1,238 @@ +package eino + +import ( + "context" + "encoding/json" + "strings" + "sync" + "testing" + + "github.com/sundynix/sundynix-dispatcher/internal/harness" + "github.com/sundynix/sundynix-dispatcher/internal/llm" + "github.com/sundynix/sundynix-shared/contract" +) + +// ---- 测试替身 ---- + +type fakeLLM struct { + ready bool + stream func(msgs []llm.ChatMessage) string // ChatStream 要回流的整段文本 + chat func(msgs []llm.ChatMessage) (string, error) // Chat 返回 +} + +func (f *fakeLLM) Ready() bool { return f.ready } +func (f *fakeLLM) ChatStream(_ context.Context, msgs []llm.ChatMessage, onToken func(string)) error { + if f.stream != nil { + onToken(f.stream(msgs)) + } + return nil +} +func (f *fakeLLM) StreamText(_ context.Context, text string, onToken func([]byte)) error { + onToken([]byte(text)) + return nil +} +func (f *fakeLLM) Chat(_ context.Context, msgs []llm.ChatMessage) (string, error) { + if f.chat != nil { + return f.chat(msgs) + } + return "", nil +} + +type fakeSink struct { + mu sync.Mutex + tokens []string + done bool +} + +func (s *fakeSink) PublishToken(_ string, tok []byte) error { + s.mu.Lock() + s.tokens = append(s.tokens, string(tok)) + s.mu.Unlock() + return nil +} +func (s *fakeSink) CompleteStream(_ string) error { s.done = true; return nil } +func (s *fakeSink) text() string { + s.mu.Lock() + defer s.mu.Unlock() + return strings.Join(s.tokens, "") +} + +type fakeTools struct { + mu sync.Mutex + calls []string + fn func(call *contract.ToolCall) *contract.ToolResult +} + +func (t *fakeTools) CallTool(_ context.Context, _ string, call *contract.ToolCall) (*contract.ToolResult, error) { + t.mu.Lock() + t.calls = append(t.calls, call.Tool) + t.mu.Unlock() + if t.fn != nil { + return t.fn(call), nil + } + return &contract.ToolResult{OK: true, Content: ""}, nil +} +func (t *fakeTools) called(tool string) bool { + t.mu.Lock() + defer t.mu.Unlock() + for _, c := range t.calls { + if c == tool { + return true + } + } + return false +} + +type fakeExec struct { + mu sync.Mutex + events []contract.ExecEvent +} + +func (e *fakeExec) PublishExec(_ string, data []byte) error { + var ev contract.ExecEvent + if json.Unmarshal(data, &ev) == nil { + e.mu.Lock() + e.events = append(e.events, ev) + e.mu.Unlock() + } + return nil +} +func (e *fakeExec) CompleteExec(_ string) error { return nil } + +func newOrch(ll *fakeLLM, ft *fakeTools, fs *fakeSink, fe *fakeExec) *Orchestrator { + return &Orchestrator{pool: ll, breaker: harness.NewCircuitBreaker(), sink: fs, tools: ft, exec: fe} +} + +func task(graph string) *contract.Task { + return &contract.Task{ID: "t1", Graph: json.RawMessage(graph), Meta: map[string]any{}} +} + +// ---- 集成测试:runGraph ---- + +// 分支按 true/false 边标签精确选路(true 边故意列第二位)。 +func TestRunGraph_BranchRouting(t *testing.T) { + g := `{"nodes":[ + {"id":"i","kind":"input","config":{"text":"hi"}}, + {"id":"b","kind":"branch","config":{"condition":"%s"}}, + {"id":"a","kind":"agent","config":{"system":"AAA"}}, + {"id":"c","kind":"agent","config":{"system":"BBB"}} + ],"edges":[ + {"source":"i","target":"b"}, + {"source":"b","target":"c","sourceHandle":"false"}, + {"source":"b","target":"a","sourceHandle":"true"} + ]}` + // fakeLLM 回流"系统提示词",借此判断哪个 agent 跑了。 + ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }} + run := func(cond string) string { + o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{}) + ans, err := o.runGraph(context.Background(), task(strings.Replace(g, "%s", cond, 1)), o.tracer("t1")) + if err != nil { + t.Fatal(err) + } + return ans + } + if a := run("2>1"); !strings.Contains(a, "AAA") || strings.Contains(a, "BBB") { + t.Errorf("条件真应走 true 标签(AAA),got %q", a) + } + if a := run("1>2"); !strings.Contains(a, "BBB") || strings.Contains(a, "AAA") { + t.Errorf("条件假应走 false 标签(BBB),got %q", a) + } +} + +// 工具节点产出注入 agent 上下文。 +func TestRunGraph_ToolFeedsAgent(t *testing.T) { + g := `{"nodes":[ + {"id":"i","kind":"input","config":{"text":"hi"}}, + {"id":"t","kind":"tool","config":{"tool":"wiki_search"}}, + {"id":"a","kind":"agent","config":{"system":"S"}} + ],"edges":[{"source":"i","target":"t"},{"source":"t","target":"a"}]}` + ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }} + ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult { + return &contract.ToolResult{OK: true, Content: "TOOLDATA"} + }} + o := newOrch(ll, ft, &fakeSink{}, &fakeExec{}) + ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1")) + if err != nil { + t.Fatal(err) + } + if !ft.called("wiki_search") { + t.Error("应调用 wiki_search 工具") + } + if !strings.Contains(ans, "TOOLDATA") { + t.Errorf("工具产出应注入 agent 上下文,got %q", ans) + } +} + +// map 并行 fan-out:拆项 → 各章撰写 → 汇成多章成稿。 +func TestRunGraph_MapFanout(t *testing.T) { + g := `{"nodes":[ + {"id":"i","kind":"input","config":{"text":"猫"}}, + {"id":"m","kind":"map","config":{"splitBy":"要点"}} + ],"edges":[{"source":"i","target":"m"}]}` + ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) { + u := m[len(m)-1].Content + if strings.Contains(u, "拆分") { + return `["第一章","第二章"]`, nil + } + return "正文XYZ", nil + }} + o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{}) + ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1")) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(ans, "## 第一章") || !strings.Contains(ans, "## 第二章") || !strings.Contains(ans, "正文XYZ") { + t.Errorf("map 应产出多章成稿,got %q", ans) + } +} + +// 输出护栏:发射的 token 中疑似密钥被脱敏。 +func TestRunGraph_OutputRedaction(t *testing.T) { + g := `{"nodes":[{"id":"i","kind":"input","config":{"text":"hi"}},{"id":"a","kind":"agent","config":{"system":"S"}}],"edges":[{"source":"i","target":"a"}]}` + ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { + return "你的 key 是 sk-912cf85b16d04b22bcb95f4576423bfb 请保密" + }} + fs := &fakeSink{} + o := newOrch(ll, &fakeTools{}, fs, &fakeExec{}) + if _, err := o.runGraph(context.Background(), task(g), o.tracer("t1")); err != nil { + t.Fatal(err) + } + out := fs.text() + if strings.Contains(out, "sk-912cf85b16d04b22bcb95f4576423bfb") { + t.Errorf("流式输出未脱敏密钥: %q", out) + } + if !strings.Contains(out, "[已脱敏]") { + t.Errorf("应出现脱敏标记: %q", out) + } +} + +// ---- 集成测试:handleReport ---- + +func TestHandleReport_PlanWriteStore(t *testing.T) { + ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) { + u := m[len(m)-1].Content + if strings.Contains(u, "大纲") { + return `{"title":"咖啡报告","sections":["提神","风险"]}`, nil + } + return "本章正文。", nil + }} + ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult { + return &contract.ToolResult{OK: true, Content: "/tmp/x.json"} + }} + fs := &fakeSink{} + o := newOrch(ll, ft, fs, &fakeExec{}) + tk := &contract.Task{ID: "r1", Graph: json.RawMessage(`{}`), Meta: map[string]any{contract.MetaTopic: "咖啡"}} + if err := o.handleReport(context.Background(), tk, o.tracer("r1")); err != nil { + t.Fatal(err) + } + if !ft.called("report_store") { + t.Error("应调用 report_store 存源") + } + out := fs.text() + if !strings.Contains(out, "# 咖啡报告") || !strings.Contains(out, "## 提神") || !strings.Contains(out, "## 风险") { + t.Errorf("报告流应含标题与各章,got %q", out) + } + if !fs.done { + t.Error("报告结束应 CompleteStream") + } +} diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index 03b5bcb..3639bbf 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -27,12 +27,20 @@ type ToolCaller interface { CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) } +// LLM 是编排所需的语言模型能力(生产由 *llm.Pool 实现)。抽成接口便于测试注入假模型。 +type LLM interface { + Ready() bool + ChatStream(ctx context.Context, msgs []llm.ChatMessage, onToken func(string)) error + StreamText(ctx context.Context, text string, onToken func([]byte)) error + Chat(ctx context.Context, msgs []llm.ChatMessage) (string, error) +} + // 工具调用超时;超时即降级(不带工具上下文继续推理)。 const toolCallTimeout = 3 * time.Second // Orchestrator 把每个 DSL 任务动态编译为 Eino 图并执行(记忆召回 → 工具节点 → 注入 → 流式)。 type Orchestrator struct { - pool *llm.Pool + pool LLM breaker *harness.CircuitBreaker eval *harness.Evaluator sink TokenSink @@ -42,7 +50,7 @@ type Orchestrator struct { // NewOrchestrator 持有依赖;图按任务的 DSL 在 Handle 内动态编译。 // exec 为执行可视化事件出口(可为 nil,则不发轨迹事件);eval 为自动化评测(可为 nil)。 -func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, eval *harness.Evaluator, sink TokenSink, tools ToolCaller, exec ExecSink) (*Orchestrator, error) { +func NewOrchestrator(pool LLM, breaker *harness.CircuitBreaker, eval *harness.Evaluator, sink TokenSink, tools ToolCaller, exec ExecSink) (*Orchestrator, error) { return &Orchestrator{pool: pool, breaker: breaker, eval: eval, sink: sink, tools: tools, exec: exec}, nil }