adc521f94d
第 4 层 Dispatcher 经 NATS request-reply + 队列组同步调用第 5 层 MCP 工具, 工具不可用/超时即降级,不阻断主流程。 - shared/contract: ToolCall/ToolResult + sundynix.tools.go.* subject 约定 + ToolSubjectGo/Py - shared/bus: CallTool(发起) / ServeTool(队列组订阅+应答) - mcp-go: 接共享 bus,gateway 通配订阅按工具名分发(wiki_search/echo),main 优雅退出 - dispatcher: ToolCaller 接口 + Orchestrator.retrieveContext(调 wiki_search,超时3s降级) - e2e: TestToolCallRoundTrip(PASS);demo.sh 加 mcp-go(就绪门避免启动竞态),live 跑通 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
102 lines
3.3 KiB
Go
102 lines
3.3 KiB
Go
// Package eino 封装基于 CloudWeGo Eino 的 Agent 图编排引擎。
|
||
package eino
|
||
|
||
import (
|
||
"context"
|
||
"log"
|
||
"time"
|
||
|
||
"github.com/sundynix/sundynix-dispatcher/internal/harness"
|
||
"github.com/sundynix/sundynix-dispatcher/internal/llm"
|
||
"github.com/sundynix/sundynix-shared/contract"
|
||
)
|
||
|
||
// TokenSink 是 Token 流回流出口(由 NATS bus 实现)。
|
||
type TokenSink interface {
|
||
PublishToken(taskID string, token []byte) error
|
||
CompleteStream(taskID string) error
|
||
}
|
||
|
||
// ToolCaller 经 NATS 调起第 5 层 MCP 工具(由 NATS bus 实现)。
|
||
type ToolCaller interface {
|
||
CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error)
|
||
}
|
||
|
||
// 工具调用超时;超时即降级(不带工具上下文继续推理)。
|
||
const toolCallTimeout = 3 * time.Second
|
||
|
||
// Orchestrator 将 DSL 图编译为 Eino Graph 并驱动执行。
|
||
type Orchestrator struct {
|
||
pool *llm.Pool
|
||
breaker *harness.CircuitBreaker
|
||
sink TokenSink
|
||
tools ToolCaller
|
||
}
|
||
|
||
func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) *Orchestrator {
|
||
return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools}
|
||
}
|
||
|
||
// Handle 消费一个任务:编译图 → 流式推理 → 经 sink 把 Token 回流到 sundynix.streams.<id>。
|
||
func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error {
|
||
if !o.breaker.Allow() {
|
||
log.Printf("[eino] circuit open, drop task %s", t.ID)
|
||
return nil
|
||
}
|
||
log.Printf("[eino] task %s received (graph=%d bytes), streaming tokens...", t.ID, len(t.Graph))
|
||
|
||
// TODO: compose.NewGraph(...) 编译 DSL;此处 prompt 占位为图原文。
|
||
prompt := string(t.Graph)
|
||
|
||
// 工具节点:经 NATS 调用第 5 层 MCP(sundynix.tools.go.*)。
|
||
// 这里以 wiki_search 演示完整调用链路;真实 Eino 图会按 DSL 节点择机调用。
|
||
if ctxNote := o.retrieveContext(ctx, t); ctxNote != "" {
|
||
prompt = ctxNote + "\n" + prompt
|
||
}
|
||
|
||
n := 0
|
||
err := o.pool.Stream(ctx, prompt, func(tok []byte) {
|
||
if perr := o.sink.PublishToken(t.ID, tok); perr != nil {
|
||
log.Printf("[eino] publish token failed: %v", perr)
|
||
return
|
||
}
|
||
n++
|
||
})
|
||
if err != nil {
|
||
log.Printf("[eino] task %s stream error: %v", t.ID, err)
|
||
}
|
||
|
||
if cerr := o.sink.CompleteStream(t.ID); cerr != nil {
|
||
log.Printf("[eino] complete stream failed: %v", cerr)
|
||
}
|
||
log.Printf("[eino] task %s done, %d tokens streamed", t.ID, n)
|
||
o.breaker.Report(err == nil)
|
||
return err
|
||
}
|
||
|
||
// retrieveContext 经 MCP wiki_search 工具拉取检索上下文。
|
||
// 工具不可用/超时时返回空串,降级为无工具上下文推理(不阻断主流程)。
|
||
func (o *Orchestrator) retrieveContext(ctx context.Context, t *contract.Task) string {
|
||
if o.tools == nil {
|
||
return ""
|
||
}
|
||
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
|
||
defer cancel()
|
||
|
||
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{
|
||
Tool: "wiki_search",
|
||
TaskID: t.ID,
|
||
Args: map[string]any{"q": string(t.Graph)},
|
||
})
|
||
if err != nil {
|
||
log.Printf("[eino] task %s wiki_search unavailable, degrade: %v", t.ID, err)
|
||
return ""
|
||
}
|
||
if !res.OK {
|
||
log.Printf("[eino] task %s wiki_search error: %s", t.ID, res.Error)
|
||
return ""
|
||
}
|
||
log.Printf("[eino] task %s wiki_search ok: %s", t.ID, res.Content)
|
||
return res.Content
|
||
}
|