Files
sundynix-agentix/sundynix-dispatcher/internal/nats/subscriber.go
T
Blizzard cdc5b3a847 feat(observability): 执行可视化 — 节点级实时轨迹(运行·观测)
把任务执行做成可观测:Dispatcher 在每个节点/阶段发结构化 ExecEvent,
经独立 NATS 通道回流,前端逐节点点亮(状态/耗时/工具入参产出)。

- shared: contract.ExecEvent + ExecSubject(sundynix.exec.<id>,与 Token 流分流);
  bus.PublishExec/CompleteExec/SubscribeExec(core NATS,复用结束头)
- dispatcher: execTracer(自增 Seq 保序 + span 自动计耗时);
  Orchestrator 加 ExecSink;通用图(init 召回 / 各 tool 入参→产出 / prompt / model
  首token+token数)与报告编排(规划大纲 / 各章并行 start-end / 渲染)全程埋点
- gateway: SubscribeExec + GET /tasks/:id/exec SSE(与 token 流并行)
- desktop: streamExec + deriveNodes(按 node 归并 start/end/error/info);
  复用组件 ExecTrace(竖向轨道,按 kind 着色,运行中脉冲灯);
  新 RunsView(运行·观测:轨迹+输出双栏);BottomDrawer 轨迹/工具调用 tab 接真实数据;
  ReportView 加执行轨迹栏;左导航「运行」置就绪

实测:
- 报告任务 /exec:规划(2680ms,4章) → 4 章并行(seq 交错,各~7-8s 重叠=真并行,
  每章带 docs 知识库检索预览+成稿字数) → 渲染(docx 落盘)
- 通用图 /exec:tool:kb_search(678ms,入参→Milvus 产出) → prompt(2消息) →
  model(首token 860ms / 4 tokens)
- 浏览器(Preview):报告页执行轨迹逐节点点亮、章节带耗时/字数/检索片段,完成后下载 Word

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 14:29:28 +08:00

82 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package nats 是调度器对共享 bus 的薄封装(消费任务 / 回写 Token)。
package nats
import (
"context"
"log"
sharedbus "github.com/sundynix/sundynix-shared/bus"
"github.com/sundynix/sundynix-shared/contract"
)
// TaskHandler 处理单个任务。
type TaskHandler func(ctx context.Context, t *contract.Task) error
// Subscriber 包装共享 bus,向调度器暴露消费能力。
type Subscriber struct {
inner *sharedbus.Bus
}
// MustConnect 接入 NATS 并确保任务流存在(消费者声明在 Consume 时完成)。
func MustConnect(url string) *Subscriber {
inner, err := sharedbus.Connect(url)
if err != nil {
log.Fatalf("[dispatcher/nats] connect: %v", err)
}
if err := inner.EnsureTaskStream(context.Background()); err != nil {
log.Fatalf("[dispatcher/nats] ensure stream: %v", err)
}
log.Printf("[dispatcher/nats] connected %s", url)
return &Subscriber{inner: inner}
}
// ConsumeTasks 从 sundynix.tasks.* 持续消费任务(队列组负载均衡),阻塞至 ctx 取消。
func (s *Subscriber) ConsumeTasks(ctx context.Context, h TaskHandler) error {
stop, err := s.inner.ConsumeTasks(ctx, func(c context.Context, t *contract.Task) error {
return h(c, t)
})
if err != nil {
return err
}
defer stop()
<-ctx.Done()
return ctx.Err()
}
// PublishToken / CompleteStream 让 Subscriber 满足 eino.TokenSink
// 把推理 Token 回流到 sundynix.streams.<taskID>。
func (s *Subscriber) PublishToken(taskID string, token []byte) error {
return s.inner.PublishToken(taskID, token)
}
func (s *Subscriber) CompleteStream(taskID string) error {
return s.inner.CompleteStream(taskID)
}
// PublishExec / CompleteExec 让 Subscriber 满足 eino.ExecSink
// 把执行轨迹事件回流到 sundynix.exec.<taskID>(与 Token 流分开)。
func (s *Subscriber) PublishExec(taskID string, data []byte) error {
return s.inner.PublishExec(taskID, data)
}
func (s *Subscriber) CompleteExec(taskID string) error {
return s.inner.CompleteExec(taskID)
}
// CallTool 让 Subscriber 满足 eino.ToolCaller,经 NATS request-reply 调起第 5 层 MCP 工具。
func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) {
return s.inner.CallTool(ctx, subject, call)
}
// RequestModelConfig 向控制面(Gateway)取当前激活的对话模型配置。
func (s *Subscriber) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) {
return s.inner.RequestConfig(ctx, contract.ConfigKindChat)
}
// SubscribeModelConfigUpdated 订阅对话模型配置热更新。
func (s *Subscriber) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (func() error, error) {
return s.inner.SubscribeConfigUpdated(contract.ConfigKindChat, onUpdate)
}
func (s *Subscriber) Close() { s.inner.Close() }