Files
Blizzard cdc5b3a847 feat(observability): 执行可视化 — 节点级实时轨迹(运行·观测)
把任务执行做成可观测:Dispatcher 在每个节点/阶段发结构化 ExecEvent,
经独立 NATS 通道回流,前端逐节点点亮(状态/耗时/工具入参产出)。

- shared: contract.ExecEvent + ExecSubject(sundynix.exec.<id>,与 Token 流分流);
  bus.PublishExec/CompleteExec/SubscribeExec(core NATS,复用结束头)
- dispatcher: execTracer(自增 Seq 保序 + span 自动计耗时);
  Orchestrator 加 ExecSink;通用图(init 召回 / 各 tool 入参→产出 / prompt / model
  首token+token数)与报告编排(规划大纲 / 各章并行 start-end / 渲染)全程埋点
- gateway: SubscribeExec + GET /tasks/:id/exec SSE(与 token 流并行)
- desktop: streamExec + deriveNodes(按 node 归并 start/end/error/info);
  复用组件 ExecTrace(竖向轨道,按 kind 着色,运行中脉冲灯);
  新 RunsView(运行·观测:轨迹+输出双栏);BottomDrawer 轨迹/工具调用 tab 接真实数据;
  ReportView 加执行轨迹栏;左导航「运行」置就绪

实测:
- 报告任务 /exec:规划(2680ms,4章) → 4 章并行(seq 交错,各~7-8s 重叠=真并行,
  每章带 docs 知识库检索预览+成稿字数) → 渲染(docx 落盘)
- 通用图 /exec:tool:kb_search(678ms,入参→Milvus 产出) → prompt(2消息) →
  model(首token 860ms / 4 tokens)
- 浏览器(Preview):报告页执行轨迹逐节点点亮、章节带耗时/字数/检索片段,完成后下载 Word

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 14:29:28 +08:00

