Files
sundynix-agentix/sundynix-gateway/internal/handler/task_handler.go
T
Blizzard 4928ffc0f7 feat: 短期多轮历史接入 Eino 图 MessagesPlaceholder (⑨)
会话历史(Redis,易失,与长期画像分开)经 MCP 工具进出 Eino 图:
recall 召回历史填 MessagesPlaceholder,写回把本轮 user/assistant 落历史。

- mcp-go: internal/history(go-redis, sundynix:history:<session>, LPUSH+LTRIM 保留近20条,
  24h TTL) + 工具 history_get(返回JSON turns)/history_append; main 开 Redis(降级)
- dispatcher Eino: 模板加 MessagesPlaceholder('history'); recall 调 history_get→转 schema.Message;
  Handle 累积 answer; memorize 异步 history_append(user+assistant)
- shared: contract.MetaSessionID; gateway: SubmitTask 注入 Meta[session_id](X-Session-ID 头,缺省 default)
- demo.sh: 同会话两轮提交,验证第2轮召回第1轮历史
- 验证: 4 模块 build✓ + 3 e2e PASS; live 跑通——轮1=0轮历史→落库, 轮2 history_get 命中→注入

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:18:45 +08:00

145 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package handler 实现网关的 HTTP 处理器。
package handler
import (
"encoding/json"
"io"
"log"
"net/http"
"github.com/gin-gonic/gin"
"github.com/sundynix/sundynix-gateway/internal/dsl"
"github.com/sundynix/sundynix-gateway/internal/nats"
"github.com/sundynix/sundynix-gateway/internal/store"
"github.com/sundynix/sundynix-shared/contract"
)
type Handler struct {
db *store.Postgres
cache *store.Redis
bus *nats.Bus
}
func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *Handler {
return &Handler{db: db, cache: cache, bus: bus}
}
// SubmitTask: 解析客户端导出的 JSON DSL,组装为 TaskPublish 到 sundynix.tasks.*。
func (h *Handler) SubmitTask(c *gin.Context) {
var raw json.RawMessage
if err := c.ShouldBindJSON(&raw); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task, err := dsl.ParseAndAssemble(raw) // Task DSL Parser & Assembly
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": err.Error()})
return
}
// 附上用户标识(召回偏好记忆)与会话标识(召回短期多轮历史)。
// 真实场景由鉴权/会话中间件注入;此处用请求头,缺省匿名/默认会话。
task.Meta[contract.MetaUserID] = userID(c)
task.Meta[contract.MetaSessionID] = sessionID(c)
// 持久化任务提交(best-effort:降级模式下静默跳过,不阻断发布)。
if err := h.db.SaveTask(c.Request.Context(), task.ID, string(task.Graph)); err != nil {
log.Printf("[gateway] save task %s failed: %v", task.ID, err)
}
if err := h.bus.PublishTask(c.Request.Context(), task); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusAccepted, gin.H{"task_id": task.ID})
}
// StreamTask: 订阅 sundynix.streams.<task_id>,以 SSE 把零拷贝 Token Stream 推给客户端。
func (h *Handler) StreamTask(c *gin.Context) {
taskID := c.Param("id")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
tokens := make(chan []byte, 256)
done := make(chan struct{})
unsub, err := h.bus.SubscribeTokens(taskID,
func(tok []byte) {
select {
case tokens <- tok:
default: // 背压保护:客户端过慢则丢弃,避免阻塞 NATS 回调
}
},
func() { close(done) },
)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer func() { _ = unsub() }()
// gin 的流式写:返回 false 即结束响应。
c.Stream(func(w io.Writer) bool {
select {
case tok := <-tokens:
c.SSEvent("token", string(tok))
return true
case <-done:
c.SSEvent("done", taskID)
return false
case <-c.Request.Context().Done():
return false
}
})
}
// SetMemory: 写入/更新一条用户偏好记忆,经 NATS 调 mcp-go 的 memory_upsert 工具。
// 桌面端"偏好记忆面板"可用它让用户显式登记/纠正模型对自己的记忆。
func (h *Handler) SetMemory(c *gin.Context) {
var body struct {
Key string `json:"key"`
Value string `json:"value"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.Key == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "key/value required"})
return
}
res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("memory_upsert"),
&contract.ToolCall{Tool: "memory_upsert", Args: map[string]any{
"user_id": userID(c), "key": body.Key, "value": body.Value,
}})
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
if !res.OK {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content})
}
// userID 从请求取已登录用户标识(真实场景应由鉴权中间件注入)。
func userID(c *gin.Context) string {
if u := c.GetHeader("X-User-ID"); u != "" {
return u
}
return "anonymous"
}
// sessionID 从请求取会话标识(真实场景应由会话中间件注入)。
func sessionID(c *gin.Context) string {
if s := c.GetHeader("X-Session-ID"); s != "" {
return s
}
return "default"
}
func (h *Handler) Billing(c *gin.Context) {
// TODO: 商业化与计费模块;暂以已提交任务计数演示真实读库。
n, err := h.db.CountTasks(c.Request.Context())
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "tasks_submitted": n, "persisted": h.db.Enabled()})
}