Files
sundynix-agentix/sundynix-gateway/internal/handler/task_handler.go
T
Blizzard d5ae2f71d4 feat(desktop): 工业化升级 D —— 内容可信(Markdown 渲染 + 健康五灯全真)
- components/Markdown.tsx:零依赖、行级 Markdown 渲染(# 标题 / **粗** *斜* `码` /
  - 与 1. 列表 / > 引用 / --- 分隔 / 段落),流式安全(每 token 重渲染容忍残缺)。
  报告正文与运行输出从裸 <pre> 换成真排版,瞬间像份报告。
- 健康聚合:mcp-go 加 rag.Status() + health 工具(milvus/neo4j/embedding 就绪);
  gateway GET /api/v1/health 聚合 gateway/nats/db/redis(本地) + milvus/neo4j(经 mcp-go);
  health.ts 轮询 /health,TopBar 五盏灯(Gateway/DB/NATS/Milvus/Neo4j)从"灰=未知"变真实绿/红。

验证:浏览器(Preview)跑报告——正文以标题/有序列表/引用/分隔线/二级标题排版呈现;
五盏灯全绿(/health 返回 db/gateway/milvus/nats/neo4j/redis 全 true)。tsc + vite build + 后端 build 通过。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 17:34:54 +08:00

210 lines
6.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package handler 实现网关的 HTTP 处理器。
package handler
import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-gateway/internal/dsl"
"github.com/sundynix/sundynix-gateway/internal/nats"
"github.com/sundynix/sundynix-gateway/internal/store"
"github.com/sundynix/sundynix-shared/contract"
)
type Handler struct {
db *store.Postgres
cache *store.Redis
bus *nats.Bus
}
func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *Handler {
return &Handler{db: db, cache: cache, bus: bus}
}
// SubmitTask: 解析客户端导出的 JSON DSL,组装为 TaskPublish 到 sundynix.tasks.*。
func (h *Handler) SubmitTask(c *gin.Context) {
var raw json.RawMessage
if err := c.ShouldBindJSON(&raw); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task, err := dsl.ParseAndAssemble(raw) // Task DSL Parser & Assembly
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": err.Error()})
return
}
// 附上用户标识(召回偏好记忆)与会话标识(召回短期多轮历史)。
// 真实场景由鉴权/会话中间件注入;此处用请求头,缺省匿名/默认会话。
task.Meta[contract.MetaUserID] = userID(c)
task.Meta[contract.MetaSessionID] = sessionID(c)
// 持久化任务提交(best-effort:降级模式下静默跳过,不阻断发布)。
if err := h.db.SaveTask(c.Request.Context(), task.ID, string(task.Graph)); err != nil {
log.Printf("[gateway] save task %s failed: %v", task.ID, err)
}
if err := h.bus.PublishTask(c.Request.Context(), task); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusAccepted, gin.H{"task_id": task.ID})
}
// StreamTask: 订阅 sundynix.streams.<task_id>,以 SSE 把零拷贝 Token Stream 推给客户端。
func (h *Handler) StreamTask(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
tokens := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(taskID,
func(tok []byte) {
select {
case tokens <- tok:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
// gin 的流式写:返回 false 即结束响应。
c.Stream(func(w io.Writer) bool {
select {
case tok := <-tokens:
c.SSEvent("token", string(tok))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// Health: GET /api/v1/health —— 聚合各依赖子系统健康,供桌面端顶栏五盏灯实时点亮。
// gateway/db/redis/nats 网关本地可判;milvus/neo4j 经 mcp-go health 工具取(不可用则置否)。
func (h *Handler) Health(c *gin.Context) {
status := gin.H{
"gateway": true, // 能应答即在线
"nats": true, // 网关启动即连上 NATS(连不上会 fatal)
"db": h.db.Enabled(), // Postgres
"redis": h.cache.Enabled(), // Redis
"milvus": false,
"neo4j": false,
}
cctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second)
defer cancel()
if res, err := h.bus.CallTool(cctx, contract.ToolSubjectGo("health"),
&contract.ToolCall{Tool: "health"}); err == nil && res != nil && res.OK {
var sub map[string]bool
if json.Unmarshal([]byte(res.Content), &sub) == nil {
status["milvus"] = sub["milvus"]
status["neo4j"] = sub["neo4j"]
}
}
c.JSON(http.StatusOK, status)
}
// StreamExec: 订阅 sundynix.exec.<task_id>,以 SSE 把执行轨迹事件推给客户端(运行·观测)。
// 与 StreamTasktoken 流)并行:前端同时连两路,token 走输出、exec 走轨迹/工具面板。
func (h *Handler) StreamExec(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
events := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeExec(taskID,
func(ev []byte) {
select {
case events <- ev:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
c.Stream(func(w io.Writer) bool {
select {
case ev := <-events:
c.SSEvent("exec", string(ev))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// SetMemory: 写入/更新一条用户偏好记忆,经 NATS 调 mcp-go 的 memory_upsert 工具。
// 桌面端"偏好记忆面板"可用它让用户显式登记/纠正模型对自己的记忆。
func (h *Handler) SetMemory(c *gin.Context) {
var body struct {
Key string `json:"key"`
Value string `json:"value"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Key == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "key/value required"})
return
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("memory_upsert"),
&contract.ToolCall{Tool: "memory_upsert", Args: map[string]any{
"user_id": userID(c), "key": body.Key, "value": body.Value,
}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content})
}
// userID 从请求取已登录用户标识(真实场景应由鉴权中间件注入)。
func userID(c *gin.Context) string {
if u := c.GetHeader("X-User-ID"); u != "" {
return u
}
return "anonymous"
}
// sessionID 从请求取会话标识(真实场景应由会话中间件注入)。
func sessionID(c *gin.Context) string {
if s := c.GetHeader("X-Session-ID"); s != "" {
return s
}
return "default"
}
func (h *Handler) Billing(c *gin.Context) {
// TODO: 商业化与计费模块;暂以已提交任务计数演示真实读库。
n, err := h.db.CountTasks(c.Request.Context())
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "tasks_submitted": n, "persisted": h.db.Enabled()})
}