ba8c6b3c43
- shared: 新增 intent=report 任务约定 + ReportPath(跨进程共享落盘目录,零配置对齐) - dispatcher: handleReport 专用编排(DeepSeek 规划大纲 → 各章并行 RAG 检索+撰写 → 汇聚 → report_render),Pool.Chat 非流式聚合;进度与正文经 Token 流实时回流 - mcp-go: 用标准库 archive/zip + OOXML 拼出真实可打开的 .docx(零额外依赖), report_render 工具落盘到共享目录;附 docx 有效性测试 - gateway: POST /reports 触发;GET /reports/:id/download 下发 Word - desktop: 新增「报告」页(主题→实时编排进度→下载 Word),左导航置为就绪 实测:DeepSeek 生成 5 章报告 → 渲染 5KB docx → file 识别为 Microsoft Word 2007+ → textutil 提取标题/各章正文完整。 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
157 lines
4.1 KiB
Go
157 lines
4.1 KiB
Go
// Package llm 抽象 LLM Pool(vLLM / Ollama / 第三方在线 API)的负载均衡与流式推理。
|
||
package llm
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"net/http"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/sundynix/sundynix-shared/contract"
|
||
)
|
||
|
||
// ChatMessage 是一条对话消息(role: system/user/assistant)。
|
||
type ChatMessage struct {
|
||
Role string `json:"role"`
|
||
Content string `json:"content"`
|
||
}
|
||
|
||
// Pool 维护当前激活的后端配置(由控制面经 NATS 下发,可热更新)。
|
||
type Pool struct {
|
||
mu sync.RWMutex
|
||
cfg *contract.ModelConfig
|
||
hc *http.Client
|
||
}
|
||
|
||
func NewPool() *Pool {
|
||
return &Pool{hc: &http.Client{Timeout: 120 * time.Second}}
|
||
}
|
||
|
||
// SetConfig 热更新后端配置(控制面变更时调用)。
|
||
func (p *Pool) SetConfig(cfg *contract.ModelConfig) {
|
||
p.mu.Lock()
|
||
p.cfg = cfg
|
||
p.mu.Unlock()
|
||
if cfg != nil {
|
||
// 不打印 api_key。
|
||
fmt.Printf("[llm] model config set: provider=%s base=%s model=%s\n", cfg.Provider, cfg.BaseURL, cfg.Model)
|
||
}
|
||
}
|
||
|
||
func (p *Pool) config() *contract.ModelConfig {
|
||
p.mu.RLock()
|
||
defer p.mu.RUnlock()
|
||
return p.cfg
|
||
}
|
||
|
||
// Ready 报告是否已配置可用后端。
|
||
func (p *Pool) Ready() bool { return p.config().Ready() }
|
||
|
||
// ChatStream 以 OpenAI 兼容协议流式推理,逐 token 回调 onToken。
|
||
// 仅在 Ready() 时可用(调用方据此决定真实推理或降级桩)。
|
||
func (p *Pool) ChatStream(ctx context.Context, msgs []ChatMessage, onToken func(string)) error {
|
||
cfg := p.config()
|
||
if !cfg.Ready() {
|
||
return fmt.Errorf("no model configured")
|
||
}
|
||
body, _ := json.Marshal(map[string]any{
|
||
"model": cfg.Model,
|
||
"messages": msgs,
|
||
"stream": true,
|
||
})
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/chat/completions", bytes.NewReader(body))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
if cfg.APIKey != "" {
|
||
req.Header.Set("Authorization", "Bearer "+cfg.APIKey)
|
||
}
|
||
resp, err := p.hc.Do(req)
|
||
if err != nil {
|
||
return fmt.Errorf("llm request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode >= 400 {
|
||
buf := new(bytes.Buffer)
|
||
_, _ = buf.ReadFrom(resp.Body)
|
||
return fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(buf.String()))
|
||
}
|
||
|
||
// 解析 OpenAI 兼容 SSE:data: {choices:[{delta:{content}}]} … data: [DONE]
|
||
sc := bufio.NewScanner(resp.Body)
|
||
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||
for sc.Scan() {
|
||
line := strings.TrimSpace(sc.Text())
|
||
if !strings.HasPrefix(line, "data:") {
|
||
continue
|
||
}
|
||
payload := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
||
if payload == "[DONE]" {
|
||
break
|
||
}
|
||
var chunk struct {
|
||
Choices []struct {
|
||
Delta struct {
|
||
Content string `json:"content"`
|
||
} `json:"delta"`
|
||
} `json:"choices"`
|
||
}
|
||
if json.Unmarshal([]byte(payload), &chunk) != nil {
|
||
continue
|
||
}
|
||
if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" {
|
||
onToken(chunk.Choices[0].Delta.Content)
|
||
}
|
||
}
|
||
return sc.Err()
|
||
}
|
||
|
||
// Chat 非流式:内部复用 ChatStream 聚合全部 token,返回整段文本。
|
||
// 报告生成的「规划大纲 / 撰写章节」等需要拿到完整结果再继续,用它而非流式。
|
||
func (p *Pool) Chat(ctx context.Context, msgs []ChatMessage) (string, error) {
|
||
var b strings.Builder
|
||
err := p.ChatStream(ctx, msgs, func(tok string) { b.WriteString(tok) })
|
||
return b.String(), err
|
||
}
|
||
|
||
// ---- 占位降级(未配置后端时)----
|
||
|
||
// 占位参数:模拟真实后端的 TTFT(首 token 延迟) 与逐 token 间隔。
|
||
const (
|
||
timeToFirstToken = 700 * time.Millisecond
|
||
interTokenDelay = 60 * time.Millisecond
|
||
)
|
||
|
||
// StreamText 按节奏把给定文本流式回调(未配置真实后端时的降级桩)。
|
||
func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(timeToFirstToken):
|
||
}
|
||
for _, tok := range tokenize(text) {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
onToken([]byte(tok))
|
||
time.Sleep(interTokenDelay)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func tokenize(s string) []string {
|
||
out := make([]string, 0, len(s))
|
||
for _, r := range s {
|
||
out = append(out, string(r))
|
||
}
|
||
return out
|
||
}
|