Files
sundynix-agentix/sundynix-dispatcher/internal/llm/pool.go
T
Blizzard ba8c6b3c43 feat(report): 报告生成端到端 — 规划→分章并行检索撰写→渲染真实 Word
- shared: 新增 intent=report 任务约定 + ReportPath(跨进程共享落盘目录,零配置对齐)
- dispatcher: handleReport 专用编排(DeepSeek 规划大纲 → 各章并行 RAG 检索+撰写
  → 汇聚 → report_render),Pool.Chat 非流式聚合;进度与正文经 Token 流实时回流
- mcp-go: 用标准库 archive/zip + OOXML 拼出真实可打开的 .docx(零额外依赖),
  report_render 工具落盘到共享目录;附 docx 有效性测试
- gateway: POST /reports 触发;GET /reports/:id/download 下发 Word
- desktop: 新增「报告」页(主题→实时编排进度→下载 Word),左导航置为就绪

实测:DeepSeek 生成 5 章报告 → 渲染 5KB docx → file 识别为 Microsoft Word 2007+
→ textutil 提取标题/各章正文完整。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 14:02:21 +08:00

157 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package llm 抽象 LLM PoolvLLM / Ollama / 第三方在线 API)的负载均衡与流式推理。
package llm
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/sundynix/sundynix-shared/contract"
)
// ChatMessage 是一条对话消息(role: system/user/assistant)。
type ChatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
// Pool 维护当前激活的后端配置(由控制面经 NATS 下发,可热更新)。
type Pool struct {
mu sync.RWMutex
cfg *contract.ModelConfig
hc *http.Client
}
func NewPool() *Pool {
return &Pool{hc: &http.Client{Timeout: 120 * time.Second}}
}
// SetConfig 热更新后端配置(控制面变更时调用)。
func (p *Pool) SetConfig(cfg *contract.ModelConfig) {
p.mu.Lock()
p.cfg = cfg
p.mu.Unlock()
if cfg != nil {
// 不打印 api_key。
fmt.Printf("[llm] model config set: provider=%s base=%s model=%s\n", cfg.Provider, cfg.BaseURL, cfg.Model)
}
}
func (p *Pool) config() *contract.ModelConfig {
p.mu.RLock()
defer p.mu.RUnlock()
return p.cfg
}
// Ready 报告是否已配置可用后端。
func (p *Pool) Ready() bool { return p.config().Ready() }
// ChatStream 以 OpenAI 兼容协议流式推理,逐 token 回调 onToken。
// 仅在 Ready() 时可用(调用方据此决定真实推理或降级桩)。
func (p *Pool) ChatStream(ctx context.Context, msgs []ChatMessage, onToken func(string)) error {
cfg := p.config()
if !cfg.Ready() {
return fmt.Errorf("no model configured")
}
body, _ := json.Marshal(map[string]any{
"model": cfg.Model,
"messages": msgs,
"stream": true,
})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/chat/completions", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if cfg.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+cfg.APIKey)
}
resp, err := p.hc.Do(req)
if err != nil {
return fmt.Errorf("llm request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
buf := new(bytes.Buffer)
_, _ = buf.ReadFrom(resp.Body)
return fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(buf.String()))
}
// 解析 OpenAI 兼容 SSEdata: {choices:[{delta:{content}}]} … data: [DONE]
sc := bufio.NewScanner(resp.Body)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if !strings.HasPrefix(line, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if payload == "[DONE]" {
break
}
var chunk struct {
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
} `json:"choices"`
}
if json.Unmarshal([]byte(payload), &chunk) != nil {
continue
}
if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" {
onToken(chunk.Choices[0].Delta.Content)
}
}
return sc.Err()
}
// Chat 非流式:内部复用 ChatStream 聚合全部 token,返回整段文本。
// 报告生成的「规划大纲 / 撰写章节」等需要拿到完整结果再继续,用它而非流式。
func (p *Pool) Chat(ctx context.Context, msgs []ChatMessage) (string, error) {
var b strings.Builder
err := p.ChatStream(ctx, msgs, func(tok string) { b.WriteString(tok) })
return b.String(), err
}
// ---- 占位降级(未配置后端时)----
// 占位参数:模拟真实后端的 TTFT(首 token 延迟) 与逐 token 间隔。
const (
timeToFirstToken = 700 * time.Millisecond
interTokenDelay = 60 * time.Millisecond
)
// StreamText 按节奏把给定文本流式回调(未配置真实后端时的降级桩)。
func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(timeToFirstToken):
}
for _, tok := range tokenize(text) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
onToken([]byte(tok))
time.Sleep(interTokenDelay)
}
return nil
}
func tokenize(s string) []string {
out := make([]string, 0, len(s))
for _, r := range s {
out = append(out, string(r))
}
return out
}