diff --git a/sundynix-admin/src/api.ts b/sundynix-admin/src/api.ts index b21d9a3..2362800 100644 --- a/sundynix-admin/src/api.ts +++ b/sundynix-admin/src/api.ts @@ -3,8 +3,11 @@ export const GATEWAY: string = (import.meta.env.VITE_GATEWAY as string | undefined) ?? "http://localhost:8080"; const ADMIN = `${GATEWAY}/api/v1/admin`; +export type Kind = "chat" | "embedding"; + export interface Model { id: number; + kind: Kind; provider: string; base_url: string; api_key: string; // 列表里是脱敏值 @@ -14,14 +17,15 @@ export interface Model { export interface ModelInput { id?: number; + kind: Kind; provider: string; base_url: string; api_key: string; model: string; } -export async function listModels(): Promise { - const res = await fetch(`${ADMIN}/models`); +export async function listModels(kind: Kind): Promise { + const res = await fetch(`${ADMIN}/models?kind=${kind}`); if (!res.ok) throw new Error(`list failed: ${res.status}`); return ((await res.json()) as { models: Model[] }).models; } diff --git a/sundynix-admin/src/components/ModelManager.tsx b/sundynix-admin/src/components/ModelManager.tsx new file mode 100644 index 0000000..268019e --- /dev/null +++ b/sundynix-admin/src/components/ModelManager.tsx @@ -0,0 +1,182 @@ +import { useEffect, useState } from "react"; +import { + listModels, + saveModel, + setActive, + deleteModel, + testModel, + type Kind, + type Model, + type ModelInput, +} from "../api"; + +// 复用的模型控制面:列表(激活/脱敏key) + 登记表单 + 测试连接 + 激活/删除。 +// 按 kind(chat/embedding) 区分,激活后经 NATS 热更新对应消费方。 +export function ModelManager({ + kind, + title, + baseUrlHint, + modelHint, +}: { + kind: Kind; + title: string; + baseUrlHint: string; + modelHint: string; +}) { + const empty: ModelInput = { kind, provider: "openai-compatible", base_url: "", api_key: "", model: "" }; + const [models, setModels] = useState([]); + const [form, setForm] = useState(empty); + const [msg, setMsg] = useState(""); + const [testing, setTesting] = useState(false); + + const refresh = () => listModels(kind).then(setModels).catch((e) => setMsg(`✗ ${e.message}`)); + useEffect(() => { + refresh(); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [kind]); + + const set = (k: keyof ModelInput, v: string) => setForm((f) => ({ ...f, [k]: v })); + + const onSave = async () => { + try { + await saveModel({ ...form, kind }); + setMsg("✓ 已保存"); + setForm(empty); + refresh(); + } catch (e) { + setMsg(`✗ ${(e as Error).message}`); + } + }; + + const onTest = async () => { + setTesting(true); + try { + const r = await testModel({ ...form, kind }); + setMsg(r.ok ? `✓ 连接成功(${r.message})` : `✗ 连接失败:${r.message}`); + } catch (e) { + setMsg(`✗ ${(e as Error).message}`); + } finally { + setTesting(false); + } + }; + + return ( +
+
+

{title}

+
+ + + + + + + + + + + + + {models.length === 0 && ( + + + + )} + {models.map((m) => ( + + + + + + + + + ))} + +
状态ProviderBase URLModelAPI Key操作
+ 暂无,使用下方表单登记。 +
+ {m.active ? ( + 激活 + ) : ( + + )} + {m.provider}{m.base_url}{m.model}{m.api_key || "—"} +
+ {!m.active && ( + + )} + +
+
+
+
+ +
+

登记(开发期:第三方在线 API,OpenAI 兼容)

+
+ + + + +
+
+ + + {msg && {msg}} +
+
+
+ ); +} diff --git a/sundynix-admin/src/pages/DatasourcesPage.tsx b/sundynix-admin/src/pages/DatasourcesPage.tsx new file mode 100644 index 0000000..09f6dcd --- /dev/null +++ b/sundynix-admin/src/pages/DatasourcesPage.tsx @@ -0,0 +1,20 @@ +import { ModelManager } from "../components/ModelManager"; +import { Soon } from "../components/Soon"; + +// 数据源页:Embedding 模型(RAG 向量路,→ mcp-go 热更新)+ 向量库/图库(规划)。 +export function DatasourcesPage() { + return ( +
+ + +
+ ); +} diff --git a/sundynix-admin/src/pages/ModelsPage.tsx b/sundynix-admin/src/pages/ModelsPage.tsx index 0e6fdd0..80b199e 100644 --- a/sundynix-admin/src/pages/ModelsPage.tsx +++ b/sundynix-admin/src/pages/ModelsPage.tsx @@ -1,179 +1,13 @@ -import { useEffect, useState } from "react"; -import { - listModels, - saveModel, - setActive, - deleteModel, - testModel, - type Model, - type ModelInput, -} from "../api"; +import { ModelManager } from "../components/ModelManager"; -const EMPTY: ModelInput = { - provider: "openai-compatible", - base_url: "", - api_key: "", - model: "", -}; - -// 模型配置页:登记/激活/删除 + 测试连接。激活后经 NATS 热更新到 Dispatcher。 +// 对话模型(chat)配置页 → Dispatcher 经 NATS 热更新。 export function ModelsPage() { - const [models, setModels] = useState([]); - const [form, setForm] = useState(EMPTY); - const [msg, setMsg] = useState(""); - const [testing, setTesting] = useState(false); - - const refresh = () => listModels().then(setModels).catch((e) => setMsg(`✗ ${e.message}`)); - useEffect(() => { - refresh(); - }, []); - - const set = (k: keyof ModelInput, v: string) => setForm((f) => ({ ...f, [k]: v })); - - const onSave = async () => { - try { - await saveModel(form); - setMsg("✓ 已保存"); - setForm(EMPTY); - refresh(); - } catch (e) { - setMsg(`✗ ${(e as Error).message}`); - } - }; - - const onTest = async () => { - setTesting(true); - try { - const r = await testModel(form); - setMsg(r.ok ? `✓ 连接成功(${r.message})` : `✗ 连接失败:${r.message}`); - } catch (e) { - setMsg(`✗ ${(e as Error).message}`); - } finally { - setTesting(false); - } - }; - return ( -
-
-

