package rag import ( "context" "fmt" "log" "strconv" "strings" "sync" "github.com/milvus-io/milvus-sdk-go/v2/client" "github.com/milvus-io/milvus-sdk-go/v2/entity" ) const collection = "sundynix_wiki" // Wiki/知识库向量集合 // milvusStore 封装 Milvus 连接与集合管理(集合按首次写入的向量维度懒建)。 type milvusStore struct { cli client.Client mu sync.Mutex dim int // 已建集合的维度(0=未建) ok bool // 集合是否就绪 } func openMilvus(ctx context.Context, addr string) (*milvusStore, error) { cli, err := client.NewClient(ctx, client.Config{Address: addr}) if err != nil { return nil, err } return &milvusStore{cli: cli}, nil } func (m *milvusStore) close() { if m != nil && m.cli != nil { _ = m.cli.Close() } } // ensure 幂等地按维度 dim 建集合 + 向量索引 + 加载(首次写入时调用)。 func (m *milvusStore) ensure(ctx context.Context, dim int) error { m.mu.Lock() defer m.mu.Unlock() if m.ok && m.dim == dim { return nil } has, err := m.cli.HasCollection(ctx, collection) if err != nil { return err } // 已存集合维度不一致(如切换 embedding 模型)→ 重建。 if has { if coll, derr := m.cli.DescribeCollection(ctx, collection); derr == nil { if existing := vectorDim(coll); existing != 0 && existing != dim { log.Printf("[rag] 集合维度 %d≠%d,重建 %s", existing, dim, collection) if err := m.cli.DropCollection(ctx, collection); err != nil { return fmt.Errorf("drop collection: %w", err) } has = false } } } if !has { schema := entity.NewSchema().WithName(collection).WithDescription("sundynix wiki vectors"). WithField(entity.NewField().WithName("id").WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true).WithIsAutoID(true)). WithField(entity.NewField().WithName("kb").WithDataType(entity.FieldTypeVarChar).WithMaxLength(64)). WithField(entity.NewField().WithName("text").WithDataType(entity.FieldTypeVarChar).WithMaxLength(8192)). WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithDim(int64(dim))) if err := m.cli.CreateCollection(ctx, schema, 1); err != nil { return fmt.Errorf("create collection: %w", err) } idx, _ := entity.NewIndexAUTOINDEX(entity.COSINE) if err := m.cli.CreateIndex(ctx, collection, "vector", idx, false); err != nil { return fmt.Errorf("create index: %w", err) } } if err := m.cli.LoadCollection(ctx, collection, false); err != nil { return fmt.Errorf("load collection: %w", err) } m.dim, m.ok = dim, true return nil } // invalidate 让集合就绪缓存失效(集合被外部删除/基础设施重启丢失后,强制下次重建)。 func (m *milvusStore) invalidate() { m.mu.Lock() m.ok = false m.mu.Unlock() } // isCollectionGone 判断错误是否为"集合不存在"(Milvus 重启丢集合后写/查会报此类)。 func isCollectionGone(err error) bool { if err == nil { return false } s := strings.ToLower(err.Error()) return strings.Contains(s, "collection not found") || strings.Contains(s, "can't find collection") || strings.Contains(s, "collection not exist") || strings.Contains(s, "collection not loaded") } // insert 写入若干 (kb, text, vector)。 // 若集合在运行期被丢失(如 Milvus 重启)→ 清缓存、重建集合后重试一次,避免必须重启进程才能恢复。 func (m *milvusStore) insert(ctx context.Context, kb string, texts []string, vecs [][]float32) error { if len(vecs) == 0 { return nil } dim := len(vecs[0]) kbs := make([]string, len(texts)) for i := range kbs { kbs[i] = kb } do := func() error { if err := m.ensure(ctx, dim); err != nil { return err } if _, err := m.cli.Insert(ctx, collection, "", entity.NewColumnVarChar("kb", kbs), entity.NewColumnVarChar("text", texts), entity.NewColumnFloatVector("vector", dim, vecs), ); err != nil { return fmt.Errorf("insert: %w", err) } return m.cli.Flush(ctx, collection, false) } err := do() if err != nil && isCollectionGone(err) { log.Printf("[rag] 集合不存在(疑似 Milvus 重启丢失),清缓存重建后重试写入") m.invalidate() err = do() } return err } // vectorDim 从集合 schema 读出向量字段维度(用于检测维度变化)。 func vectorDim(coll *entity.Collection) int { if coll == nil || coll.Schema == nil { return 0 } for _, f := range coll.Schema.Fields { if f.DataType == entity.FieldTypeFloatVector { if d, ok := f.TypeParams["dim"]; ok { n, _ := strconv.Atoi(d) return n } } } return 0 } // Hit 是一条检索结果。 type Hit struct { Text string `json:"text"` Score float32 `json:"score"` } // search 用查询向量做 topK 向量检索(可按 kb 过滤)。 // 集合未建(还没入过库)→ 返回空结果;集合运行期丢失 → 清缓存重建后重试一次。 func (m *milvusStore) search(ctx context.Context, kb string, qvec []float32, topK int) ([]Hit, error) { expr := "" if kb != "" { expr = fmt.Sprintf("kb == \"%s\"", kb) } sp, _ := entity.NewIndexAUTOINDEXSearchParam(1) do := func() ([]client.SearchResult, error) { if err := m.ensure(ctx, len(qvec)); err != nil { return nil, err } return m.cli.Search(ctx, collection, nil, expr, []string{"text"}, []entity.Vector{entity.FloatVector(qvec)}, "vector", entity.COSINE, topK, sp) } results, err := do() if err != nil && isCollectionGone(err) { log.Printf("[rag] 检索时集合不存在,清缓存重建后重试") m.invalidate() results, err = do() } if err != nil { // 集合尚未就绪/无法重建 → 降级空结果(不阻断混合检索其它路)。 return nil, nil } var hits []Hit for _, r := range results { textCol := r.Fields.GetColumn("text") for i := 0; i < r.ResultCount; i++ { text := "" if textCol != nil { if s, err := textCol.GetAsString(i); err == nil { text = s } } var score float32 if i < len(r.Scores) { score = r.Scores[i] } hits = append(hits, Hit{Text: text, Score: score}) } } return hits, nil }