From 3c65189f30cc7d6e5074a0fc16d9fb175cd77fd9 Mon Sep 17 00:00:00 2001 From: Blizzard Date: Wed, 10 Jun 2026 15:41:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=85=8D=E7=BD=AE=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E9=9D=A2=20+=20LLM=20Pool=20=E6=8E=A5=E7=AC=AC=E4=B8=89?= =?UTF-8?q?=E6=96=B9=E5=9C=A8=E7=BA=BF=20API=20(OpenAI=20=E5=85=BC?= =?UTF-8?q?=E5=AE=B9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端从占位回显变为真实生成:管理员经控制面登记/激活模型,Gateway 经 NATS 下发,Dispatcher 热更新 LLM Pool,Eino 图用 OpenAI 兼容流式真实推理。 - shared: contract.ModelConfig(provider/base_url/api_key/model) + 配置 subjects; bus.RequestModelConfig/ServeModelConfig/Publish/Subscribe ModelConfigUpdated - gateway: store.LLMModel→sundynix_model(AutoMigrate,唯一激活) + admin REST (GET/POST/active/delete/test models, api_key 脱敏) + main ServeModelConfig + 变更广播; 路由 /api/v1/admin/models* - dispatcher: llm.Pool OpenAI 兼容 SSE 流式客户端(ChatStream) + 热更新配置 + 未配置则降级桩; poolModel.Ready()?真实流式:注入记忆的桩; main 取配置+订阅 - 开发期接在线 API 不拉本地模型(见 llm-provider-strategy memory) - 验证: 4 模块 build✓ + e2e PASS; mock OpenAI 服务 live 跑通——登记/测试连接✓/ 激活→NATS 热更新→提交→真实 SSE 流出 mock 回复, mock 日志证明端点被调用且 注入画像(老王)进了模型上下文 Co-Authored-By: Claude Opus 4.8 --- scripts/mock_openai.py | 59 ++++++++ sundynix-dispatcher/cmd/dispatcher/main.go | 13 ++ sundynix-dispatcher/internal/eino/model.go | 38 ++++- sundynix-dispatcher/internal/llm/pool.go | 135 +++++++++++++---- .../internal/nats/subscriber.go | 10 ++ sundynix-gateway/cmd/server/main.go | 13 ++ sundynix-gateway/internal/handler/admin.go | 137 ++++++++++++++++++ sundynix-gateway/internal/nats/publisher.go | 10 ++ sundynix-gateway/internal/router/router.go | 10 ++ sundynix-gateway/internal/store/model.go | 72 +++++++++ sundynix-gateway/internal/store/pgsql.go | 6 +- sundynix-shared/bus/bus.go | 60 ++++++++ sundynix-shared/contract/task.go | 18 +++ 13 files changed, 548 insertions(+), 33 deletions(-) create mode 100644 scripts/mock_openai.py create mode 100644 sundynix-gateway/internal/handler/admin.go create mode 100644 sundynix-gateway/internal/store/model.go diff --git a/scripts/mock_openai.py b/scripts/mock_openai.py new file mode 100644 index 0000000..ddeac6e --- /dev/null +++ b/scripts/mock_openai.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +"""最小 OpenAI 兼容 mock 服务 —— 无真实 key 时验证 LLM Pool 流式链路。 +- GET /models → 200(供"测试连接") +- POST /chat/completions → SSE 流式,回复里回显注入的画像,证明记忆真进了模型上下文。 +用法: python3 scripts/mock_openai.py 11999 +""" +import json +import sys +import time +from http.server import BaseHTTPRequestHandler, HTTPServer + + +class H(BaseHTTPRequestHandler): + def log_message(self, *a): + pass + + def do_GET(self): + if self.path.endswith("/models"): + body = json.dumps({"object": "list", "data": [{"id": "mock-model"}]}).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + else: + self.send_response(404) + self.end_headers() + + def do_POST(self): + n = int(self.headers.get("Content-Length", 0)) + req = json.loads(self.rfile.read(n) or b"{}") + msgs = req.get("messages", []) + system = next((m["content"] for m in msgs if m.get("role") == "system"), "") + # 从 system 里抽出"称呼",证明画像注入真到了模型这层。 + who = "(未知)" + for line in system.replace(":", ":").splitlines(): + if "称呼" in line: + who = line.split(":")[-1].strip() + reply = f"你好{who},这是来自【在线模型 mock】的真实流式回复——我已读取你的偏好与历史并据此作答。" + sys.stderr.write(f"[mock-openai] POST /chat/completions 命中, 注入称呼={who}\n") + sys.stderr.flush() + + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + time.sleep(0.8) # 模拟 TTFT,给 SSE 客户端订阅时间(便于演示捕获) + for ch in reply: + chunk = {"choices": [{"delta": {"content": ch}}]} + self.wfile.write(f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n".encode()) + self.wfile.flush() + time.sleep(0.02) + self.wfile.write(b"data: [DONE]\n\n") + self.wfile.flush() + + +if __name__ == "__main__": + port = int(sys.argv[1]) if len(sys.argv) > 1 else 11999 + print(f"[mock-openai] listening on :{port}") + HTTPServer(("127.0.0.1", port), H).serve_forever() diff --git a/sundynix-dispatcher/cmd/dispatcher/main.go b/sundynix-dispatcher/cmd/dispatcher/main.go index 6b09113..b829ce6 100644 --- a/sundynix-dispatcher/cmd/dispatcher/main.go +++ b/sundynix-dispatcher/cmd/dispatcher/main.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/sundynix/sundynix-dispatcher/internal/eino" "github.com/sundynix/sundynix-dispatcher/internal/harness" @@ -23,6 +24,18 @@ func main() { sub := dnats.MustConnect(natsURL) defer sub.Close() + // 配置控制面:启动时取激活模型配置,并订阅热更新。 + cctx, ccancel := context.WithTimeout(context.Background(), 3*time.Second) + if cfg, _ := sub.RequestModelConfig(cctx); cfg != nil { + pool.SetConfig(cfg) + } else { + log.Println("[dispatcher] 未取到在线模型配置,降级桩运行(控制台配置后将热更新)") + } + ccancel() + if _, err := sub.SubscribeModelConfigUpdated(pool.SetConfig); err != nil { + log.Printf("[dispatcher] subscribe model config: %v", err) + } + // sub 同时作为 Token 回流出口(TokenSink)与 MCP 工具调用出口(ToolCaller)。 orch, err := eino.NewOrchestrator(pool, breaker, sub, sub) if err != nil { diff --git a/sundynix-dispatcher/internal/eino/model.go b/sundynix-dispatcher/internal/eino/model.go index 8afc1f8..b24759d 100644 --- a/sundynix-dispatcher/internal/eino/model.go +++ b/sundynix-dispatcher/internal/eino/model.go @@ -22,27 +22,55 @@ var _ model.BaseChatModel = (*poolModel)(nil) // Generate 阻塞式生成(图被 Invoke 时用)。 func (pm *poolModel) Generate(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.Message, error) { var sb strings.Builder - if err := pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { sb.Write(tok) }); err != nil { + var err error + if pm.pool.Ready() { + err = pm.pool.ChatStream(ctx, toChatMessages(input), func(tok string) { sb.WriteString(tok) }) + } else { + err = pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { sb.Write(tok) }) + } + if err != nil { return nil, err } return schema.AssistantMessage(sb.String(), nil), nil } // Stream 流式生成(图被 Stream 时用):把回复按 token 推进 pipe。 +// 已配置在线模型 → 真实 OpenAI 兼容流式;否则 → 注入记忆的降级桩。 func (pm *poolModel) Stream(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.StreamReader[*schema.Message], error) { sr, sw := schema.Pipe[*schema.Message](32) - text := replyFor(input) + ready := pm.pool.Ready() go func() { defer sw.Close() - if err := pm.pool.StreamText(ctx, text, func(tok []byte) { - sw.Send(schema.AssistantMessage(string(tok), nil), nil) - }); err != nil { + send := func(s string) { sw.Send(schema.AssistantMessage(s, nil), nil) } + var err error + if ready { + err = pm.pool.ChatStream(ctx, toChatMessages(input), send) + } else { + err = pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { send(string(tok)) }) + } + if err != nil { sw.Send(nil, err) } }() return sr, nil } +// toChatMessages 把 Eino 消息转为 LLM Pool 的 OpenAI 兼容消息。 +func toChatMessages(msgs []*schema.Message) []llm.ChatMessage { + out := make([]llm.ChatMessage, 0, len(msgs)) + for _, m := range msgs { + role := "user" + switch m.Role { + case schema.System: + role = "system" + case schema.Assistant: + role = "assistant" + } + out = append(out, llm.ChatMessage{Role: role, Content: m.Content}) + } + return out +} + // replyFor 是占位"模型":从消息中取出注入的画像与用户输入, // 生成一段能体现"记忆已注入"的确定性回复(证明 recall→prompt 链路真的把画像喂进来了)。 // 真实模型不需要本函数。 diff --git a/sundynix-dispatcher/internal/llm/pool.go b/sundynix-dispatcher/internal/llm/pool.go index 2ecce8f..61985e9 100644 --- a/sundynix-dispatcher/internal/llm/pool.go +++ b/sundynix-dispatcher/internal/llm/pool.go @@ -1,16 +1,118 @@ -// Package llm 抽象 LLM Pool(vLLM / Ollama 集群)的负载均衡与流式推理。 +// Package llm 抽象 LLM Pool(vLLM / Ollama / 第三方在线 API)的负载均衡与流式推理。 package llm import ( + "bufio" + "bytes" "context" + "encoding/json" + "fmt" + "net/http" "strings" + "sync" "time" + + "github.com/sundynix/sundynix-shared/contract" ) -// Pool 维护后端 LLM 实例列表与路由策略。 -type Pool struct{ /* backends []Backend */ } +// ChatMessage 是一条对话消息(role: system/user/assistant)。 +type ChatMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} -func NewPool() *Pool { return &Pool{} } +// Pool 维护当前激活的后端配置(由控制面经 NATS 下发,可热更新)。 +type Pool struct { + mu sync.RWMutex + cfg *contract.ModelConfig + hc *http.Client +} + +func NewPool() *Pool { + return &Pool{hc: &http.Client{Timeout: 120 * time.Second}} +} + +// SetConfig 热更新后端配置(控制面变更时调用)。 +func (p *Pool) SetConfig(cfg *contract.ModelConfig) { + p.mu.Lock() + p.cfg = cfg + p.mu.Unlock() + if cfg != nil { + // 不打印 api_key。 + fmt.Printf("[llm] model config set: provider=%s base=%s model=%s\n", cfg.Provider, cfg.BaseURL, cfg.Model) + } +} + +func (p *Pool) config() *contract.ModelConfig { + p.mu.RLock() + defer p.mu.RUnlock() + return p.cfg +} + +// Ready 报告是否已配置可用后端。 +func (p *Pool) Ready() bool { return p.config().Ready() } + +// ChatStream 以 OpenAI 兼容协议流式推理,逐 token 回调 onToken。 +// 仅在 Ready() 时可用(调用方据此决定真实推理或降级桩)。 +func (p *Pool) ChatStream(ctx context.Context, msgs []ChatMessage, onToken func(string)) error { + cfg := p.config() + if !cfg.Ready() { + return fmt.Errorf("no model configured") + } + body, _ := json.Marshal(map[string]any{ + "model": cfg.Model, + "messages": msgs, + "stream": true, + }) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/chat/completions", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + if cfg.APIKey != "" { + req.Header.Set("Authorization", "Bearer "+cfg.APIKey) + } + resp, err := p.hc.Do(req) + if err != nil { + return fmt.Errorf("llm request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + buf := new(bytes.Buffer) + _, _ = buf.ReadFrom(resp.Body) + return fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(buf.String())) + } + + // 解析 OpenAI 兼容 SSE:data: {choices:[{delta:{content}}]} … data: [DONE] + sc := bufio.NewScanner(resp.Body) + sc.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if !strings.HasPrefix(line, "data:") { + continue + } + payload := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if payload == "[DONE]" { + break + } + var chunk struct { + Choices []struct { + Delta struct { + Content string `json:"content"` + } `json:"delta"` + } `json:"choices"` + } + if json.Unmarshal([]byte(payload), &chunk) != nil { + continue + } + if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" { + onToken(chunk.Choices[0].Delta.Content) + } + } + return sc.Err() +} + +// ---- 占位降级(未配置后端时)---- // 占位参数:模拟真实后端的 TTFT(首 token 延迟) 与逐 token 间隔。 const ( @@ -18,24 +120,13 @@ const ( interTokenDelay = 60 * time.Millisecond ) -// Stream 选择一个后端进行流式推理,逐 Token 回调 onToken。 -// 当前为占位实现:把对 prompt 的确定性回复按 token 流式返回, -// 真实接入 vLLM/Ollama 时替换为后端 streaming API 即可(回调签名不变)。 -func (p *Pool) Stream(ctx context.Context, prompt string, onToken func([]byte)) error { - // TODO: 选路 (least-load / 模型亲和) → 调 vLLM/Ollama streaming API - return p.StreamText(ctx, buildReply(prompt), onToken) -} - -// StreamText 按真实后端的 TTFT/逐 token 节奏把给定文本流式回调。 -// 把"说什么"(由上层/Eino 图决定)与"怎么流"(后端节奏)解耦: -// 真实接入 vLLM/Ollama 后,由后端 streaming API 直接驱动,无需本方法。 +// StreamText 按节奏把给定文本流式回调(未配置真实后端时的降级桩)。 func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(timeToFirstToken): // 模拟 TTFT + case <-time.After(timeToFirstToken): } - for _, tok := range tokenize(text) { select { case <-ctx.Done(): @@ -48,16 +139,6 @@ func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte) return nil } -// buildReply 占位:真实实现应由 DSL 编排出的对话上下文驱动后端生成。 -func buildReply(prompt string) string { - p := strings.TrimSpace(prompt) - if len(p) > 40 { - p = p[:40] + "…" - } - return "已编排执行该 Agent 图,输入摘要: " + p -} - -// tokenize 占位分词:按 rune 切,保证多字节中文也能逐字流式。 func tokenize(s string) []string { out := make([]string, 0, len(s)) for _, r := range s { diff --git a/sundynix-dispatcher/internal/nats/subscriber.go b/sundynix-dispatcher/internal/nats/subscriber.go index 8cd71e1..6411b32 100644 --- a/sundynix-dispatcher/internal/nats/subscriber.go +++ b/sundynix-dispatcher/internal/nats/subscriber.go @@ -58,4 +58,14 @@ func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contrac return s.inner.CallTool(ctx, subject, call) } +// RequestModelConfig 向控制面(Gateway)取当前激活的模型配置。 +func (s *Subscriber) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) { + return s.inner.RequestModelConfig(ctx) +} + +// SubscribeModelConfigUpdated 订阅模型配置热更新。 +func (s *Subscriber) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (func() error, error) { + return s.inner.SubscribeModelConfigUpdated(onUpdate) +} + func (s *Subscriber) Close() { s.inner.Close() } diff --git a/sundynix-gateway/cmd/server/main.go b/sundynix-gateway/cmd/server/main.go index 4e4adc6..664e052 100644 --- a/sundynix-gateway/cmd/server/main.go +++ b/sundynix-gateway/cmd/server/main.go @@ -2,12 +2,14 @@ package main import ( + "context" "log" "os" "github.com/sundynix/sundynix-gateway/internal/nats" "github.com/sundynix/sundynix-gateway/internal/router" "github.com/sundynix/sundynix-gateway/internal/store" + "github.com/sundynix/sundynix-shared/contract" ) func main() { @@ -22,6 +24,17 @@ func main() { bus := nats.MustConnect(natsURL) // 接入 NATS 零拷贝骨干网 + 声明任务流 defer bus.Close() + // 配置控制面:响应 Dispatcher 对当前激活模型配置的请求。 + if _, err := bus.ServeModelConfig(func() *contract.ModelConfig { + row, _ := db.GetActiveModel(context.Background()) + if row == nil { + return nil + } + return &contract.ModelConfig{Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model} + }); err != nil { + log.Printf("[gateway] serve model config: %v", err) + } + r := router.New(db, cache, bus) addr := envOr("GATEWAY_ADDR", ":8080") log.Printf("[gateway] listening on %s", addr) diff --git a/sundynix-gateway/internal/handler/admin.go b/sundynix-gateway/internal/handler/admin.go new file mode 100644 index 0000000..d16c4b3 --- /dev/null +++ b/sundynix-gateway/internal/handler/admin.go @@ -0,0 +1,137 @@ +package handler + +import ( + "context" + "net/http" + "strconv" + "time" + + "github.com/gin-gonic/gin" + + "github.com/sundynix/sundynix-gateway/internal/store" + "github.com/sundynix/sundynix-shared/contract" +) + +// 控制面(运维管理):LLM 模型配置 CRUD + 测试连接 + 变更广播。 +// 表 sundynix_model 由 Gateway 持有;Dispatcher 经 NATS 取激活配置。 + +type modelBody struct { + ID uint `json:"id"` + Provider string `json:"provider"` + BaseURL string `json:"base_url"` + APIKey string `json:"api_key"` + Model string `json:"model"` +} + +// ListModels: GET /api/v1/admin/models —— 列出模型(api_key 脱敏)。 +func (h *Handler) ListModels(c *gin.Context) { + rows, err := h.db.ListModels(c.Request.Context()) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + out := make([]gin.H, 0, len(rows)) + for _, m := range rows { + out = append(out, gin.H{ + "id": m.ID, "provider": m.Provider, "base_url": m.BaseURL, + "model": m.Model, "active": m.Active, "api_key": mask(m.APIKey), + }) + } + c.JSON(http.StatusOK, gin.H{"models": out}) +} + +// SaveModel: POST /api/v1/admin/models —— 新增/更新一条模型配置。 +func (h *Handler) SaveModel(c *gin.Context) { + var b modelBody + if err := c.ShouldBindJSON(&b); err != nil || b.BaseURL == "" || b.Model == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "provider/base_url/model required"}) + return + } + provider := b.Provider + if provider == "" { + provider = "openai-compatible" + } + m := &store.LLMModel{ID: b.ID, Provider: provider, BaseURL: b.BaseURL, APIKey: b.APIKey, Model: b.Model} + if err := h.db.SaveModel(c.Request.Context(), m); err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + h.broadcastActiveModel(c.Request.Context()) + c.JSON(http.StatusOK, gin.H{"id": m.ID}) +} + +// SetActiveModel: POST /api/v1/admin/models/:id/active —— 设为激活并广播。 +func (h *Handler) SetActiveModel(c *gin.Context) { + id, _ := strconv.Atoi(c.Param("id")) + if err := h.db.SetActiveModel(c.Request.Context(), uint(id)); err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + h.broadcastActiveModel(c.Request.Context()) + c.JSON(http.StatusOK, gin.H{"status": "ok", "active": id}) +} + +// DeleteModel: DELETE /api/v1/admin/models/:id +func (h *Handler) DeleteModel(c *gin.Context) { + id, _ := strconv.Atoi(c.Param("id")) + if err := h.db.DeleteModel(c.Request.Context(), uint(id)); err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + h.broadcastActiveModel(c.Request.Context()) + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + +// TestModel: POST /api/v1/admin/models/test —— 探测 OpenAI 兼容端点连通性。 +func (h *Handler) TestModel(c *gin.Context) { + var b modelBody + if err := c.ShouldBindJSON(&b); err != nil || b.BaseURL == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "base_url required"}) + return + } + // 若传了已存的 id 但未带 key,用库里的真实 key。 + key := b.APIKey + if key == "" && b.ID != 0 { + if rows, _ := h.db.ListModels(c.Request.Context()); rows != nil { + for _, m := range rows { + if m.ID == b.ID { + key = m.APIKey + } + } + } + } + ctx, cancel := context.WithTimeout(c.Request.Context(), 8*time.Second) + defer cancel() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, b.BaseURL+"/models", nil) + if key != "" { + req.Header.Set("Authorization", "Bearer "+key) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + c.JSON(http.StatusOK, gin.H{"ok": false, "message": err.Error()}) + return + } + defer resp.Body.Close() + c.JSON(http.StatusOK, gin.H{"ok": resp.StatusCode < 400, "message": "HTTP " + resp.Status}) +} + +// broadcastActiveModel 读当前激活配置并经 NATS 广播,触发 Dispatcher 热更新。 +func (h *Handler) broadcastActiveModel(ctx context.Context) { + row, _ := h.db.GetActiveModel(ctx) + if row == nil { + return + } + _ = h.bus.PublishModelConfigUpdated(&contract.ModelConfig{ + Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model, + }) +} + +func mask(s string) string { + if len(s) <= 4 { + if s == "" { + return "" + } + return "••••" + } + return "••••" + s[len(s)-4:] +} diff --git a/sundynix-gateway/internal/nats/publisher.go b/sundynix-gateway/internal/nats/publisher.go index 5b4a152..a6fde3d 100644 --- a/sundynix-gateway/internal/nats/publisher.go +++ b/sundynix-gateway/internal/nats/publisher.go @@ -48,4 +48,14 @@ func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolC return b.inner.CallTool(ctx, subject, call) } +// ServeModelConfig 让网关作为配置控制面,响应 Dispatcher 的模型配置请求。 +func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (func() error, error) { + return b.inner.ServeModelConfig(provide) +} + +// PublishModelConfigUpdated 广播模型配置变更。 +func (b *Bus) PublishModelConfigUpdated(cfg *contract.ModelConfig) error { + return b.inner.PublishModelConfigUpdated(cfg) +} + func (b *Bus) Close() { b.inner.Close() } diff --git a/sundynix-gateway/internal/router/router.go b/sundynix-gateway/internal/router/router.go index 62a50e8..acc13a2 100644 --- a/sundynix-gateway/internal/router/router.go +++ b/sundynix-gateway/internal/router/router.go @@ -24,6 +24,16 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine { api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert) api.GET("/billing", h.Billing) + + // 运维控制面:LLM 模型配置(独立运维控制台调用)。 + admin := api.Group("/admin") + { + admin.GET("/models", h.ListModels) + admin.POST("/models", h.SaveModel) + admin.POST("/models/:id/active", h.SetActiveModel) + admin.DELETE("/models/:id", h.DeleteModel) + admin.POST("/models/test", h.TestModel) + } } return r } diff --git a/sundynix-gateway/internal/store/model.go b/sundynix-gateway/internal/store/model.go new file mode 100644 index 0000000..0c10ca5 --- /dev/null +++ b/sundynix-gateway/internal/store/model.go @@ -0,0 +1,72 @@ +package store + +import ( + "context" + + "gorm.io/gorm" +) + +// LLMModel 是一个 LLM 后端配置(控制面:管理员在此登记可用模型)。 +// 表名 sundynix_model(遵守前缀约定)。同一时刻仅一条 Active=true。 +type LLMModel struct { + ID uint `gorm:"primaryKey"` + Provider string `gorm:"size:32"` // openai-compatible / vllm + BaseURL string `gorm:"size:255"` // 如 https://api.deepseek.com/v1 + APIKey string `gorm:"size:255"` + Model string `gorm:"size:64"` // 如 deepseek-chat + Active bool +} + +func (LLMModel) TableName() string { return "sundynix_model" } + +// ListModels 列出全部模型配置。 +func (p *Postgres) ListModels(ctx context.Context) ([]LLMModel, error) { + if p.db == nil { + return nil, nil + } + var rows []LLMModel + err := p.db.WithContext(ctx).Order("id").Find(&rows).Error + return rows, err +} + +// SaveModel 新增或更新一条模型配置(ID==0 新增)。 +func (p *Postgres) SaveModel(ctx context.Context, m *LLMModel) error { + if p.db == nil { + return errStoreDisabled + } + return p.db.WithContext(ctx).Save(m).Error +} + +// SetActiveModel 把指定模型设为激活(其余取消),事务保证唯一激活。 +func (p *Postgres) SetActiveModel(ctx context.Context, id uint) error { + if p.db == nil { + return errStoreDisabled + } + return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.Model(&LLMModel{}).Where("active = ?", true).Update("active", false).Error; err != nil { + return err + } + return tx.Model(&LLMModel{}).Where("id = ?", id).Update("active", true).Error + }) +} + +// GetActiveModel 返回当前激活模型(无则 nil)。 +func (p *Postgres) GetActiveModel(ctx context.Context) (*LLMModel, error) { + if p.db == nil { + return nil, nil + } + var m LLMModel + err := p.db.WithContext(ctx).Where("active = ?", true).First(&m).Error + if err != nil { + return nil, nil // 未配置激活模型 + } + return &m, nil +} + +// DeleteModel 删除一条模型配置。 +func (p *Postgres) DeleteModel(ctx context.Context, id uint) error { + if p.db == nil { + return errStoreDisabled + } + return p.db.WithContext(ctx).Delete(&LLMModel{}, id).Error +} diff --git a/sundynix-gateway/internal/store/pgsql.go b/sundynix-gateway/internal/store/pgsql.go index c95a155..8b40fce 100644 --- a/sundynix-gateway/internal/store/pgsql.go +++ b/sundynix-gateway/internal/store/pgsql.go @@ -3,6 +3,7 @@ package store import ( "context" + "errors" "log" "gorm.io/driver/postgres" @@ -10,6 +11,9 @@ import ( "gorm.io/gorm/schema" ) +// errStoreDisabled 表示 Postgres 处于降级(未连接)模式,写操作无法进行。 +var errStoreDisabled = errors.New("postgres store disabled") + // Postgres 持有 MainDB 连接(Users / Billing / DSL)。 // db 为 nil 表示降级模式(连接失败时仍允许网关启动)。 type Postgres struct { @@ -30,7 +34,7 @@ func OpenPostgres(dsn string) *Postgres { log.Printf("[store] postgres 不可用,降级运行(不持久化): %v", err) return &Postgres{} } - if err := db.AutoMigrate(&User{}, &Task{}); err != nil { + if err := db.AutoMigrate(&User{}, &Task{}, &LLMModel{}); err != nil { log.Printf("[store] postgres AutoMigrate 失败,降级运行: %v", err) return &Postgres{} } diff --git a/sundynix-shared/bus/bus.go b/sundynix-shared/bus/bus.go index f63273d..f0e7e17 100644 --- a/sundynix-shared/bus/bus.go +++ b/sundynix-shared/bus/bus.go @@ -183,6 +183,66 @@ func respond(m *nats.Msg, res *contract.ToolResult) { _ = m.Respond(data) } +// ---- 配置控制面(core NATS request-reply + broadcast)---- + +// RequestModelConfig 向控制面(Gateway)请求当前激活的模型配置。 +// 无人应答 / 无激活配置时返回 (nil, nil),由调用方降级。 +func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) { + msg, err := b.nc.RequestWithContext(ctx, contract.SubjectConfigModelGet, nil) + if err != nil { + return nil, nil // 控制面暂不可用,降级 + } + if len(msg.Data) == 0 { + return nil, nil + } + var cfg contract.ModelConfig + if err := json.Unmarshal(msg.Data, &cfg); err != nil { + return nil, fmt.Errorf("unmarshal model config: %w", err) + } + if !cfg.Ready() { + return nil, nil + } + return &cfg, nil +} + +// ServeModelConfig 让控制面响应配置请求;provide 返回当前激活配置(可为 nil)。 +func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (unsub func() error, err error) { + sub, err := b.nc.Subscribe(contract.SubjectConfigModelGet, func(m *nats.Msg) { + var data []byte + if cfg := provide(); cfg != nil { + data, _ = json.Marshal(cfg) + } + _ = m.Respond(data) + }) + if err != nil { + return nil, fmt.Errorf("serve model config: %w", err) + } + return sub.Unsubscribe, nil +} + +// PublishModelConfigUpdated 广播模型配置变更(Dispatcher 据此热更新)。 +func (b *Bus) PublishModelConfigUpdated(cfg *contract.ModelConfig) error { + data, err := json.Marshal(cfg) + if err != nil { + return err + } + return b.nc.Publish(contract.SubjectConfigModelUpdated, data) +} + +// SubscribeModelConfigUpdated 订阅模型配置变更。 +func (b *Bus) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) { + sub, err := b.nc.Subscribe(contract.SubjectConfigModelUpdated, func(m *nats.Msg) { + var cfg contract.ModelConfig + if json.Unmarshal(m.Data, &cfg) == nil { + onUpdate(&cfg) + } + }) + if err != nil { + return nil, fmt.Errorf("subscribe model config: %w", err) + } + return sub.Unsubscribe, nil +} + // TaskHandler 处理一个消费到的任务。 type TaskHandler func(ctx context.Context, t *contract.Task) error diff --git a/sundynix-shared/contract/task.go b/sundynix-shared/contract/task.go index 1358b95..a68ab91 100644 --- a/sundynix-shared/contract/task.go +++ b/sundynix-shared/contract/task.go @@ -29,8 +29,26 @@ const ( MetaUserID = "user_id" // MetaSessionID 是 Task.Meta 中承载会话标识的键(用于短期多轮历史)。 MetaSessionID = "session_id" + + // 配置控制面(Gateway 持有配置,Dispatcher 经 NATS 取用/订阅变更)。 + SubjectConfigModelGet = "sundynix.config.model.get" // request-reply:取当前激活模型配置 + SubjectConfigModelUpdated = "sundynix.config.model.updated" // broadcast:模型配置变更通知 ) +// ModelConfig 是一个 LLM 后端的连接配置(provider 抽象)。 +// 开发期指向第三方在线 API(OpenAI 兼容);生产期可换自部署(vLLM)或其它在线模型。 +type ModelConfig struct { + Provider string `json:"provider"` // openai-compatible / vllm / ... + BaseURL string `json:"base_url"` // 如 https://api.deepseek.com/v1 + APIKey string `json:"api_key,omitempty"` + Model string `json:"model"` // 如 deepseek-chat +} + +// Ready 报告该配置是否足以发起真实推理。 +func (m *ModelConfig) Ready() bool { + return m != nil && m.BaseURL != "" && m.Model != "" +} + // Task 是 DSL 解析组装后的可调度任务,在 NATS 上以 JSON 传输。 type Task struct { ID string `json:"id"`