// Package llm 抽象 LLM Pool(vLLM / Ollama / 第三方在线 API)的负载均衡与流式推理。 package llm import ( "bufio" "bytes" "context" "encoding/json" "fmt" "net/http" "strings" "sync" "time" "github.com/sundynix/sundynix-shared/contract" ) // ChatMessage 是一条对话消息(role: system/user/assistant)。 type ChatMessage struct { Role string `json:"role"` Content string `json:"content"` } // Pool 维护当前激活的后端配置(由控制面经 NATS 下发,可热更新)。 type Pool struct { mu sync.RWMutex cfg *contract.ModelConfig hc *http.Client } func NewPool() *Pool { return &Pool{hc: &http.Client{Timeout: 120 * time.Second}} } // SetConfig 热更新后端配置(控制面变更时调用)。 func (p *Pool) SetConfig(cfg *contract.ModelConfig) { p.mu.Lock() p.cfg = cfg p.mu.Unlock() if cfg != nil { // 不打印 api_key。 fmt.Printf("[llm] model config set: provider=%s base=%s model=%s\n", cfg.Provider, cfg.BaseURL, cfg.Model) } } func (p *Pool) config() *contract.ModelConfig { p.mu.RLock() defer p.mu.RUnlock() return p.cfg } // Ready 报告是否已配置可用后端。 func (p *Pool) Ready() bool { return p.config().Ready() } // ChatStream 以 OpenAI 兼容协议流式推理,逐 token 回调 onToken。 // 仅在 Ready() 时可用(调用方据此决定真实推理或降级桩)。 func (p *Pool) ChatStream(ctx context.Context, msgs []ChatMessage, onToken func(string)) error { cfg := p.config() if !cfg.Ready() { return fmt.Errorf("no model configured") } body, _ := json.Marshal(map[string]any{ "model": cfg.Model, "messages": msgs, "stream": true, }) req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/chat/completions", bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") if cfg.APIKey != "" { req.Header.Set("Authorization", "Bearer "+cfg.APIKey) } resp, err := p.hc.Do(req) if err != nil { return fmt.Errorf("llm request: %w", err) } defer resp.Body.Close() if resp.StatusCode >= 400 { buf := new(bytes.Buffer) _, _ = buf.ReadFrom(resp.Body) return fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(buf.String())) } // 解析 OpenAI 兼容 SSE:data: {choices:[{delta:{content}}]} … data: [DONE] sc := bufio.NewScanner(resp.Body) sc.Buffer(make([]byte, 0, 64*1024), 1024*1024) for sc.Scan() { line := strings.TrimSpace(sc.Text()) if !strings.HasPrefix(line, "data:") { continue } payload := strings.TrimSpace(strings.TrimPrefix(line, "data:")) if payload == "[DONE]" { break } var chunk struct { Choices []struct { Delta struct { Content string `json:"content"` } `json:"delta"` } `json:"choices"` } if json.Unmarshal([]byte(payload), &chunk) != nil { continue } if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" { onToken(chunk.Choices[0].Delta.Content) } } return sc.Err() } // Chat 非流式:内部复用 ChatStream 聚合全部 token,返回整段文本。 // 报告生成的「规划大纲 / 撰写章节」等需要拿到完整结果再继续,用它而非流式。 func (p *Pool) Chat(ctx context.Context, msgs []ChatMessage) (string, error) { var b strings.Builder err := p.ChatStream(ctx, msgs, func(tok string) { b.WriteString(tok) }) return b.String(), err } // ---- 占位降级(未配置后端时)---- // 占位参数:模拟真实后端的 TTFT(首 token 延迟) 与逐 token 间隔。 const ( timeToFirstToken = 700 * time.Millisecond interTokenDelay = 60 * time.Millisecond ) // StreamText 按节奏把给定文本流式回调(未配置真实后端时的降级桩)。 func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error { select { case <-ctx.Done(): return ctx.Err() case <-time.After(timeToFirstToken): } for _, tok := range tokenize(text) { select { case <-ctx.Done(): return ctx.Err() default: } onToken([]byte(tok)) time.Sleep(interTokenDelay) } return nil } func tokenize(s string) []string { out := make([]string, 0, len(s)) for _, r := range s { out = append(out, string(r)) } return out }