feat: 配置控制面 + LLM Pool 接第三方在线 API (OpenAI 兼容)
后端从占位回显变为真实生成:管理员经控制面登记/激活模型,Gateway 经 NATS 下发,Dispatcher 热更新 LLM Pool,Eino 图用 OpenAI 兼容流式真实推理。 - shared: contract.ModelConfig(provider/base_url/api_key/model) + 配置 subjects; bus.RequestModelConfig/ServeModelConfig/Publish/Subscribe ModelConfigUpdated - gateway: store.LLMModel→sundynix_model(AutoMigrate,唯一激活) + admin REST (GET/POST/active/delete/test models, api_key 脱敏) + main ServeModelConfig + 变更广播; 路由 /api/v1/admin/models* - dispatcher: llm.Pool OpenAI 兼容 SSE 流式客户端(ChatStream) + 热更新配置 + 未配置则降级桩; poolModel.Ready()?真实流式:注入记忆的桩; main 取配置+订阅 - 开发期接在线 API 不拉本地模型(见 llm-provider-strategy memory) - 验证: 4 模块 build✓ + e2e PASS; mock OpenAI 服务 live 跑通——登记/测试连接✓/ 激活→NATS 热更新→提交→真实 SSE 流出 mock 回复, mock 日志证明端点被调用且 注入画像(老王)进了模型上下文 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/sundynix/sundynix-dispatcher/internal/eino"
|
||||
"github.com/sundynix/sundynix-dispatcher/internal/harness"
|
||||
@@ -23,6 +24,18 @@ func main() {
|
||||
sub := dnats.MustConnect(natsURL)
|
||||
defer sub.Close()
|
||||
|
||||
// 配置控制面:启动时取激活模型配置,并订阅热更新。
|
||||
cctx, ccancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
if cfg, _ := sub.RequestModelConfig(cctx); cfg != nil {
|
||||
pool.SetConfig(cfg)
|
||||
} else {
|
||||
log.Println("[dispatcher] 未取到在线模型配置,降级桩运行(控制台配置后将热更新)")
|
||||
}
|
||||
ccancel()
|
||||
if _, err := sub.SubscribeModelConfigUpdated(pool.SetConfig); err != nil {
|
||||
log.Printf("[dispatcher] subscribe model config: %v", err)
|
||||
}
|
||||
|
||||
// sub 同时作为 Token 回流出口(TokenSink)与 MCP 工具调用出口(ToolCaller)。
|
||||
orch, err := eino.NewOrchestrator(pool, breaker, sub, sub)
|
||||
if err != nil {
|
||||
|
||||
@@ -22,27 +22,55 @@ var _ model.BaseChatModel = (*poolModel)(nil)
|
||||
// Generate 阻塞式生成(图被 Invoke 时用)。
|
||||
func (pm *poolModel) Generate(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.Message, error) {
|
||||
var sb strings.Builder
|
||||
if err := pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { sb.Write(tok) }); err != nil {
|
||||
var err error
|
||||
if pm.pool.Ready() {
|
||||
err = pm.pool.ChatStream(ctx, toChatMessages(input), func(tok string) { sb.WriteString(tok) })
|
||||
} else {
|
||||
err = pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { sb.Write(tok) })
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return schema.AssistantMessage(sb.String(), nil), nil
|
||||
}
|
||||
|
||||
// Stream 流式生成(图被 Stream 时用):把回复按 token 推进 pipe。
|
||||
// 已配置在线模型 → 真实 OpenAI 兼容流式;否则 → 注入记忆的降级桩。
|
||||
func (pm *poolModel) Stream(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.StreamReader[*schema.Message], error) {
|
||||
sr, sw := schema.Pipe[*schema.Message](32)
|
||||
text := replyFor(input)
|
||||
ready := pm.pool.Ready()
|
||||
go func() {
|
||||
defer sw.Close()
|
||||
if err := pm.pool.StreamText(ctx, text, func(tok []byte) {
|
||||
sw.Send(schema.AssistantMessage(string(tok), nil), nil)
|
||||
}); err != nil {
|
||||
send := func(s string) { sw.Send(schema.AssistantMessage(s, nil), nil) }
|
||||
var err error
|
||||
if ready {
|
||||
err = pm.pool.ChatStream(ctx, toChatMessages(input), send)
|
||||
} else {
|
||||
err = pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { send(string(tok)) })
|
||||
}
|
||||
if err != nil {
|
||||
sw.Send(nil, err)
|
||||
}
|
||||
}()
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
// toChatMessages 把 Eino 消息转为 LLM Pool 的 OpenAI 兼容消息。
|
||||
func toChatMessages(msgs []*schema.Message) []llm.ChatMessage {
|
||||
out := make([]llm.ChatMessage, 0, len(msgs))
|
||||
for _, m := range msgs {
|
||||
role := "user"
|
||||
switch m.Role {
|
||||
case schema.System:
|
||||
role = "system"
|
||||
case schema.Assistant:
|
||||
role = "assistant"
|
||||
}
|
||||
out = append(out, llm.ChatMessage{Role: role, Content: m.Content})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// replyFor 是占位"模型":从消息中取出注入的画像与用户输入,
|
||||
// 生成一段能体现"记忆已注入"的确定性回复(证明 recall→prompt 链路真的把画像喂进来了)。
|
||||
// 真实模型不需要本函数。
|
||||
|
||||
@@ -1,16 +1,118 @@
|
||||
// Package llm 抽象 LLM Pool(vLLM / Ollama 集群)的负载均衡与流式推理。
|
||||
// Package llm 抽象 LLM Pool(vLLM / Ollama / 第三方在线 API)的负载均衡与流式推理。
|
||||
package llm
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sundynix/sundynix-shared/contract"
|
||||
)
|
||||
|
||||
// Pool 维护后端 LLM 实例列表与路由策略。
|
||||
type Pool struct{ /* backends []Backend */ }
|
||||
// ChatMessage 是一条对话消息(role: system/user/assistant)。
|
||||
type ChatMessage struct {
|
||||
Role string `json:"role"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
func NewPool() *Pool { return &Pool{} }
|
||||
// Pool 维护当前激活的后端配置(由控制面经 NATS 下发,可热更新)。
|
||||
type Pool struct {
|
||||
mu sync.RWMutex
|
||||
cfg *contract.ModelConfig
|
||||
hc *http.Client
|
||||
}
|
||||
|
||||
func NewPool() *Pool {
|
||||
return &Pool{hc: &http.Client{Timeout: 120 * time.Second}}
|
||||
}
|
||||
|
||||
// SetConfig 热更新后端配置(控制面变更时调用)。
|
||||
func (p *Pool) SetConfig(cfg *contract.ModelConfig) {
|
||||
p.mu.Lock()
|
||||
p.cfg = cfg
|
||||
p.mu.Unlock()
|
||||
if cfg != nil {
|
||||
// 不打印 api_key。
|
||||
fmt.Printf("[llm] model config set: provider=%s base=%s model=%s\n", cfg.Provider, cfg.BaseURL, cfg.Model)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) config() *contract.ModelConfig {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.cfg
|
||||
}
|
||||
|
||||
// Ready 报告是否已配置可用后端。
|
||||
func (p *Pool) Ready() bool { return p.config().Ready() }
|
||||
|
||||
// ChatStream 以 OpenAI 兼容协议流式推理,逐 token 回调 onToken。
|
||||
// 仅在 Ready() 时可用(调用方据此决定真实推理或降级桩)。
|
||||
func (p *Pool) ChatStream(ctx context.Context, msgs []ChatMessage, onToken func(string)) error {
|
||||
cfg := p.config()
|
||||
if !cfg.Ready() {
|
||||
return fmt.Errorf("no model configured")
|
||||
}
|
||||
body, _ := json.Marshal(map[string]any{
|
||||
"model": cfg.Model,
|
||||
"messages": msgs,
|
||||
"stream": true,
|
||||
})
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/chat/completions", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if cfg.APIKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+cfg.APIKey)
|
||||
}
|
||||
resp, err := p.hc.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("llm request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
buf := new(bytes.Buffer)
|
||||
_, _ = buf.ReadFrom(resp.Body)
|
||||
return fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(buf.String()))
|
||||
}
|
||||
|
||||
// 解析 OpenAI 兼容 SSE:data: {choices:[{delta:{content}}]} … data: [DONE]
|
||||
sc := bufio.NewScanner(resp.Body)
|
||||
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||||
for sc.Scan() {
|
||||
line := strings.TrimSpace(sc.Text())
|
||||
if !strings.HasPrefix(line, "data:") {
|
||||
continue
|
||||
}
|
||||
payload := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
||||
if payload == "[DONE]" {
|
||||
break
|
||||
}
|
||||
var chunk struct {
|
||||
Choices []struct {
|
||||
Delta struct {
|
||||
Content string `json:"content"`
|
||||
} `json:"delta"`
|
||||
} `json:"choices"`
|
||||
}
|
||||
if json.Unmarshal([]byte(payload), &chunk) != nil {
|
||||
continue
|
||||
}
|
||||
if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" {
|
||||
onToken(chunk.Choices[0].Delta.Content)
|
||||
}
|
||||
}
|
||||
return sc.Err()
|
||||
}
|
||||
|
||||
// ---- 占位降级(未配置后端时)----
|
||||
|
||||
// 占位参数:模拟真实后端的 TTFT(首 token 延迟) 与逐 token 间隔。
|
||||
const (
|
||||
@@ -18,24 +120,13 @@ const (
|
||||
interTokenDelay = 60 * time.Millisecond
|
||||
)
|
||||
|
||||
// Stream 选择一个后端进行流式推理,逐 Token 回调 onToken。
|
||||
// 当前为占位实现:把对 prompt 的确定性回复按 token 流式返回,
|
||||
// 真实接入 vLLM/Ollama 时替换为后端 streaming API 即可(回调签名不变)。
|
||||
func (p *Pool) Stream(ctx context.Context, prompt string, onToken func([]byte)) error {
|
||||
// TODO: 选路 (least-load / 模型亲和) → 调 vLLM/Ollama streaming API
|
||||
return p.StreamText(ctx, buildReply(prompt), onToken)
|
||||
}
|
||||
|
||||
// StreamText 按真实后端的 TTFT/逐 token 节奏把给定文本流式回调。
|
||||
// 把"说什么"(由上层/Eino 图决定)与"怎么流"(后端节奏)解耦:
|
||||
// 真实接入 vLLM/Ollama 后,由后端 streaming API 直接驱动,无需本方法。
|
||||
// StreamText 按节奏把给定文本流式回调(未配置真实后端时的降级桩)。
|
||||
func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(timeToFirstToken): // 模拟 TTFT
|
||||
case <-time.After(timeToFirstToken):
|
||||
}
|
||||
|
||||
for _, tok := range tokenize(text) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -48,16 +139,6 @@ func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildReply 占位:真实实现应由 DSL 编排出的对话上下文驱动后端生成。
|
||||
func buildReply(prompt string) string {
|
||||
p := strings.TrimSpace(prompt)
|
||||
if len(p) > 40 {
|
||||
p = p[:40] + "…"
|
||||
}
|
||||
return "已编排执行该 Agent 图,输入摘要: " + p
|
||||
}
|
||||
|
||||
// tokenize 占位分词:按 rune 切,保证多字节中文也能逐字流式。
|
||||
func tokenize(s string) []string {
|
||||
out := make([]string, 0, len(s))
|
||||
for _, r := range s {
|
||||
|
||||
@@ -58,4 +58,14 @@ func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contrac
|
||||
return s.inner.CallTool(ctx, subject, call)
|
||||
}
|
||||
|
||||
// RequestModelConfig 向控制面(Gateway)取当前激活的模型配置。
|
||||
func (s *Subscriber) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) {
|
||||
return s.inner.RequestModelConfig(ctx)
|
||||
}
|
||||
|
||||
// SubscribeModelConfigUpdated 订阅模型配置热更新。
|
||||
func (s *Subscriber) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (func() error, error) {
|
||||
return s.inner.SubscribeModelConfigUpdated(onUpdate)
|
||||
}
|
||||
|
||||
func (s *Subscriber) Close() { s.inner.Close() }
|
||||
|
||||
Reference in New Issue
Block a user