已配置模型

-
- - - - - - - - - - - - - {models.length === 0 && ( - - - - )} - {models.map((m) => ( - - - - - - - - - ))} - -
状态ProviderBase URLModelAPI Key操作
- 暂无模型,使用下方表单登记。 -
- {m.active ? ( - - 激活 - - ) : ( - - )} - {m.provider}{m.base_url}{m.model}{m.api_key || "—"} -
- {!m.active && ( - - )} - -
-
-
-
- -
-

登记模型(开发期:第三方在线 API,OpenAI 兼容)

-
- - - - -
-
- - - {msg && {msg}} -
-
-
+ ); } diff --git a/sundynix-admin/src/routes.tsx b/sundynix-admin/src/routes.tsx index 05e2d1f..832015e 100644 --- a/sundynix-admin/src/routes.tsx +++ b/sundynix-admin/src/routes.tsx @@ -4,6 +4,7 @@ import { Soon } from "./components/Soon"; // 路由注册表 —— 控制台的单一事实源:导航 + 内容都从这里派生。 // 新增页面 = 在此加一条;real 页面用 lazy 懒加载(代码分割)。 const ModelsPage = lazy(() => import("./pages/ModelsPage").then((m) => ({ default: m.ModelsPage }))); +const DatasourcesPage = lazy(() => import("./pages/DatasourcesPage").then((m) => ({ default: m.DatasourcesPage }))); export interface RouteDef { path: string; @@ -25,12 +26,8 @@ export const routes: RouteDef[] = [ path: "/datasources", label: "数据源", group: "配置", - element: ( - - ), + ready: true, + element: , }, { path: "/tenants", diff --git a/sundynix-dispatcher/internal/nats/subscriber.go b/sundynix-dispatcher/internal/nats/subscriber.go index 6411b32..b6402e5 100644 --- a/sundynix-dispatcher/internal/nats/subscriber.go +++ b/sundynix-dispatcher/internal/nats/subscriber.go @@ -58,14 +58,14 @@ func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contrac return s.inner.CallTool(ctx, subject, call) } -// RequestModelConfig 向控制面(Gateway)取当前激活的模型配置。 +// RequestModelConfig 向控制面(Gateway)取当前激活的对话模型配置。 func (s *Subscriber) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) { - return s.inner.RequestModelConfig(ctx) + return s.inner.RequestConfig(ctx, contract.ConfigKindChat) } -// SubscribeModelConfigUpdated 订阅模型配置热更新。 +// SubscribeModelConfigUpdated 订阅对话模型配置热更新。 func (s *Subscriber) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (func() error, error) { - return s.inner.SubscribeModelConfigUpdated(onUpdate) + return s.inner.SubscribeConfigUpdated(contract.ConfigKindChat, onUpdate) } func (s *Subscriber) Close() { s.inner.Close() } diff --git a/sundynix-gateway/cmd/server/main.go b/sundynix-gateway/cmd/server/main.go index 664e052..88bc56c 100644 --- a/sundynix-gateway/cmd/server/main.go +++ b/sundynix-gateway/cmd/server/main.go @@ -24,15 +24,18 @@ func main() { bus := nats.MustConnect(natsURL) // 接入 NATS 零拷贝骨干网 + 声明任务流 defer bus.Close() - // 配置控制面:响应 Dispatcher 对当前激活模型配置的请求。 - if _, err := bus.ServeModelConfig(func() *contract.ModelConfig { - row, _ := db.GetActiveModel(context.Background()) - if row == nil { - return nil + // 配置控制面:按 kind 响应消费方(Dispatcher=chat / mcp-go=embedding)的配置请求。 + for _, kind := range []string{contract.ConfigKindChat, contract.ConfigKindEmbedding} { + k := kind + if _, err := bus.ServeConfig(k, func() *contract.ModelConfig { + row, _ := db.GetActiveModel(context.Background(), k) + if row == nil { + return nil + } + return &contract.ModelConfig{Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model} + }); err != nil { + log.Printf("[gateway] serve %s config: %v", k, err) } - return &contract.ModelConfig{Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model} - }); err != nil { - log.Printf("[gateway] serve model config: %v", err) } r := router.New(db, cache, bus) diff --git a/sundynix-gateway/internal/handler/admin.go b/sundynix-gateway/internal/handler/admin.go index d16c4b3..8a19c4c 100644 --- a/sundynix-gateway/internal/handler/admin.go +++ b/sundynix-gateway/internal/handler/admin.go @@ -1,7 +1,9 @@ package handler import ( + "bytes" "context" + "encoding/json" "net/http" "strconv" "time" @@ -17,15 +19,16 @@ import ( type modelBody struct { ID uint `json:"id"` + Kind string `json:"kind"` Provider string `json:"provider"` BaseURL string `json:"base_url"` APIKey string `json:"api_key"` Model string `json:"model"` } -// ListModels: GET /api/v1/admin/models —— 列出模型(api_key 脱敏)。 +// ListModels: GET /api/v1/admin/models?kind=chat|embedding —— 列出模型(api_key 脱敏)。 func (h *Handler) ListModels(c *gin.Context) { - rows, err := h.db.ListModels(c.Request.Context()) + rows, err := h.db.ListModels(c.Request.Context(), c.Query("kind")) if err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) return @@ -33,7 +36,7 @@ func (h *Handler) ListModels(c *gin.Context) { out := make([]gin.H, 0, len(rows)) for _, m := range rows { out = append(out, gin.H{ - "id": m.ID, "provider": m.Provider, "base_url": m.BaseURL, + "id": m.ID, "kind": m.Kind, "provider": m.Provider, "base_url": m.BaseURL, "model": m.Model, "active": m.Active, "api_key": mask(m.APIKey), }) } @@ -51,12 +54,16 @@ func (h *Handler) SaveModel(c *gin.Context) { if provider == "" { provider = "openai-compatible" } - m := &store.LLMModel{ID: b.ID, Provider: provider, BaseURL: b.BaseURL, APIKey: b.APIKey, Model: b.Model} + kind := b.Kind + if kind == "" { + kind = contract.ConfigKindChat + } + m := &store.LLMModel{ID: b.ID, Kind: kind, Provider: provider, BaseURL: b.BaseURL, APIKey: b.APIKey, Model: b.Model} if err := h.db.SaveModel(c.Request.Context(), m); err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) return } - h.broadcastActiveModel(c.Request.Context()) + h.broadcastActive(c.Request.Context()) c.JSON(http.StatusOK, gin.H{"id": m.ID}) } @@ -67,7 +74,7 @@ func (h *Handler) SetActiveModel(c *gin.Context) { c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) return } - h.broadcastActiveModel(c.Request.Context()) + h.broadcastActive(c.Request.Context()) c.JSON(http.StatusOK, gin.H{"status": "ok", "active": id}) } @@ -78,7 +85,7 @@ func (h *Handler) DeleteModel(c *gin.Context) { c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) return } - h.broadcastActiveModel(c.Request.Context()) + h.broadcastActive(c.Request.Context()) c.JSON(http.StatusOK, gin.H{"status": "ok"}) } @@ -92,7 +99,7 @@ func (h *Handler) TestModel(c *gin.Context) { // 若传了已存的 id 但未带 key,用库里的真实 key。 key := b.APIKey if key == "" && b.ID != 0 { - if rows, _ := h.db.ListModels(c.Request.Context()); rows != nil { + if rows, _ := h.db.ListModels(c.Request.Context(), ""); rows != nil { for _, m := range rows { if m.ID == b.ID { key = m.APIKey @@ -100,9 +107,17 @@ func (h *Handler) TestModel(c *gin.Context) { } } } - ctx, cancel := context.WithTimeout(c.Request.Context(), 8*time.Second) + ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Second) defer cancel() - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, b.BaseURL+"/models", nil) + var req *http.Request + if b.Kind == contract.ConfigKindEmbedding { + // embedding 端点多无 /models,发一个最小 /embeddings 探测。 + payload, _ := json.Marshal(map[string]any{"model": b.Model, "input": []string{"ping"}}) + req, _ = http.NewRequestWithContext(ctx, http.MethodPost, b.BaseURL+"/embeddings", bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + } else { + req, _ = http.NewRequestWithContext(ctx, http.MethodGet, b.BaseURL+"/models", nil) + } if key != "" { req.Header.Set("Authorization", "Bearer "+key) } @@ -115,15 +130,17 @@ func (h *Handler) TestModel(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"ok": resp.StatusCode < 400, "message": "HTTP " + resp.Status}) } -// broadcastActiveModel 读当前激活配置并经 NATS 广播,触发 Dispatcher 热更新。 -func (h *Handler) broadcastActiveModel(ctx context.Context) { - row, _ := h.db.GetActiveModel(ctx) - if row == nil { - return +// broadcastActive 重新广播各 kind 当前激活配置,触发对应消费方热更新。 +func (h *Handler) broadcastActive(ctx context.Context) { + for _, kind := range []string{contract.ConfigKindChat, contract.ConfigKindEmbedding} { + row, _ := h.db.GetActiveModel(ctx, kind) + if row == nil { + continue + } + _ = h.bus.PublishConfigUpdated(kind, &contract.ModelConfig{ + Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model, + }) } - _ = h.bus.PublishModelConfigUpdated(&contract.ModelConfig{ - Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model, - }) } func mask(s string) string { diff --git a/sundynix-gateway/internal/nats/publisher.go b/sundynix-gateway/internal/nats/publisher.go index a6fde3d..8103923 100644 --- a/sundynix-gateway/internal/nats/publisher.go +++ b/sundynix-gateway/internal/nats/publisher.go @@ -48,14 +48,14 @@ func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolC return b.inner.CallTool(ctx, subject, call) } -// ServeModelConfig 让网关作为配置控制面,响应 Dispatcher 的模型配置请求。 -func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (func() error, error) { - return b.inner.ServeModelConfig(provide) +// ServeConfig 让网关作为配置控制面,响应某 kind 的配置请求。 +func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (func() error, error) { + return b.inner.ServeConfig(kind, provide) } -// PublishModelConfigUpdated 广播模型配置变更。 -func (b *Bus) PublishModelConfigUpdated(cfg *contract.ModelConfig) error { - return b.inner.PublishModelConfigUpdated(cfg) +// PublishConfigUpdated 广播某 kind 的配置变更。 +func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error { + return b.inner.PublishConfigUpdated(kind, cfg) } func (b *Bus) Close() { b.inner.Close() } diff --git a/sundynix-gateway/internal/store/model.go b/sundynix-gateway/internal/store/model.go index 0c10ca5..1672e6e 100644 --- a/sundynix-gateway/internal/store/model.go +++ b/sundynix-gateway/internal/store/model.go @@ -6,26 +6,31 @@ import ( "gorm.io/gorm" ) -// LLMModel 是一个 LLM 后端配置(控制面:管理员在此登记可用模型)。 -// 表名 sundynix_model(遵守前缀约定)。同一时刻仅一条 Active=true。 +// LLMModel 是一个模型后端配置(控制面:管理员在此登记可用模型)。 +// 表名 sundynix_model(遵守前缀约定)。每个 kind 同一时刻仅一条 Active=true。 type LLMModel struct { ID uint `gorm:"primaryKey"` - Provider string `gorm:"size:32"` // openai-compatible / vllm - BaseURL string `gorm:"size:255"` // 如 https://api.deepseek.com/v1 + Kind string `gorm:"size:16;index"` // chat / embedding + Provider string `gorm:"size:32"` // openai-compatible / vllm + BaseURL string `gorm:"size:255"` // 如 https://api.deepseek.com APIKey string `gorm:"size:255"` - Model string `gorm:"size:64"` // 如 deepseek-chat + Model string `gorm:"size:64"` // 如 deepseek-chat / text-embedding-v3 Active bool } func (LLMModel) TableName() string { return "sundynix_model" } -// ListModels 列出全部模型配置。 -func (p *Postgres) ListModels(ctx context.Context) ([]LLMModel, error) { +// ListModels 列出某 kind 的模型配置(kind 空则全部)。 +func (p *Postgres) ListModels(ctx context.Context, kind string) ([]LLMModel, error) { if p.db == nil { return nil, nil } var rows []LLMModel - err := p.db.WithContext(ctx).Order("id").Find(&rows).Error + q := p.db.WithContext(ctx).Order("id") + if kind != "" { + q = q.Where("kind = ?", kind) + } + err := q.Find(&rows).Error return rows, err } @@ -37,26 +42,30 @@ func (p *Postgres) SaveModel(ctx context.Context, m *LLMModel) error { return p.db.WithContext(ctx).Save(m).Error } -// SetActiveModel 把指定模型设为激活(其余取消),事务保证唯一激活。 +// SetActiveModel 把指定模型设为激活(同 kind 内其余取消),事务保证每 kind 唯一激活。 func (p *Postgres) SetActiveModel(ctx context.Context, id uint) error { if p.db == nil { return errStoreDisabled } return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - if err := tx.Model(&LLMModel{}).Where("active = ?", true).Update("active", false).Error; err != nil { + var m LLMModel + if err := tx.First(&m, id).Error; err != nil { + return err + } + if err := tx.Model(&LLMModel{}).Where("kind = ? AND active = ?", m.Kind, true).Update("active", false).Error; err != nil { return err } return tx.Model(&LLMModel{}).Where("id = ?", id).Update("active", true).Error }) } -// GetActiveModel 返回当前激活模型(无则 nil)。 -func (p *Postgres) GetActiveModel(ctx context.Context) (*LLMModel, error) { +// GetActiveModel 返回某 kind 当前激活模型(无则 nil)。 +func (p *Postgres) GetActiveModel(ctx context.Context, kind string) (*LLMModel, error) { if p.db == nil { return nil, nil } var m LLMModel - err := p.db.WithContext(ctx).Where("active = ?", true).First(&m).Error + err := p.db.WithContext(ctx).Where("kind = ? AND active = ?", kind, true).First(&m).Error if err != nil { return nil, nil // 未配置激活模型 } diff --git a/sundynix-gateway/internal/store/pgsql.go b/sundynix-gateway/internal/store/pgsql.go index 8b40fce..587657e 100644 --- a/sundynix-gateway/internal/store/pgsql.go +++ b/sundynix-gateway/internal/store/pgsql.go @@ -38,6 +38,8 @@ func OpenPostgres(dsn string) *Postgres { log.Printf("[store] postgres AutoMigrate 失败,降级运行: %v", err) return &Postgres{} } + // 回填:kind 列新增前的旧模型行默认归为 chat(幂等)。 + db.Model(&LLMModel{}).Where("kind = '' OR kind IS NULL").Update("kind", "chat") log.Println("[store] postgres connected, migrated sundynix_user / sundynix_task") return &Postgres{db: db} } diff --git a/sundynix-mcp-go/cmd/server/main.go b/sundynix-mcp-go/cmd/server/main.go index f7d80f1..8dfab5b 100644 --- a/sundynix-mcp-go/cmd/server/main.go +++ b/sundynix-mcp-go/cmd/server/main.go @@ -7,8 +7,10 @@ import ( "os" "os/signal" "syscall" + "time" sharedbus "github.com/sundynix/sundynix-shared/bus" + "github.com/sundynix/sundynix-shared/contract" "github.com/sundynix/sundynix-mcp-go/internal/history" "github.com/sundynix/sundynix-mcp-go/internal/mcp" @@ -42,8 +44,26 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - ragEngine := rag.Open(ctx, milvusAddr, embBase, embKey, embModel) // RAG 核心链:embedding + Milvus + ragEngine := rag.Open(ctx, milvusAddr, embBase, embKey, embModel) // RAG 核心链:embedding(env 初值) + Milvus defer ragEngine.Close() + + // 配置控制面:启动取激活 embedding 配置 + 订阅热更新(覆盖 env,持久化由 Gateway 管)。 + applyEmbed := func(cfg *contract.ModelConfig) { + if cfg != nil { + ragEngine.SetEmbedding(cfg.BaseURL, cfg.APIKey, cfg.Model) + } + } + cctx, ccancel := context.WithTimeout(ctx, 3*time.Second) + if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindEmbedding); cfg != nil { + applyEmbed(cfg) + } else { + log.Println("[mcp_go] 未取到 embedding 控制面配置(用 env 或降级)") + } + ccancel() + if _, err := b.SubscribeConfigUpdated(contract.ConfigKindEmbedding, applyEmbed); err != nil { + log.Printf("[mcp_go] subscribe embedding config: %v", err) + } + gw := mcp.NewGateway(b, engine, mem, hist, ragEngine) log.Println("[mcp_go] serving MCP over sundynix.tools.go.* (Ctrl-C to quit)") diff --git a/sundynix-mcp-go/internal/rag/rag.go b/sundynix-mcp-go/internal/rag/rag.go index 9758d49..86e1670 100644 --- a/sundynix-mcp-go/internal/rag/rag.go +++ b/sundynix-mcp-go/internal/rag/rag.go @@ -7,22 +7,41 @@ import ( "errors" "log" "strings" + "sync" ) -// Engine 聚合 embedding 与 Milvus,对外提供入库/检索。 +// Engine 聚合 embedding 与 Milvus,对外提供入库/检索。embedding 可热更新(控制面下发)。 type Engine struct { + mu sync.RWMutex emb *embedClient mv *milvusStore } +// SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。 +func (e *Engine) SetEmbedding(base, key, model string) { + e.mu.Lock() + defer e.mu.Unlock() + if base == "" || model == "" { + e.emb = nil + return + } + e.emb = newEmbedClient(base, key, model) + log.Printf("[rag] embedding 配置: %s model=%s", base, model) +} + +func (e *Engine) embed() *embedClient { + e.mu.RLock() + defer e.mu.RUnlock() + return e.emb +} + // Open 建立 RAG 引擎。embedding 未配 / Milvus 连不上 → 降级(检索返回空,不阻断工具服务)。 func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel string) *Engine { e := &Engine{} if embBase != "" && embModel != "" { - e.emb = newEmbedClient(embBase, embKey, embModel) - log.Printf("[rag] embedding: %s model=%s", embBase, embModel) + e.SetEmbedding(embBase, embKey, embModel) // env 初值(控制面会覆盖) } else { - log.Println("[rag] embedding 未配置,向量检索降级") + log.Println("[rag] embedding 未配置(待控制面下发),向量检索暂降级") } if milvusAddr != "" { mv, err := openMilvus(ctx, milvusAddr) @@ -37,7 +56,7 @@ func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel string) *En } // Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。 -func (e *Engine) Ready() bool { return e.emb.ready() && e.mv != nil } +func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil } // Ingest 把一段文本切块 → 向量化 → 写入 Milvus,返回块数。 func (e *Engine) Ingest(ctx context.Context, kb, text string) (int, error) { @@ -48,7 +67,7 @@ func (e *Engine) Ingest(ctx context.Context, kb, text string) (int, error) { if len(chunks) == 0 { return 0, nil } - vecs, err := e.emb.Embed(ctx, chunks) + vecs, err := e.embed().Embed(ctx, chunks) if err != nil { return 0, err } @@ -66,7 +85,7 @@ func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit, if topK <= 0 { topK = 5 } - vecs, err := e.emb.Embed(ctx, []string{query}) + vecs, err := e.embed().Embed(ctx, []string{query}) if err != nil || len(vecs) == 0 { return nil, err } diff --git a/sundynix-shared/bus/bus.go b/sundynix-shared/bus/bus.go index f0e7e17..dc0443a 100644 --- a/sundynix-shared/bus/bus.go +++ b/sundynix-shared/bus/bus.go @@ -185,10 +185,10 @@ func respond(m *nats.Msg, res *contract.ToolResult) { // ---- 配置控制面(core NATS request-reply + broadcast)---- -// RequestModelConfig 向控制面(Gateway)请求当前激活的模型配置。 +// RequestConfig 向控制面(Gateway)请求某 kind 当前激活配置(chat/embedding)。 // 无人应答 / 无激活配置时返回 (nil, nil),由调用方降级。 -func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) { - msg, err := b.nc.RequestWithContext(ctx, contract.SubjectConfigModelGet, nil) +func (b *Bus) RequestConfig(ctx context.Context, kind string) (*contract.ModelConfig, error) { + msg, err := b.nc.RequestWithContext(ctx, contract.ConfigGetSubject(kind), nil) if err != nil { return nil, nil // 控制面暂不可用,降级 } @@ -197,7 +197,7 @@ func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, er } var cfg contract.ModelConfig if err := json.Unmarshal(msg.Data, &cfg); err != nil { - return nil, fmt.Errorf("unmarshal model config: %w", err) + return nil, fmt.Errorf("unmarshal %s config: %w", kind, err) } if !cfg.Ready() { return nil, nil @@ -205,9 +205,9 @@ func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, er return &cfg, nil } -// ServeModelConfig 让控制面响应配置请求;provide 返回当前激活配置(可为 nil)。 -func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (unsub func() error, err error) { - sub, err := b.nc.Subscribe(contract.SubjectConfigModelGet, func(m *nats.Msg) { +// ServeConfig 让控制面响应某 kind 的配置请求;provide 返回当前激活配置(可为 nil)。 +func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (unsub func() error, err error) { + sub, err := b.nc.Subscribe(contract.ConfigGetSubject(kind), func(m *nats.Msg) { var data []byte if cfg := provide(); cfg != nil { data, _ = json.Marshal(cfg) @@ -215,30 +215,30 @@ func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (unsub func _ = m.Respond(data) }) if err != nil { - return nil, fmt.Errorf("serve model config: %w", err) + return nil, fmt.Errorf("serve %s config: %w", kind, err) } return sub.Unsubscribe, nil } -// PublishModelConfigUpdated 广播模型配置变更(Dispatcher 据此热更新)。 -func (b *Bus) PublishModelConfigUpdated(cfg *contract.ModelConfig) error { +// PublishConfigUpdated 广播某 kind 的配置变更(消费方据此热更新)。 +func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error { data, err := json.Marshal(cfg) if err != nil { return err } - return b.nc.Publish(contract.SubjectConfigModelUpdated, data) + return b.nc.Publish(contract.ConfigUpdatedSubject(kind), data) } -// SubscribeModelConfigUpdated 订阅模型配置变更。 -func (b *Bus) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) { - sub, err := b.nc.Subscribe(contract.SubjectConfigModelUpdated, func(m *nats.Msg) { +// SubscribeConfigUpdated 订阅某 kind 的配置变更。 +func (b *Bus) SubscribeConfigUpdated(kind string, onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) { + sub, err := b.nc.Subscribe(contract.ConfigUpdatedSubject(kind), func(m *nats.Msg) { var cfg contract.ModelConfig if json.Unmarshal(m.Data, &cfg) == nil { onUpdate(&cfg) } }) if err != nil { - return nil, fmt.Errorf("subscribe model config: %w", err) + return nil, fmt.Errorf("subscribe %s config: %w", kind, err) } return sub.Unsubscribe, nil } diff --git a/sundynix-shared/contract/task.go b/sundynix-shared/contract/task.go index a68ab91..aadaa54 100644 --- a/sundynix-shared/contract/task.go +++ b/sundynix-shared/contract/task.go @@ -30,18 +30,23 @@ const ( // MetaSessionID 是 Task.Meta 中承载会话标识的键(用于短期多轮历史)。 MetaSessionID = "session_id" - // 配置控制面(Gateway 持有配置,Dispatcher 经 NATS 取用/订阅变更)。 - SubjectConfigModelGet = "sundynix.config.model.get" // request-reply:取当前激活模型配置 - SubjectConfigModelUpdated = "sundynix.config.model.updated" // broadcast:模型配置变更通知 + // 配置控制面按 kind 寻址:sundynix.config..get / .updated。 + // Gateway 持有配置,消费方(Dispatcher/mcp-go)经 NATS 取用/订阅变更。 + ConfigKindChat = "chat" // 对话模型(Dispatcher 用) + ConfigKindEmbedding = "embedding" // 向量模型(mcp-go RAG 用) ) -// ModelConfig 是一个 LLM 后端的连接配置(provider 抽象)。 -// 开发期指向第三方在线 API(OpenAI 兼容);生产期可换自部署(vLLM)或其它在线模型。 +// ConfigGetSubject / ConfigUpdatedSubject 返回某类配置的 request / 广播主题。 +func ConfigGetSubject(kind string) string { return "sundynix.config." + kind + ".get" } +func ConfigUpdatedSubject(kind string) string { return "sundynix.config." + kind + ".updated" } + +// ModelConfig 是一个模型后端的连接配置(provider 抽象,chat 与 embedding 同形)。 +// 开发期指向第三方在线 API(OpenAI 兼容);生产期可换自部署或其它在线模型。 type ModelConfig struct { Provider string `json:"provider"` // openai-compatible / vllm / ... - BaseURL string `json:"base_url"` // 如 https://api.deepseek.com/v1 + BaseURL string `json:"base_url"` // 如 https://api.deepseek.com APIKey string `json:"api_key,omitempty"` - Model string `json:"model"` // 如 deepseek-chat + Model string `json:"model"` // 如 deepseek-chat / text-embedding-v3 } // Ready 报告该配置是否足以发起真实推理。