Compare commits

...

10 Commits

Author SHA1 Message Date
Blizzard b06c768f11 feat: 长期记忆抽取设计优化md 2026-06-19 22:18:54 +08:00
Blizzard 034ee51b0d chore: 更新 go.work.sum(prometheus client 依赖)
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 11:52:08 +08:00
Blizzard 597665f3c8 feat(admin): 计价配置(按模型·分输入/输出单价)—— 计费比率配置落地
计费需 token↔真钱比率,配置归管理端。本次落地"按模型·分输入/输出"粒度:

后端(gateway):
- store.Pricing 模型(BaseModel + model_id 唯一 + input_per_1k/output_per_1k + currency),
  AutoMigrate 建 sundynix_pricing;ListPricing/UpsertPricing(OnConflict model_id 覆盖)。
- admin handler:GET /admin/pricing 列表、PUT /admin/pricing 设置(校验非负,币种默认 CNY),
  挂在 RequireAdmin 组下。

前端(admin):
- api:listPricing/savePricing(带 Bearer)。
- PricingPage:列出所有已登记模型(chat+embedding),每行可编辑 输入/输出每1K单价 + 币种,逐行保存。
- routes 新增「计价」页(配置组)。

实测:PUT→ok;GET 返回正确行;重复 PUT 同 model_id 仍 1 行且值更新(upsert 生效);表自动迁移。
前端 tsc 干净。下一步可做用量计量 × 单价折算(真正计费)。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 11:25:24 +08:00
Blizzard 030dcda9b4 feat(admin): 管理端接登录 + Bearer 鉴权 + 修雪花ID/探活(适配硬化后的网关)
/admin 加 RequireAdmin 后管理端原来无鉴权会 401。本次打通:
- api.ts:JWT token 存 localStorage + 所有 /admin 调用带 Bearer + 401 清令牌广播登出;
  login/me;gatewayOnline 改用公开 /healthz(原 /billing 已转受保护会误判离线)。
- 修类型:Model.id number → string(模型 id 早已迁雪花字符串)。
- Login 登录门 + App 鉴权门(启动校验 me,无则登录页)+ AppShell 显示用户/登出。

实测(硬化网关):无 token /admin/models → 401;登录拿 token → 200 返回模型(string id)。
dev 未配 ADMIN_USER_IDS 时任意登录账号放行;生产须在白名单。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 11:09:02 +08:00
Blizzard 8f619c2a62 test(dispatcher): 引擎主链路集成测试(pool 抽接口 + 假替身端到端)
把 Orchestrator.pool 从 *llm.Pool 抽成 LLM 接口(Ready/ChatStream/StreamText/Chat),
*llm.Pool 天然满足、main 不变;从而可注入假模型做端到端测试,不依赖网络/Docker/LLM。

新增 integration_test.go(假 LLM/工具/sink/exec 替身):
- runGraph 分支路由:true/false 边标签精确选路(true 边故意列后)。
- runGraph 工具→agent:工具产出注入 agent 上下文。
- runGraph map fan-out:拆项 → 各章并行撰写 → 多章成稿。
- runGraph 输出护栏:流式 token 中疑似密钥被脱敏。
- handleReport:规划 → 分章撰写 → report_store 存源 → 流含标题/各章 + CompleteStream。

全部 go test -race 通过(修了测试替身 fakeExec 的并发追加竞态;生产 ExecSink 安全)。
至此引擎与报告主链路从"仅手动验证"升级为自动化端到端覆盖。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:51:39 +08:00
Blizzard 9b33a62573 docs: PROGRESS 对齐实况(记忆抽取已完成勾选 + push 债更新 + 日期)
记忆抽取(9c19bb4)已完成并 push,但 PROGRESS 那两行回退成未勾;push 债行停留在旧值。
按已提交实况校正,并把可观测性纳入。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:42:40 +08:00
Blizzard b6a6875795 feat(gateway): 可观测性 —— Prometheus 指标 + 结构化日志 + 探针
往"生产可运维"推一步(网关前门):
- Prometheus /metrics:sundynix_http_requests_total{method,route,status}、
  request_duration_seconds 直方图、requests_in_flight。route 用 c.FullPath()
  路由模板(/tasks/:id/...)避免按真实路径高基数。
- 结构化访问日志:slog JSON 到 stderr(request_id/method/route/status/latency_ms/
  ip/uid/bytes),替代 gin 默认文本日志;gin.New()+Recovery 自管中间件链。
- RequestID 中间件:生成/透传 X-Request-ID,写上下文+响应头,供日志关联。
- 探针:/healthz(liveness,不查依赖)、/readyz(readiness,DB+Redis 就绪才 200,
  否则 503),供 k8s 等导流判断;/api/v1/health 深度聚合保留。
- 三个根端点不挂业务鉴权(/metrics 生产应由网络层限制抓取来源)。

验证:单测(计数 +1 / X-Request-ID 生成与透传);实跑 /healthz 200、/readyz 200
(db,redis ready)、/metrics 输出真实指标、访问日志 JSON 正常、X-Request-ID 回写。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:38:31 +08:00
Blizzard e05e6f5903 fix(gateway): 三处生产安全硬化(默认密钥/admin裸奔/CORS)
1) JWT 默认密钥:生产模式(APP_ENV=production|prod 或 GIN_MODE=release)下若未设
   JWT_SECRET 直接 log.Fatal,杜绝用开发默认值签发可伪造令牌;开发期警告并放行。
2) /admin 运维控制面(含模型 API 密钥管理)改挂 RequireAdmin:必须登录 +
   (设了 ADMIN_USER_IDS 则)uid 须在白名单;生产期未配置管理员直接 403。
3) CORS Allow-Origin 由 CORS_ALLOW_ORIGIN 配置(缺省 * 仅开发),非 * 时加 Vary。

build + auth 单测通过。仍属"小范围灰度"级,TLS/可观测/集成测试/HA 见 PROGRESS。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 12:55:04 +08:00
Blizzard 9c19bb44f1 feat(dispatcher): 长期偏好记忆抽取(补全记忆闭环)
memorize 的 TODO 落地:写回阶段(异步、离热路径)从本轮对话用 LLM 抽取用户
长期稳定偏好 → 与已有画像去重 → memory_upsert 登记。

- extractMemory:模型/工具不可用或输入过短则跳过;复用 llmCtx 超时;
  抽取 prompt 只取长期偏好、忽略一次性信息。
- 纯逻辑(可单测):parsePrefs(容忍 json 代码围栏)、parseProfile(把 memory_get
  渲染的"- 维度:值"解析回 map,兼容全/半角冒号)、filterNewPrefs(新增/变更才留,
  同批同 key 去重)。
- 单测覆盖三者;LLM 抽取调用沿用已验证的 pool.Chat 模式。

至此记忆闭环:召回(memory_get) + 历史写回 + 偏好自动抽取 全通。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 12:47:49 +08:00
Blizzard aa3139da68 feat(mcp-go): external_api 通用出站 HTTP 工具(带 SSRF 防护)
新增 external_api 工具(GET/POST):agent 图可调外部 API。安全为先:
- SSRF 防护 validateExternalURL/isBlockedIP:scheme 限 http/https;拒环回/内网
  /链路本地(含 169.254.169.254 云元数据)/未指定 IP;重定向同样校验、限 3 跳。
- 可选 EXTERNAL_API_ALLOWLIST(逗号分隔主机,支持子域)收窄到白名单。
- 超时 10s + 响应体限 256KB。
- 校验逻辑纯函数,单测覆盖(内网/元数据/scheme/白名单,字面量 IP 离线判定)。

