Files
sundynix-agentix/sundynix-gateway/internal/handler/task_handler.go
T
Blizzard 9657a07bb5 feat(auth): 鉴权片2 —— 前端登录闭环 + 保护路由 + 去掉 header 兜底
把 JWT 鉴权从后端核心闭环到端到端:

后端:
- middleware.RequireAuth:上下文无已验证 uid 则 401;挂在 owner 作用域业务路由组。
- 路由拆 公开/受保护:公开=auth/health + 按 task_id 寻址的 SSE 与报告导出
  (EventSource/下载无法带 Bearer);受保护=tasks/memory/kb*/agents/reports/billing。
- userID(c) 去掉 X-User-ID 兜底,仅信任 JWT 注入的 uid。
- 修 CORS:Allow-Headers 增 Authorization(否则浏览器拦截带 Bearer 的请求)。

前端:
- lib/api:token 存 localStorage + Bearer 头(不再发 X-User-ID)+ authRegister/Login/Me
  + 401 清令牌并广播 sdx:logout;submitTask/report/memory/列表加载走 Bearer 与 401 守卫。
- views/Login:登录/注册全屏门。
- App:启动校验令牌 → 无则渲染 Login,有则进主应用;identity.userId=已验证 user.id;
  监听 sdx:logout 回登录页。
- TopBar:去掉可编辑身份输入,改显登录用户 + 登出。

实跑验证(docker+gateway+preview):
- RequireAuth:无 token /kb/list、/agents → 401;/health → 200;带 token → 200。
- 前端:无 token 显登录门;注入有效 token 重载 → 进主应用、顶栏显 Dexter、KB 加载本人库、
  隔离徽标显雪花 uid。控制台无错、生产构建通过。
- 过程中发现并修复 CORS 缺 Authorization 头的真实 bug。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 16:32:00 +08:00

215 lines
6.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package handler 实现网关的 HTTP 处理器。
package handler
import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-gateway/internal/blob"
"github.com/sundynix/sundynix-gateway/internal/dsl"
"github.com/sundynix/sundynix-gateway/internal/nats"
"github.com/sundynix/sundynix-gateway/internal/store"
"github.com/sundynix/sundynix-shared/contract"
)
type Handler struct {
db *store.Postgres
cache *store.Redis
bus *nats.Bus
blob *blob.Store
}
func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus, blob *blob.Store) *Handler {
return &Handler{db: db, cache: cache, bus: bus, blob: blob}
}
// SubmitTask: 解析客户端导出的 JSON DSL,组装为 TaskPublish 到 sundynix.tasks.*。
func (h *Handler) SubmitTask(c *gin.Context) {
var raw json.RawMessage
if err := c.ShouldBindJSON(&raw); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task, err := dsl.ParseAndAssemble(raw) // Task DSL Parser & Assembly
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": err.Error()})
return
}
// 附上用户标识(召回偏好记忆)与会话标识(召回短期多轮历史)。
// 真实场景由鉴权/会话中间件注入;此处用请求头,缺省匿名/默认会话。
task.Meta[contract.MetaUserID] = userID(c)
task.Meta[contract.MetaSessionID] = sessionID(c)
// 持久化任务提交(best-effort:降级模式下静默跳过,不阻断发布)。
if err := h.db.SaveTask(c.Request.Context(), task.ID, string(task.Graph)); err != nil {
log.Printf("[gateway] save task %s failed: %v", task.ID, err)
}
if err := h.bus.PublishTask(c.Request.Context(), task); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusAccepted, gin.H{"task_id": task.ID})
}
// StreamTask: 订阅 sundynix.streams.<task_id>,以 SSE 把零拷贝 Token Stream 推给客户端。
func (h *Handler) StreamTask(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
tokens := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(taskID,
func(tok []byte) {
select {
case tokens <- tok:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
// gin 的流式写:返回 false 即结束响应。
c.Stream(func(w io.Writer) bool {
select {
case tok := <-tokens:
c.SSEvent("token", string(tok))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// Health: GET /api/v1/health —— 聚合各依赖子系统健康,供桌面端顶栏五盏灯实时点亮。
// gateway/db/redis/nats 网关本地可判;milvus/neo4j 经 mcp-go health 工具取(不可用则置否)。
func (h *Handler) Health(c *gin.Context) {
status := gin.H{
"gateway": true, // 能应答即在线
"nats": true, // 网关启动即连上 NATS(连不上会 fatal)
"db": h.db.Enabled(), // Postgres
"redis": h.cache.Enabled(), // Redis
"milvus": false,
"neo4j": false,
}
cctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second)
defer cancel()
if res, err := h.bus.CallTool(cctx, contract.ToolSubjectGo("health"),
&contract.ToolCall{Tool: "health"}); err == nil && res != nil && res.OK {
var sub map[string]bool
if json.Unmarshal([]byte(res.Content), &sub) == nil {
status["milvus"] = sub["milvus"]
status["neo4j"] = sub["neo4j"]
}
}
c.JSON(http.StatusOK, status)
}
// StreamExec: 订阅 sundynix.exec.<task_id>,以 SSE 把执行轨迹事件推给客户端(运行·观测)。
// 与 StreamTasktoken 流)并行:前端同时连两路,token 走输出、exec 走轨迹/工具面板。
func (h *Handler) StreamExec(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeExec(taskID,
func(ev []byte) {
select {
case events <- ev:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("exec", string(ev))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// SetMemory: 写入/更新一条用户偏好记忆,经 NATS 调 mcp-go 的 memory_upsert 工具。
// 桌面端"偏好记忆面板"可用它让用户显式登记/纠正模型对自己的记忆。
func (h *Handler) SetMemory(c *gin.Context) {
var body struct {
Key string `json:"key"`
Value string `json:"value"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Key == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "key/value required"})
return
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("memory_upsert"),
&contract.ToolCall{Tool: "memory_upsert", Args: map[string]any{
"user_id": userID(c), "key": body.Key, "value": body.Value,
}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content})
}
// userID 取当前用户标识 —— 仅信任 JWT 鉴权中间件注入的已验证 uid(不再认 header)。
// 受保护路由有 RequireAuth 兜底,此处理论上不会返回 anonymous。
func userID(c *gin.Context) string {
if v, ok := c.Get("uid"); ok {
if s, _ := v.(string); s != "" {
return s
}
}
return "anonymous"
}
// sessionID 从请求取会话标识(真实场景应由会话中间件注入)。
func sessionID(c *gin.Context) string {
if s := c.GetHeader("X-Session-ID"); s != "" {
return s
}
return "default"
}
func (h *Handler) Billing(c *gin.Context) {
// TODO: 商业化与计费模块;暂以已提交任务计数演示真实读库。
n, err := h.db.CountTasks(c.Request.Context())
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "tasks_submitted": n, "persisted": h.db.Enabled()})
}