feat: embedding 配置搬上控制面 — 数据源页可视化配置 + 热更新

embedding 从 env 改为控制面驱动(持久化+可视化),复用 chat 模型同套范式:
配置控制面泛化为按 kind(chat/embedding),加 embedding kind。

- shared: 配置 subjects 泛化 sundynix.config.<kind>.get/.updated;bus 方法改 kind 参数
  (RequestConfig/ServeConfig/PublishConfigUpdated/SubscribeConfigUpdated)
- gateway: sundynix_model 加 kind 列(每 kind 唯一激活)+旧行回填 chat;admin 按 kind
  增删改/激活/列表,测试连接 embedding 走 POST /embeddings;main 按 kind ServeConfig;
  变更广播各 kind
- dispatcher: 取 chat 配置(kind 化)
- mcp-go: rag.Engine.SetEmbedding 热更新(RWMutex);main 取/订阅 embedding 控制面配置
  (覆盖 env)
- admin 控制台: api 按 kind;抽出复用 ModelManager;ModelsPage(chat)+新 DatasourcesPage
  (embedding + 向量/图库占位);routes 数据源页就绪
- 验证: 全模块 build✓ + e2e PASS + 控制台 npm build✓;live 全跑通——chat(DeepSeek 回填
  kind 仍工作);mcp-go 不带 EMBED env 启动→控制台配 embedding(百炼)→测试连接✓→激活
  →NATS 热更新 mcp-go→入库+语义检索'存向量的数据库'→Milvus;浏览器数据源页拉到激活配置

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Blizzard
2026-06-10 17:25:54 +08:00
parent e5bbe7318c
commit 3b54e59ecf
15 changed files with 373 additions and 261 deletions
+6 -2
View File
@@ -3,8 +3,11 @@ export const GATEWAY: string =
(import.meta.env.VITE_GATEWAY as string | undefined) ?? "http://localhost:8080";
const ADMIN = `${GATEWAY}/api/v1/admin`;
export type Kind = "chat" | "embedding";
export interface Model {
id: number;
kind: Kind;
provider: string;
base_url: string;
api_key: string; // 列表里是脱敏值
@@ -14,14 +17,15 @@ export interface Model {
export interface ModelInput {
id?: number;
kind: Kind;
provider: string;
base_url: string;
api_key: string;
model: string;
}
export async function listModels(): Promise<Model[]> {
const res = await fetch(`${ADMIN}/models`);
export async function listModels(kind: Kind): Promise<Model[]> {
const res = await fetch(`${ADMIN}/models?kind=${kind}`);
if (!res.ok) throw new Error(`list failed: ${res.status}`);
return ((await res.json()) as { models: Model[] }).models;
}
@@ -0,0 +1,182 @@
import { useEffect, useState } from "react";
import {
listModels,
saveModel,
setActive,
deleteModel,
testModel,
type Kind,
type Model,
type ModelInput,
} from "../api";
// 复用的模型控制面:列表(激活/脱敏key) + 登记表单 + 测试连接 + 激活/删除。
// 按 kind(chat/embedding) 区分,激活后经 NATS 热更新对应消费方。
export function ModelManager({
kind,
title,
baseUrlHint,
modelHint,
}: {
kind: Kind;
title: string;
baseUrlHint: string;
modelHint: string;
}) {
const empty: ModelInput = { kind, provider: "openai-compatible", base_url: "", api_key: "", model: "" };
const [models, setModels] = useState<Model[]>([]);
const [form, setForm] = useState<ModelInput>(empty);
const [msg, setMsg] = useState("");
const [testing, setTesting] = useState(false);
const refresh = () => listModels(kind).then(setModels).catch((e) => setMsg(`${e.message}`));
useEffect(() => {
refresh();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [kind]);
const set = (k: keyof ModelInput, v: string) => setForm((f) => ({ ...f, [k]: v }));
const onSave = async () => {
try {
await saveModel({ ...form, kind });
setMsg("✓ 已保存");
setForm(empty);
refresh();
} catch (e) {
setMsg(`${(e as Error).message}`);
}
};
const onTest = async () => {
setTesting(true);
try {
const r = await testModel({ ...form, kind });
setMsg(r.ok ? `✓ 连接成功(${r.message}` : `✗ 连接失败:${r.message}`);
} catch (e) {
setMsg(`${(e as Error).message}`);
} finally {
setTesting(false);
}
};
return (
<div className="flex flex-col gap-6">
<section>
<h2 className="mb-3 text-sm font-semibold text-gray-700">{title}</h2>
<div className="overflow-hidden rounded border">
<table className="w-full text-sm">
<thead className="bg-gray-50 text-left text-xs text-gray-500">
<tr>
<th className="px-3 py-2"></th>
<th className="px-3 py-2">Provider</th>
<th className="px-3 py-2">Base URL</th>
<th className="px-3 py-2">Model</th>
<th className="px-3 py-2">API Key</th>
<th className="px-3 py-2"></th>
</tr>
</thead>
<tbody>
{models.length === 0 && (
<tr>
<td colSpan={6} className="px-3 py-4 text-center text-xs text-gray-400">
使
</td>
</tr>
)}
{models.map((m) => (
<tr key={m.id} className="border-t">
<td className="px-3 py-2">
{m.active ? (
<span className="rounded bg-emerald-100 px-1.5 py-0.5 text-[10px] text-emerald-700"></span>
) : (
<span className="text-[10px] text-gray-400"></span>
)}
</td>
<td className="px-3 py-2 text-gray-600">{m.provider}</td>
<td className="px-3 py-2 font-mono text-xs text-gray-600">{m.base_url}</td>
<td className="px-3 py-2 text-gray-800">{m.model}</td>
<td className="px-3 py-2 font-mono text-xs text-gray-400">{m.api_key || "—"}</td>
<td className="px-3 py-2">
<div className="flex gap-2">
{!m.active && (
<button
onClick={() => setActive(m.id).then(() => { setMsg("✓ 已激活并热更新"); refresh(); })}
className="rounded border px-2 py-0.5 text-xs text-violet-600 hover:bg-violet-50"
>
</button>
)}
<button
onClick={() => deleteModel(m.id).then(refresh)}
className="rounded border px-2 py-0.5 text-xs text-rose-500 hover:bg-rose-50"
>
</button>
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
</section>
<section className="max-w-xl">
<h2 className="mb-3 text-sm font-semibold text-gray-700">线 APIOpenAI </h2>
<div className="grid grid-cols-2 gap-3">
<label className="text-xs text-gray-500">
Provider
<select
className="mt-1 w-full rounded border px-2 py-1 text-sm text-gray-900"
value={form.provider}
onChange={(e) => set("provider", e.target.value)}
>
<option value="openai-compatible">openai-compatible</option>
<option value="vllm">vllm</option>
</select>
</label>
<label className="text-xs text-gray-500">
Model
<input
className="mt-1 w-full rounded border px-2 py-1 text-sm"
value={form.model}
onChange={(e) => set("model", e.target.value)}
placeholder={modelHint}
/>
</label>
<label className="col-span-2 text-xs text-gray-500">
Base URL
<input
className="mt-1 w-full rounded border px-2 py-1 text-sm font-mono"
value={form.base_url}
onChange={(e) => set("base_url", e.target.value)}
placeholder={baseUrlHint}
/>
</label>
<label className="col-span-2 text-xs text-gray-500">
API Key
<input
type="password"
className="mt-1 w-full rounded border px-2 py-1 text-sm font-mono"
value={form.api_key}
onChange={(e) => set("api_key", e.target.value)}
placeholder="sk-…"
/>
</label>
</div>
<div className="mt-3 flex items-center gap-2">
<button onClick={onSave} className="rounded bg-violet-600 px-3 py-1 text-sm text-white"></button>
<button
onClick={onTest}
disabled={testing || !form.base_url}
className="rounded border px-3 py-1 text-sm disabled:opacity-40"
>
{testing ? "测试中…" : "测试连接"}
</button>
{msg && <span className="text-xs text-gray-600">{msg}</span>}
</div>
</section>
</div>
);
}
@@ -0,0 +1,20 @@
import { ModelManager } from "../components/ModelManager";
import { Soon } from "../components/Soon";
// 数据源页:Embedding 模型(RAG 向量路,→ mcp-go 热更新)+ 向量库/图库(规划)。
export function DatasourcesPage() {
return (
<div className="flex flex-col gap-8">
<ModelManager
kind="embedding"
title="Embedding 模型(embedding → mcp-go RAG"
baseUrlHint="https://dashscope.aliyuncs.com/compatible-mode/v1"
modelHint="text-embedding-v3"
/>
<Soon
title="向量库 / 图库 / 全文"
desc="Milvus(:19530) / Neo4j(:7687) / Bleve 连接配置 + 测试连接 + 状态。当前经 env,规划同 Embedding 走控制面。"
/>
</div>
);
}
+8 -174
View File
@@ -1,179 +1,13 @@
import { useEffect, useState } from "react";
import {
listModels,
saveModel,
setActive,
deleteModel,
testModel,
type Model,
type ModelInput,
} from "../api";
import { ModelManager } from "../components/ModelManager";
const EMPTY: ModelInput = {
provider: "openai-compatible",
base_url: "",
api_key: "",
model: "",
};
// 模型配置页:登记/激活/删除 + 测试连接。激活后经 NATS 热更新到 Dispatcher。
// 对话模型(chat)配置页 → Dispatcher 经 NATS 热更新。
export function ModelsPage() {
const [models, setModels] = useState<Model[]>([]);
const [form, setForm] = useState<ModelInput>(EMPTY);
const [msg, setMsg] = useState("");
const [testing, setTesting] = useState(false);
const refresh = () => listModels().then(setModels).catch((e) => setMsg(`${e.message}`));
useEffect(() => {
refresh();
}, []);
const set = (k: keyof ModelInput, v: string) => setForm((f) => ({ ...f, [k]: v }));
const onSave = async () => {
try {
await saveModel(form);
setMsg("✓ 已保存");
setForm(EMPTY);
refresh();
} catch (e) {
setMsg(`${(e as Error).message}`);
}
};
const onTest = async () => {
setTesting(true);
try {
const r = await testModel(form);
setMsg(r.ok ? `✓ 连接成功(${r.message}` : `✗ 连接失败:${r.message}`);
} catch (e) {
setMsg(`${(e as Error).message}`);
} finally {
setTesting(false);
}
};
return (
<div className="flex flex-col gap-6">
<section>
<h2 className="mb-3 text-sm font-semibold text-gray-700"></h2>
<div className="overflow-hidden rounded border">
<table className="w-full text-sm">
<thead className="bg-gray-50 text-left text-xs text-gray-500">
<tr>
<th className="px-3 py-2"></th>
<th className="px-3 py-2">Provider</th>
<th className="px-3 py-2">Base URL</th>
<th className="px-3 py-2">Model</th>
<th className="px-3 py-2">API Key</th>
<th className="px-3 py-2"></th>
</tr>
</thead>
<tbody>
{models.length === 0 && (
<tr>
<td colSpan={6} className="px-3 py-4 text-center text-xs text-gray-400">
使
</td>
</tr>
)}
{models.map((m) => (
<tr key={m.id} className="border-t">
<td className="px-3 py-2">
{m.active ? (
<span className="rounded bg-emerald-100 px-1.5 py-0.5 text-[10px] text-emerald-700">
</span>
) : (
<span className="text-[10px] text-gray-400"></span>
)}
</td>
<td className="px-3 py-2 text-gray-600">{m.provider}</td>
<td className="px-3 py-2 font-mono text-xs text-gray-600">{m.base_url}</td>
<td className="px-3 py-2 text-gray-800">{m.model}</td>
<td className="px-3 py-2 font-mono text-xs text-gray-400">{m.api_key || "—"}</td>
<td className="px-3 py-2">
<div className="flex gap-2">
{!m.active && (
<button
onClick={() => setActive(m.id).then(() => { setMsg("✓ 已激活并热更新 Dispatcher"); refresh(); })}
className="rounded border px-2 py-0.5 text-xs text-violet-600 hover:bg-violet-50"
>
</button>
)}
<button
onClick={() => deleteModel(m.id).then(refresh)}
className="rounded border px-2 py-0.5 text-xs text-rose-500 hover:bg-rose-50"
>
</button>
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
</section>
<section className="max-w-xl">
<h2 className="mb-3 text-sm font-semibold text-gray-700">线 APIOpenAI </h2>
<div className="grid grid-cols-2 gap-3">
<label className="text-xs text-gray-500">
Provider
<select
className="mt-1 w-full rounded border px-2 py-1 text-sm text-gray-900"
value={form.provider}
onChange={(e) => set("provider", e.target.value)}
>
<option value="openai-compatible">openai-compatible</option>
<option value="vllm">vllm</option>
</select>
</label>
<label className="text-xs text-gray-500">
Model
<input
className="mt-1 w-full rounded border px-2 py-1 text-sm"
value={form.model}
onChange={(e) => set("model", e.target.value)}
placeholder="deepseek-chat"
/>
</label>
<label className="col-span-2 text-xs text-gray-500">
Base URL
<input
className="mt-1 w-full rounded border px-2 py-1 text-sm font-mono"
value={form.base_url}
onChange={(e) => set("base_url", e.target.value)}
placeholder="https://api.deepseek.com/v1"
/>
</label>
<label className="col-span-2 text-xs text-gray-500">
API Key
<input
type="password"
className="mt-1 w-full rounded border px-2 py-1 text-sm font-mono"
value={form.api_key}
onChange={(e) => set("api_key", e.target.value)}
placeholder="sk-…"
/>
</label>
</div>
<div className="mt-3 flex items-center gap-2">
<button onClick={onSave} className="rounded bg-violet-600 px-3 py-1 text-sm text-white">
</button>
<button
onClick={onTest}
disabled={testing || !form.base_url}
className="rounded border px-3 py-1 text-sm disabled:opacity-40"
>
{testing ? "测试中…" : "测试连接"}
</button>
{msg && <span className="text-xs text-gray-600">{msg}</span>}
</div>
</section>
</div>
<ModelManager
kind="chat"
title="对话模型(chat → Dispatcher"
baseUrlHint="https://api.deepseek.com"
modelHint="deepseek-chat"
/>
);
}
+3 -6
View File
@@ -4,6 +4,7 @@ import { Soon } from "./components/Soon";
// 路由注册表 —— 控制台的单一事实源:导航 + 内容都从这里派生。
// 新增页面 = 在此加一条;real 页面用 lazy 懒加载(代码分割)。
const ModelsPage = lazy(() => import("./pages/ModelsPage").then((m) => ({ default: m.ModelsPage })));
const DatasourcesPage = lazy(() => import("./pages/DatasourcesPage").then((m) => ({ default: m.DatasourcesPage })));
export interface RouteDef {
path: string;
@@ -25,12 +26,8 @@ export const routes: RouteDef[] = [
path: "/datasources",
label: "数据源",
group: "配置",
element: (
<Soon
title="数据源(向量库 / 图库 / 全文)"
desc="配置 Milvus(:19530) / Neo4j(:7687) / Bleve 连接 + 测试连接 + 状态。复用模型控制面同套路(配置→NATS 下发→mcp-go 热更新)。RAG 核心链。"
/>
),
ready: true,
element: <DatasourcesPage />,
},
{
path: "/tenants",
@@ -58,14 +58,14 @@ func (s *Subscriber) CallTool(ctx context.Context, subject string, call *contrac
return s.inner.CallTool(ctx, subject, call)
}
// RequestModelConfig 向控制面(Gateway)取当前激活的模型配置。
// RequestModelConfig 向控制面(Gateway)取当前激活的对话模型配置。
func (s *Subscriber) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) {
return s.inner.RequestModelConfig(ctx)
return s.inner.RequestConfig(ctx, contract.ConfigKindChat)
}
// SubscribeModelConfigUpdated 订阅模型配置热更新。
// SubscribeModelConfigUpdated 订阅对话模型配置热更新。
func (s *Subscriber) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (func() error, error) {
return s.inner.SubscribeModelConfigUpdated(onUpdate)
return s.inner.SubscribeConfigUpdated(contract.ConfigKindChat, onUpdate)
}
func (s *Subscriber) Close() { s.inner.Close() }
+11 -8
View File
@@ -24,15 +24,18 @@ func main() {
bus := nats.MustConnect(natsURL) // 接入 NATS 零拷贝骨干网 + 声明任务流
defer bus.Close()
// 配置控制面:响应 Dispatcher 对当前激活模型配置请求。
if _, err := bus.ServeModelConfig(func() *contract.ModelConfig {
row, _ := db.GetActiveModel(context.Background())
if row == nil {
return nil
// 配置控制面:按 kind 响应消费方(Dispatcher=chat / mcp-go=embedding)的配置请求。
for _, kind := range []string{contract.ConfigKindChat, contract.ConfigKindEmbedding} {
k := kind
if _, err := bus.ServeConfig(k, func() *contract.ModelConfig {
row, _ := db.GetActiveModel(context.Background(), k)
if row == nil {
return nil
}
return &contract.ModelConfig{Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model}
}); err != nil {
log.Printf("[gateway] serve %s config: %v", k, err)
}
return &contract.ModelConfig{Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model}
}); err != nil {
log.Printf("[gateway] serve model config: %v", err)
}
r := router.New(db, cache, bus)
+35 -18
View File
@@ -1,7 +1,9 @@
package handler
import (
"bytes"
"context"
"encoding/json"
"net/http"
"strconv"
"time"
@@ -17,15 +19,16 @@ import (
type modelBody struct {
ID uint `json:"id"`
Kind string `json:"kind"`
Provider string `json:"provider"`
BaseURL string `json:"base_url"`
APIKey string `json:"api_key"`
Model string `json:"model"`
}
// ListModels: GET /api/v1/admin/models —— 列出模型(api_key 脱敏)。
// ListModels: GET /api/v1/admin/models?kind=chat|embedding —— 列出模型(api_key 脱敏)。
func (h *Handler) ListModels(c *gin.Context) {
rows, err := h.db.ListModels(c.Request.Context())
rows, err := h.db.ListModels(c.Request.Context(), c.Query("kind"))
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
@@ -33,7 +36,7 @@ func (h *Handler) ListModels(c *gin.Context) {
out := make([]gin.H, 0, len(rows))
for _, m := range rows {
out = append(out, gin.H{
"id": m.ID, "provider": m.Provider, "base_url": m.BaseURL,
"id": m.ID, "kind": m.Kind, "provider": m.Provider, "base_url": m.BaseURL,
"model": m.Model, "active": m.Active, "api_key": mask(m.APIKey),
})
}
@@ -51,12 +54,16 @@ func (h *Handler) SaveModel(c *gin.Context) {
if provider == "" {
provider = "openai-compatible"
}
m := &store.LLMModel{ID: b.ID, Provider: provider, BaseURL: b.BaseURL, APIKey: b.APIKey, Model: b.Model}
kind := b.Kind
if kind == "" {
kind = contract.ConfigKindChat
}
m := &store.LLMModel{ID: b.ID, Kind: kind, Provider: provider, BaseURL: b.BaseURL, APIKey: b.APIKey, Model: b.Model}
if err := h.db.SaveModel(c.Request.Context(), m); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
h.broadcastActiveModel(c.Request.Context())
h.broadcastActive(c.Request.Context())
c.JSON(http.StatusOK, gin.H{"id": m.ID})
}
@@ -67,7 +74,7 @@ func (h *Handler) SetActiveModel(c *gin.Context) {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
h.broadcastActiveModel(c.Request.Context())
h.broadcastActive(c.Request.Context())
c.JSON(http.StatusOK, gin.H{"status": "ok", "active": id})
}
@@ -78,7 +85,7 @@ func (h *Handler) DeleteModel(c *gin.Context) {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
h.broadcastActiveModel(c.Request.Context())
h.broadcastActive(c.Request.Context())
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
@@ -92,7 +99,7 @@ func (h *Handler) TestModel(c *gin.Context) {
// 若传了已存的 id 但未带 key,用库里的真实 key。
key := b.APIKey
if key == "" && b.ID != 0 {
if rows, _ := h.db.ListModels(c.Request.Context()); rows != nil {
if rows, _ := h.db.ListModels(c.Request.Context(), ""); rows != nil {
for _, m := range rows {
if m.ID == b.ID {
key = m.APIKey
@@ -100,9 +107,17 @@ func (h *Handler) TestModel(c *gin.Context) {
}
}
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 8*time.Second)
ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, b.BaseURL+"/models", nil)
var req *http.Request
if b.Kind == contract.ConfigKindEmbedding {
// embedding 端点多无 /models,发一个最小 /embeddings 探测。
payload, _ := json.Marshal(map[string]any{"model": b.Model, "input": []string{"ping"}})
req, _ = http.NewRequestWithContext(ctx, http.MethodPost, b.BaseURL+"/embeddings", bytes.NewReader(payload))
req.Header.Set("Content-Type", "application/json")
} else {
req, _ = http.NewRequestWithContext(ctx, http.MethodGet, b.BaseURL+"/models", nil)
}
if key != "" {
req.Header.Set("Authorization", "Bearer "+key)
}
@@ -115,15 +130,17 @@ func (h *Handler) TestModel(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": resp.StatusCode < 400, "message": "HTTP " + resp.Status})
}
// broadcastActiveModel 读当前激活配置并经 NATS 广播,触发 Dispatcher 热更新。
func (h *Handler) broadcastActiveModel(ctx context.Context) {
row, _ := h.db.GetActiveModel(ctx)
if row == nil {
return
// broadcastActive 重新广播各 kind 当前激活配置,触发对应消费方热更新。
func (h *Handler) broadcastActive(ctx context.Context) {
for _, kind := range []string{contract.ConfigKindChat, contract.ConfigKindEmbedding} {
row, _ := h.db.GetActiveModel(ctx, kind)
if row == nil {
continue
}
_ = h.bus.PublishConfigUpdated(kind, &contract.ModelConfig{
Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model,
})
}
_ = h.bus.PublishModelConfigUpdated(&contract.ModelConfig{
Provider: row.Provider, BaseURL: row.BaseURL, APIKey: row.APIKey, Model: row.Model,
})
}
func mask(s string) string {
+6 -6
View File
@@ -48,14 +48,14 @@ func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolC
return b.inner.CallTool(ctx, subject, call)
}
// ServeModelConfig 让网关作为配置控制面,响应 Dispatcher 的模型配置请求。
func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (func() error, error) {
return b.inner.ServeModelConfig(provide)
// ServeConfig 让网关作为配置控制面,响应某 kind 的配置请求。
func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (func() error, error) {
return b.inner.ServeConfig(kind, provide)
}
// PublishModelConfigUpdated 广播模型配置变更。
func (b *Bus) PublishModelConfigUpdated(cfg *contract.ModelConfig) error {
return b.inner.PublishModelConfigUpdated(cfg)
// PublishConfigUpdated 广播某 kind 的配置变更。
func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error {
return b.inner.PublishConfigUpdated(kind, cfg)
}
func (b *Bus) Close() { b.inner.Close() }
+22 -13
View File
@@ -6,26 +6,31 @@ import (
"gorm.io/gorm"
)
// LLMModel 是一个 LLM 后端配置(控制面:管理员在此登记可用模型)。
// 表名 sundynix_model(遵守前缀约定)。同一时刻仅一条 Active=true。
// LLMModel 是一个模型后端配置(控制面:管理员在此登记可用模型)。
// 表名 sundynix_model(遵守前缀约定)。每个 kind 同一时刻仅一条 Active=true。
type LLMModel struct {
ID uint `gorm:"primaryKey"`
Provider string `gorm:"size:32"` // openai-compatible / vllm
BaseURL string `gorm:"size:255"` // 如 https://api.deepseek.com/v1
Kind string `gorm:"size:16;index"` // chat / embedding
Provider string `gorm:"size:32"` // openai-compatible / vllm
BaseURL string `gorm:"size:255"` // 如 https://api.deepseek.com
APIKey string `gorm:"size:255"`
Model string `gorm:"size:64"` // 如 deepseek-chat
Model string `gorm:"size:64"` // 如 deepseek-chat / text-embedding-v3
Active bool
}
func (LLMModel) TableName() string { return "sundynix_model" }
// ListModels 列出全部模型配置
func (p *Postgres) ListModels(ctx context.Context) ([]LLMModel, error) {
// ListModels 列出某 kind 的模型配置(kind 空则全部)
func (p *Postgres) ListModels(ctx context.Context, kind string) ([]LLMModel, error) {
if p.db == nil {
return nil, nil
}
var rows []LLMModel
err := p.db.WithContext(ctx).Order("id").Find(&rows).Error
q := p.db.WithContext(ctx).Order("id")
if kind != "" {
q = q.Where("kind = ?", kind)
}
err := q.Find(&rows).Error
return rows, err
}
@@ -37,26 +42,30 @@ func (p *Postgres) SaveModel(ctx context.Context, m *LLMModel) error {
return p.db.WithContext(ctx).Save(m).Error
}
// SetActiveModel 把指定模型设为激活(其余取消),事务保证唯一激活。
// SetActiveModel 把指定模型设为激活(同 kind 内其余取消),事务保证每 kind 唯一激活。
func (p *Postgres) SetActiveModel(ctx context.Context, id uint) error {
if p.db == nil {
return errStoreDisabled
}
return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Model(&LLMModel{}).Where("active = ?", true).Update("active", false).Error; err != nil {
var m LLMModel
if err := tx.First(&m, id).Error; err != nil {
return err
}
if err := tx.Model(&LLMModel{}).Where("kind = ? AND active = ?", m.Kind, true).Update("active", false).Error; err != nil {
return err
}
return tx.Model(&LLMModel{}).Where("id = ?", id).Update("active", true).Error
})
}
// GetActiveModel 返回当前激活模型(无则 nil)。
func (p *Postgres) GetActiveModel(ctx context.Context) (*LLMModel, error) {
// GetActiveModel 返回某 kind 当前激活模型(无则 nil)。
func (p *Postgres) GetActiveModel(ctx context.Context, kind string) (*LLMModel, error) {
if p.db == nil {
return nil, nil
}
var m LLMModel
err := p.db.WithContext(ctx).Where("active = ?", true).First(&m).Error
err := p.db.WithContext(ctx).Where("kind = ? AND active = ?", kind, true).First(&m).Error
if err != nil {
return nil, nil // 未配置激活模型
}
+2
View File
@@ -38,6 +38,8 @@ func OpenPostgres(dsn string) *Postgres {
log.Printf("[store] postgres AutoMigrate 失败,降级运行: %v", err)
return &Postgres{}
}
// 回填:kind 列新增前的旧模型行默认归为 chat(幂等)。
db.Model(&LLMModel{}).Where("kind = '' OR kind IS NULL").Update("kind", "chat")
log.Println("[store] postgres connected, migrated sundynix_user / sundynix_task")
return &Postgres{db: db}
}
+21 -1
View File
@@ -7,8 +7,10 @@ import (
"os"
"os/signal"
"syscall"
"time"
sharedbus "github.com/sundynix/sundynix-shared/bus"
"github.com/sundynix/sundynix-shared/contract"
"github.com/sundynix/sundynix-mcp-go/internal/history"
"github.com/sundynix/sundynix-mcp-go/internal/mcp"
@@ -42,8 +44,26 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
ragEngine := rag.Open(ctx, milvusAddr, embBase, embKey, embModel) // RAG 核心链:embedding + Milvus
ragEngine := rag.Open(ctx, milvusAddr, embBase, embKey, embModel) // RAG 核心链:embedding(env 初值) + Milvus
defer ragEngine.Close()
// 配置控制面:启动取激活 embedding 配置 + 订阅热更新(覆盖 env,持久化由 Gateway 管)。
applyEmbed := func(cfg *contract.ModelConfig) {
if cfg != nil {
ragEngine.SetEmbedding(cfg.BaseURL, cfg.APIKey, cfg.Model)
}
}
cctx, ccancel := context.WithTimeout(ctx, 3*time.Second)
if cfg, _ := b.RequestConfig(cctx, contract.ConfigKindEmbedding); cfg != nil {
applyEmbed(cfg)
} else {
log.Println("[mcp_go] 未取到 embedding 控制面配置(用 env 或降级)")
}
ccancel()
if _, err := b.SubscribeConfigUpdated(contract.ConfigKindEmbedding, applyEmbed); err != nil {
log.Printf("[mcp_go] subscribe embedding config: %v", err)
}
gw := mcp.NewGateway(b, engine, mem, hist, ragEngine)
log.Println("[mcp_go] serving MCP over sundynix.tools.go.* (Ctrl-C to quit)")
+26 -7
View File
@@ -7,22 +7,41 @@ import (
"errors"
"log"
"strings"
"sync"
)
// Engine 聚合 embedding 与 Milvus,对外提供入库/检索。
// Engine 聚合 embedding 与 Milvus,对外提供入库/检索。embedding 可热更新(控制面下发)。
type Engine struct {
mu sync.RWMutex
emb *embedClient
mv *milvusStore
}
// SetEmbedding 热更新 embedding 配置(控制面变更时调用)。空配置=关闭向量检索。
func (e *Engine) SetEmbedding(base, key, model string) {
e.mu.Lock()
defer e.mu.Unlock()
if base == "" || model == "" {
e.emb = nil
return
}
e.emb = newEmbedClient(base, key, model)
log.Printf("[rag] embedding 配置: %s model=%s", base, model)
}
func (e *Engine) embed() *embedClient {
e.mu.RLock()
defer e.mu.RUnlock()
return e.emb
}
// Open 建立 RAG 引擎。embedding 未配 / Milvus 连不上 → 降级(检索返回空,不阻断工具服务)。
func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel string) *Engine {
e := &Engine{}
if embBase != "" && embModel != "" {
e.emb = newEmbedClient(embBase, embKey, embModel)
log.Printf("[rag] embedding: %s model=%s", embBase, embModel)
e.SetEmbedding(embBase, embKey, embModel) // env 初值(控制面会覆盖)
} else {
log.Println("[rag] embedding 未配置,向量检索降级")
log.Println("[rag] embedding 未配置(待控制面下发),向量检索降级")
}
if milvusAddr != "" {
mv, err := openMilvus(ctx, milvusAddr)
@@ -37,7 +56,7 @@ func Open(ctx context.Context, milvusAddr, embBase, embKey, embModel string) *En
}
// Ready 报告 RAG 是否可用(embedding + Milvus 均就绪)。
func (e *Engine) Ready() bool { return e.emb.ready() && e.mv != nil }
func (e *Engine) Ready() bool { return e.embed().ready() && e.mv != nil }
// Ingest 把一段文本切块 → 向量化 → 写入 Milvus,返回块数。
func (e *Engine) Ingest(ctx context.Context, kb, text string) (int, error) {
@@ -48,7 +67,7 @@ func (e *Engine) Ingest(ctx context.Context, kb, text string) (int, error) {
if len(chunks) == 0 {
return 0, nil
}
vecs, err := e.emb.Embed(ctx, chunks)
vecs, err := e.embed().Embed(ctx, chunks)
if err != nil {
return 0, err
}
@@ -66,7 +85,7 @@ func (e *Engine) Search(ctx context.Context, kb, query string, topK int) ([]Hit,
if topK <= 0 {
topK = 5
}
vecs, err := e.emb.Embed(ctx, []string{query})
vecs, err := e.embed().Embed(ctx, []string{query})
if err != nil || len(vecs) == 0 {
return nil, err
}
+15 -15
View File
@@ -185,10 +185,10 @@ func respond(m *nats.Msg, res *contract.ToolResult) {
// ---- 配置控制面(core NATS request-reply + broadcast----
// RequestModelConfig 向控制面(Gateway)请求当前激活的模型配置
// RequestConfig 向控制面(Gateway)请求某 kind 当前激活配置(chat/embedding
// 无人应答 / 无激活配置时返回 (nil, nil),由调用方降级。
func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, error) {
msg, err := b.nc.RequestWithContext(ctx, contract.SubjectConfigModelGet, nil)
func (b *Bus) RequestConfig(ctx context.Context, kind string) (*contract.ModelConfig, error) {
msg, err := b.nc.RequestWithContext(ctx, contract.ConfigGetSubject(kind), nil)
if err != nil {
return nil, nil // 控制面暂不可用,降级
}
@@ -197,7 +197,7 @@ func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, er
}
var cfg contract.ModelConfig
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
return nil, fmt.Errorf("unmarshal model config: %w", err)
return nil, fmt.Errorf("unmarshal %s config: %w", kind, err)
}
if !cfg.Ready() {
return nil, nil
@@ -205,9 +205,9 @@ func (b *Bus) RequestModelConfig(ctx context.Context) (*contract.ModelConfig, er
return &cfg, nil
}
// ServeModelConfig 让控制面响应配置请求;provide 返回当前激活配置(可为 nil)。
func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.SubjectConfigModelGet, func(m *nats.Msg) {
// ServeConfig 让控制面响应某 kind 的配置请求;provide 返回当前激活配置(可为 nil)。
func (b *Bus) ServeConfig(kind string, provide func() *contract.ModelConfig) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.ConfigGetSubject(kind), func(m *nats.Msg) {
var data []byte
if cfg := provide(); cfg != nil {
data, _ = json.Marshal(cfg)
@@ -215,30 +215,30 @@ func (b *Bus) ServeModelConfig(provide func() *contract.ModelConfig) (unsub func
_ = m.Respond(data)
})
if err != nil {
return nil, fmt.Errorf("serve model config: %w", err)
return nil, fmt.Errorf("serve %s config: %w", kind, err)
}
return sub.Unsubscribe, nil
}
// PublishModelConfigUpdated 广播模型配置变更(Dispatcher 据此热更新)。
func (b *Bus) PublishModelConfigUpdated(cfg *contract.ModelConfig) error {
// PublishConfigUpdated 广播某 kind 的配置变更(消费方据此热更新)。
func (b *Bus) PublishConfigUpdated(kind string, cfg *contract.ModelConfig) error {
data, err := json.Marshal(cfg)
if err != nil {
return err
}
return b.nc.Publish(contract.SubjectConfigModelUpdated, data)
return b.nc.Publish(contract.ConfigUpdatedSubject(kind), data)
}
// SubscribeModelConfigUpdated 订阅模型配置变更。
func (b *Bus) SubscribeModelConfigUpdated(onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.SubjectConfigModelUpdated, func(m *nats.Msg) {
// SubscribeConfigUpdated 订阅某 kind 的配置变更。
func (b *Bus) SubscribeConfigUpdated(kind string, onUpdate func(*contract.ModelConfig)) (unsub func() error, err error) {
sub, err := b.nc.Subscribe(contract.ConfigUpdatedSubject(kind), func(m *nats.Msg) {
var cfg contract.ModelConfig
if json.Unmarshal(m.Data, &cfg) == nil {
onUpdate(&cfg)
}
})
if err != nil {
return nil, fmt.Errorf("subscribe model config: %w", err)
return nil, fmt.Errorf("subscribe %s config: %w", kind, err)
}
return sub.Unsubscribe, nil
}
+12 -7
View File
@@ -30,18 +30,23 @@ const (
// MetaSessionID 是 Task.Meta 中承载会话标识的键(用于短期多轮历史)。
MetaSessionID = "session_id"
// 配置控制面Gateway 持有配置,Dispatcher 经 NATS 取用/订阅变更)
SubjectConfigModelGet = "sundynix.config.model.get" // request-reply:取当前激活模型配置
SubjectConfigModelUpdated = "sundynix.config.model.updated" // broadcast:模型配置变更通知
// 配置控制面按 kind 寻址:sundynix.config.<kind>.get / .updated
// Gateway 持有配置,消费方(Dispatcher/mcp-go)经 NATS 取用/订阅变更。
ConfigKindChat = "chat" // 对话模型(Dispatcher 用)
ConfigKindEmbedding = "embedding" // 向量模型(mcp-go RAG 用)
)
// ModelConfig 是一个 LLM 后端的连接配置(provider 抽象)
// 开发期指向第三方在线 API(OpenAI 兼容);生产期可换自部署(vLLM)或其它在线模型。
// ConfigGetSubject / ConfigUpdatedSubject 返回某类配置的 request / 广播主题
func ConfigGetSubject(kind string) string { return "sundynix.config." + kind + ".get" }
func ConfigUpdatedSubject(kind string) string { return "sundynix.config." + kind + ".updated" }
// ModelConfig 是一个模型后端的连接配置(provider 抽象,chat 与 embedding 同形)。
// 开发期指向第三方在线 API(OpenAI 兼容);生产期可换自部署或其它在线模型。
type ModelConfig struct {
Provider string `json:"provider"` // openai-compatible / vllm / ...
BaseURL string `json:"base_url"` // 如 https://api.deepseek.com/v1
BaseURL string `json:"base_url"` // 如 https://api.deepseek.com
APIKey string `json:"api_key,omitempty"`
Model string `json:"model"` // 如 deepseek-chat
Model string `json:"model"` // 如 deepseek-chat / text-embedding-v3
}
// Ready 报告该配置是否足以发起真实推理。