308 lines
9.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package bus 封装 NATS JetStream 的连接、流声明、任务发布与消费。
// Gateway 与 Dispatcher 共用这套真实收发逻辑,e2e 测试也直接覆盖它。
package bus
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/sundynix/sundynix-shared/contract"
)
// Bus 持有 NATS 连接与 JetStream 上下文。
type Bus struct {
nc *nats.Conn
js jetstream.JetStream
}
// Connect 接入 NATS 骨干网并初始化 JetStream,使用默认重试参数。
func Connect(url string) (*Bus, error) {
return ConnectWithRetry(url, 30, time.Second)
}
// ConnectWithRetry 在 NATS 暂不可用时按固定间隔重试,容忍服务先于 NATS 启动。
func ConnectWithRetry(url string, attempts int, interval time.Duration) (*Bus, error) {
var lastErr error
for i := 0; i < attempts; i++ {
nc, err := nats.Connect(url,
nats.Timeout(5*time.Second),
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(-1),
nats.ReconnectWait(interval),
)
if err != nil {
lastErr = err
time.Sleep(interval)
continue
}
// RetryOnFailedConnect 下 Connect 可能立即返回但尚未连上,等待真正建立。
if nc.Status() != nats.CONNECTED {
if !waitConnected(nc, 5*time.Second) {
lastErr = fmt.Errorf("nats not connected within timeout")
nc.Close()
time.Sleep(interval)
continue
}
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return nil, fmt.Errorf("jetstream init: %w", err)
}
return &Bus{nc: nc, js: js}, nil
}
return nil, fmt.Errorf("nats connect after %d attempts: %w", attempts, lastErr)
}
func waitConnected(nc *nats.Conn, d time.Duration) bool {
deadline := time.Now().Add(d)
for time.Now().Before(deadline) {
if nc.Status() == nats.CONNECTED {
return true
}
time.Sleep(50 * time.Millisecond)
}
return nc.Status() == nats.CONNECTED
}
// Close 关闭底层连接。
func (b *Bus) Close() {
if b.nc != nil {
b.nc.Close()
}
}
// EnsureTaskStream 幂等地创建/更新任务流,捕获 sundynix.tasks.>。
func (b *Bus) EnsureTaskStream(ctx context.Context) error {
_, err := b.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: contract.StreamTasks,
Subjects: []string{contract.SubjectTasksAll},
Storage: jetstream.FileStorage,
})
return err
}
// PublishTask 把任务发布到 sundynix.tasks.<id>,返回序列号。
func (b *Bus) PublishTask(ctx context.Context, t *contract.Task) (uint64, error) {
data, err := t.Marshal()
if err != nil {
return 0, err
}
ack, err := b.js.Publish(ctx, contract.TaskSubject(t.ID), data)
if err != nil {
return 0, fmt.Errorf("publish task: %w", err)
}
return ack.Sequence, nil
}
// ---- Token 流回流(core NATS 零拷贝字节管道)----
// PublishToken 把一个推理 Token 以 core NATS 写到 sundynix.streams.<taskID>。
func (b *Bus) PublishToken(taskID string, token []byte) error {
return b.nc.Publish(contract.StreamSubject(taskID), token)
}
// CompleteStream 发送 Token 流结束信号(空体 + 结束头)。
func (b *Bus) CompleteStream(taskID string) error {
msg := nats.NewMsg(contract.StreamSubject(taskID))
msg.Header.Set(contract.HeaderStreamEnd, "1")
return b.nc.PublishMsg(msg)
}
// SubscribeTokens 订阅某 task 的 Token 流。每个 Token 触发 onToken
// 收到结束信号后触发 onDone。返回的 unsub 用于退订。
// 注意:core NATS 无持久化,订阅须在 Token 产生前建立(SSE 客户端先连)。
func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func()) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.StreamSubject(taskID), func(m *nats.Msg) {
if m.Header.Get(contract.HeaderStreamEnd) == "1" {
onDone()
return
}
// 拷贝,避免 nats 复用底层 buffer。
tok := make([]byte, len(m.Data))
copy(tok, m.Data)
onToken(tok)
})
if err != nil {
return nil, fmt.Errorf("subscribe tokens: %w", err)
}
return sub.Unsubscribe, nil
}
// ---- 执行可视化事件(core NATS,与 Token 流分流)----
// PublishExec 把一条执行事件(JSON)发到 sundynix.exec.<taskID>。
func (b *Bus) PublishExec(taskID string, data []byte) error {
return b.nc.Publish(contract.ExecSubject(taskID), data)
}
// CompleteExec 发送执行事件流结束信号(空体 + 结束头)。
func (b *Bus) CompleteExec(taskID string) error {
msg := nats.NewMsg(contract.ExecSubject(taskID))
msg.Header.Set(contract.HeaderStreamEnd, "1")
return b.nc.PublishMsg(msg)
}
// SubscribeExec 订阅某 task 的执行事件流。每条事件触发 onEvent;结束触发 onDone。
func (b *Bus) SubscribeExec(taskID string, onEvent func([]byte), onDone func()) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.ExecSubject(taskID), func(m *nats.Msg) {
if m.Header.Get(contract.HeaderStreamEnd) == "1" {
onDone()
return
}
data := make([]byte, len(m.Data))
copy(data, m.Data)
onEvent(data)
})
if err != nil {
return nil, fmt.Errorf("subscribe exec: %w", err)
}
return sub.Unsubscribe, nil
}
// ---- MCP 工具调用(core NATS request-reply----
// CallTool 同步调用一个 MCP 工具:发到 subject,阻塞等待应答。
// ctx 超时即视为工具不可用,由调用方决定降级。
func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) {
data, err := json.Marshal(call)
if err != nil {
return nil, fmt.Errorf("marshal tool call: %w", err)
}
msg, err := b.nc.RequestWithContext(ctx, subject, data)
if err != nil {
return nil, fmt.Errorf("call tool %s: %w", subject, err)
}
var res contract.ToolResult
if err := json.Unmarshal(msg.Data, &res); err != nil {
return nil, fmt.Errorf("unmarshal tool result: %w", err)
}
return &res, nil
}
// ToolHandler 处理一次工具调用并返回结果。
type ToolHandler func(ctx context.Context, call *contract.ToolCall) *contract.ToolResult
// ServeTool 以队列组订阅工具主题(可用通配 sundynix.tools.go.>),
// 对每个请求调用 h 并 Respond,队列组内多副本自动负载均衡。
// 返回的 unsub 用于退订。
func (b *Bus) ServeTool(subject, queue string, h ToolHandler) (unsub func() error, err error) {
sub, err := b.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) {
var call contract.ToolCall
if err := json.Unmarshal(m.Data, &call); err != nil {
respond(m, &contract.ToolResult{OK: false, Error: "bad tool call: " + err.Error()})
return
}
respond(m, h(context.Background(), &call))
})
if err != nil {
return nil, fmt.Errorf("serve tool %s: %w", subject, err)
}
return sub.Unsubscribe, nil
}
func respond(m *nats.Msg, res *contract.ToolResult) {
data, err := json.Marshal(res)
if err != nil {
data, _ = json.Marshal(&contract.ToolResult{OK: false, Error: "marshal result: " + err.Error()})
}
_ = m.Respond(data)
}
// ---- 配置控制面(core NATS request-reply + broadcast----
// RequestConfig 向控制面(Gateway)请求某 kind 当前激活配置(chat/embedding)。
// 无人应答 / 无激活配置时返回 (nil, nil),由调用方降级。
func (b *Bus) RequestConfig(ctx context.Context, kind string) (*contract.ModelConfig, error) {
msg, err := b.nc.RequestWithContext(ctx, contract.ConfigGetSubject(kind), nil)
if err != nil {
return nil, nil // 控制面暂不可用,降级
}
if len(msg.Data) == 0 {
return nil, nil
}
var cfg contract.ModelConfig
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
return nil, fmt.Errorf("unmarshal %s config: %w", kind, err)
}
if !cfg.Ready() {
return nil, nil
}
return &cfg, nil
}
// ServeConfig 让控制面响应某 kind 的配置请求;provide 返回当前激活配置(可为 nil)。
func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.ConfigGetSubject(kind), func(m *nats.Msg) {
var data []byte
if cfg := provide(); cfg != nil {
data, _ = json.Marshal(cfg)
}
_ = m.Respond(data)
})
if err != nil {
return nil, fmt.Errorf("serve %s config: %w", kind, err)
}
return sub.Unsubscribe, nil
}
// PublishConfigUpdated 广播某 kind 的配置变更(消费方据此热更新)。
func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error {
data, err := json.Marshal(cfg)
if err != nil {
return err
}
return b.nc.Publish(contract.ConfigUpdatedSubject(kind), data)
}
// SubscribeConfigUpdated 订阅某 kind 的配置变更。
func (b *Bus) SubscribeConfigUpdated(kind string, onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.ConfigUpdatedSubject(kind), func(m *nats.Msg) {
var cfg contract.ModelConfig
if json.Unmarshal(m.Data, &cfg) == nil {
onUpdate(&cfg)
}
})
if err != nil {
return nil, fmt.Errorf("subscribe %s config: %w", kind, err)
}
return sub.Unsubscribe, nil
}
// TaskHandler 处理一个消费到的任务。
type TaskHandler func(ctx context.Context, t *contract.Task) error
// ConsumeTasks 在持久消费者上消费任务,队列组内负载均衡。
// 返回的 stop 函数用于优雅停止消费。
func (b *Bus) ConsumeTasks(ctx context.Context, h TaskHandler) (stop func(), err error) {
cons, err := b.js.CreateOrUpdateConsumer(ctx, contract.StreamTasks, jetstream.ConsumerConfig{
Durable: contract.ConsumerDurable,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: contract.SubjectTasksAll,
})
if err != nil {
return nil, fmt.Errorf("create consumer: %w", err)
}
cc, err := cons.Consume(func(msg jetstream.Msg) {
t, err := contract.Unmarshal(msg.Data())
if err != nil {
_ = msg.Term() // 脏数据,丢弃不重投
return
}
if err := h(ctx, t); err != nil {
_ = msg.NakWithDelay(time.Second) // 处理失败,延迟重投
return
}
_ = msg.Ack()
})
if err != nil {
return nil, fmt.Errorf("consume: %w", err)
}
return cc.Stop, nil
}