// Package bus 封装 NATS JetStream 的连接、流声明、任务发布与消费。 // Gateway 与 Dispatcher 共用这套真实收发逻辑,e2e 测试也直接覆盖它。 package bus import ( "context" "encoding/json" "fmt" "time" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/sundynix/sundynix-shared/contract" ) // Bus 持有 NATS 连接与 JetStream 上下文。 type Bus struct { nc *nats.Conn js jetstream.JetStream } // Connect 接入 NATS 骨干网并初始化 JetStream,使用默认重试参数。 func Connect(url string) (*Bus, error) { return ConnectWithRetry(url, 30, time.Second) } // ConnectWithRetry 在 NATS 暂不可用时按固定间隔重试,容忍服务先于 NATS 启动。 func ConnectWithRetry(url string, attempts int, interval time.Duration) (*Bus, error) { var lastErr error for i := 0; i < attempts; i++ { nc, err := nats.Connect(url, nats.Timeout(5*time.Second), nats.RetryOnFailedConnect(true), nats.MaxReconnects(-1), nats.ReconnectWait(interval), ) if err != nil { lastErr = err time.Sleep(interval) continue } // RetryOnFailedConnect 下 Connect 可能立即返回但尚未连上,等待真正建立。 if nc.Status() != nats.CONNECTED { if !waitConnected(nc, 5*time.Second) { lastErr = fmt.Errorf("nats not connected within timeout") nc.Close() time.Sleep(interval) continue } } js, err := jetstream.New(nc) if err != nil { nc.Close() return nil, fmt.Errorf("jetstream init: %w", err) } return &Bus{nc: nc, js: js}, nil } return nil, fmt.Errorf("nats connect after %d attempts: %w", attempts, lastErr) } func waitConnected(nc *nats.Conn, d time.Duration) bool { deadline := time.Now().Add(d) for time.Now().Before(deadline) { if nc.Status() == nats.CONNECTED { return true } time.Sleep(50 * time.Millisecond) } return nc.Status() == nats.CONNECTED } // Close 关闭底层连接。 func (b *Bus) Close() { if b.nc != nil { b.nc.Close() } } // EnsureTaskStream 幂等地创建/更新任务流,捕获 sundynix.tasks.>。 func (b *Bus) EnsureTaskStream(ctx context.Context) error { _, err := b.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ Name: contract.StreamTasks, Subjects: []string{contract.SubjectTasksAll}, Storage: jetstream.FileStorage, }) return err } // PublishTask 把任务发布到 sundynix.tasks.,返回序列号。 func (b *Bus) PublishTask(ctx context.Context, t *contract.Task) (uint64, error) { data, err := t.Marshal() if err != nil { return 0, err } ack, err := b.js.Publish(ctx, contract.TaskSubject(t.ID), data) if err != nil { return 0, fmt.Errorf("publish task: %w", err) } return ack.Sequence, nil } // ---- Token 流回流(core NATS 零拷贝字节管道)---- // PublishToken 把一个推理 Token 以 core NATS 写到 sundynix.streams.。 func (b *Bus) PublishToken(taskID string, token []byte) error { return b.nc.Publish(contract.StreamSubject(taskID), token) } // CompleteStream 发送 Token 流结束信号(空体 + 结束头)。 func (b *Bus) CompleteStream(taskID string) error { msg := nats.NewMsg(contract.StreamSubject(taskID)) msg.Header.Set(contract.HeaderStreamEnd, "1") return b.nc.PublishMsg(msg) } // SubscribeTokens 订阅某 task 的 Token 流。每个 Token 触发 onToken; // 收到结束信号后触发 onDone。返回的 unsub 用于退订。 // 注意:core NATS 无持久化,订阅须在 Token 产生前建立(SSE 客户端先连)。 func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func()) (unsub func() error, err error) { sub, err := b.nc.Subscribe(contract.StreamSubject(taskID), func(m *nats.Msg) { if m.Header.Get(contract.HeaderStreamEnd) == "1" { onDone() return } // 拷贝,避免 nats 复用底层 buffer。 tok := make([]byte, len(m.Data)) copy(tok, m.Data) onToken(tok) }) if err != nil { return nil, fmt.Errorf("subscribe tokens: %w", err) } return sub.Unsubscribe, nil } // ---- 执行可视化事件(core NATS,与 Token 流分流)---- // PublishExec 把一条执行事件(JSON)发到 sundynix.exec.。 func (b *Bus) PublishExec(taskID string, data []byte) error { return b.nc.Publish(contract.ExecSubject(taskID), data) } // CompleteExec 发送执行事件流结束信号(空体 + 结束头)。 func (b *Bus) CompleteExec(taskID string) error { msg := nats.NewMsg(contract.ExecSubject(taskID)) msg.Header.Set(contract.HeaderStreamEnd, "1") return b.nc.PublishMsg(msg) } // SubscribeExec 订阅某 task 的执行事件流。每条事件触发 onEvent;结束触发 onDone。 func (b *Bus) SubscribeExec(taskID string, onEvent func([]byte), onDone func()) (unsub func() error, err error) { sub, err := b.nc.Subscribe(contract.ExecSubject(taskID), func(m *nats.Msg) { if m.Header.Get(contract.HeaderStreamEnd) == "1" { onDone() return } data := make([]byte, len(m.Data)) copy(data, m.Data) onEvent(data) }) if err != nil { return nil, fmt.Errorf("subscribe exec: %w", err) } return sub.Unsubscribe, nil } // ---- MCP 工具调用(core NATS request-reply)---- // CallTool 同步调用一个 MCP 工具:发到 subject,阻塞等待应答。 // ctx 超时即视为工具不可用,由调用方决定降级。 func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) { data, err := json.Marshal(call) if err != nil { return nil, fmt.Errorf("marshal tool call: %w", err) } msg, err := b.nc.RequestWithContext(ctx, subject, data) if err != nil { return nil, fmt.Errorf("call tool %s: %w", subject, err) } var res contract.ToolResult if err := json.Unmarshal(msg.Data, &res); err != nil { return nil, fmt.Errorf("unmarshal tool result: %w", err) } return &res, nil } // ToolHandler 处理一次工具调用并返回结果。 type ToolHandler func(ctx context.Context, call *contract.ToolCall) *contract.ToolResult // ServeTool 以队列组订阅工具主题(可用通配 sundynix.tools.go.>), // 对每个请求调用 h 并 Respond,队列组内多副本自动负载均衡。 // 返回的 unsub 用于退订。 func (b *Bus) ServeTool(subject, queue string, h ToolHandler) (unsub func() error, err error) { sub, err := b.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) { var call contract.ToolCall if err := json.Unmarshal(m.Data, &call); err != nil { respond(m, &contract.ToolResult{OK: false, Error: "bad tool call: " + err.Error()}) return } respond(m, h(context.Background(), &call)) }) if err != nil { return nil, fmt.Errorf("serve tool %s: %w", subject, err) } return sub.Unsubscribe, nil } func respond(m *nats.Msg, res *contract.ToolResult) { data, err := json.Marshal(res) if err != nil { data, _ = json.Marshal(&contract.ToolResult{OK: false, Error: "marshal result: " + err.Error()}) } _ = m.Respond(data) } // ---- 配置控制面(core NATS request-reply + broadcast)---- // RequestConfig 向控制面(Gateway)请求某 kind 当前激活配置(chat/embedding)。 // 无人应答 / 无激活配置时返回 (nil, nil),由调用方降级。 func (b *Bus) RequestConfig(ctx context.Context, kind string) (*contract.ModelConfig, error) { msg, err := b.nc.RequestWithContext(ctx, contract.ConfigGetSubject(kind), nil) if err != nil { return nil, nil // 控制面暂不可用,降级 } if len(msg.Data) == 0 { return nil, nil } var cfg contract.ModelConfig if err := json.Unmarshal(msg.Data, &cfg); err != nil { return nil, fmt.Errorf("unmarshal %s config: %w", kind, err) } if !cfg.Ready() { return nil, nil } return &cfg, nil } // ServeConfig 让控制面响应某 kind 的配置请求;provide 返回当前激活配置(可为 nil)。 func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (unsub func() error, err error) { sub, err := b.nc.Subscribe(contract.ConfigGetSubject(kind), func(m *nats.Msg) { var data []byte if cfg := provide(); cfg != nil { data, _ = json.Marshal(cfg) } _ = m.Respond(data) }) if err != nil { return nil, fmt.Errorf("serve %s config: %w", kind, err) } return sub.Unsubscribe, nil } // PublishConfigUpdated 广播某 kind 的配置变更(消费方据此热更新)。 func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error { data, err := json.Marshal(cfg) if err != nil { return err } return b.nc.Publish(contract.ConfigUpdatedSubject(kind), data) } // SubscribeConfigUpdated 订阅某 kind 的配置变更。 func (b *Bus) SubscribeConfigUpdated(kind string, onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) { sub, err := b.nc.Subscribe(contract.ConfigUpdatedSubject(kind), func(m *nats.Msg) { var cfg contract.ModelConfig if json.Unmarshal(m.Data, &cfg) == nil { onUpdate(&cfg) } }) if err != nil { return nil, fmt.Errorf("subscribe %s config: %w", kind, err) } return sub.Unsubscribe, nil } // TaskHandler 处理一个消费到的任务。 type TaskHandler func(ctx context.Context, t *contract.Task) error // ConsumeTasks 在持久消费者上消费任务,队列组内负载均衡。 // 返回的 stop 函数用于优雅停止消费。 func (b *Bus) ConsumeTasks(ctx context.Context, h TaskHandler) (stop func(), err error) { cons, err := b.js.CreateOrUpdateConsumer(ctx, contract.StreamTasks, jetstream.ConsumerConfig{ Durable: contract.ConsumerDurable, AckPolicy: jetstream.AckExplicitPolicy, FilterSubject: contract.SubjectTasksAll, }) if err != nil { return nil, fmt.Errorf("create consumer: %w", err) } cc, err := cons.Consume(func(msg jetstream.Msg) { t, err := contract.Unmarshal(msg.Data()) if err != nil { _ = msg.Term() // 脏数据,丢弃不重投 return } if err := h(ctx, t); err != nil { _ = msg.NakWithDelay(time.Second) // 处理失败,延迟重投 return } _ = msg.Ack() }) if err != nil { return nil, fmt.Errorf("consume: %w", err) } return cc.Stop, nil }