79f9912615
项目级 DB 规约(雪花字符串 id + created_at/updated_at + 软删索引)此前只在网关落地, mcp-go 的 sundynix_user_profile(Profile) 仍是旧复合主键 (user_id,key)、无 id/时间戳—— 本次对齐,全库统一。 - 新增 BaseModel + NewID(snowflake node=2,与网关 node=1 区分,避免共享 PG 中 id 冲突)。 - Profile 嵌 BaseModel:雪花 id 主键 + (user_id,key) 唯一索引。 - Upsert 改 OnConflict((user_id,key)),冲突覆盖 value 且保留原 id/created_at。 - 一次性迁移 migrateLegacyProfile:检测旧表(无 id 列)→ 备份偏好 → drop → 按新规约重建 → 回灌。 - 加 mcp-go 首个集成测试(store_test.go,gated on MEMORY_TEST_DSN): · TestBaseModelID 纯单测(id 非空/不重/BeforeCreate 补id不覆盖); · Integration:迁移后字段齐全 + Upsert 同键覆盖不新增 + id 稳定 + Get 渲染; · LegacyMigration:旧复合主键表 + 1 条偏好 → 迁移后 id 列存在、数据保留补新 id。 实测(docker postgres):三测全过,迁移日志确认旧偏好回灌保留 ✓。 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
146 lines
5.1 KiB
Go
146 lines
5.1 KiB
Go
// Package memory 是偏好记忆的存储后端(第 5 层 I/O 型工具持有)。
|
||
// 常驻画像存 Postgres,按 sundynix_ 前缀约定 + AutoMigrate 自动迁移。
|
||
package memory
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"sort"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/bwmarrin/snowflake"
|
||
"gorm.io/driver/postgres"
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/clause"
|
||
"gorm.io/gorm/logger"
|
||
)
|
||
|
||
// sfNode 是本服务的雪花 ID 生成器。node=2 与网关(node=1)区分,避免共享 PG 中 ID 冲突。
|
||
var sfNode *snowflake.Node
|
||
|
||
func init() { sfNode, _ = snowflake.NewNode(2) }
|
||
|
||
// NewID 生成字符串型雪花 ID(项目级 DB 规约:主键统一用它)。
|
||
func NewID() string { return sfNode.Generate().String() }
|
||
|
||
// BaseModel 是所有 DB 映射结构体的基础字段(项目级规约):
|
||
// 字符串雪花 ID 主键 + 创建/更新时间 + GORM 软删(带索引)。与网关 store.BaseModel 同规约。
|
||
type BaseModel struct {
|
||
ID string `gorm:"primaryKey;size:24" json:"id"`
|
||
CreatedAt time.Time `json:"created_at"`
|
||
UpdatedAt time.Time `json:"updated_at"`
|
||
DeletedAt gorm.DeletedAt `gorm:"index" json:"-"`
|
||
}
|
||
|
||
// BeforeCreate 在插入前补雪花 ID。
|
||
func (b *BaseModel) BeforeCreate(*gorm.DB) error {
|
||
if b.ID == "" {
|
||
b.ID = NewID()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Profile 是用户常驻画像的一条 key/value 偏好(always-on memory)。
|
||
// 套用 BaseModel 规约:雪花 ID 主键;(user_id, key) 唯一索引 —— 同一用户同一键 upsert 覆盖。
|
||
type Profile struct {
|
||
BaseModel
|
||
UserID string `gorm:"column:user_id;size:64;uniqueIndex:idx_profile_uk"`
|
||
Key string `gorm:"size:64;uniqueIndex:idx_profile_uk"`
|
||
Value string `gorm:"type:text"`
|
||
}
|
||
|
||
// TableName 固定表名,遵守 sundynix_ 前缀约定。
|
||
func (Profile) TableName() string { return "sundynix_user_profile" }
|
||
|
||
// Store 封装画像读写。db 为 nil 表示降级(无 Postgres 时记忆功能空转,不阻断工具服务)。
|
||
type Store struct{ db *gorm.DB }
|
||
|
||
// Open 连接 Postgres 并自动迁移 sundynix_user_profile。连接失败不 fatal:返回降级实例。
|
||
func Open(dsn string) *Store {
|
||
db, err := gorm.Open(postgres.New(postgres.Config{DSN: dsn}), &gorm.Config{
|
||
Logger: logger.Default.LogMode(logger.Silent),
|
||
})
|
||
if err != nil {
|
||
log.Printf("[memory] postgres 不可用,记忆降级(召回为空): %v", err)
|
||
return &Store{}
|
||
}
|
||
// 一次性迁移:旧表用复合主键 (user_id,key) 无 id/时间戳,与雪花规约不兼容。
|
||
migrateLegacyProfile(db)
|
||
if err := db.AutoMigrate(&Profile{}); err != nil {
|
||
log.Printf("[memory] AutoMigrate 失败,记忆降级: %v", err)
|
||
return &Store{}
|
||
}
|
||
log.Println("[memory] postgres connected, migrated sundynix_user_profile (雪花 id + 软删 规约)")
|
||
return &Store{db: db}
|
||
}
|
||
|
||
// migrateLegacyProfile 检测旧 Profile 表(无 id 列)则备份偏好 → 重建为雪花规约 → 回灌。
|
||
func migrateLegacyProfile(db *gorm.DB) {
|
||
m := db.Migrator()
|
||
if !m.HasTable("sundynix_user_profile") {
|
||
return // 全新库,AutoMigrate 直接建新表
|
||
}
|
||
if m.HasColumn(&Profile{}, "id") {
|
||
return // 已是新规约
|
||
}
|
||
log.Println("[memory] 检测到旧 Profile 表(复合主键无 id),迁移到雪花规约(保留偏好)")
|
||
var saved []struct {
|
||
UserID string `gorm:"column:user_id"`
|
||
Key string `gorm:"column:key"`
|
||
Value string `gorm:"column:value"`
|
||
}
|
||
db.Raw(`SELECT user_id, "key", value FROM sundynix_user_profile`).Scan(&saved)
|
||
if err := m.DropTable("sundynix_user_profile"); err != nil {
|
||
log.Printf("[memory] 迁移 drop 旧表失败: %v", err)
|
||
return
|
||
}
|
||
if err := db.AutoMigrate(&Profile{}); err != nil {
|
||
log.Printf("[memory] 迁移建新表失败: %v", err)
|
||
return
|
||
}
|
||
for _, r := range saved {
|
||
_ = db.Create(&Profile{UserID: r.UserID, Key: r.Key, Value: r.Value}).Error
|
||
}
|
||
log.Printf("[memory] 已回灌 %d 条偏好(新雪花 id)", len(saved))
|
||
}
|
||
|
||
// Get 返回某用户的画像,渲染为可直接注入 prompt 的多行文本(按 key 排序,稳定输出)。
|
||
func (s *Store) Get(ctx context.Context, userID string) (string, error) {
|
||
if s.db == nil || userID == "" {
|
||
return "", nil
|
||
}
|
||
var rows []Profile
|
||
if err := s.db.WithContext(ctx).Where("user_id = ?", userID).Find(&rows).Error; err != nil {
|
||
return "", err
|
||
}
|
||
sort.Slice(rows, func(i, j int) bool { return rows[i].Key < rows[j].Key })
|
||
var b strings.Builder
|
||
for _, r := range rows {
|
||
fmt.Fprintf(&b, "- %s:%s\n", r.Key, r.Value)
|
||
}
|
||
return strings.TrimRight(b.String(), "\n"), nil
|
||
}
|
||
|
||
// Upsert 写入/更新一条画像偏好((user_id,key) 冲突即覆盖 value,保留原 id)。
|
||
func (s *Store) Upsert(ctx context.Context, userID, key, value string) error {
|
||
if s.db == nil {
|
||
return fmt.Errorf("memory store disabled")
|
||
}
|
||
return s.db.WithContext(ctx).Clauses(clause.OnConflict{
|
||
Columns: []clause.Column{{Name: "user_id"}, {Name: "key"}},
|
||
DoUpdates: clause.Assignments(map[string]any{"value": value, "updated_at": time.Now()}),
|
||
}).Create(&Profile{UserID: userID, Key: key, Value: value}).Error
|
||
}
|
||
|
||
// Close 释放连接。
|
||
func (s *Store) Close() {
|
||
if s.db == nil {
|
||
return
|
||
}
|
||
if sqlDB, err := s.db.DB(); err == nil {
|
||
_ = sqlDB.Close()
|
||
}
|
||
}
|