feat: 第一张真实 Eino 图 + 偏好记忆(让模型知道是我)
dispatcher 不再手搓 pool.Stream,改用编译好的 Eino 图驱动;接入用户常驻画像,
推理前召回并注入 system prompt,实现个性化(架构'心脏'首次真跳)。
Eino 图(dispatcher/internal/eino): START→recall→prompt→model→END + 全局 State
- recall(Lambda): 取 Meta[user_id] → 调 MCP memory_get → ProcessState 写画像
- prompt(ChatTemplate): {profile} 注入 system,{query} 作 user
- model: poolModel 适配 LLM Pool 为 model.BaseChatModel(Generate+Stream, schema.Pipe)
- 写回: 流排空后异步 memorize(流式节点走 OnEndWithStreamOutput 非 OnEndFn)
记忆存储(mcp-go owns): GORM Profile→sundynix_user_profile(复合主键, AutoMigrate,
遵守前缀约定), 新工具 memory_get/memory_upsert, 连不上降级
Gateway: SubmitTask 注入 Meta[user_id](X-User-ID 头), PUT /api/v1/memory→memory_upsert
shared: contract.MetaUserID; llm.Pool 拆出 StreamText
验证: 4 模块 build✓ + 3 e2e PASS; live 跑通——PUT 偏好落 sundynix_user_profile,
带 X-User-ID 提交→Eino recall 召回→注入→SSE 流出含画像的个性化回答, writeback 触发
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,72 @@
|
||||
package eino
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cloudwego/eino/components/prompt"
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
|
||||
"github.com/sundynix/sundynix-dispatcher/internal/llm"
|
||||
"github.com/sundynix/sundynix-shared/contract"
|
||||
)
|
||||
|
||||
// memoryFetcher 召回某用户与本次输入相关的偏好记忆(经 MCP memory_get 工具)。
|
||||
type memoryFetcher func(ctx context.Context, userID, query string) string
|
||||
|
||||
// buildGraph 编译这套"记忆增强"图:
|
||||
//
|
||||
// START → recall(召回画像→写State) → prompt(注入system) → model(流式) → END
|
||||
//
|
||||
// 返回可流式执行的 Runnable。
|
||||
func buildGraph(ctx context.Context, pool *llm.Pool, fetch memoryFetcher) (compose.Runnable[*contract.Task, *schema.Message], error) {
|
||||
g := compose.NewGraph[*contract.Task, *schema.Message](
|
||||
compose.WithGenLocalState(func(context.Context) *AgentState { return &AgentState{} }),
|
||||
)
|
||||
|
||||
// 1) recall:取 user_id → memory_get 召回画像 → 写入 State,并输出模板变量。
|
||||
if err := g.AddLambdaNode("recall", compose.InvokableLambda(
|
||||
func(ctx context.Context, t *contract.Task) (map[string]any, error) {
|
||||
uid, _ := t.Meta[contract.MetaUserID].(string)
|
||||
profile := fetch(ctx, uid, string(t.Graph))
|
||||
_ = compose.ProcessState(ctx, func(_ context.Context, s *AgentState) error {
|
||||
s.UserID, s.Profile, s.Input = uid, profile, string(t.Graph)
|
||||
return nil
|
||||
})
|
||||
if profile == "" {
|
||||
profile = "(暂无该用户的偏好记忆)"
|
||||
}
|
||||
return map[string]any{"profile": profile, "query": string(t.Graph)}, nil
|
||||
})); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 2) prompt:把画像注入 system message,用户输入作为 user message。
|
||||
tpl := prompt.FromMessages(schema.FString,
|
||||
schema.SystemMessage("你在与特定用户对话。关于该用户的已知信息:\n{profile}\n请据此个性化作答并保持其偏好。"),
|
||||
schema.UserMessage("{query}"),
|
||||
)
|
||||
if err := g.AddChatTemplateNode("prompt", tpl); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 3) model:LLM Pool 适配为 ChatModel 节点,流式产出。
|
||||
if err := g.AddChatModelNode("model", newPoolModel(pool)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := g.AddEdge(compose.START, "recall"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := g.AddEdge("recall", "prompt"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := g.AddEdge("prompt", "model"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := g.AddEdge("model", compose.END); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return g.Compile(ctx)
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
package eino
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/cloudwego/eino/components/model"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
|
||||
"github.com/sundynix/sundynix-dispatcher/internal/llm"
|
||||
)
|
||||
|
||||
// poolModel 把 LLM Pool 适配成 Eino 的 model.BaseChatModel,
|
||||
// 让 LLM Pool 能作为图中的 ChatModel 节点参与编排与流式。
|
||||
// 真实接入 vLLM/Ollama 后,这里替换为后端的 Generate/Stream 即可。
|
||||
type poolModel struct{ pool *llm.Pool }
|
||||
|
||||
func newPoolModel(p *llm.Pool) *poolModel { return &poolModel{pool: p} }
|
||||
|
||||
var _ model.BaseChatModel = (*poolModel)(nil)
|
||||
|
||||
// Generate 阻塞式生成(图被 Invoke 时用)。
|
||||
func (pm *poolModel) Generate(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.Message, error) {
|
||||
var sb strings.Builder
|
||||
if err := pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { sb.Write(tok) }); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return schema.AssistantMessage(sb.String(), nil), nil
|
||||
}
|
||||
|
||||
// Stream 流式生成(图被 Stream 时用):把回复按 token 推进 pipe。
|
||||
func (pm *poolModel) Stream(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.StreamReader[*schema.Message], error) {
|
||||
sr, sw := schema.Pipe[*schema.Message](32)
|
||||
text := replyFor(input)
|
||||
go func() {
|
||||
defer sw.Close()
|
||||
if err := pm.pool.StreamText(ctx, text, func(tok []byte) {
|
||||
sw.Send(schema.AssistantMessage(string(tok), nil), nil)
|
||||
}); err != nil {
|
||||
sw.Send(nil, err)
|
||||
}
|
||||
}()
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
// replyFor 是占位"模型":从消息中取出注入的画像与用户输入,
|
||||
// 生成一段能体现"记忆已注入"的确定性回复(证明 recall→prompt 链路真的把画像喂进来了)。
|
||||
// 真实模型不需要本函数。
|
||||
func replyFor(msgs []*schema.Message) string {
|
||||
var profile, user string
|
||||
for _, m := range msgs {
|
||||
switch m.Role {
|
||||
case schema.System:
|
||||
profile = m.Content
|
||||
case schema.User:
|
||||
user = m.Content
|
||||
}
|
||||
}
|
||||
return "【已注入用户画像】" + condense(profile, 80) +
|
||||
" | 据此为你个性化作答:已编排执行该 Agent 图(输入「" + condense(user, 30) + "」)。"
|
||||
}
|
||||
|
||||
func condense(s string, max int) string {
|
||||
s = strings.TrimSpace(strings.ReplaceAll(s, "\n", " "))
|
||||
r := []rune(s)
|
||||
if len(r) > max {
|
||||
return string(r[:max]) + "…"
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -3,9 +3,14 @@ package eino
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
|
||||
"github.com/sundynix/sundynix-dispatcher/internal/harness"
|
||||
"github.com/sundynix/sundynix-dispatcher/internal/llm"
|
||||
"github.com/sundynix/sundynix-shared/contract"
|
||||
@@ -25,77 +30,105 @@ type ToolCaller interface {
|
||||
// 工具调用超时;超时即降级(不带工具上下文继续推理)。
|
||||
const toolCallTimeout = 3 * time.Second
|
||||
|
||||
// Orchestrator 将 DSL 图编译为 Eino Graph 并驱动执行。
|
||||
// Orchestrator 把 DSL 任务交给编译好的 Eino 图执行(记忆召回 → 注入 → 流式)。
|
||||
type Orchestrator struct {
|
||||
pool *llm.Pool
|
||||
breaker *harness.CircuitBreaker
|
||||
sink TokenSink
|
||||
tools ToolCaller
|
||||
run compose.Runnable[*contract.Task, *schema.Message]
|
||||
}
|
||||
|
||||
func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) *Orchestrator {
|
||||
return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools}
|
||||
// NewOrchestrator 构建并编译记忆增强图。
|
||||
func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) (*Orchestrator, error) {
|
||||
o := &Orchestrator{breaker: breaker, sink: sink, tools: tools}
|
||||
run, err := buildGraph(context.Background(), pool, o.fetchMemory)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o.run = run
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// Handle 消费一个任务:编译图 → 流式推理 → 经 sink 把 Token 回流到 sundynix.streams.<id>。
|
||||
// Handle 消费一个任务:执行 Eino 图,把 Token 流回流到 sundynix.streams.<id>。
|
||||
func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error {
|
||||
if !o.breaker.Allow() {
|
||||
log.Printf("[eino] circuit open, drop task %s", t.ID)
|
||||
return nil
|
||||
}
|
||||
log.Printf("[eino] task %s received (graph=%d bytes), streaming tokens...", t.ID, len(t.Graph))
|
||||
log.Printf("[eino] task %s received (graph=%d bytes), running graph...", t.ID, len(t.Graph))
|
||||
|
||||
// TODO: compose.NewGraph(...) 编译 DSL;此处 prompt 占位为图原文。
|
||||
prompt := string(t.Graph)
|
||||
|
||||
// 工具节点:经 NATS 调用第 5 层 MCP(sundynix.tools.go.*)。
|
||||
// 这里以 wiki_search 演示完整调用链路;真实 Eino 图会按 DSL 节点择机调用。
|
||||
if ctxNote := o.retrieveContext(ctx, t); ctxNote != "" {
|
||||
prompt = ctxNote + "\n" + prompt
|
||||
stream, err := o.run.Stream(ctx, t)
|
||||
if err != nil {
|
||||
log.Printf("[eino] task %s graph error: %v", t.ID, err)
|
||||
_ = o.sink.CompleteStream(t.ID)
|
||||
o.breaker.Report(false)
|
||||
return err
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
n := 0
|
||||
err := o.pool.Stream(ctx, prompt, func(tok []byte) {
|
||||
if perr := o.sink.PublishToken(t.ID, tok); perr != nil {
|
||||
for {
|
||||
chunk, rerr := stream.Recv()
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if rerr != nil {
|
||||
log.Printf("[eino] task %s stream recv error: %v", t.ID, rerr)
|
||||
break
|
||||
}
|
||||
if chunk == nil || chunk.Content == "" {
|
||||
continue
|
||||
}
|
||||
if perr := o.sink.PublishToken(t.ID, []byte(chunk.Content)); perr != nil {
|
||||
log.Printf("[eino] publish token failed: %v", perr)
|
||||
return
|
||||
break
|
||||
}
|
||||
n++
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[eino] task %s stream error: %v", t.ID, err)
|
||||
}
|
||||
|
||||
if cerr := o.sink.CompleteStream(t.ID); cerr != nil {
|
||||
log.Printf("[eino] complete stream failed: %v", cerr)
|
||||
}
|
||||
log.Printf("[eino] task %s done, %d tokens streamed", t.ID, n)
|
||||
o.breaker.Report(err == nil)
|
||||
return err
|
||||
o.breaker.Report(true)
|
||||
|
||||
// 写回阶段:流已排空(= 模型生成结束),此处离开热路径、异步抽取记忆。
|
||||
// 注:流式节点用 OnEndWithStreamOutput 而非 OnEndFn,故不走回调而在此触发。
|
||||
go o.memorize(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
// retrieveContext 经 MCP wiki_search 工具拉取检索上下文。
|
||||
// 工具不可用/超时时返回空串,降级为无工具上下文推理(不阻断主流程)。
|
||||
func (o *Orchestrator) retrieveContext(ctx context.Context, t *contract.Task) string {
|
||||
if o.tools == nil {
|
||||
// fetchMemory 经 MCP memory_get 工具召回用户常驻画像。
|
||||
// 工具不可用/超时/无 user_id 时返回空串,降级为无记忆推理(不阻断主流程)。
|
||||
func (o *Orchestrator) fetchMemory(ctx context.Context, userID, _ string) string {
|
||||
if o.tools == nil || userID == "" {
|
||||
return ""
|
||||
}
|
||||
cctx, cancel := context.WithTimeout(ctx, toolCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{
|
||||
Tool: "wiki_search",
|
||||
TaskID: t.ID,
|
||||
Args: map[string]any{"q": string(t.Graph)},
|
||||
res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("memory_get"), &contract.ToolCall{
|
||||
Tool: "memory_get",
|
||||
Args: map[string]any{"user_id": userID},
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[eino] task %s wiki_search unavailable, degrade: %v", t.ID, err)
|
||||
log.Printf("[eino] memory_get unavailable for %s, degrade: %v", userID, err)
|
||||
return ""
|
||||
}
|
||||
if !res.OK {
|
||||
log.Printf("[eino] task %s wiki_search error: %s", t.ID, res.Error)
|
||||
log.Printf("[eino] memory_get error for %s: %s", userID, res.Error)
|
||||
return ""
|
||||
}
|
||||
log.Printf("[eino] task %s wiki_search ok: %s", t.ID, res.Content)
|
||||
log.Printf("[eino] memory_get ok for %s: %s", userID, res.Content)
|
||||
return res.Content
|
||||
}
|
||||
|
||||
// memorize 写回阶段:从本轮对话抽取并更新偏好记忆。
|
||||
// 目前发占位日志;真实实现应跑抽取 LLM → 去重/更新 → memory_upsert(异步,离开热路径)。
|
||||
func (o *Orchestrator) memorize(t *contract.Task) {
|
||||
uid, _ := t.Meta[contract.MetaUserID].(string)
|
||||
if uid == "" {
|
||||
return
|
||||
}
|
||||
log.Printf("[eino] (writeback) task %s 完成,待抽取 user=%s 的新偏好记忆", t.ID, uid)
|
||||
// TODO: 发 sundynix.memory.extract 事件 → memory worker 抽取 → memory_upsert
|
||||
}
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
package eino
|
||||
|
||||
// AgentState 是 Eino 图的全局状态,贯穿 recall→prompt→model 各节点。
|
||||
// 偏好记忆经 recall 节点写入,供模板注入与写回抽取使用。
|
||||
type AgentState struct {
|
||||
UserID string // 来自 Task.Meta["user_id"]
|
||||
Profile string // 召回到的常驻画像(always-on 偏好记忆)
|
||||
Input string // 本次输入(DSL 原文)
|
||||
Answer string // 累积输出,供写回阶段抽取新记忆
|
||||
}
|
||||
@@ -23,15 +23,20 @@ const (
|
||||
// 真实接入 vLLM/Ollama 时替换为后端 streaming API 即可(回调签名不变)。
|
||||
func (p *Pool) Stream(ctx context.Context, prompt string, onToken func([]byte)) error {
|
||||
// TODO: 选路 (least-load / 模型亲和) → 调 vLLM/Ollama streaming API
|
||||
reply := buildReply(prompt)
|
||||
return p.StreamText(ctx, buildReply(prompt), onToken)
|
||||
}
|
||||
|
||||
// StreamText 按真实后端的 TTFT/逐 token 节奏把给定文本流式回调。
|
||||
// 把"说什么"(由上层/Eino 图决定)与"怎么流"(后端节奏)解耦:
|
||||
// 真实接入 vLLM/Ollama 后,由后端 streaming API 直接驱动,无需本方法。
|
||||
func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(timeToFirstToken): // 模拟 TTFT
|
||||
}
|
||||
|
||||
for _, tok := range tokenize(reply) {
|
||||
for _, tok := range tokenize(text) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
||||
Reference in New Issue
Block a user