Files
sundynix-agentix/sundynix-gateway/internal/handler/task_handler.go
T
Blizzard b6a6875795 feat(gateway): 可观测性 —— Prometheus 指标 + 结构化日志 + 探针
往"生产可运维"推一步(网关前门):
- Prometheus /metrics:sundynix_http_requests_total{method,route,status}、
  request_duration_seconds 直方图、requests_in_flight。route 用 c.FullPath()
  路由模板(/tasks/:id/...)避免按真实路径高基数。
- 结构化访问日志:slog JSON 到 stderr(request_id/method/route/status/latency_ms/
  ip/uid/bytes),替代 gin 默认文本日志;gin.New()+Recovery 自管中间件链。
- RequestID 中间件:生成/透传 X-Request-ID,写上下文+响应头,供日志关联。
- 探针:/healthz(liveness,不查依赖)、/readyz(readiness,DB+Redis 就绪才 200,
  否则 503),供 k8s 等导流判断;/api/v1/health 深度聚合保留。
- 三个根端点不挂业务鉴权(/metrics 生产应由网络层限制抓取来源)。

验证:单测(计数 +1 / X-Request-ID 生成与透传);实跑 /healthz 200、/readyz 200
(db,redis ready)、/metrics 输出真实指标、访问日志 JSON 正常、X-Request-ID 回写。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:38:31 +08:00

231 lines
7.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package handler 实现网关的 HTTP 处理器。
package handler
import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-gateway/internal/blob"
"github.com/sundynix/sundynix-gateway/internal/dsl"
"github.com/sundynix/sundynix-gateway/internal/nats"
"github.com/sundynix/sundynix-gateway/internal/store"
"github.com/sundynix/sundynix-shared/contract"
)
type Handler struct {
db *store.Postgres
cache *store.Redis
bus *nats.Bus
blob *blob.Store
}
func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus, blob *blob.Store) *Handler {
return &Handler{db: db, cache: cache, bus: bus, blob: blob}
}
// SubmitTask: 解析客户端导出的 JSON DSL,组装为 TaskPublish 到 sundynix.tasks.*。
func (h *Handler) SubmitTask(c *gin.Context) {
var raw json.RawMessage
if err := c.ShouldBindJSON(&raw); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task, err := dsl.ParseAndAssemble(raw) // Task DSL Parser & Assembly
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": err.Error()})
return
}
// 附上用户标识(召回偏好记忆)与会话标识(召回短期多轮历史)。
// 真实场景由鉴权/会话中间件注入;此处用请求头,缺省匿名/默认会话。
task.Meta[contract.MetaUserID] = userID(c)
task.Meta[contract.MetaSessionID] = sessionID(c)
// 持久化任务提交(best-effort:降级模式下静默跳过,不阻断发布)。
if err := h.db.SaveTask(c.Request.Context(), task.ID, string(task.Graph)); err != nil {
log.Printf("[gateway] save task %s failed: %v", task.ID, err)
}
if err := h.bus.PublishTask(c.Request.Context(), task); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusAccepted, gin.H{"task_id": task.ID})
}
// StreamTask: 订阅 sundynix.streams.<task_id>,以 SSE 把零拷贝 Token Stream 推给客户端。
func (h *Handler) StreamTask(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
tokens := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(taskID,
func(tok []byte) {
select {
case tokens <- tok:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
// gin 的流式写:返回 false 即结束响应。
c.Stream(func(w io.Writer) bool {
select {
case tok := <-tokens:
c.SSEvent("token", string(tok))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// Healthz: GET /healthz —— 存活探针(liveness):进程能应答即 200,不查依赖。
func (h *Handler) Healthz(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
// Readyz: GET /readyz —— 就绪探针(readiness):核心依赖(DB/Redis)可用才 200,否则 503。
// 供 k8s 等编排器在依赖未就绪时暂不导流。NATS 在启动时即连(连不上会 fatal),故不单列。
func (h *Handler) Readyz(c *gin.Context) {
deps := gin.H{"db": h.db.Enabled(), "redis": h.cache.Enabled()}
if h.db.Enabled() && h.cache.Enabled() {
c.JSON(http.StatusOK, gin.H{"status": "ready", "deps": deps})
return
}
c.JSON(http.StatusServiceUnavailable, gin.H{"status": "not_ready", "deps": deps})
}
// Health: GET /api/v1/health —— 聚合各依赖子系统健康,供桌面端顶栏五盏灯实时点亮。
// gateway/db/redis/nats 网关本地可判;milvus/neo4j 经 mcp-go health 工具取(不可用则置否)。
func (h *Handler) Health(c *gin.Context) {
status := gin.H{
"gateway": true, // 能应答即在线
"nats": true, // 网关启动即连上 NATS(连不上会 fatal)
"db": h.db.Enabled(), // Postgres
"redis": h.cache.Enabled(), // Redis
"milvus": false,
"neo4j": false,
}
cctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second)
defer cancel()
if res, err := h.bus.CallTool(cctx, contract.ToolSubjectGo("health"),
&contract.ToolCall{Tool: "health"}); err == nil && res != nil && res.OK {
var sub map[string]bool
if json.Unmarshal([]byte(res.Content), &sub) == nil {
status["milvus"] = sub["milvus"]
status["neo4j"] = sub["neo4j"]
}
}
c.JSON(http.StatusOK, status)
}
// StreamExec: 订阅 sundynix.exec.<task_id>,以 SSE 把执行轨迹事件推给客户端(运行·观测)。
// 与 StreamTasktoken 流)并行:前端同时连两路,token 走输出、exec 走轨迹/工具面板。
func (h *Handler) StreamExec(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeExec(taskID,
func(ev []byte) {
select {
case events <- ev:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("exec", string(ev))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// SetMemory: 写入/更新一条用户偏好记忆,经 NATS 调 mcp-go 的 memory_upsert 工具。
// 桌面端"偏好记忆面板"可用它让用户显式登记/纠正模型对自己的记忆。
func (h *Handler) SetMemory(c *gin.Context) {
var body struct {
Key string `json:"key"`
Value string `json:"value"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Key == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "key/value required"})
return
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("memory_upsert"),
&contract.ToolCall{Tool: "memory_upsert", Args: map[string]any{
"user_id": userID(c), "key": body.Key, "value": body.Value,
}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content})
}
// userID 取当前用户标识 —— 仅信任 JWT 鉴权中间件注入的已验证 uid(不再认 header)。
// 受保护路由有 RequireAuth 兜底,此处理论上不会返回 anonymous。
func userID(c *gin.Context) string {
if v, ok := c.Get("uid"); ok {
if s, _ := v.(string); s != "" {
return s
}
}
return "anonymous"
}
// sessionID 从请求取会话标识(真实场景应由会话中间件注入)。
func sessionID(c *gin.Context) string {
if s := c.GetHeader("X-Session-ID"); s != "" {
return s
}
return "default"
}
func (h *Handler) Billing(c *gin.Context) {
// TODO: 商业化与计费模块;暂以已提交任务计数演示真实读库。
n, err := h.db.CountTasks(c.Request.Context())
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "tasks_submitted": n, "persisted": h.db.Enabled()})
}