// Package bus 封装 NATS JetStream 的连接、流声明、任务发布与消费。 // Gateway 与 Dispatcher 共用这套真实收发逻辑,e2e 测试也直接覆盖它。 package bus import ( "context" "fmt" "time" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/sundynix/sundynix-shared/contract" ) // Bus 持有 NATS 连接与 JetStream 上下文。 type Bus struct { nc *nats.Conn js jetstream.JetStream } // Connect 接入 NATS 骨干网并初始化 JetStream,使用默认重试参数。 func Connect(url string) (*Bus, error) { return ConnectWithRetry(url, 30, time.Second) } // ConnectWithRetry 在 NATS 暂不可用时按固定间隔重试,容忍服务先于 NATS 启动。 func ConnectWithRetry(url string, attempts int, interval time.Duration) (*Bus, error) { var lastErr error for i := 0; i < attempts; i++ { nc, err := nats.Connect(url, nats.Timeout(5*time.Second), nats.RetryOnFailedConnect(true), nats.MaxReconnects(-1), nats.ReconnectWait(interval), ) if err != nil { lastErr = err time.Sleep(interval) continue } // RetryOnFailedConnect 下 Connect 可能立即返回但尚未连上,等待真正建立。 if nc.Status() != nats.CONNECTED { if !waitConnected(nc, 5*time.Second) { lastErr = fmt.Errorf("nats not connected within timeout") nc.Close() time.Sleep(interval) continue } } js, err := jetstream.New(nc) if err != nil { nc.Close() return nil, fmt.Errorf("jetstream init: %w", err) } return &Bus{nc: nc, js: js}, nil } return nil, fmt.Errorf("nats connect after %d attempts: %w", attempts, lastErr) } func waitConnected(nc *nats.Conn, d time.Duration) bool { deadline := time.Now().Add(d) for time.Now().Before(deadline) { if nc.Status() == nats.CONNECTED { return true } time.Sleep(50 * time.Millisecond) } return nc.Status() == nats.CONNECTED } // Close 关闭底层连接。 func (b *Bus) Close() { if b.nc != nil { b.nc.Close() } } // EnsureTaskStream 幂等地创建/更新任务流,捕获 sundynix.tasks.>。 func (b *Bus) EnsureTaskStream(ctx context.Context) error { _, err := b.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ Name: contract.StreamTasks, Subjects: []string{contract.SubjectTasksAll}, Storage: jetstream.FileStorage, }) return err } // PublishTask 把任务发布到 sundynix.tasks.,返回序列号。 func (b *Bus) PublishTask(ctx context.Context, t *contract.Task) (uint64, error) { data, err := t.Marshal() if err != nil { return 0, err } ack, err := b.js.Publish(ctx, contract.TaskSubject(t.ID), data) if err != nil { return 0, fmt.Errorf("publish task: %w", err) } return ack.Sequence, nil } // ---- Token 流回流(core NATS 零拷贝字节管道)---- // PublishToken 把一个推理 Token 以 core NATS 写到 sundynix.streams.。 func (b *Bus) PublishToken(taskID string, token []byte) error { return b.nc.Publish(contract.StreamSubject(taskID), token) } // CompleteStream 发送 Token 流结束信号(空体 + 结束头)。 func (b *Bus) CompleteStream(taskID string) error { msg := nats.NewMsg(contract.StreamSubject(taskID)) msg.Header.Set(contract.HeaderStreamEnd, "1") return b.nc.PublishMsg(msg) } // SubscribeTokens 订阅某 task 的 Token 流。每个 Token 触发 onToken; // 收到结束信号后触发 onDone。返回的 unsub 用于退订。 // 注意:core NATS 无持久化,订阅须在 Token 产生前建立(SSE 客户端先连)。 func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func()) (unsub func() error, err error) { sub, err := b.nc.Subscribe(contract.StreamSubject(taskID), func(m *nats.Msg) { if m.Header.Get(contract.HeaderStreamEnd) == "1" { onDone() return } // 拷贝,避免 nats 复用底层 buffer。 tok := make([]byte, len(m.Data)) copy(tok, m.Data) onToken(tok) }) if err != nil { return nil, fmt.Errorf("subscribe tokens: %w", err) } return sub.Unsubscribe, nil } // TaskHandler 处理一个消费到的任务。 type TaskHandler func(ctx context.Context, t *contract.Task) error // ConsumeTasks 在持久消费者上消费任务,队列组内负载均衡。 // 返回的 stop 函数用于优雅停止消费。 func (b *Bus) ConsumeTasks(ctx context.Context, h TaskHandler) (stop func(), err error) { cons, err := b.js.CreateOrUpdateConsumer(ctx, contract.StreamTasks, jetstream.ConsumerConfig{ Durable: contract.ConsumerDurable, AckPolicy: jetstream.AckExplicitPolicy, FilterSubject: contract.SubjectTasksAll, }) if err != nil { return nil, fmt.Errorf("create consumer: %w", err) } cc, err := cons.Consume(func(msg jetstream.Msg) { t, err := contract.Unmarshal(msg.Data()) if err != nil { _ = msg.Term() // 脏数据,丢弃不重投 return } if err := h(ctx, t); err != nil { _ = msg.NakWithDelay(time.Second) // 处理失败,延迟重投 return } _ = msg.Ack() }) if err != nil { return nil, fmt.Errorf("consume: %w", err) } return cc.Stop, nil }