注册进 mcp-go dispatch(external_api → externalAPI)。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 11:58:45 +08:00
27 changed files with 1559 additions and 62 deletions
+10 -7
View File
@@ -2,7 +2,7 @@
对照 [architecture.md](architecture.md)(5 层 + NATS 总线)与功能规划的完成度。**活文档:完成一项就把 `[ ]` 勾成 `[x]`。**
图例:`[x]` 已完成 · `[ ]` 未做 · 🟡 部分/桩(旁注说明)。最近更新:2026-06-17(至提交 `79f9912`)。
图例:`[x]` 已完成 · `[ ]` 未做 · 🟡 部分/桩(旁注说明)。最近更新:2026-06-19(至提交 `b6a6875`)。
---
@@ -31,9 +31,11 @@
- [x] 模型配置控制面(按 kind 经 NATS 下发给 dispatcher / mcp-go
- [x] 独立运维控制台 sundynix-admin(模型 / 数据源页)
- [x] SSE 回流:Token 流 / 执行轨迹 / 入库进度
- [x] 可观测性:Prometheus /metrics(请求数/耗时/在途,路由模板低基数)· 结构化 JSON 访问日志 + X-Request-ID · /healthz(存活) + /readyz(就绪) 探针
- [x] Harness **输入**护栏(拦提示词注入 + 超大体,纯逻辑 `internal/guardrail` + 单测 + 实跑验证)
- [x] Harness **输出**护栏(dispatcher 发射层逐片脱敏疑似密钥/令牌 sk-/AKIA/JWT/Bearer + 轨迹标记 + 单测)
- [ ] 🟡 商业化与计费模块(占位,仅统计任务数
- [x] 计价配置(按模型·分输入/输出每1K单价+币种;admin 计价页 + /admin/pricing 端点 + sundynix_pricing 表
- [ ] 🟡 计费/商业化:用量计量×单价折算 + 配额(计价配置已完成,计量待做)
## 第 3 层 · MESSAGE BUSNATS 零拷贝骨干网)
@@ -55,7 +57,7 @@
- [x] 会话历史写回
- [x] Harness 熔断降级中心(真三态状态机 Closed/Open/HalfOpen + 单测含 -race;熔断时回流提示并收尾流,不静默丢弃)
- [x] Harness LLM 自动化评测(规则检查 + LLM-as-judge,异步 off 热路径评分记录 + 单测)
- [ ] 长期偏好记忆抽取(LLM 抽取 → 去重 → memory_upsertTODO
- [x] 长期偏好记忆抽取(writeback 异步:LLM 抽取 → 解析画像去重 → memory_upsert;纯逻辑单测
## 第 5 层 · MCP TOOLS
@@ -67,7 +69,7 @@
- [x] memory_get / memory_upsert · history_get / history_append
- [x] report_render / report_store / report_export
- [x] Word 渲染(🟡 自建零依赖 OOXML,非 UniOffice —— 偏差,UniOffice 商业授权)
- [ ] external_api外部 API 工具
- [x] external_api通用出站 HTTP:SSRF 防护 + 可选主机白名单 + 超时/限重定向/限体;含单测
### sundynix-mcp-py(算法型)
@@ -82,7 +84,8 @@
- [x] 文件主表,文档间关联用雪花 ID(弃用按名关联)
- [x] 后端首批单测(19 纯逻辑用例:引擎/DSL/docx/报告)+ mcp-go 集成测试(Profile 迁移)
- [x] **真实鉴权(JWT)闭环**:后端注册/登录/校验 + RequireAuth 保护路由 + owner=已验证 uid(去掉 header 兜底);前端登录/注册门 + 存 token + Bearer + 401 自动登出 + 顶栏用户/登出。实跑验证(含 CORS Authorization 修复)
- [ ] 集成/前端测试(`runGraph` / `handleReport` 需 mock pool/tools/sink;前端无测试
- [x] 生产安全硬化:JWT 默认密钥生产 fail-fast · /admin 加 RequireAdminADMIN_USER_IDS 白名单)· CORS 来源可配(CORS_ALLOW_ORIGIN
- [x] 后端集成测试:pool 抽成 LLM 接口,runGraph(分支/工具/map/脱敏)+handleReport 端到端假替身测试(含 -race);🟡 前端测试仍无
---
@@ -91,13 +94,13 @@
- [x] **真实登录 / 鉴权(JWT** —— 后端 + 前端闭环已完成 ✅
- [x] **代码解释器 + 安全沙箱**AST 守卫 + Docker 隔离已落地 ✅;生产可换 gVisor/Kata
- [x] **Harness 三件全完成** ✅:熔断降级 · 输入护栏 · LLM 评测 · 输出护栏(密钥脱敏)
- [ ] **长期记忆抽取** + external_api 工具
- [x] **长期记忆抽取** + external_api ✅(记忆闭环全通)
- [ ] **计费 / 商业化**真实实现
- [ ] 微服务化拆分(Morph B)—— 现为 Monolith First**按设计如此,非缺陷**
## 收尾小债
- [ ] 6 个提交待 push`5d76652``79f9912`,需在普通终端 `git push origin main`
- [ ] push:本地 1 个提交待 pushb6a6875);其余已 push
- [ ] PDF 导出 Wails 真机验证(不行则回退后端内嵌 CJK 字体出 PDF)
- [x] 报告生成并发健壮性(每次 LLM 调用 60s 超时上限,挂死自释放;规划/分章/撰写均套)
- [x] MinIO 孤儿 GC:重名覆盖后旧对象(转内联/换键)从 MinIO 删除(SaveDoc 返回旧键,runIngest 清理)
+5
View File
@@ -27,6 +27,7 @@ github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stg
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
@@ -45,6 +46,7 @@ go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnw
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
@@ -56,6 +58,7 @@ golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
@@ -63,6 +66,7 @@ golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
@@ -80,6 +84,7 @@ golang.org/x/term v0.44.0/go.mod h1:7ze4MdzUzLXpSAoFP1H0bOI9aXDqveSvatT5vKcFh2Y=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
+228
View File
@@ -0,0 +1,228 @@
# 生产级 AI Agent 长期记忆 — 行业主流方案
## 行业共识:四层记忆架构
2025 年以来,行业在长期记忆上已经形成了**四层分类共识**(模仿认知科学):
| 层 | 类型 | 内容 | 生命周期 |
|---|---|---|---|
| L1 | **工作记忆** | 当前对话的上下文窗口 | 会话内 |
| L2 | **短期/情景记忆** | 近期对话摘要、事件日志 | 数小时 ~ 数天 |
| L3 | **语义记忆** | 提炼出的事实、偏好、知识 | 持久(可衰减) |
| L4 | **程序记忆** | 学到的行为规则、约束、SOP | 持久 |
> **sundynix-agentix 当前只覆盖了 L1context window+ L2history+ L3 的最简形态(KV profile)。**
---
## 四大主流方案详解
### 1️⃣ OpenAI ChatGPT — "Dreaming" 机制
**思路**:不靠向量检索,靠**后台异步合成摘要**。
```
用户对话 → 结束后异步触发 "Dreaming"
分析历史 → 合成/更新 Memory Summary(结构化事实列表)
下次对话 → 整个 Memory Summary 注入 context window
```
**架构特点**
- **存储**:结构化事实列表(非向量),存为 user-scoped 文档
- **写入**:后台异步 "Dreaming" 进程,周期性回顾历史、合成偏好
- **读取****全量注入** context window(不做检索,依赖结构化摘要足够紧凑)
- **治理**:自动失效旧记忆("traveling to Singapore" → "went to Singapore"),用户可查看/编辑/删除
- **不用 RAG**:刻意避免向量检索的延迟和复杂度,用轻量摘要替代
> [!NOTE]
> sundynix-agentix 的方案与 ChatGPT 早期版本(2024.4 Saved Memories)非常接近——都是**逐轮 LLM 抽取 + 全量注入**。
> ChatGPT 后来演进到 Dreaming 是因为发现逐轮抽取会导致"记忆腐烂"noisy/stale),需要**周期性整理合并**。
---
### 2️⃣ Mem0 — 行业最主流的记忆中间件
**思路**:三阶段流水线 + 混合存储。被认为是当前**事实上的行业标准**。
```
对话消息 → Extract → Consolidate → Store
┌───────────┴───────────┐
Vector Store Graph Store (Mem0g)
(语义检索) (关系推理)
查询时 → Retrieve (语义相似 + 图谱关联) → 注入 prompt
```
**三阶段流水线**
| 阶段 | 做什么 | 关键区别 |
|---|---|---|
| **Extract** | LLM 从对话中抽取结构化事实和关系 | 类似 sundynix 的 `extractMemory` |
| **Consolidate** | LLM 将新事实与已有记忆**对账**,输出 ADD / UPDATE / DELETE / NOOP | ⚠️ sundynix 缺失这一层 |
| **Store** | 分别写入 Vector Store + Graph Store + KV Store | sundynix 只有 KV |
**Consolidate 是核心差异**
```
已有记忆: "用户喜欢 Vue"
新抽取: "用户说最近在学 React,打算迁移"
Mem0 Consolidate → UPDATE: "用户从 Vue 迁移到 React"
sundynix → 如果 key 不同 → 两条并存("前端框架:Vue" + "技术学习:React"
如果 key 恰好相同 → 覆盖(但依赖 LLM 用同一个 key,不可控)
```
**检索方式**
- **不是全量注入**,而是用当前 query embedding 做 top-K 语义检索
- 同时走 Graph Store 做关系关联检索("用户在 X 公司" → 关联出 "X 公司用 Go 技术栈"
- 大幅减少 token 浪费
---
### 3️⃣ Letta(原 MemGPT)— 操作系统式自管理记忆
**思路**:Agent 像操作系统一样**自己管理**自己的内存层级。
```
┌─────────────────────────────────────────┐
│ Context Window ("RAM") │
│ ┌─────────┐ ┌──────────┐ ┌─────┐ │
│ │ System │ │ Recall │ │User │ │
│ │ Prompt │ │ Memory │ │Msg │ │
│ │(可编辑) │ │(检索结果) │ │ │ │
│ └─────────┘ └──────────┘ └─────┘ │
└─────────────┬───────────────────────────┘
│ Agent 主动调用内存管理工具
┌─────────────────────────────────────────┐
│ Archival Memory ("Disk") — 向量存储 │
│ Agent 可以 search / insert / delete │
└─────────────────────────────────────────┘
```
**关键设计**
- Agent 自己决定什么时候存记忆(不是外部异步抽取)
- Agent 自己决定什么时候查记忆(通过 tool call)
- System prompt 是**可编辑的**Agent 可以修改自己的"人格"和"核心记忆"
- Archival Memory 是无限的向量存储,Agent 按需检索
**与 sundynix 的对比**sundynix 是外部驱动(orchestrator 异步抽取),Letta 是 Agent 自驱动。
---
### 4️⃣ Zep / Graphiti — 双时态知识图谱
**思路**:面向**时间敏感**场景,用图谱追踪事实的时间演变。
```
对话/文档 → Graphiti 引擎 → 三层子图:
├─ Episode 子图:原始事件(无损保留,带时间戳)
├─ Semantic Entity 子图:提取的实体和关系(语义检索)
└─ Community 子图:实体聚类(高层概览)
```
**双时态模型(Bi-temporal**
每条事实记录两个时间维度:
| 维度 | 含义 | 示例 |
|---|---|---|
| **Valid Time** | 事实在现实中为真的时间 | "2024-01 ~ 2025-03 用户在 A 公司" |
| **Ingestion Time** | 系统何时发现这个事实 | "2025-04 从对话中得知" |
```
2024-01: 用户说 "我在 A 公司做后端"
2025-03: 用户说 "我上个月从 A 离职了"
Zep 存储:
(用户) -[就职于]-> (A公司) valid: 2024-01 ~ 2025-02, invalidated_at: 2025-03
(用户) -[离职于]-> (A公司) valid: 2025-03 ~
普通 KV 存储:
公司: A公司 → 被覆盖为空,或两条矛盾并存
```
- 可以回答"用户去年这个时候在做什么?"
- 旧事实**失效但不删除**,保留完整时间线
- 企业级场景(合规、审计、保单变更)的首选
---
## 对比总览
| 维度 | sundynix 现状 | ChatGPT | Mem0 | Letta | Zep |
|---|---|---|---|---|---|
| **存储** | PG 扁平 KV | 结构化文档 | Vector + Graph + KV | Vector (Archival) | 双时态知识图谱 |
| **抽取** | 逐轮 LLM 抽取 | 后台周期 Dreaming | Extract 阶段 | Agent 自主存储 | Graphiti 引擎 |
| **去重/合并** | exact key match | Dreaming 合成 | **Consolidate (ADD/UPDATE/DELETE)** | Agent 自管理 | 双时态失效 |
| **检索** | **全量注入** | 全量注入(紧凑摘要) | **语义 top-K + 图谱关联** | Agent 主动搜索 | 语义 + 时间 + 图谱 |
| **时间维度** | ❌ | 有(自动过时) | ❌ | ❌ | ✅ 双时态 |
| **遗忘** | ❌ | ✅ 自动 | ✅ DELETE 操作 | ✅ Agent 决定 | ✅ 失效不删 |
| **用户控制** | ❌ | ✅ 查看/编辑/删除 | 有 API | 有 API | 有 API |
| **复杂度** | ★☆☆☆☆ | ★★☆☆☆ | ★★★☆☆ | ★★★★☆ | ★★★★★ |
---
## sundynix-agentix 的差距与改造路线
### 与行业标准(Mem0 模式)的 3 个关键差距
```mermaid
flowchart LR
A["1. 缺 Consolidate 层<br/>无法 ADD/UPDATE/DELETE<br/>只有盲目 upsert"] --> D["记忆腐烂<br/>冗余/过时/矛盾"]
B["2. 全量注入<br/>不做相关性检索"] --> E["token 浪费<br/>噪声干扰"]
C["3. 无时间/置信度<br/>维度"] --> F["无法遗忘<br/>无法区分强弱偏好"]
```
### 建议改造路线(利用已有基础设施)
项目已有 **Milvus + Neo4j + Postgres**,完全可以不引入新依赖,渐进实现 Mem0 级别的能力:
#### Phase 1 — 加 Consolidate 层(改 `memory_extract.go`,约 50 行)
把现在的 "抽取 → 盲目 upsert" 改为 "抽取 → LLM 对账 → 决策执行":
```go
// 现在的流程:
fresh := filterNewPrefs(parsePrefs(txt), existing) // exact match 去重
for _, p := range fresh {
o.upsertMemory(ctx, uid, p.Key, p.Value) // 盲 upsert
}
// 改为 Mem0 式 Consolidate
// 1. 把新抽取 + 已有画像一起交给 LLM
// 2. LLM 输出操作列表:[{op:"ADD/UPDATE/DELETE", key, value, reason}]
// 3. 按操作执行
```
#### Phase 2 — 按需检索代替全量注入(复用 Milvus,改 `compile.go`
```go
// 现在 (compile.go:27-31):全量注入
if rc.Profile != "" {
sys.WriteString("\n\n关于当前用户的已知信息:\n")
sys.WriteString(rc.Profile) // 全部偏好
}
// 改为:memory_get 传入 queryMilvus 语义检索 top-5 相关偏好
// memory store 存偏好时同时 embedding → Milvus memory collection
// 读取时用 query embedding → top-K → 只注入相关的
```
#### Phase 3 — 图谱记忆(复用 Neo4j)
```
memory_upsert 时同时写入 Neo4j
(User:uid) -[偏好 {key, since, confidence}]-> (Value:value)
检索时可以做关系推理:
"用户的公司" → "公司的技术栈" → 关联出相关偏好
```
> [!TIP]
> **最务实的改造优先级**Phase 1Consolidate> Phase 2(语义检索)> Phase 3(图谱)。
> Phase 1 改动最小(只改 `memory_extract.go` 的抽取 prompt 和处理逻辑),
> 但效果最显著——直接解决记忆腐烂这个最大的 Day-2 问题。
+29 -1
View File
@@ -1,11 +1,39 @@
import { useEffect, useState } from "react";
import { HashRouter } from "react-router-dom";
import { AppShell } from "./shell/AppShell";
import { Login } from "./Login";
import { me, clearToken, type AuthUser } from "./api";
// 用 HashRouter:纯静态托管/桌面内嵌都能深链,无需服务端路由配置。
export default function App() {
const [user, setUser] = useState<AuthUser | null>(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
me()
.then(setUser)
.catch(() => setUser(null))
.finally(() => setLoading(false));
const onLogout = () => setUser(null);
window.addEventListener("sdx:logout", onLogout);
return () => window.removeEventListener("sdx:logout", onLogout);
}, []);
if (loading) {
return <div className="flex h-screen w-screen items-center justify-center text-sm text-gray-400"></div>;
}
if (!user) {
return <Login onAuthed={setUser} />;
}
return (
<HashRouter>
<AppShell />
<AppShell
user={user}
onLogout={() => {
clearToken();
setUser(null);
}}
/>
</HashRouter>
);
}
+62
View File
@@ -0,0 +1,62 @@
import { useState } from "react";
import { login, type AuthUser } from "./api";
// 管理端登录门:登录成功回调 onAuthed。需管理员账号(生产须在 ADMIN_USER_IDS 白名单)。
export function Login({ onAuthed }: { onAuthed: (u: AuthUser) => void }) {
const [email, setEmail] = useState("");
const [password, setPassword] = useState("");
const [busy, setBusy] = useState(false);
const [err, setErr] = useState("");
const submit = async () => {
if (busy || !email.trim() || !password) return;
setErr("");
setBusy(true);
try {
onAuthed(await login(email.trim(), password));
} catch (e) {
setErr((e as Error).message);
} finally {
setBusy(false);
}
};
return (
<div className="flex h-screen w-screen items-center justify-center bg-gray-50 text-gray-900">
<div className="w-[340px] rounded-xl border bg-white p-6 shadow-sm">
<div className="text-sm font-bold text-gray-800">sundynix-agentix</div>
<div className="mb-5 text-[11px] text-gray-400"> · </div>
<label className="mb-1 block text-xs text-gray-500"></label>
<input
className="mb-3 w-full rounded border px-3 py-2 text-sm focus:border-violet-500 focus:outline-none"
type="email"
value={email}
onChange={(e) => setEmail(e.target.value)}
placeholder="admin@example.com"
autoFocus
onKeyDown={(e) => e.key === "Enter" && submit()}
/>
<label className="mb-1 block text-xs text-gray-500"></label>
<input
className="mb-4 w-full rounded border px-3 py-2 text-sm focus:border-violet-500 focus:outline-none"
type="password"
value={password}
onChange={(e) => setPassword(e.target.value)}
placeholder="••••••••"
onKeyDown={(e) => e.key === "Enter" && submit()}
/>
{err && <div className="mb-3 rounded bg-rose-50 px-3 py-2 text-xs text-rose-600">{err}</div>}
<button
className="w-full rounded bg-violet-600 py-2 text-sm font-medium text-white hover:bg-violet-700 disabled:opacity-50"
disabled={busy || !email.trim() || !password}
onClick={submit}
>
{busy ? "登录中…" : "登录"}
</button>
<p className="mt-3 text-[10px] leading-relaxed text-gray-400">
ADMIN_USER_IDS
</p>
</div>
</div>
);
}
+105 -22
View File
@@ -1,12 +1,80 @@
// 运维控制台 → Gateway 控制面 API。
// 运维控制台 → Gateway 控制面 API(带 JWT 鉴权;/admin 受 RequireAdmin 保护)
export const GATEWAY: string =
(import.meta.env.VITE_GATEWAY as string | undefined) ?? "http://localhost:8080";
const ADMIN = `${GATEWAY}/api/v1/admin`;
// ---- 鉴权(JWT,存 localStorage----
const TOKEN_KEY = "sdx_admin_token";
let token = typeof localStorage !== "undefined" ? localStorage.getItem(TOKEN_KEY) ?? "" : "";
export function setToken(t: string): void {
token = t;
try {
localStorage.setItem(TOKEN_KEY, t);
} catch {
/* ignore */
}
}
export function clearToken(): void {
token = "";
try {
localStorage.removeItem(TOKEN_KEY);
} catch {
/* ignore */
}
}
export function getToken(): string {
return token;
}
function authHeaders(json = false): Record<string, string> {
const h: Record<string, string> = token ? { Authorization: `Bearer ${token}` } : {};
if (json) h["Content-Type"] = "application/json";
return h;
}
// guard 在 401(未登录) 时清令牌并广播登出(403=已登录但非管理员,照常抛错)。
function guard(res: Response): Response {
if (res.status === 401) {
clearToken();
if (typeof window !== "undefined") window.dispatchEvent(new Event("sdx:logout"));
}
return res;
}
export interface AuthUser {
id: string;
email: string;
name?: string;
}
export async function login(email: string, password: string): Promise<AuthUser> {
const res = await fetch(`${GATEWAY}/api/v1/auth/login`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ email, password }),
});
const data = (await res.json()) as { token?: string; user?: AuthUser; error?: string };
if (!res.ok || !data.token || !data.user) throw new Error(data.error ?? `登录失败: ${res.status}`);
setToken(data.token);
return data.user;
}
export async function me(): Promise<AuthUser | null> {
if (!token) return null;
const res = await fetch(`${GATEWAY}/api/v1/auth/me`, { headers: authHeaders() });
if (!res.ok) {
clearToken();
return null;
}
return ((await res.json()) as { user?: AuthUser }).user ?? null;
}
// ---- 模型配置(id 为雪花字符串)----
export type Kind = "chat" | "embedding";
export interface Model {
id: number;
id: string;
kind: Kind;
provider: string;
base_url: string;
@@ -16,7 +84,7 @@ export interface Model {
}
export interface ModelInput {
id?: number;
id?: string;
kind: Kind;
provider: string;
base_url: string;
@@ -25,44 +93,59 @@ export interface ModelInput {
}
export async function listModels(kind: Kind): Promise<Model[]> {
const res = await fetch(`${ADMIN}/models?kind=${kind}`);
const res = guard(await fetch(`${ADMIN}/models?kind=${kind}`, { headers: authHeaders() }));
if (!res.ok) throw new Error(`list failed: ${res.status}`);
return ((await res.json()) as { models: Model[] }).models;
}
export async function saveModel(m: ModelInput): Promise<number> {
const res = await fetch(`${ADMIN}/models`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(m),
});
const data = (await res.json()) as { id?: number; error?: string };
export async function saveModel(m: ModelInput): Promise<string> {
const res = guard(await fetch(`${ADMIN}/models`, { method: "POST", headers: authHeaders(true), body: JSON.stringify(m) }));
const data = (await res.json()) as { id?: string; error?: string };
if (!res.ok) throw new Error(data.error ?? `save failed: ${res.status}`);
return data.id ?? 0;
return data.id ?? "";
}
export async function setActive(id: number): Promise<void> {
const res = await fetch(`${ADMIN}/models/${id}/active`, { method: "POST" });
export async function setActive(id: string): Promise<void> {
const res = guard(await fetch(`${ADMIN}/models/${id}/active`, { method: "POST", headers: authHeaders() }));
if (!res.ok) throw new Error(`activate failed: ${res.status}`);
}
export async function deleteModel(id: number): Promise<void> {
const res = await fetch(`${ADMIN}/models/${id}`, { method: "DELETE" });
export async function deleteModel(id: string): Promise<void> {
const res = guard(await fetch(`${ADMIN}/models/${id}`, { method: "DELETE", headers: authHeaders() }));
if (!res.ok) throw new Error(`delete failed: ${res.status}`);
}
export async function testModel(m: ModelInput): Promise<{ ok: boolean; message: string }> {
const res = await fetch(`${ADMIN}/models/test`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(m),
});
const res = guard(await fetch(`${ADMIN}/models/test`, { method: "POST", headers: authHeaders(true), body: JSON.stringify(m) }));
return (await res.json()) as { ok: boolean; message: string };
}
// ---- 计价(token↔真钱,按模型分输入/输出)----
export interface Pricing {
model_id: string;
input_per_1k: number;
output_per_1k: number;
currency: string;
}
export async function listPricing(): Promise<Pricing[]> {
const res = guard(await fetch(`${ADMIN}/pricing`, { headers: authHeaders() }));
if (!res.ok) throw new Error(`list pricing failed: ${res.status}`);
return ((await res.json()) as { pricing: Pricing[] }).pricing ?? [];
}
export async function savePricing(p: Pricing): Promise<void> {
const res = guard(await fetch(`${ADMIN}/pricing`, { method: "PUT", headers: authHeaders(true), body: JSON.stringify(p) }));
if (!res.ok) {
const d = (await res.json().catch(() => ({}))) as { error?: string };
throw new Error(d.error ?? `save pricing failed: ${res.status}`);
}
}
// gatewayOnline 用公开的 /healthz 探活(不受鉴权影响)。
export async function gatewayOnline(): Promise<boolean> {
try {
const res = await fetch(`${GATEWAY}/api/v1/billing`);
const res = await fetch(`${GATEWAY}/healthz`);
return res.ok;
} catch {
return false;
+144
View File
@@ -0,0 +1,144 @@
import { useEffect, useState } from "react";
import { listModels, listPricing, savePricing, type Model, type Pricing } from "../api";
// 每个模型一行的本地编辑态。
interface Row {
model: Model;
inPer1k: string;
outPer1k: string;
currency: string;
dirty: boolean;
saving: boolean;
msg: string;
}
// 计价配置:为每个已登记模型设「输入 / 输出 每 1K token 单价」+ 币种(token↔真钱)。供计费折算。
export function PricingPage() {
const [rows, setRows] = useState<Row[]>([]);
const [loading, setLoading] = useState(true);
const [err, setErr] = useState("");
const load = async () => {
setLoading(true);
setErr("");
try {
const [chat, emb, pricing] = await Promise.all([listModels("chat"), listModels("embedding"), listPricing()]);
const byID = new Map<string, Pricing>(pricing.map((p) => [p.model_id, p]));
const mk = (m: Model): Row => {
const p = byID.get(m.id);
return {
model: m,
inPer1k: p ? String(p.input_per_1k) : "",
outPer1k: p ? String(p.output_per_1k) : "",
currency: p?.currency || "CNY",
dirty: false,
saving: false,
msg: "",
};
};
setRows([...chat.map(mk), ...emb.map(mk)]);
} catch (e) {
setErr((e as Error).message);
} finally {
setLoading(false);
}
};
useEffect(() => {
void load();
}, []);
const patch = (id: string, p: Partial<Row>) => setRows((rs) => rs.map((r) => (r.model.id === id ? { ...r, ...p, dirty: true, msg: "" } : r)));
const save = async (r: Row) => {
setRows((rs) => rs.map((x) => (x.model.id === r.model.id ? { ...x, saving: true, msg: "" } : x)));
try {
await savePricing({
model_id: r.model.id,
input_per_1k: Number(r.inPer1k) || 0,
output_per_1k: Number(r.outPer1k) || 0,
currency: r.currency || "CNY",
});
setRows((rs) => rs.map((x) => (x.model.id === r.model.id ? { ...x, saving: false, dirty: false, msg: "✓ 已保存" } : x)));
} catch (e) {
setRows((rs) => rs.map((x) => (x.model.id === r.model.id ? { ...x, saving: false, msg: (e as Error).message } : x)));
}
};
if (loading) return <div className="text-sm text-gray-400"></div>;
if (err) return <div className="text-sm text-rose-600">{err}</div>;
return (
<div>
<p className="mb-4 text-sm text-gray-500"> token 1K token × </p>
{rows.length === 0 ? (
<div className="text-sm text-gray-400"></div>
) : (
<table className="w-full text-sm">
<thead>
<tr className="border-b text-left text-xs text-gray-400">
<th className="py-2"></th>
<th></th>
<th> / 1K</th>
<th> / 1K</th>
<th></th>
<th></th>
</tr>
</thead>
<tbody>
{rows.map((r) => (
<tr key={r.model.id} className="border-t">
<td className="py-2">
<div className="font-medium text-gray-800">{r.model.model}</div>
<div className="text-[11px] text-gray-400">{r.model.provider}</div>
</td>
<td className="text-gray-500">{r.model.kind}</td>
<td>
<input
className="w-24 rounded border px-2 py-1 focus:border-violet-500 focus:outline-none"
type="number"
step="0.0001"
min="0"
value={r.inPer1k}
onChange={(e) => patch(r.model.id, { inPer1k: e.target.value })}
placeholder="0"
/>
</td>
<td>
<input
className="w-24 rounded border px-2 py-1 focus:border-violet-500 focus:outline-none"
type="number"
step="0.0001"
min="0"
value={r.outPer1k}
onChange={(e) => patch(r.model.id, { outPer1k: e.target.value })}
placeholder="0"
/>
</td>
<td>
<select
className="rounded border px-2 py-1 focus:border-violet-500 focus:outline-none"
value={r.currency}
onChange={(e) => patch(r.model.id, { currency: e.target.value })}
>
<option value="CNY">CNY</option>
<option value="USD">USD</option>
</select>
</td>
<td className="text-right">
<button
className="rounded bg-violet-600 px-3 py-1 text-xs text-white hover:bg-violet-700 disabled:opacity-40"
disabled={!r.dirty || r.saving}
onClick={() => save(r)}
>
{r.saving ? "保存中…" : "保存"}
</button>
{r.msg && <span className={`ml-2 text-[11px] ${r.msg.startsWith("✓") ? "text-emerald-600" : "text-rose-600"}`}>{r.msg}</span>}
</td>
</tr>
))}
</tbody>
</table>
)}
</div>
);
}
+8
View File
@@ -5,6 +5,7 @@ import { Soon } from "./components/Soon";
// 新增页面 = 在此加一条;real 页面用 lazy 懒加载(代码分割)。
const ModelsPage = lazy(() => import("./pages/ModelsPage").then((m) => ({ default: m.ModelsPage })));
const DatasourcesPage = lazy(() => import("./pages/DatasourcesPage").then((m) => ({ default: m.DatasourcesPage })));
const PricingPage = lazy(() => import("./pages/PricingPage").then((m) => ({ default: m.PricingPage })));
export interface RouteDef {
path: string;
@@ -29,6 +30,13 @@ export const routes: RouteDef[] = [
ready: true,
element: <DatasourcesPage />,
},
{
path: "/pricing",
label: "计价",
group: "配置",
ready: true,
element: <PricingPage />,
},
{
path: "/tenants",
label: "租户",
+11 -5
View File
@@ -2,10 +2,10 @@ import { Suspense, useEffect, useState } from "react";
import { NavLink, Routes, Route, Navigate, useLocation } from "react-router-dom";
import { routes, navGroups, defaultPath } from "../routes";
import { gatewayOnline } from "../api";
import { gatewayOnline, type AuthUser } from "../api";
// 控制台外壳:导航与内容均由路由注册表派生(动态路由)。
export function AppShell() {
export function AppShell({ user, onLogout }: { user: AuthUser; onLogout: () => void }) {
const [online, setOnline] = useState(false);
const loc = useLocation();
const current = routes.find((r) => r.path === loc.pathname);
@@ -45,9 +45,15 @@ export function AppShell() {
</div>
))}
</nav>
<div className="mt-auto flex items-center gap-2 border-t p-4 text-[11px] text-gray-500">
<span className={`h-2 w-2 rounded-full ${online ? "bg-emerald-500" : "bg-rose-500"}`} />
Gateway {online ? "在线" : "离线"}
<div className="mt-auto border-t p-4 text-[11px] text-gray-500">
<div className="mb-2 flex items-center justify-between">
<span className="truncate text-gray-600" title={user.email}>{user.name || user.email}</span>
<button onClick={onLogout} className="text-gray-400 hover:text-rose-600"></button>
</div>
<div className="flex items-center gap-2">
<span className={`h-2 w-2 rounded-full ${online ? "bg-emerald-500" : "bg-rose-500"}`} />
Gateway {online ? "在线" : "离线"}
</div>
</div>
</aside>
@@ -0,0 +1,238 @@
package eino
import (
"context"
"encoding/json"
"strings"
"sync"
"testing"
"github.com/sundynix/sundynix-dispatcher/internal/harness"
"github.com/sundynix/sundynix-dispatcher/internal/llm"
"github.com/sundynix/sundynix-shared/contract"
)
// ---- 测试替身 ----
type fakeLLM struct {
ready bool
stream func(msgs []llm.ChatMessage) string // ChatStream 要回流的整段文本
chat func(msgs []llm.ChatMessage) (string, error) // Chat 返回
}
func (f *fakeLLM) Ready() bool { return f.ready }
func (f *fakeLLM) ChatStream(_ context.Context, msgs []llm.ChatMessage, onToken func(string)) error {
if f.stream != nil {
onToken(f.stream(msgs))
}
return nil
}
func (f *fakeLLM) StreamText(_ context.Context, text string, onToken func([]byte)) error {
onToken([]byte(text))
return nil
}
func (f *fakeLLM) Chat(_ context.Context, msgs []llm.ChatMessage) (string, error) {
if f.chat != nil {
return f.chat(msgs)
}
return "", nil
}
type fakeSink struct {
mu sync.Mutex
tokens []string
done bool
}
func (s *fakeSink) PublishToken(_ string, tok []byte) error {
s.mu.Lock()
s.tokens = append(s.tokens, string(tok))
s.mu.Unlock()
return nil
}
func (s *fakeSink) CompleteStream(_ string) error { s.done = true; return nil }
func (s *fakeSink) text() string {
s.mu.Lock()
defer s.mu.Unlock()
return strings.Join(s.tokens, "")
}
type fakeTools struct {
mu sync.Mutex
calls []string
fn func(call *contract.ToolCall) *contract.ToolResult
}
func (t *fakeTools) CallTool(_ context.Context, _ string, call *contract.ToolCall) (*contract.ToolResult, error) {
t.mu.Lock()
t.calls = append(t.calls, call.Tool)
t.mu.Unlock()
if t.fn != nil {
return t.fn(call), nil
}
return &contract.ToolResult{OK: true, Content: ""}, nil
}
func (t *fakeTools) called(tool string) bool {
t.mu.Lock()
defer t.mu.Unlock()
for _, c := range t.calls {
if c == tool {
return true
}
}
return false
}
type fakeExec struct {
mu sync.Mutex
events []contract.ExecEvent
}
func (e *fakeExec) PublishExec(_ string, data []byte) error {
var ev contract.ExecEvent
if json.Unmarshal(data, &ev) == nil {
e.mu.Lock()
e.events = append(e.events, ev)
e.mu.Unlock()
}
return nil
}
func (e *fakeExec) CompleteExec(_ string) error { return nil }
func newOrch(ll *fakeLLM, ft *fakeTools, fs *fakeSink, fe *fakeExec) *Orchestrator {
return &Orchestrator{pool: ll, breaker: harness.NewCircuitBreaker(), sink: fs, tools: ft, exec: fe}
}
func task(graph string) *contract.Task {
return &contract.Task{ID: "t1", Graph: json.RawMessage(graph), Meta: map[string]any{}}
}
// ---- 集成测试:runGraph ----
// 分支按 true/false 边标签精确选路(true 边故意列第二位)。
func TestRunGraph_BranchRouting(t *testing.T) {
g := `{"nodes":[
{"id":"i","kind":"input","config":{"text":"hi"}},
{"id":"b","kind":"branch","config":{"condition":"%s"}},
{"id":"a","kind":"agent","config":{"system":"AAA"}},
{"id":"c","kind":"agent","config":{"system":"BBB"}}
],"edges":[
{"source":"i","target":"b"},
{"source":"b","target":"c","sourceHandle":"false"},
{"source":"b","target":"a","sourceHandle":"true"}
]}`
// fakeLLM 回流"系统提示词",借此判断哪个 agent 跑了。
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }}
run := func(cond string) string {
o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{})
ans, err := o.runGraph(context.Background(), task(strings.Replace(g, "%s", cond, 1)), o.tracer("t1"))
if err != nil {
t.Fatal(err)
}
return ans
}
if a := run("2>1"); !strings.Contains(a, "AAA") || strings.Contains(a, "BBB") {
t.Errorf("条件真应走 true 标签(AAA)got %q", a)
}
if a := run("1>2"); !strings.Contains(a, "BBB") || strings.Contains(a, "AAA") {
t.Errorf("条件假应走 false 标签(BBB)got %q", a)
}
}
// 工具节点产出注入 agent 上下文。
func TestRunGraph_ToolFeedsAgent(t *testing.T) {
g := `{"nodes":[
{"id":"i","kind":"input","config":{"text":"hi"}},
{"id":"t","kind":"tool","config":{"tool":"wiki_search"}},
{"id":"a","kind":"agent","config":{"system":"S"}}
],"edges":[{"source":"i","target":"t"},{"source":"t","target":"a"}]}`
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string { return m[0].Content }}
ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult {
return &contract.ToolResult{OK: true, Content: "TOOLDATA"}
}}
o := newOrch(ll, ft, &fakeSink{}, &fakeExec{})
ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1"))
if err != nil {
t.Fatal(err)
}
if !ft.called("wiki_search") {
t.Error("应调用 wiki_search 工具")
}
if !strings.Contains(ans, "TOOLDATA") {
t.Errorf("工具产出应注入 agent 上下文,got %q", ans)
}
}
// map 并行 fan-out:拆项 → 各章撰写 → 汇成多章成稿。
func TestRunGraph_MapFanout(t *testing.T) {
g := `{"nodes":[
{"id":"i","kind":"input","config":{"text":"猫"}},
{"id":"m","kind":"map","config":{"splitBy":"要点"}}
],"edges":[{"source":"i","target":"m"}]}`
ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) {
u := m[len(m)-1].Content
if strings.Contains(u, "拆分") {
return `["第一章","第二章"]`, nil
}
return "正文XYZ", nil
}}
o := newOrch(ll, &fakeTools{}, &fakeSink{}, &fakeExec{})
ans, err := o.runGraph(context.Background(), task(g), o.tracer("t1"))
if err != nil {
t.Fatal(err)
}
if !strings.Contains(ans, "## 第一章") || !strings.Contains(ans, "## 第二章") || !strings.Contains(ans, "正文XYZ") {
t.Errorf("map 应产出多章成稿,got %q", ans)
}
}
// 输出护栏:发射的 token 中疑似密钥被脱敏。
func TestRunGraph_OutputRedaction(t *testing.T) {
g := `{"nodes":[{"id":"i","kind":"input","config":{"text":"hi"}},{"id":"a","kind":"agent","config":{"system":"S"}}],"edges":[{"source":"i","target":"a"}]}`
ll := &fakeLLM{ready: true, stream: func(m []llm.ChatMessage) string {
return "你的 key 是 sk-912cf85b16d04b22bcb95f4576423bfb 请保密"
}}
fs := &fakeSink{}
o := newOrch(ll, &fakeTools{}, fs, &fakeExec{})
if _, err := o.runGraph(context.Background(), task(g), o.tracer("t1")); err != nil {
t.Fatal(err)
}
out := fs.text()
if strings.Contains(out, "sk-912cf85b16d04b22bcb95f4576423bfb") {
t.Errorf("流式输出未脱敏密钥: %q", out)
}
if !strings.Contains(out, "[已脱敏]") {
t.Errorf("应出现脱敏标记: %q", out)
}
}
// ---- 集成测试:handleReport ----
func TestHandleReport_PlanWriteStore(t *testing.T) {
ll := &fakeLLM{ready: true, chat: func(m []llm.ChatMessage) (string, error) {
u := m[len(m)-1].Content
if strings.Contains(u, "大纲") {
return `{"title":"咖啡报告","sections":["提神","风险"]}`, nil
}
return "本章正文。", nil
}}
ft := &fakeTools{fn: func(c *contract.ToolCall) *contract.ToolResult {
return &contract.ToolResult{OK: true, Content: "/tmp/x.json"}
}}
fs := &fakeSink{}
o := newOrch(ll, ft, fs, &fakeExec{})
tk := &contract.Task{ID: "r1", Graph: json.RawMessage(`{}`), Meta: map[string]any{contract.MetaTopic: "咖啡"}}
if err := o.handleReport(context.Background(), tk, o.tracer("r1")); err != nil {
t.Fatal(err)
}
if !ft.called("report_store") {
t.Error("应调用 report_store 存源")
}
out := fs.text()
if !strings.Contains(out, "# 咖啡报告") || !strings.Contains(out, "## 提神") || !strings.Contains(out, "## 风险") {
t.Errorf("报告流应含标题与各章,got %q", out)
}
if !fs.done {
t.Error("报告结束应 CompleteStream")
}
}
@@ -0,0 +1,113 @@
package eino
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"github.com/sundynix/sundynix-dispatcher/internal/llm"
"github.com/sundynix/sundynix-shared/contract"
)
// Pref 是从对话抽取出的一条长期偏好(key/value)。
type Pref struct {
Key string `json:"key"`
Value string `json:"value"`
}
// extractMemory 写回阶段(异步、离开热路径):从本轮对话用 LLM 抽取用户长期偏好,
// 与已有画像去重后经 memory_upsert 登记。模型/工具不可用或输入过短则跳过。
func (o *Orchestrator) extractMemory(ctx context.Context, uid, input, answer string) {
if uid == "" || o.tools == nil || o.pool == nil || !o.pool.Ready() {
return
}
if len([]rune(strings.TrimSpace(input))) < 2 || len([]rune(strings.TrimSpace(answer))) < 20 {
return // 太短,不值得抽取
}
existing := parseProfile(o.fetchMemory(ctx, uid, ""))
cctx, cancel := llmCtx(ctx)
defer cancel()
sys := "你从对话中提取【用户的长期稳定偏好或事实】(如称呼、语言、职业、专业领域、口味、常用工具、固定要求等)," +
"忽略一次性的临时信息与你自己的话。"
user := fmt.Sprintf("用户输入:%s\n助手回答:%s\n请抽取。只输出 JSON 数组 [{\"key\":\"偏好维度\",\"value\":\"值\"}]"+
"没有可抽取的就输出 [],不要任何多余文字。", truncate(input, 800), truncate(answer, 1200))
txt, err := o.pool.Chat(cctx, []llm.ChatMessage{{Role: "system", Content: sys}, {Role: "user", Content: user}})
if err != nil {
log.Printf("[eino] (writeback) 偏好抽取失败 user=%s: %v", uid, err)
return
}
fresh := filterNewPrefs(parsePrefs(txt), existing)
for _, p := range fresh {
o.upsertMemory(ctx, uid, p.Key, p.Value)
}
if len(fresh) > 0 {
log.Printf("[eino] (writeback) 已登记 %d 条新偏好 user=%s", len(fresh), uid)
}
}
// upsertMemory 经 mcp-go memory_upsert 工具登记一条偏好。
func (o *Orchestrator) upsertMemory(ctx context.Context, uid, key, value string) {
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
defer cancel()
if _, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("memory_upsert"),
&contract.ToolCall{Tool: "memory_upsert", Args: map[string]any{"user_id": uid, "key": key, "value": value}}); err != nil {
log.Printf("[eino] memory_upsert 失败 %s=%s: %v", key, value, err)
}
}
// parsePrefs 解析 LLM 抽取结果(容忍 ```json 围栏)为 []Pref,过滤空项。
func parsePrefs(txt string) []Pref {
var ps []Pref
if json.Unmarshal([]byte(stripFence(txt)), &ps) != nil {
return nil
}
out := make([]Pref, 0, len(ps))
for _, p := range ps {
p.Key, p.Value = strings.TrimSpace(p.Key), strings.TrimSpace(p.Value)
if p.Key != "" && p.Value != "" {
out = append(out, p)
}
}
return out
}
// parseProfile 把 memory_get 渲染的画像("- 维度:值" 多行)解析回 map,供去重。
func parseProfile(s string) map[string]string {
m := map[string]string{}
for _, line := range strings.Split(s, "\n") {
line = strings.TrimSpace(strings.TrimPrefix(strings.TrimSpace(line), "-"))
if line == "" {
continue
}
for _, sep := range []string{"", ":"} { // 兼容全角/半角冒号
if i := strings.Index(line, sep); i > 0 {
k := strings.TrimSpace(line[:i])
if k != "" {
m[k] = strings.TrimSpace(line[i+len(sep):])
}
break
}
}
}
return m
}
// filterNewPrefs 保留新增或值有变化的偏好(同批同 key 去重;已有且相同则跳过)。
func filterNewPrefs(extracted []Pref, existing map[string]string) []Pref {
out := make([]Pref, 0, len(extracted))
seen := map[string]bool{}
for _, p := range extracted {
if seen[p.Key] {
continue
}
seen[p.Key] = true
if cur, ok := existing[p.Key]; ok && cur == p.Value {
continue
}
out = append(out, p)
}
return out
}
@@ -0,0 +1,43 @@
package eino
import "testing"
func TestParsePrefs(t *testing.T) {
got := parsePrefs("```json\n[{\"key\":\"称呼\",\"value\":\"Dexter\"},{\"key\":\"语言\",\"value\":\"中文\"},{\"key\":\"\",\"value\":\"空\"}]\n```")
if len(got) != 2 {
t.Fatalf("应解析出 2 条(过滤空 key),got %d: %v", len(got), got)
}
if got[0].Key != "称呼" || got[0].Value != "Dexter" {
t.Errorf("解析错: %v", got[0])
}
if parsePrefs("不是 JSON") != nil {
t.Error("非 JSON 应返回 nil")
}
}
func TestParseProfile(t *testing.T) {
m := parseProfile("- 称呼:Dexter\n- 语言: 中文\n\n- 职业:律师")
if m["称呼"] != "Dexter" || m["语言"] != "中文" || m["职业"] != "律师" {
t.Errorf("画像解析错: %v", m)
}
if len(parseProfile("")) != 0 {
t.Error("空画像应得空 map")
}
}
func TestFilterNewPrefs(t *testing.T) {
existing := map[string]string{"称呼": "Dexter", "语言": "中文"}
in := []Pref{
{"称呼", "Dexter"}, // 已有且相同 → 跳
{"语言", "英文"}, // 已有但变了 → 留
{"职业", "律师"}, // 新 → 留
{"职业", "工程师"}, // 同批重复 key → 跳(保留首个)
}
got := filterNewPrefs(in, existing)
if len(got) != 2 {
t.Fatalf("应剩 2 条(语言变更 + 新职业),got %d: %v", len(got), got)
}
if got[0].Key != "语言" || got[0].Value != "英文" || got[1].Key != "职业" || got[1].Value != "律师" {
t.Errorf("过滤结果不符: %v", got)
}
}
@@ -27,12 +27,20 @@ type ToolCaller interface {
CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error)
}
// LLM 是编排所需的语言模型能力(生产由 *llm.Pool 实现)。抽成接口便于测试注入假模型。
type LLM interface {
Ready() bool
ChatStream(ctx context.Context, msgs []llm.ChatMessage, onToken func(string)) error
StreamText(ctx context.Context, text string, onToken func([]byte)) error
Chat(ctx context.Context, msgs []llm.ChatMessage) (string, error)
}
// 工具调用超时;超时即降级(不带工具上下文继续推理)。
const toolCallTimeout = 3 * time.Second
// Orchestrator 把每个 DSL 任务动态编译为 Eino 图并执行(记忆召回 → 工具节点 → 注入 → 流式)。
type Orchestrator struct {
pool *llm.Pool
pool LLM
breaker *harness.CircuitBreaker
eval *harness.Evaluator
sink TokenSink
@@ -42,7 +50,7 @@ type Orchestrator struct {
// NewOrchestrator 持有依赖;图按任务的 DSL 在 Handle 内动态编译。
// exec 为执行可视化事件出口(可为 nil,则不发轨迹事件);eval 为自动化评测(可为 nil)。
func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, eval *harness.Evaluator, sink TokenSink, tools ToolCaller, exec ExecSink) (*Orchestrator, error) {
func NewOrchestrator(pool LLM, breaker *harness.CircuitBreaker, eval *harness.Evaluator, sink TokenSink, tools ToolCaller, exec ExecSink) (*Orchestrator, error) {
return &Orchestrator{pool: pool, breaker: breaker, eval: eval, sink: sink, tools: tools, exec: exec}, nil
}
@@ -172,8 +180,8 @@ func (o *Orchestrator) memorize(t *contract.Task, answer string) {
log.Printf("[eino] (writeback) task %s 已落会话历史 session=%s", t.ID, sid)
}
if uid != "" {
log.Printf("[eino] (writeback) task %s 待抽取 user=%s 的新偏好记忆", t.ID, uid)
// TODO: 抽取 LLM → 去重/更新 → memory_upsert
// 从本轮对话抽取长期偏好 → 去重 → memory_upsert(离开热路径,已在 goroutine 内)。
o.extractMemory(context.Background(), uid, dsl.Compile(t.Graph).Query, answer)
}
}
+10 -2
View File
@@ -5,9 +5,12 @@ go 1.25.0
require (
github.com/bwmarrin/snowflake v0.3.0
github.com/gin-gonic/gin v1.10.0
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/minio/minio-go/v7 v7.2.0
github.com/prometheus/client_golang v1.23.2
github.com/redis/go-redis/v9 v9.20.0
github.com/sundynix/sundynix-shared v0.0.0
golang.org/x/crypto v0.53.0
gorm.io/driver/postgres v1.6.0
gorm.io/gorm v1.31.1
)
@@ -15,6 +18,7 @@ require (
replace github.com/sundynix/sundynix-shared => ../sundynix-shared
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/bytedance/sonic v1.15.0 // indirect
github.com/bytedance/sonic/loader v0.5.0 // indirect
@@ -27,7 +31,6 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
@@ -40,26 +43,31 @@ require (
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/crc64nvme v1.1.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.37.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.3.1 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/tinylib/msgp v1.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.11.0 // indirect
golang.org/x/crypto v0.53.0 // indirect
golang.org/x/net v0.55.0 // indirect
golang.org/x/sync v0.21.0 // indirect
golang.org/x/sys v0.46.0 // indirect
+20 -12
View File
@@ -1,3 +1,5 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
@@ -64,10 +66,12 @@ github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4O
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/crc32 v1.3.0 h1:sSmTt3gUt81RP655XGZPElI0PelVTZ6YwCRnPSupoFM=
github.com/klauspost/crc32 v1.3.0/go.mod h1:D7kQaZhnkX/Y0tstFGf8VUzv2UofNGqCjnC3zdHB0Hw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
@@ -85,6 +89,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI=
@@ -101,6 +107,14 @@ github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM=
github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/redis/go-redis/v9 v9.20.0 h1:WnQYxLkgO2xiXTCJY0ldIiI8dNqCDlQAG+AtaH7a2a0=
github.com/redis/go-redis/v9 v9.20.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
@@ -131,29 +145,23 @@ github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/arch v0.11.0 h1:KXV8WWKCXm6tRpLirl2szsO5j/oOODwZf4hATmGVNs4=
golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/crypto v0.53.0 h1:QZ4Muo8THX6CizN2vPPd5fBGHyogrdK9fG4wLPFUsto=
golang.org/x/crypto v0.53.0/go.mod h1:DNLU434OwVakk9PzuwV8w62mAJpRJL3vsgcfp4Qnsio=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8=
golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM=
golang.org/x/sync v0.21.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw=
golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
golang.org/x/text v0.38.0 h1:sXmwo9DwP3OK9EZ7PqAdaooSGozfl/3a6/xJcbzPRhE=
golang.org/x/text v0.38.0/go.mod h1:YXZt3QhHUKYT53r2lLKFIVi6Ao1jdzrTR/KQ09qyxF4=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
+24 -2
View File
@@ -3,7 +3,9 @@ package auth
import (
"errors"
"log"
"os"
"strings"
"time"
"github.com/golang-jwt/jwt/v5"
@@ -13,8 +15,28 @@ import (
// TokenTTL 是访问令牌有效期。
const TokenTTL = 24 * time.Hour
// secret 是 JWT 签名密钥。生产经环境变量 JWT_SECRET 注入;缺省仅供开发(务必覆盖)。
var secret = []byte(envOr("JWT_SECRET", "sundynix-dev-secret-change-me"))
const devSecret = "sundynix-dev-secret-change-me"
// secret 是 JWT 签名密钥。生产必须经 JWT_SECRET 注入强密钥;
// 生产模式(APP_ENV=production/prod 或 GIN_MODE=release)下未设则直接 fatal,杜绝可伪造令牌。
var secret = []byte(resolveSecret())
func resolveSecret() string {
if s := os.Getenv("JWT_SECRET"); s != "" {
return s
}
if isProd() {
log.Fatal("[auth] 生产模式必须设置 JWT_SECRET(强随机密钥),拒绝使用开发默认值")
}
log.Println("[auth] ⚠️ 使用开发默认 JWT 密钥,生产务必设置 JWT_SECRET")
return devSecret
}
// isProd 判定是否生产环境。
func isProd() bool {
env := strings.ToLower(os.Getenv("APP_ENV"))
return env == "production" || env == "prod" || strings.ToLower(os.Getenv("GIN_MODE")) == "release"
}
// ErrInvalidToken 表示令牌无效/过期/签名不符。
var ErrInvalidToken = errors.New("invalid token")
@@ -42,6 +42,48 @@ func (h *Handler) ListModels(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"models": out})
}
// ListPricing: GET /api/v1/admin/pricing —— 列出各模型的计价配置(token↔真钱)。
func (h *Handler) ListPricing(c *gin.Context) {
rows, err := h.db.ListPricing(c.Request.Context())
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
out := make([]gin.H, 0, len(rows))
for _, p := range rows {
out = append(out, gin.H{
"model_id": p.ModelID, "input_per_1k": p.InputPer1K, "output_per_1k": p.OutputPer1K, "currency": p.Currency,
})
}
c.JSON(http.StatusOK, gin.H{"pricing": out})
}
// SavePricing: PUT /api/v1/admin/pricing —— 设置某模型的输入/输出单价(每 1K token)。
func (h *Handler) SavePricing(c *gin.Context) {
var b struct {
ModelID string `json:"model_id"`
InputPer1K float64 `json:"input_per_1k"`
OutputPer1K float64 `json:"output_per_1k"`
Currency string `json:"currency"`
}
if err := c.ShouldBindJSON(&b); err != nil || b.ModelID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "model_id required"})
return
}
if b.InputPer1K < 0 || b.OutputPer1K < 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "单价不能为负"})
return
}
if b.Currency == "" {
b.Currency = "CNY"
}
if err := h.db.UpsertPricing(c.Request.Context(), b.ModelID, b.InputPer1K, b.OutputPer1K, b.Currency); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
// SaveModel: POST /api/v1/admin/models —— 新增/更新一条模型配置。
func (h *Handler) SaveModel(c *gin.Context) {
var b modelBody
@@ -95,6 +95,22 @@ func (h *Handler) StreamTask(c *gin.Context) {
})
}
// Healthz: GET /healthz —— 存活探针(liveness):进程能应答即 200,不查依赖。
func (h *Handler) Healthz(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
// Readyz: GET /readyz —— 就绪探针(readiness):核心依赖(DB/Redis)可用才 200,否则 503。
// 供 k8s 等编排器在依赖未就绪时暂不导流。NATS 在启动时即连(连不上会 fatal),故不单列。
func (h *Handler) Readyz(c *gin.Context) {
deps := gin.H{"db": h.db.Enabled(), "redis": h.cache.Enabled()}
if h.db.Enabled() && h.cache.Enabled() {
c.JSON(http.StatusOK, gin.H{"status": "ready", "deps": deps})
return
}
c.JSON(http.StatusServiceUnavailable, gin.H{"status": "not_ready", "deps": deps})
}
// Health: GET /api/v1/health —— 聚合各依赖子系统健康,供桌面端顶栏五盏灯实时点亮。
// gateway/db/redis/nats 网关本地可判;milvus/neo4j 经 mcp-go health 工具取(不可用则置否)。
func (h *Handler) Health(c *gin.Context) {
@@ -2,6 +2,7 @@ package middleware
import (
"net/http"
"os"
"strings"
"github.com/gin-gonic/gin"
@@ -40,3 +41,45 @@ func RequireAuth() gin.HandlerFunc {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "需要登录"})
}
}
// RequireAdmin 保护运维控制面:必须登录,且(设了 ADMIN_USER_IDS 时)uid 须在白名单内。
// ADMIN_USER_IDS 为空:开发期放行任意登录用户;生产期(APP_ENV=prod/GIN_MODE=release)直接拒绝
// ——逼运维显式配置管理员,杜绝"任意账号改模型/密钥配置"。
func RequireAdmin() gin.HandlerFunc {
allow := splitEnv("ADMIN_USER_IDS")
prod := strings.EqualFold(os.Getenv("APP_ENV"), "production") || strings.EqualFold(os.Getenv("APP_ENV"), "prod") ||
strings.EqualFold(os.Getenv("GIN_MODE"), "release")
return func(c *gin.Context) {
uid, _ := c.Get(CtxUserID)
id, _ := uid.(string)
if id == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "需要登录"})
return
}
if len(allow) == 0 {
if prod {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{"error": "未配置管理员(ADMIN_USER_IDS"})
return
}
c.Next() // 开发期放行
return
}
for _, a := range allow {
if a == id {
c.Next()
return
}
}
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{"error": "需要管理员权限"})
}
}
func splitEnv(key string) []string {
var out []string
for _, p := range strings.Split(os.Getenv(key), ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
@@ -0,0 +1,94 @@
package middleware
import (
"crypto/rand"
"encoding/hex"
"log/slog"
"os"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// CtxRequestID 是请求 ID 在 gin.Context 中的键。
const CtxRequestID = "request_id"
// ---- Prometheus 指标 ----
var (
httpRequests = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "sundynix_http_requests_total",
Help: "HTTP 请求总数(按方法/路由模板/状态码)。",
}, []string{"method", "route", "status"})
httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "sundynix_http_request_duration_seconds",
Help: "HTTP 请求耗时(秒)。",
Buckets: prometheus.DefBuckets,
}, []string{"method", "route"})
httpInFlight = promauto.NewGauge(prometheus.GaugeOpts{
Name: "sundynix_http_requests_in_flight",
Help: "当前处理中的 HTTP 请求数。",
})
)
// accessLogger 是结构化访问日志器(JSON 到 stderr)。
var accessLogger = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
// RequestID 为每个请求生成/透传 X-Request-ID,写入上下文与响应头,供日志关联。
func RequestID() gin.HandlerFunc {
return func(c *gin.Context) {
id := c.GetHeader("X-Request-ID")
if id == "" {
id = newRequestID()
}
c.Set(CtxRequestID, id)
c.Header("X-Request-ID", id)
c.Next()
}
}
// Observe 记录 Prometheus 指标 + 结构化访问日志。放在中间件链较前位置。
func Observe() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
httpInFlight.Inc()
c.Next()
httpInFlight.Dec()
route := c.FullPath() // 路由模板(/tasks/:id/...),避免按真实路径产生高基数
if route == "" {
route = "unmatched"
}
status := c.Writer.Status()
dur := time.Since(start)
method := c.Request.Method
httpRequests.WithLabelValues(method, route, strconv.Itoa(status)).Inc()
httpDuration.WithLabelValues(method, route).Observe(dur.Seconds())
uid, _ := c.Get(CtxUserID)
rid, _ := c.Get(CtxRequestID)
accessLogger.Info("http",
"request_id", rid,
"method", method,
"route", route,
"path", c.Request.URL.Path,
"status", status,
"latency_ms", dur.Milliseconds(),
"ip", c.ClientIP(),
"uid", uid,
"bytes", c.Writer.Size(),
)
}
}
func newRequestID() string {
var b [8]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}
@@ -0,0 +1,48 @@
package middleware
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/testutil"
)
func newEngine() *gin.Engine {
gin.SetMode(gin.TestMode)
r := gin.New()
r.Use(RequestID(), Observe())
r.GET("/ping", func(c *gin.Context) { c.String(http.StatusOK, "pong") })
return r
}
func TestObserve_CountsAndRequestID(t *testing.T) {
r := newEngine()
before := testutil.ToFloat64(httpRequests.WithLabelValues("GET", "/ping", "200"))
w := httptest.NewRecorder()
r.ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/ping", nil))
if w.Code != 200 {
t.Fatalf("状态码=%d", w.Code)
}
if w.Header().Get("X-Request-ID") == "" {
t.Error("应自动生成并回写 X-Request-ID")
}
after := testutil.ToFloat64(httpRequests.WithLabelValues("GET", "/ping", "200"))
if after != before+1 {
t.Errorf("请求计数应 +1before=%v after=%v", before, after)
}
}
func TestRequestID_PropagatesIncoming(t *testing.T) {
r := newEngine()
w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/ping", nil)
req.Header.Set("X-Request-ID", "trace-abc-123")
r.ServeHTTP(w, req)
if got := w.Header().Get("X-Request-ID"); got != "trace-abc-123" {
t.Errorf("应透传入站 X-Request-IDgot %q", got)
}
}
+28 -6
View File
@@ -2,7 +2,10 @@
package router
import (
"os"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sundynix/sundynix-gateway/internal/blob"
"github.com/sundynix/sundynix-gateway/internal/handler"
@@ -13,13 +16,22 @@ import (
// New 构建带有 Guardrail / 限流中间件的 Gin 引擎。
func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus, blobStore *blob.Store) *gin.Engine {
r := gin.Default()
r.Use(cors()) // 桌面端/浏览器跨源访问(开发期放开)
r := gin.New()
r.Use(gin.Recovery()) // panic 兜底
r.Use(middleware.RequestID()) // 生成/透传 X-Request-ID(日志关联)
r.Use(middleware.Observe()) // Prometheus 指标 + 结构化访问日志(替代 gin 默认文本日志)
r.Use(cors()) // 桌面端/浏览器跨源访问
r.Use(middleware.RateLimit(cache))
r.Use(middleware.Auth()) // 解析 Bearer JWT,注入已验证 userID(非阻断)
r.Use(middleware.Guardrail()) // Harness: Input Guardrail
h := handler.New(db, cache, bus, blobStore)
// 可观测性根端点:Prometheus 抓取 + k8s 存活/就绪探针(不挂业务中间件鉴权)。
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
r.GET("/healthz", h.Healthz)
r.GET("/readyz", h.Readyz)
api := r.Group("/api/v1")
{
// —— 公开:鉴权端点 / 健康 / 按 task_id 寻址的 SSE 与导出(EventSource/下载无法带 Bearer)——
@@ -55,23 +67,33 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus, blobStore *blob.
p.GET("/billing", h.Billing)
}
// 运维控制面:LLM 模型配置(独立运维控制台调用;鉴权待后续接管理员角色)。
admin := api.Group("/admin")
// 运维控制面:LLM 模型配置(含 API 密钥管理)—— 必须管理员(RequireAdmin)。
admin := api.Group("/admin", middleware.RequireAdmin())
{
admin.GET("/models", h.ListModels)
admin.POST("/models", h.SaveModel)
admin.POST("/models/:id/active", h.SetActiveModel)
admin.DELETE("/models/:id", h.DeleteModel)
admin.POST("/models/test", h.TestModel)
admin.GET("/pricing", h.ListPricing) // 各模型计价(token↔真钱)
admin.PUT("/pricing", h.SavePricing) // 设置某模型输入/输出单价
}
}
return r
}
// cors 放开跨源访问允许桌面端/浏览器带自定义身份头与 SSE 访问网关(开发期)。
// cors 控制跨源访问允许来源经 CORS_ALLOW_ORIGIN 配置(缺省 "*" 仅供开发;
// 生产应设为具体源,如 https://app.example.com)。Vary 保证按 Origin 正确缓存。
func cors() gin.HandlerFunc {
origin := "*"
if v := os.Getenv("CORS_ALLOW_ORIGIN"); v != "" {
origin = v
}
return func(c *gin.Context) {
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Origin", origin)
if origin != "*" {
c.Header("Vary", "Origin")
}
c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Session-ID, X-User-ID")
if c.Request.Method == "OPTIONS" {
+33
View File
@@ -216,6 +216,39 @@ func (p *Postgres) ListLinks(ctx context.Context, owner, kb string) ([]DocLink,
return rows, err
}
// Pricing 是某模型的计价配置(token↔真钱):按模型分输入/输出单价(每 1K token)。
// 表名 sundynix_pricing。ModelID 关联 sundynix_model.id,唯一。
type Pricing struct {
BaseModel
ModelID string `gorm:"size:24;uniqueIndex"` // 关联 sundynix_model.id
InputPer1K float64 `gorm:"column:input_per_1k"` // 每 1K 输入 token 单价
OutputPer1K float64 `gorm:"column:output_per_1k"` // 每 1K 输出 token 单价
Currency string `gorm:"size:8"` // 币种(CNY / USD…)
}
func (Pricing) TableName() string { return "sundynix_pricing" }
// ListPricing 列出全部计价配置。
func (p *Postgres) ListPricing(ctx context.Context) ([]Pricing, error) {
if p.db == nil {
return nil, nil
}
var rows []Pricing
err := p.db.WithContext(ctx).Find(&rows).Error
return rows, err
}
// UpsertPricing 写入/更新某模型的计价(model_id 唯一,重复即覆盖单价/币种)。
func (p *Postgres) UpsertPricing(ctx context.Context, modelID string, inPer1K, outPer1K float64, currency string) error {
if p.db == nil {
return errStoreDisabled
}
return p.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "model_id"}},
DoUpdates: clause.Assignments(map[string]any{"input_per_1k": inPer1K, "output_per_1k": outPer1K, "currency": currency, "updated_at": time.Now()}),
}).Create(&Pricing{ModelID: modelID, InputPer1K: inPer1K, OutputPer1K: outPer1K, Currency: currency}).Error
}
// LLMModel 是一个模型后端配置(控制面:管理员在此登记可用模型)。
// 表名 sundynix_model(遵守前缀约定)。每个 kind 同一时刻仅一条 Active=true。
type LLMModel struct {
+1 -1
View File
@@ -39,7 +39,7 @@ func OpenPostgres(dsn string) *Postgres {
migrateLegacyIntIDs(db)
migrateDocLinkToID(db)
if err := db.AutoMigrate(&User{}, &Task{}, &LLMModel{}, &KB{}, &Doc{}, &Agent{}, &DocLink{}); err != nil {
if err := db.AutoMigrate(&User{}, &Task{}, &LLMModel{}, &KB{}, &Doc{}, &Agent{}, &DocLink{}, &Pricing{}); err != nil {
log.Printf("[store] postgres AutoMigrate 失败,降级运行: %v", err)
return &Postgres{}
}
+135
View File
@@ -0,0 +1,135 @@
package mcp
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/sundynix/sundynix-shared/contract"
)
const (
extTimeout = 10 * time.Second
extMaxBytes = 256 * 1024 // 响应体读取上限
)
// externalAPI 是通用出站 HTTP 工具(GET/POST)。带 SSRF 防护:拒环回/内网/链路本地/
// 云元数据地址;可选 EXTERNAL_API_ALLOWLIST 收窄到白名单主机。限超时 + 限响应体大小 + 限重定向。
func (g *Gateway) externalAPI(ctx context.Context, call *contract.ToolCall) *contract.ToolResult {
raw, _ := call.Args["url"].(string)
raw = strings.TrimSpace(raw)
if raw == "" {
return &contract.ToolResult{OK: false, Error: "external_api: url 必填"}
}
method := strings.ToUpper(strings.TrimSpace(fmt.Sprint(call.Args["method"])))
if method == "" || method == "<NIL>" {
method = "GET"
}
if method != "GET" && method != "POST" {
return &contract.ToolResult{OK: false, Error: "external_api: 仅支持 GET/POST"}
}
allow := extAllowlist()
if reason, ok := validateExternalURL(raw, allow); !ok {
return &contract.ToolResult{OK: false, Error: "external_api: URL 被拦截 —— " + reason}
}
var body io.Reader
if b, _ := call.Args["body"].(string); b != "" {
body = strings.NewReader(b)
}
req, err := http.NewRequestWithContext(ctx, method, raw, body)
if err != nil {
return &contract.ToolResult{OK: false, Error: "external_api: " + err.Error()}
}
if hm, ok := call.Args["headers"].(map[string]any); ok {
for k, v := range hm {
req.Header.Set(k, fmt.Sprint(v))
}
}
client := &http.Client{
Timeout: extTimeout,
CheckRedirect: func(r *http.Request, via []*http.Request) error {
if len(via) >= 3 {
return fmt.Errorf("重定向过多")
}
if reason, ok := validateExternalURL(r.URL.String(), allow); !ok {
return fmt.Errorf("重定向被拦截:%s", reason)
}
return nil
},
}
resp, err := client.Do(req)
if err != nil {
return &contract.ToolResult{OK: false, Error: "external_api: " + err.Error()}
}
defer resp.Body.Close()
data, _ := io.ReadAll(io.LimitReader(resp.Body, extMaxBytes))
return &contract.ToolResult{OK: true, Content: fmt.Sprintf("HTTP %d\n%s", resp.StatusCode, string(data))}
}
// extAllowlist 读取 EXTERNAL_API_ALLOWLIST(逗号分隔主机);空则不限主机(仍有 SSRF 防护)。
func extAllowlist() []string {
v := strings.TrimSpace(os.Getenv("EXTERNAL_API_ALLOWLIST"))
if v == "" {
return nil
}
var out []string
for _, p := range strings.Split(v, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
// validateExternalURL 校验出站 URLscheme 限 http/https;可选白名单;解析出的 IP 不得为
// 环回/内网/链路本地/未指定(防 SSRF 打内部服务与 169.254.169.254 云元数据)。
func validateExternalURL(raw string, allow []string) (reason string, ok bool) {
u, err := url.Parse(raw)
if err != nil || (u.Scheme != "http" && u.Scheme != "https") {
return "仅支持 http/https", false
}
host := u.Hostname()
if host == "" {
return "缺少主机", false
}
if len(allow) > 0 && !hostAllowed(host, allow) {
return "主机不在允许清单", false
}
ips, err := net.LookupIP(host)
if err != nil || len(ips) == 0 {
return "域名解析失败", false
}
for _, ip := range ips {
if isBlockedIP(ip) {
return "禁止访问内网/环回/元数据地址", false
}
}
return "", true
}
func hostAllowed(host string, allow []string) bool {
host = strings.ToLower(host)
for _, a := range allow {
a = strings.ToLower(a)
if host == a || strings.HasSuffix(host, "."+a) {
return true
}
}
return false
}
// isBlockedIP 判断 IP 是否属于禁止出站的范围(SSRF 防护)。
func isBlockedIP(ip net.IP) bool {
return ip.IsLoopback() || // 127.0.0.0/8, ::1
ip.IsPrivate() || // 10/8, 172.16/12, 192.168/16, fc00::/7
ip.IsLinkLocalUnicast() || // 169.254/16(含云元数据), fe80::/10
ip.IsLinkLocalMulticast() ||
ip.IsUnspecified() // 0.0.0.0, ::
}
@@ -0,0 +1,55 @@
package mcp
import (
"net"
"testing"
)
func TestIsBlockedIP(t *testing.T) {
blocked := []string{"127.0.0.1", "10.1.2.3", "172.16.0.5", "192.168.1.1", "169.254.169.254", "0.0.0.0", "::1"}
for _, s := range blocked {
if !isBlockedIP(net.ParseIP(s)) {
t.Errorf("%s 应被拦截(内网/环回/元数据)", s)
}
}
allowed := []string{"8.8.8.8", "1.1.1.1", "93.184.216.34"}
for _, s := range allowed {
if isBlockedIP(net.ParseIP(s)) {
t.Errorf("%s 是公网,不应被拦截", s)
}
}
}
func TestValidateExternalURL_Scheme(t *testing.T) {
for _, raw := range []string{"ftp://x/y", "file:///etc/passwd", "not a url", "ws://h"} {
if reason, ok := validateExternalURL(raw, nil); ok {
t.Errorf("%q 应被拒(非 http/https, got ok reason=%q", raw, reason)
}
}
}
func TestValidateExternalURL_SSRF(t *testing.T) {
// 字面量 IP 不走真实 DNS,可离线判定。
for _, raw := range []string{"http://127.0.0.1/admin", "http://169.254.169.254/latest/meta-data/", "http://10.0.0.1/"} {
if _, ok := validateExternalURL(raw, nil); ok {
t.Errorf("%q 应被 SSRF 防护拦截", raw)
}
}
// 公网字面量放行。
if reason, ok := validateExternalURL("http://8.8.8.8/", nil); !ok {
t.Errorf("公网 IP 应放行, got %q", reason)
}
}
func TestValidateExternalURL_Allowlist(t *testing.T) {
allow := []string{"api.github.com"}
if _, ok := validateExternalURL("http://8.8.8.8/", allow); ok {
t.Error("白名单生效时,非白名单主机应被拒")
}
if !hostAllowed("api.github.com", allow) || !hostAllowed("sub.api.github.com", allow) {
t.Error("白名单主机及其子域应放行")
}
if hostAllowed("evil.com", allow) {
t.Error("非白名单主机不应放行")
}
}
+2
View File
@@ -64,6 +64,8 @@ func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contra
return g.reportStore(ctx, call)
case "report_export":
return g.reportExport(ctx, call)
case "external_api":
return g.externalAPI(ctx, call)
case "health":
data, _ := json.Marshal(g.rag.Status())
return &contract.ToolResult{OK: true, Content: string(data)}