cdc5b3a847
把任务执行做成可观测:Dispatcher 在每个节点/阶段发结构化 ExecEvent, 经独立 NATS 通道回流,前端逐节点点亮(状态/耗时/工具入参产出)。 - shared: contract.ExecEvent + ExecSubject(sundynix.exec.<id>,与 Token 流分流); bus.PublishExec/CompleteExec/SubscribeExec(core NATS,复用结束头) - dispatcher: execTracer(自增 Seq 保序 + span 自动计耗时); Orchestrator 加 ExecSink;通用图(init 召回 / 各 tool 入参→产出 / prompt / model 首token+token数)与报告编排(规划大纲 / 各章并行 start-end / 渲染)全程埋点 - gateway: SubscribeExec + GET /tasks/:id/exec SSE(与 token 流并行) - desktop: streamExec + deriveNodes(按 node 归并 start/end/error/info); 复用组件 ExecTrace(竖向轨道,按 kind 着色,运行中脉冲灯); 新 RunsView(运行·观测:轨迹+输出双栏);BottomDrawer 轨迹/工具调用 tab 接真实数据; ReportView 加执行轨迹栏;左导航「运行」置就绪 实测: - 报告任务 /exec:规划(2680ms,4章) → 4 章并行(seq 交错,各~7-8s 重叠=真并行, 每章带 docs 知识库检索预览+成稿字数) → 渲染(docx 落盘) - 通用图 /exec:tool:kb_search(678ms,入参→Milvus 产出) → prompt(2消息) → model(首token 860ms / 4 tokens) - 浏览器(Preview):报告页执行轨迹逐节点点亮、章节带耗时/字数/检索片段,完成后下载 Word Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
308 lines
9.8 KiB
Go
308 lines
9.8 KiB
Go
// Package bus 封装 NATS JetStream 的连接、流声明、任务发布与消费。
|
||
// Gateway 与 Dispatcher 共用这套真实收发逻辑,e2e 测试也直接覆盖它。
|
||
package bus
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/nats-io/nats.go"
|
||
"github.com/nats-io/nats.go/jetstream"
|
||
|
||
"github.com/sundynix/sundynix-shared/contract"
|
||
)
|
||
|
||
// Bus 持有 NATS 连接与 JetStream 上下文。
|
||
type Bus struct {
|
||
nc *nats.Conn
|
||
js jetstream.JetStream
|
||
}
|
||
|
||
// Connect 接入 NATS 骨干网并初始化 JetStream,使用默认重试参数。
|
||
func Connect(url string) (*Bus, error) {
|
||
return ConnectWithRetry(url, 30, time.Second)
|
||
}
|
||
|
||
// ConnectWithRetry 在 NATS 暂不可用时按固定间隔重试,容忍服务先于 NATS 启动。
|
||
func ConnectWithRetry(url string, attempts int, interval time.Duration) (*Bus, error) {
|
||
var lastErr error
|
||
for i := 0; i < attempts; i++ {
|
||
nc, err := nats.Connect(url,
|
||
nats.Timeout(5*time.Second),
|
||
nats.RetryOnFailedConnect(true),
|
||
nats.MaxReconnects(-1),
|
||
nats.ReconnectWait(interval),
|
||
)
|
||
if err != nil {
|
||
lastErr = err
|
||
time.Sleep(interval)
|
||
continue
|
||
}
|
||
// RetryOnFailedConnect 下 Connect 可能立即返回但尚未连上,等待真正建立。
|
||
if nc.Status() != nats.CONNECTED {
|
||
if !waitConnected(nc, 5*time.Second) {
|
||
lastErr = fmt.Errorf("nats not connected within timeout")
|
||
nc.Close()
|
||
time.Sleep(interval)
|
||
continue
|
||
}
|
||
}
|
||
js, err := jetstream.New(nc)
|
||
if err != nil {
|
||
nc.Close()
|
||
return nil, fmt.Errorf("jetstream init: %w", err)
|
||
}
|
||
return &Bus{nc: nc, js: js}, nil
|
||
}
|
||
return nil, fmt.Errorf("nats connect after %d attempts: %w", attempts, lastErr)
|
||
}
|
||
|
||
func waitConnected(nc *nats.Conn, d time.Duration) bool {
|
||
deadline := time.Now().Add(d)
|
||
for time.Now().Before(deadline) {
|
||
if nc.Status() == nats.CONNECTED {
|
||
return true
|
||
}
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
return nc.Status() == nats.CONNECTED
|
||
}
|
||
|
||
// Close 关闭底层连接。
|
||
func (b *Bus) Close() {
|
||
if b.nc != nil {
|
||
b.nc.Close()
|
||
}
|
||
}
|
||
|
||
// EnsureTaskStream 幂等地创建/更新任务流,捕获 sundynix.tasks.>。
|
||
func (b *Bus) EnsureTaskStream(ctx context.Context) error {
|
||
_, err := b.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
|
||
Name: contract.StreamTasks,
|
||
Subjects: []string{contract.SubjectTasksAll},
|
||
Storage: jetstream.FileStorage,
|
||
})
|
||
return err
|
||
}
|
||
|
||
// PublishTask 把任务发布到 sundynix.tasks.<id>,返回序列号。
|
||
func (b *Bus) PublishTask(ctx context.Context, t *contract.Task) (uint64, error) {
|
||
data, err := t.Marshal()
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
ack, err := b.js.Publish(ctx, contract.TaskSubject(t.ID), data)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("publish task: %w", err)
|
||
}
|
||
return ack.Sequence, nil
|
||
}
|
||
|
||
// ---- Token 流回流(core NATS 零拷贝字节管道)----
|
||
|
||
// PublishToken 把一个推理 Token 以 core NATS 写到 sundynix.streams.<taskID>。
|
||
func (b *Bus) PublishToken(taskID string, token []byte) error {
|
||
return b.nc.Publish(contract.StreamSubject(taskID), token)
|
||
}
|
||
|
||
// CompleteStream 发送 Token 流结束信号(空体 + 结束头)。
|
||
func (b *Bus) CompleteStream(taskID string) error {
|
||
msg := nats.NewMsg(contract.StreamSubject(taskID))
|
||
msg.Header.Set(contract.HeaderStreamEnd, "1")
|
||
return b.nc.PublishMsg(msg)
|
||
}
|
||
|
||
// SubscribeTokens 订阅某 task 的 Token 流。每个 Token 触发 onToken;
|
||
// 收到结束信号后触发 onDone。返回的 unsub 用于退订。
|
||
// 注意:core NATS 无持久化,订阅须在 Token 产生前建立(SSE 客户端先连)。
|
||
func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func()) (unsub func() error, err error) {
|
||
sub, err := b.nc.Subscribe(contract.StreamSubject(taskID), func(m *nats.Msg) {
|
||
if m.Header.Get(contract.HeaderStreamEnd) == "1" {
|
||
onDone()
|
||
return
|
||
}
|
||
// 拷贝,避免 nats 复用底层 buffer。
|
||
tok := make([]byte, len(m.Data))
|
||
copy(tok, m.Data)
|
||
onToken(tok)
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("subscribe tokens: %w", err)
|
||
}
|
||
return sub.Unsubscribe, nil
|
||
}
|
||
|
||
// ---- 执行可视化事件(core NATS,与 Token 流分流)----
|
||
|
||
// PublishExec 把一条执行事件(JSON)发到 sundynix.exec.<taskID>。
|
||
func (b *Bus) PublishExec(taskID string, data []byte) error {
|
||
return b.nc.Publish(contract.ExecSubject(taskID), data)
|
||
}
|
||
|
||
// CompleteExec 发送执行事件流结束信号(空体 + 结束头)。
|
||
func (b *Bus) CompleteExec(taskID string) error {
|
||
msg := nats.NewMsg(contract.ExecSubject(taskID))
|
||
msg.Header.Set(contract.HeaderStreamEnd, "1")
|
||
return b.nc.PublishMsg(msg)
|
||
}
|
||
|
||
// SubscribeExec 订阅某 task 的执行事件流。每条事件触发 onEvent;结束触发 onDone。
|
||
func (b *Bus) SubscribeExec(taskID string, onEvent func([]byte), onDone func()) (unsub func() error, err error) {
|
||
sub, err := b.nc.Subscribe(contract.ExecSubject(taskID), func(m *nats.Msg) {
|
||
if m.Header.Get(contract.HeaderStreamEnd) == "1" {
|
||
onDone()
|
||
return
|
||
}
|
||
data := make([]byte, len(m.Data))
|
||
copy(data, m.Data)
|
||
onEvent(data)
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("subscribe exec: %w", err)
|
||
}
|
||
return sub.Unsubscribe, nil
|
||
}
|
||
|
||
// ---- MCP 工具调用(core NATS request-reply)----
|
||
|
||
// CallTool 同步调用一个 MCP 工具:发到 subject,阻塞等待应答。
|
||
// ctx 超时即视为工具不可用,由调用方决定降级。
|
||
func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) {
|
||
data, err := json.Marshal(call)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("marshal tool call: %w", err)
|
||
}
|
||
msg, err := b.nc.RequestWithContext(ctx, subject, data)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("call tool %s: %w", subject, err)
|
||
}
|
||
var res contract.ToolResult
|
||
if err := json.Unmarshal(msg.Data, &res); err != nil {
|
||
return nil, fmt.Errorf("unmarshal tool result: %w", err)
|
||
}
|
||
return &res, nil
|
||
}
|
||
|
||
// ToolHandler 处理一次工具调用并返回结果。
|
||
type ToolHandler func(ctx context.Context, call *contract.ToolCall) *contract.ToolResult
|
||
|
||
// ServeTool 以队列组订阅工具主题(可用通配 sundynix.tools.go.>),
|
||
// 对每个请求调用 h 并 Respond,队列组内多副本自动负载均衡。
|
||
// 返回的 unsub 用于退订。
|
||
func (b *Bus) ServeTool(subject, queue string, h ToolHandler) (unsub func() error, err error) {
|
||
sub, err := b.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) {
|
||
var call contract.ToolCall
|
||
if err := json.Unmarshal(m.Data, &call); err != nil {
|
||
respond(m, &contract.ToolResult{OK: false, Error: "bad tool call: " + err.Error()})
|
||
return
|
||
}
|
||
respond(m, h(context.Background(), &call))
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("serve tool %s: %w", subject, err)
|
||
}
|
||
return sub.Unsubscribe, nil
|
||
}
|
||
|
||
func respond(m *nats.Msg, res *contract.ToolResult) {
|
||
data, err := json.Marshal(res)
|
||
if err != nil {
|
||
data, _ = json.Marshal(&contract.ToolResult{OK: false, Error: "marshal result: " + err.Error()})
|
||
}
|
||
_ = m.Respond(data)
|
||
}
|
||
|
||
// ---- 配置控制面(core NATS request-reply + broadcast)----
|
||
|
||
// RequestConfig 向控制面(Gateway)请求某 kind 当前激活配置(chat/embedding)。
|
||
// 无人应答 / 无激活配置时返回 (nil, nil),由调用方降级。
|
||
func (b *Bus) RequestConfig(ctx context.Context, kind string) (*contract.ModelConfig, error) {
|
||
msg, err := b.nc.RequestWithContext(ctx, contract.ConfigGetSubject(kind), nil)
|
||
if err != nil {
|
||
return nil, nil // 控制面暂不可用,降级
|
||
}
|
||
if len(msg.Data) == 0 {
|
||
return nil, nil
|
||
}
|
||
var cfg contract.ModelConfig
|
||
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
|
||
return nil, fmt.Errorf("unmarshal %s config: %w", kind, err)
|
||
}
|
||
if !cfg.Ready() {
|
||
return nil, nil
|
||
}
|
||
return &cfg, nil
|
||
}
|
||
|
||
// ServeConfig 让控制面响应某 kind 的配置请求;provide 返回当前激活配置(可为 nil)。
|
||
func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (unsub func() error, err error) {
|
||
sub, err := b.nc.Subscribe(contract.ConfigGetSubject(kind), func(m *nats.Msg) {
|
||
var data []byte
|
||
if cfg := provide(); cfg != nil {
|
||
data, _ = json.Marshal(cfg)
|
||
}
|
||
_ = m.Respond(data)
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("serve %s config: %w", kind, err)
|
||
}
|
||
return sub.Unsubscribe, nil
|
||
}
|
||
|
||
// PublishConfigUpdated 广播某 kind 的配置变更(消费方据此热更新)。
|
||
func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error {
|
||
data, err := json.Marshal(cfg)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return b.nc.Publish(contract.ConfigUpdatedSubject(kind), data)
|
||
}
|
||
|
||
// SubscribeConfigUpdated 订阅某 kind 的配置变更。
|
||
func (b *Bus) SubscribeConfigUpdated(kind string, onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) {
|
||
sub, err := b.nc.Subscribe(contract.ConfigUpdatedSubject(kind), func(m *nats.Msg) {
|
||
var cfg contract.ModelConfig
|
||
if json.Unmarshal(m.Data, &cfg) == nil {
|
||
onUpdate(&cfg)
|
||
}
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("subscribe %s config: %w", kind, err)
|
||
}
|
||
return sub.Unsubscribe, nil
|
||
}
|
||
|
||
// TaskHandler 处理一个消费到的任务。
|
||
type TaskHandler func(ctx context.Context, t *contract.Task) error
|
||
|
||
// ConsumeTasks 在持久消费者上消费任务,队列组内负载均衡。
|
||
// 返回的 stop 函数用于优雅停止消费。
|
||
func (b *Bus) ConsumeTasks(ctx context.Context, h TaskHandler) (stop func(), err error) {
|
||
cons, err := b.js.CreateOrUpdateConsumer(ctx, contract.StreamTasks, jetstream.ConsumerConfig{
|
||
Durable: contract.ConsumerDurable,
|
||
AckPolicy: jetstream.AckExplicitPolicy,
|
||
FilterSubject: contract.SubjectTasksAll,
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("create consumer: %w", err)
|
||
}
|
||
cc, err := cons.Consume(func(msg jetstream.Msg) {
|
||
t, err := contract.Unmarshal(msg.Data())
|
||
if err != nil {
|
||
_ = msg.Term() // 脏数据,丢弃不重投
|
||
return
|
||
}
|
||
if err := h(ctx, t); err != nil {
|
||
_ = msg.NakWithDelay(time.Second) // 处理失败,延迟重投
|
||
return
|
||
}
|
||
_ = msg.Ack()
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("consume: %w", err)
|
||
}
|
||
return cc.Stop, nil
|
||
}
|