// Package eino 封装基于 CloudWeGo Eino 的 Agent 图编排引擎。 package eino import ( "context" "log" "time" "github.com/sundynix/sundynix-dispatcher/internal/harness" "github.com/sundynix/sundynix-dispatcher/internal/llm" "github.com/sundynix/sundynix-shared/contract" ) // TokenSink 是 Token 流回流出口(由 NATS bus 实现)。 type TokenSink interface { PublishToken(taskID string, token []byte) error CompleteStream(taskID string) error } // ToolCaller 经 NATS 调起第 5 层 MCP 工具(由 NATS bus 实现)。 type ToolCaller interface { CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) } // 工具调用超时;超时即降级(不带工具上下文继续推理)。 const toolCallTimeout = 3 * time.Second // Orchestrator 将 DSL 图编译为 Eino Graph 并驱动执行。 type Orchestrator struct { pool *llm.Pool breaker *harness.CircuitBreaker sink TokenSink tools ToolCaller } func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) *Orchestrator { return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools} } // Handle 消费一个任务:编译图 → 流式推理 → 经 sink 把 Token 回流到 sundynix.streams.。 func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { if !o.breaker.Allow() { log.Printf("[eino] circuit open, drop task %s", t.ID) return nil } log.Printf("[eino] task %s received (graph=%d bytes), streaming tokens...", t.ID, len(t.Graph)) // TODO: compose.NewGraph(...) 编译 DSL;此处 prompt 占位为图原文。 prompt := string(t.Graph) // 工具节点:经 NATS 调用第 5 层 MCP(sundynix.tools.go.*)。 // 这里以 wiki_search 演示完整调用链路;真实 Eino 图会按 DSL 节点择机调用。 if ctxNote := o.retrieveContext(ctx, t); ctxNote != "" { prompt = ctxNote + "\n" + prompt } n := 0 err := o.pool.Stream(ctx, prompt, func(tok []byte) { if perr := o.sink.PublishToken(t.ID, tok); perr != nil { log.Printf("[eino] publish token failed: %v", perr) return } n++ }) if err != nil { log.Printf("[eino] task %s stream error: %v", t.ID, err) } if cerr := o.sink.CompleteStream(t.ID); cerr != nil { log.Printf("[eino] complete stream failed: %v", cerr) } log.Printf("[eino] task %s done, %d tokens streamed", t.ID, n) o.breaker.Report(err == nil) return err } // retrieveContext 经 MCP wiki_search 工具拉取检索上下文。 // 工具不可用/超时时返回空串,降级为无工具上下文推理(不阻断主流程)。 func (o *Orchestrator) retrieveContext(ctx context.Context, t *contract.Task) string { if o.tools == nil { return "" } cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) defer cancel() res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{ Tool: "wiki_search", TaskID: t.ID, Args: map[string]any{"q": string(t.Graph)}, }) if err != nil { log.Printf("[eino] task %s wiki_search unavailable, degrade: %v", t.ID, err) return "" } if !res.OK { log.Printf("[eino] task %s wiki_search error: %s", t.ID, res.Error) return "" } log.Printf("[eino] task %s wiki_search ok: %s", t.ID, res.Content) return res.Content }