diff --git a/go.work.sum b/go.work.sum index 34bfde8..cb8a699 100644 --- a/go.work.sum +++ b/go.work.sum @@ -45,8 +45,13 @@ github.com/blevesearch/zapx/v15 v15.3.13 h1:6EkfaZiPlAxqXz0neniq35my6S48QI94W/wy github.com/blevesearch/zapx/v15 v15.3.13/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg= github.com/blevesearch/zapx/v16 v16.1.5 h1:b0sMcarqNFxuXvjoXsF8WtwVahnxyhEvBSRJi/AUHjU= github.com/blevesearch/zapx/v16 v16.1.5/go.mod h1:J4mSF39w1QELc11EWRSBFkPeZuO7r/NPKkHzDCoiaI8= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/sonic v1.12.2/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cockroachdb/errors v1.9.1 h1:yFVvsI0VxmRShfawbt/laCIDy/mtTqqnvoNgiy5bEV8= github.com/cockroachdb/errors v1.9.1/go.mod h1:2sxOtL2WIc096WSZqZ5h8fa17rdDq9HZOZLBCor4mBk= github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74= @@ -77,6 +82,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaW github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= github.com/knz/go-libedit v1.10.1 h1:0pHpWtx9vcvC0xGZqEQlQdfSQs7WRlAjuPvk3fOZDCo= github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/milvus-io/milvus-proto/go-api/v2 v2.4.3 h1:KUSaWVePVlHMIluAXf2qmNffI1CMlGFLLiP+4iy9014= @@ -95,6 +101,7 @@ github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRM github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -109,6 +116,7 @@ go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= +golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= @@ -116,6 +124,7 @@ golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= @@ -123,6 +132,7 @@ golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= diff --git a/scripts/demo.sh b/scripts/demo.sh index 83bc3f2..15c91aa 100755 --- a/scripts/demo.sh +++ b/scripts/demo.sh @@ -37,9 +37,18 @@ for _ in $(seq 1 30); do grep -q "tools ready" .bin/mcp-go.log 2>/dev/null && break || sleep 0.1 done -echo "== 提交 DSL 任务 ==" +USER="wt" +echo "== 登记用户偏好记忆 (→ mcp-go memory_upsert → sundynix_user_profile) ==" +curl -s -X PUT http://127.0.0.1:8080/api/v1/memory \ + -H 'Content-Type: application/json' -H "X-User-ID: $USER" \ + -d '{"key":"称呼","value":"老王"}'; echo +curl -s -X PUT http://127.0.0.1:8080/api/v1/memory \ + -H 'Content-Type: application/json' -H "X-User-ID: $USER" \ + -d '{"key":"回答偏好","value":"简洁、中文、多给要点"}'; echo + +echo "== 提交 DSL 任务 (带 X-User-ID,Dispatcher 将召回其画像) ==" RESP=$(curl -s -X POST http://127.0.0.1:8080/api/v1/tasks \ - -H 'Content-Type: application/json' \ + -H 'Content-Type: application/json' -H "X-User-ID: $USER" \ -d '{"nodes":[{"id":"n1","type":"agent","data":{"prompt":"hello"}}],"edges":[]}') echo "$RESP" TASK_ID=$(echo "$RESP" | sed -n 's/.*"task_id":"\([^"]*\)".*/\1/p') diff --git a/sundynix-dispatcher/cmd/dispatcher/main.go b/sundynix-dispatcher/cmd/dispatcher/main.go index 1870795..6b09113 100644 --- a/sundynix-dispatcher/cmd/dispatcher/main.go +++ b/sundynix-dispatcher/cmd/dispatcher/main.go @@ -24,7 +24,10 @@ func main() { defer sub.Close() // sub 同时作为 Token 回流出口(TokenSink)与 MCP 工具调用出口(ToolCaller)。 - orch := eino.NewOrchestrator(pool, breaker, sub, sub) + orch, err := eino.NewOrchestrator(pool, breaker, sub, sub) + if err != nil { + log.Fatalf("[dispatcher] build eino graph: %v", err) + } // 监听退出信号,优雅停止消费。 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) diff --git a/sundynix-dispatcher/go.mod b/sundynix-dispatcher/go.mod index 7499977..7c84431 100644 --- a/sundynix-dispatcher/go.mod +++ b/sundynix-dispatcher/go.mod @@ -2,16 +2,45 @@ module github.com/sundynix/sundynix-dispatcher go 1.23 -require github.com/sundynix/sundynix-shared v0.0.0 +require ( + github.com/cloudwego/eino v0.9.5 + github.com/sundynix/sundynix-shared v0.0.0 +) require ( + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.15.0 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/eino-contrib/jsonschema v1.0.3 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/goph/emperror v0.17.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/nats.go v1.37.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/nikolalohinski/gonja v1.5.3 // indirect + github.com/pelletier/go-toml/v2 v2.0.9 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + github.com/yargevad/filepathx v1.0.0 // indirect + golang.org/x/arch v0.11.0 // indirect golang.org/x/crypto v0.26.0 // indirect - golang.org/x/sys v0.24.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.17.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/sundynix/sundynix-shared => ../sundynix-shared diff --git a/sundynix-dispatcher/go.sum b/sundynix-dispatcher/go.sum index 7483077..b665d36 100644 --- a/sundynix-dispatcher/go.sum +++ b/sundynix-dispatcher/go.sum @@ -1,7 +1,75 @@ +github.com/airbrake/gobrake v3.6.1+incompatible/go.mod h1:wM4gu3Cn0W0K7GUuVWnlXZU11AGBXMILnrdOU8Kn00o= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/bugsnag/bugsnag-go v1.4.0/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= +github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= +github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= +github.com/certifi/gocertifi v0.0.0-20190105021004-abcd57078448/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= +github.com/cloudwego/eino v0.9.5 h1:0Nftjx9gPek/2S/hzm38LVxSjk5/6mqRr3I9VKrKvm4= +github.com/cloudwego/eino v0.9.5/go.mod h1:OBD1mrkfkt/pJa4rkg1P0VnaMeOVl7l8IAdEqY//3IQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eino-contrib/jsonschema v1.0.3 h1:2Kfsm1xlMV0ssY2nuxshS4AwbLFuqmPmzIjLVJ1Fsp0= +github.com/eino-contrib/jsonschema v1.0.3/go.mod h1:cpnX4SyKjWjGC7iN2EbhxaTdLqGjCi0e9DxpLYxddD4= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= +github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= +github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/goph/emperror v0.17.2 h1:yLapQcmEsO0ipe9p5TaN22djm3OFV/TfM/fcYP0/J18= +github.com/goph/emperror v0.17.2/go.mod h1:+ZbQ+fUNO/6FNiUo0ujtMjhgad9Xa6fQL9KhH4LNHic= +github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= +github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI= @@ -12,11 +80,78 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nikolalohinski/gonja v1.5.3 h1:GsA+EEaZDZPGJ8JtpeGN78jidhOlxeJROpqMT9fTj9c= +github.com/nikolalohinski/gonja v1.5.3/go.mod h1:RmjwxNiXAEqcq1HeK5SSMmqFJvKOfTfXhkJv6YBtPa4= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0= +github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rollbar/rollbar-go v1.0.2/go.mod h1:AcFs5f0I+c71bpHlXNNDbOWJiKwjFDtISeXco0L5PKQ= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f h1:Z2cODYsUxQPofhpYRMQVwWz4yUVpHF+vPi+eUdruUYI= +github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f/go.mod h1:JqzWyvTuI2X4+9wOHmKSQCYxybB/8j6Ko43qVmXDuZg= +github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY= +github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec= +github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= +github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= +github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc= +github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +golang.org/x/arch v0.11.0 h1:KXV8WWKCXm6tRpLirl2szsO5j/oOODwZf4hATmGVNs4= +golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sundynix-dispatcher/internal/eino/graph.go b/sundynix-dispatcher/internal/eino/graph.go new file mode 100644 index 0000000..d5b601f --- /dev/null +++ b/sundynix-dispatcher/internal/eino/graph.go @@ -0,0 +1,72 @@ +package eino + +import ( + "context" + + "github.com/cloudwego/eino/components/prompt" + "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" + + "github.com/sundynix/sundynix-dispatcher/internal/llm" + "github.com/sundynix/sundynix-shared/contract" +) + +// memoryFetcher 召回某用户与本次输入相关的偏好记忆(经 MCP memory_get 工具)。 +type memoryFetcher func(ctx context.Context, userID, query string) string + +// buildGraph 编译这套"记忆增强"图: +// +// START → recall(召回画像→写State) → prompt(注入system) → model(流式) → END +// +// 返回可流式执行的 Runnable。 +func buildGraph(ctx context.Context, pool *llm.Pool, fetch memoryFetcher) (compose.Runnable[*contract.Task, *schema.Message], error) { + g := compose.NewGraph[*contract.Task, *schema.Message]( + compose.WithGenLocalState(func(context.Context) *AgentState { return &AgentState{} }), + ) + + // 1) recall:取 user_id → memory_get 召回画像 → 写入 State,并输出模板变量。 + if err := g.AddLambdaNode("recall", compose.InvokableLambda( + func(ctx context.Context, t *contract.Task) (map[string]any, error) { + uid, _ := t.Meta[contract.MetaUserID].(string) + profile := fetch(ctx, uid, string(t.Graph)) + _ = compose.ProcessState(ctx, func(_ context.Context, s *AgentState) error { + s.UserID, s.Profile, s.Input = uid, profile, string(t.Graph) + return nil + }) + if profile == "" { + profile = "(暂无该用户的偏好记忆)" + } + return map[string]any{"profile": profile, "query": string(t.Graph)}, nil + })); err != nil { + return nil, err + } + + // 2) prompt:把画像注入 system message,用户输入作为 user message。 + tpl := prompt.FromMessages(schema.FString, + schema.SystemMessage("你在与特定用户对话。关于该用户的已知信息:\n{profile}\n请据此个性化作答并保持其偏好。"), + schema.UserMessage("{query}"), + ) + if err := g.AddChatTemplateNode("prompt", tpl); err != nil { + return nil, err + } + + // 3) model:LLM Pool 适配为 ChatModel 节点,流式产出。 + if err := g.AddChatModelNode("model", newPoolModel(pool)); err != nil { + return nil, err + } + + if err := g.AddEdge(compose.START, "recall"); err != nil { + return nil, err + } + if err := g.AddEdge("recall", "prompt"); err != nil { + return nil, err + } + if err := g.AddEdge("prompt", "model"); err != nil { + return nil, err + } + if err := g.AddEdge("model", compose.END); err != nil { + return nil, err + } + + return g.Compile(ctx) +} diff --git a/sundynix-dispatcher/internal/eino/model.go b/sundynix-dispatcher/internal/eino/model.go new file mode 100644 index 0000000..8806c3d --- /dev/null +++ b/sundynix-dispatcher/internal/eino/model.go @@ -0,0 +1,70 @@ +package eino + +import ( + "context" + "strings" + + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/schema" + + "github.com/sundynix/sundynix-dispatcher/internal/llm" +) + +// poolModel 把 LLM Pool 适配成 Eino 的 model.BaseChatModel, +// 让 LLM Pool 能作为图中的 ChatModel 节点参与编排与流式。 +// 真实接入 vLLM/Ollama 后,这里替换为后端的 Generate/Stream 即可。 +type poolModel struct{ pool *llm.Pool } + +func newPoolModel(p *llm.Pool) *poolModel { return &poolModel{pool: p} } + +var _ model.BaseChatModel = (*poolModel)(nil) + +// Generate 阻塞式生成(图被 Invoke 时用)。 +func (pm *poolModel) Generate(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.Message, error) { + var sb strings.Builder + if err := pm.pool.StreamText(ctx, replyFor(input), func(tok []byte) { sb.Write(tok) }); err != nil { + return nil, err + } + return schema.AssistantMessage(sb.String(), nil), nil +} + +// Stream 流式生成(图被 Stream 时用):把回复按 token 推进 pipe。 +func (pm *poolModel) Stream(ctx context.Context, input []*schema.Message, _ ...model.Option) (*schema.StreamReader[*schema.Message], error) { + sr, sw := schema.Pipe[*schema.Message](32) + text := replyFor(input) + go func() { + defer sw.Close() + if err := pm.pool.StreamText(ctx, text, func(tok []byte) { + sw.Send(schema.AssistantMessage(string(tok), nil), nil) + }); err != nil { + sw.Send(nil, err) + } + }() + return sr, nil +} + +// replyFor 是占位"模型":从消息中取出注入的画像与用户输入, +// 生成一段能体现"记忆已注入"的确定性回复(证明 recall→prompt 链路真的把画像喂进来了)。 +// 真实模型不需要本函数。 +func replyFor(msgs []*schema.Message) string { + var profile, user string + for _, m := range msgs { + switch m.Role { + case schema.System: + profile = m.Content + case schema.User: + user = m.Content + } + } + return "【已注入用户画像】" + condense(profile, 80) + + " | 据此为你个性化作答:已编排执行该 Agent 图(输入「" + condense(user, 30) + "」)。" +} + +func condense(s string, max int) string { + s = strings.TrimSpace(strings.ReplaceAll(s, "\n", " ")) + r := []rune(s) + if len(r) > max { + return string(r[:max]) + "…" + } + return s +} diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index 4e35b83..16c7628 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -3,9 +3,14 @@ package eino import ( "context" + "errors" + "io" "log" "time" + "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" + "github.com/sundynix/sundynix-dispatcher/internal/harness" "github.com/sundynix/sundynix-dispatcher/internal/llm" "github.com/sundynix/sundynix-shared/contract" @@ -25,77 +30,105 @@ type ToolCaller interface { // 工具调用超时;超时即降级(不带工具上下文继续推理)。 const toolCallTimeout = 3 * time.Second -// Orchestrator 将 DSL 图编译为 Eino Graph 并驱动执行。 +// Orchestrator 把 DSL 任务交给编译好的 Eino 图执行(记忆召回 → 注入 → 流式)。 type Orchestrator struct { - pool *llm.Pool breaker *harness.CircuitBreaker sink TokenSink tools ToolCaller + run compose.Runnable[*contract.Task, *schema.Message] } -func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) *Orchestrator { - return &Orchestrator{pool: pool, breaker: breaker, sink: sink, tools: tools} +// NewOrchestrator 构建并编译记忆增强图。 +func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink TokenSink, tools ToolCaller) (*Orchestrator, error) { + o := &Orchestrator{breaker: breaker, sink: sink, tools: tools} + run, err := buildGraph(context.Background(), pool, o.fetchMemory) + if err != nil { + return nil, err + } + o.run = run + return o, nil } -// Handle 消费一个任务:编译图 → 流式推理 → 经 sink 把 Token 回流到 sundynix.streams.。 +// Handle 消费一个任务:执行 Eino 图,把 Token 流回流到 sundynix.streams.。 func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { if !o.breaker.Allow() { log.Printf("[eino] circuit open, drop task %s", t.ID) return nil } - log.Printf("[eino] task %s received (graph=%d bytes), streaming tokens...", t.ID, len(t.Graph)) + log.Printf("[eino] task %s received (graph=%d bytes), running graph...", t.ID, len(t.Graph)) - // TODO: compose.NewGraph(...) 编译 DSL;此处 prompt 占位为图原文。 - prompt := string(t.Graph) - - // 工具节点:经 NATS 调用第 5 层 MCP(sundynix.tools.go.*)。 - // 这里以 wiki_search 演示完整调用链路;真实 Eino 图会按 DSL 节点择机调用。 - if ctxNote := o.retrieveContext(ctx, t); ctxNote != "" { - prompt = ctxNote + "\n" + prompt + stream, err := o.run.Stream(ctx, t) + if err != nil { + log.Printf("[eino] task %s graph error: %v", t.ID, err) + _ = o.sink.CompleteStream(t.ID) + o.breaker.Report(false) + return err } + defer stream.Close() n := 0 - err := o.pool.Stream(ctx, prompt, func(tok []byte) { - if perr := o.sink.PublishToken(t.ID, tok); perr != nil { + for { + chunk, rerr := stream.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + log.Printf("[eino] task %s stream recv error: %v", t.ID, rerr) + break + } + if chunk == nil || chunk.Content == "" { + continue + } + if perr := o.sink.PublishToken(t.ID, []byte(chunk.Content)); perr != nil { log.Printf("[eino] publish token failed: %v", perr) - return + break } n++ - }) - if err != nil { - log.Printf("[eino] task %s stream error: %v", t.ID, err) } if cerr := o.sink.CompleteStream(t.ID); cerr != nil { log.Printf("[eino] complete stream failed: %v", cerr) } log.Printf("[eino] task %s done, %d tokens streamed", t.ID, n) - o.breaker.Report(err == nil) - return err + o.breaker.Report(true) + + // 写回阶段:流已排空(= 模型生成结束),此处离开热路径、异步抽取记忆。 + // 注:流式节点用 OnEndWithStreamOutput 而非 OnEndFn,故不走回调而在此触发。 + go o.memorize(t) + return nil } -// retrieveContext 经 MCP wiki_search 工具拉取检索上下文。 -// 工具不可用/超时时返回空串,降级为无工具上下文推理(不阻断主流程)。 -func (o *Orchestrator) retrieveContext(ctx context.Context, t *contract.Task) string { - if o.tools == nil { +// fetchMemory 经 MCP memory_get 工具召回用户常驻画像。 +// 工具不可用/超时/无 user_id 时返回空串,降级为无记忆推理(不阻断主流程)。 +func (o *Orchestrator) fetchMemory(ctx context.Context, userID, _ string) string { + if o.tools == nil || userID == "" { return "" } cctx, cancel := context.WithTimeout(ctx, toolCallTimeout) defer cancel() - - res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("wiki_search"), &contract.ToolCall{ - Tool: "wiki_search", - TaskID: t.ID, - Args: map[string]any{"q": string(t.Graph)}, + res, err := o.tools.CallTool(cctx, contract.ToolSubjectGo("memory_get"), &contract.ToolCall{ + Tool: "memory_get", + Args: map[string]any{"user_id": userID}, }) if err != nil { - log.Printf("[eino] task %s wiki_search unavailable, degrade: %v", t.ID, err) + log.Printf("[eino] memory_get unavailable for %s, degrade: %v", userID, err) return "" } if !res.OK { - log.Printf("[eino] task %s wiki_search error: %s", t.ID, res.Error) + log.Printf("[eino] memory_get error for %s: %s", userID, res.Error) return "" } - log.Printf("[eino] task %s wiki_search ok: %s", t.ID, res.Content) + log.Printf("[eino] memory_get ok for %s: %s", userID, res.Content) return res.Content } + +// memorize 写回阶段:从本轮对话抽取并更新偏好记忆。 +// 目前发占位日志;真实实现应跑抽取 LLM → 去重/更新 → memory_upsert(异步,离开热路径)。 +func (o *Orchestrator) memorize(t *contract.Task) { + uid, _ := t.Meta[contract.MetaUserID].(string) + if uid == "" { + return + } + log.Printf("[eino] (writeback) task %s 完成,待抽取 user=%s 的新偏好记忆", t.ID, uid) + // TODO: 发 sundynix.memory.extract 事件 → memory worker 抽取 → memory_upsert +} diff --git a/sundynix-dispatcher/internal/eino/state.go b/sundynix-dispatcher/internal/eino/state.go new file mode 100644 index 0000000..2949863 --- /dev/null +++ b/sundynix-dispatcher/internal/eino/state.go @@ -0,0 +1,10 @@ +package eino + +// AgentState 是 Eino 图的全局状态,贯穿 recall→prompt→model 各节点。 +// 偏好记忆经 recall 节点写入,供模板注入与写回抽取使用。 +type AgentState struct { + UserID string // 来自 Task.Meta["user_id"] + Profile string // 召回到的常驻画像(always-on 偏好记忆) + Input string // 本次输入(DSL 原文) + Answer string // 累积输出,供写回阶段抽取新记忆 +} diff --git a/sundynix-dispatcher/internal/llm/pool.go b/sundynix-dispatcher/internal/llm/pool.go index b8d6c30..2ecce8f 100644 --- a/sundynix-dispatcher/internal/llm/pool.go +++ b/sundynix-dispatcher/internal/llm/pool.go @@ -23,15 +23,20 @@ const ( // 真实接入 vLLM/Ollama 时替换为后端 streaming API 即可(回调签名不变)。 func (p *Pool) Stream(ctx context.Context, prompt string, onToken func([]byte)) error { // TODO: 选路 (least-load / 模型亲和) → 调 vLLM/Ollama streaming API - reply := buildReply(prompt) + return p.StreamText(ctx, buildReply(prompt), onToken) +} +// StreamText 按真实后端的 TTFT/逐 token 节奏把给定文本流式回调。 +// 把"说什么"(由上层/Eino 图决定)与"怎么流"(后端节奏)解耦: +// 真实接入 vLLM/Ollama 后,由后端 streaming API 直接驱动,无需本方法。 +func (p *Pool) StreamText(ctx context.Context, text string, onToken func([]byte)) error { select { case <-ctx.Done(): return ctx.Err() case <-time.After(timeToFirstToken): // 模拟 TTFT } - for _, tok := range tokenize(reply) { + for _, tok := range tokenize(text) { select { case <-ctx.Done(): return ctx.Err() diff --git a/sundynix-gateway/internal/handler/task_handler.go b/sundynix-gateway/internal/handler/task_handler.go index ae9ec0b..34454d4 100644 --- a/sundynix-gateway/internal/handler/task_handler.go +++ b/sundynix-gateway/internal/handler/task_handler.go @@ -12,6 +12,7 @@ import ( "github.com/sundynix/sundynix-gateway/internal/dsl" "github.com/sundynix/sundynix-gateway/internal/nats" "github.com/sundynix/sundynix-gateway/internal/store" + "github.com/sundynix/sundynix-shared/contract" ) type Handler struct { @@ -36,6 +37,9 @@ func (h *Handler) SubmitTask(c *gin.Context) { c.JSON(http.StatusUnprocessableEntity, gin.H{"error": err.Error()}) return } + // 附上已登录用户标识,供 Dispatcher 召回其偏好记忆。 + // 真实场景由鉴权中间件注入;此处用 X-User-ID 头,缺省匿名。 + task.Meta[contract.MetaUserID] = userID(c) // 持久化任务提交(best-effort:降级模式下静默跳过,不阻断发布)。 if err := h.db.SaveTask(c.Request.Context(), task.ID, string(task.Graph)); err != nil { log.Printf("[gateway] save task %s failed: %v", task.ID, err) @@ -86,6 +90,40 @@ func (h *Handler) StreamTask(c *gin.Context) { }) } +// SetMemory: 写入/更新一条用户偏好记忆,经 NATS 调 mcp-go 的 memory_upsert 工具。 +// 桌面端"偏好记忆面板"可用它让用户显式登记/纠正模型对自己的记忆。 +func (h *Handler) SetMemory(c *gin.Context) { + var body struct { + Key string `json:"key"` + Value string `json:"value"` + } + if err := c.ShouldBindJSON(&body); err != nil || body.Key == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "key/value required"}) + return + } + res, err := h.bus.CallTool(c.Request.Context(), contract.ToolSubjectGo("memory_upsert"), + &contract.ToolCall{Tool: "memory_upsert", Args: map[string]any{ + "user_id": userID(c), "key": body.Key, "value": body.Value, + }}) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + if !res.OK { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": res.Error}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "ok", "message": res.Content}) +} + +// userID 从请求取已登录用户标识(真实场景应由鉴权中间件注入)。 +func userID(c *gin.Context) string { + if u := c.GetHeader("X-User-ID"); u != "" { + return u + } + return "anonymous" +} + func (h *Handler) Billing(c *gin.Context) { // TODO: 商业化与计费模块;暂以已提交任务计数演示真实读库。 n, err := h.db.CountTasks(c.Request.Context()) diff --git a/sundynix-gateway/internal/nats/publisher.go b/sundynix-gateway/internal/nats/publisher.go index d1ca662..5b4a152 100644 --- a/sundynix-gateway/internal/nats/publisher.go +++ b/sundynix-gateway/internal/nats/publisher.go @@ -43,4 +43,9 @@ func (b *Bus) SubscribeTokens(taskID string, onToken func([]byte), onDone func() return b.inner.SubscribeTokens(taskID, onToken, onDone) } +// CallTool 经 NATS 同步调用一个 MCP 工具(用于网关侧写偏好记忆等)。 +func (b *Bus) CallTool(ctx context.Context, subject string, call *contract.ToolCall) (*contract.ToolResult, error) { + return b.inner.CallTool(ctx, subject, call) +} + func (b *Bus) Close() { b.inner.Close() } diff --git a/sundynix-gateway/internal/router/router.go b/sundynix-gateway/internal/router/router.go index 4242f76..05cb509 100644 --- a/sundynix-gateway/internal/router/router.go +++ b/sundynix-gateway/internal/router/router.go @@ -19,8 +19,9 @@ func New(db *store.Postgres, cache *store.Redis, bus *nats.Bus) *gin.Engine { h := handler.New(db, cache, bus) api := r.Group("/api/v1") { - api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS + api.POST("/tasks", h.SubmitTask) // 1. 解析 DSL 并 Publish 到 NATS api.GET("/tasks/:id/stream", h.StreamTask) // 4. SSE/WS 回流 Token Stream + api.PUT("/memory", h.SetMemory) // 偏好记忆登记(→ mcp-go memory_upsert) api.GET("/billing", h.Billing) } return r diff --git a/sundynix-mcp-go/cmd/server/main.go b/sundynix-mcp-go/cmd/server/main.go index 73d8147..138920d 100644 --- a/sundynix-mcp-go/cmd/server/main.go +++ b/sundynix-mcp-go/cmd/server/main.go @@ -11,11 +11,13 @@ import ( sharedbus "github.com/sundynix/sundynix-shared/bus" "github.com/sundynix/sundynix-mcp-go/internal/mcp" + "github.com/sundynix/sundynix-mcp-go/internal/memory" "github.com/sundynix/sundynix-mcp-go/internal/search" ) func main() { natsURL := envOr("NATS_URL", "nats://localhost:4222") + pgDSN := envOr("POSTGRES_DSN", "postgres://sundynix:sundynix@localhost:5432/sundynix?sslmode=disable") b, err := sharedbus.Connect(natsURL) if err != nil { @@ -25,7 +27,9 @@ func main() { log.Printf("[mcp_go] connected %s", natsURL) engine := search.NewHybrid() // LLM Wiki 混合检索:Bleve + Milvus + Neo4j - gw := mcp.NewGateway(b, engine) + mem := memory.Open(pgDSN) // 偏好记忆:sundynix_user_profile(连不上则降级) + defer mem.Close() + gw := mcp.NewGateway(b, engine, mem) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() diff --git a/sundynix-mcp-go/go.mod b/sundynix-mcp-go/go.mod index 9d636d5..5e32980 100644 --- a/sundynix-mcp-go/go.mod +++ b/sundynix-mcp-go/go.mod @@ -2,16 +2,27 @@ module github.com/sundynix/sundynix-mcp-go go 1.23 -require github.com/sundynix/sundynix-shared v0.0.0 +require ( + github.com/sundynix/sundynix-shared v0.0.0 + gorm.io/driver/postgres v1.6.0 + gorm.io/gorm v1.31.1 +) require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/nats-io/nats.go v1.37.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect - golang.org/x/crypto v0.26.0 // indirect - golang.org/x/sys v0.24.0 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect ) replace github.com/sundynix/sundynix-shared => ../sundynix-shared diff --git a/sundynix-mcp-go/go.sum b/sundynix-mcp-go/go.sum index 7483077..1308d84 100644 --- a/sundynix-mcp-go/go.sum +++ b/sundynix-mcp-go/go.sum @@ -1,3 +1,18 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= @@ -12,11 +27,28 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4= +gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo= +gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= +gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= diff --git a/sundynix-mcp-go/internal/mcp/gateway.go b/sundynix-mcp-go/internal/mcp/gateway.go index 6ad77d9..534a812 100644 --- a/sundynix-mcp-go/internal/mcp/gateway.go +++ b/sundynix-mcp-go/internal/mcp/gateway.go @@ -9,6 +9,7 @@ import ( sharedbus "github.com/sundynix/sundynix-shared/bus" "github.com/sundynix/sundynix-shared/contract" + "github.com/sundynix/sundynix-mcp-go/internal/memory" "github.com/sundynix/sundynix-mcp-go/internal/search" ) @@ -16,10 +17,11 @@ import ( type Gateway struct { bus *sharedbus.Bus search *search.Hybrid + memory *memory.Store } -func NewGateway(b *sharedbus.Bus, s *search.Hybrid) *Gateway { - return &Gateway{bus: b, search: s} +func NewGateway(b *sharedbus.Bus, s *search.Hybrid, m *memory.Store) *Gateway { + return &Gateway{bus: b, search: s, memory: m} } // Serve 以队列组通配订阅 sundynix.tools.go.>,按工具名分发并阻塞。 @@ -29,7 +31,7 @@ func (g *Gateway) Serve(ctx context.Context) error { return err } defer func() { _ = unsub() }() - log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, echo", + log.Printf("[mcp_go] tools ready on %s (queue=%s): wiki_search, memory_get, memory_upsert, echo", contract.SubjectToolsGoAll, contract.QueueToolsGo) <-ctx.Done() return ctx.Err() @@ -41,6 +43,10 @@ func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contra switch call.Tool { case "wiki_search": return g.wikiSearch(ctx, call) + case "memory_get": + return g.memoryGet(ctx, call) + case "memory_upsert": + return g.memoryUpsert(ctx, call) case "echo": return &contract.ToolResult{OK: true, Content: fmt.Sprint(call.Args["text"])} default: @@ -48,6 +54,30 @@ func (g *Gateway) dispatch(ctx context.Context, call *contract.ToolCall) *contra } } +// memoryGet 召回某用户的常驻画像(已渲染为可注入 prompt 的多行文本)。 +func (g *Gateway) memoryGet(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { + uid, _ := call.Args["user_id"].(string) + profile, err := g.memory.Get(ctx, uid) + if err != nil { + return &contract.ToolResult{OK: false, Error: "memory_get: " + err.Error()} + } + return &contract.ToolResult{OK: true, Content: profile} +} + +// memoryUpsert 写入/更新一条画像偏好(user_id + key + value)。 +func (g *Gateway) memoryUpsert(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { + uid, _ := call.Args["user_id"].(string) + key, _ := call.Args["key"].(string) + val, _ := call.Args["value"].(string) + if uid == "" || key == "" { + return &contract.ToolResult{OK: false, Error: "memory_upsert: user_id 和 key 必填"} + } + if err := g.memory.Upsert(ctx, uid, key, val); err != nil { + return &contract.ToolResult{OK: false, Error: "memory_upsert: " + err.Error()} + } + return &contract.ToolResult{OK: true, Content: fmt.Sprintf("已记住 %s 的「%s」", uid, key)} +} + // wikiSearch 调 Hybrid 混合检索引擎。引擎目前为桩(返回空), // 这里仍把调用链路做真:真实接入 Bleve/Milvus/Neo4j 后无需改动协议。 func (g *Gateway) wikiSearch(ctx context.Context, call *contract.ToolCall) *contract.ToolResult { diff --git a/sundynix-mcp-go/internal/memory/store.go b/sundynix-mcp-go/internal/memory/store.go new file mode 100644 index 0000000..01e3ef5 --- /dev/null +++ b/sundynix-mcp-go/internal/memory/store.go @@ -0,0 +1,81 @@ +// Package memory 是偏好记忆的存储后端(第 5 层 I/O 型工具持有)。 +// 常驻画像存 Postgres,按 sundynix_ 前缀约定 + AutoMigrate 自动迁移。 +package memory + +import ( + "context" + "fmt" + "log" + "sort" + "strings" + + "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// Profile 是用户常驻画像的一条 key/value 偏好(always-on memory)。 +// 复合主键 (user_id, key):同一用户同一键 upsert 覆盖。 +type Profile struct { + UserID string `gorm:"primaryKey;column:user_id;size:64"` + Key string `gorm:"primaryKey;size:64"` + Value string `gorm:"type:text"` +} + +// TableName 固定表名,遵守 sundynix_ 前缀约定。 +func (Profile) TableName() string { return "sundynix_user_profile" } + +// Store 封装画像读写。db 为 nil 表示降级(无 Postgres 时记忆功能空转,不阻断工具服务)。 +type Store struct{ db *gorm.DB } + +// Open 连接 Postgres 并自动迁移 sundynix_user_profile。连接失败不 fatal:返回降级实例。 +func Open(dsn string) *Store { + db, err := gorm.Open(postgres.New(postgres.Config{DSN: dsn}), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + if err != nil { + log.Printf("[memory] postgres 不可用,记忆降级(召回为空): %v", err) + return &Store{} + } + if err := db.AutoMigrate(&Profile{}); err != nil { + log.Printf("[memory] AutoMigrate 失败,记忆降级: %v", err) + return &Store{} + } + log.Println("[memory] postgres connected, migrated sundynix_user_profile") + return &Store{db: db} +} + +// Get 返回某用户的画像,渲染为可直接注入 prompt 的多行文本(按 key 排序,稳定输出)。 +func (s *Store) Get(ctx context.Context, userID string) (string, error) { + if s.db == nil || userID == "" { + return "", nil + } + var rows []Profile + if err := s.db.WithContext(ctx).Where("user_id = ?", userID).Find(&rows).Error; err != nil { + return "", err + } + sort.Slice(rows, func(i, j int) bool { return rows[i].Key < rows[j].Key }) + var b strings.Builder + for _, r := range rows { + fmt.Fprintf(&b, "- %s:%s\n", r.Key, r.Value) + } + return strings.TrimRight(b.String(), "\n"), nil +} + +// Upsert 写入/更新一条画像偏好(key 冲突即覆盖 value)。 +func (s *Store) Upsert(ctx context.Context, userID, key, value string) error { + if s.db == nil { + return fmt.Errorf("memory store disabled") + } + return s.db.WithContext(ctx).Save(&Profile{UserID: userID, Key: key, Value: value}).Error +} + +// Close 释放连接。 +func (s *Store) Close() { + if s.db == nil { + return + } + if sqlDB, err := s.db.DB(); err == nil { + _ = sqlDB.Close() + } +} diff --git a/sundynix-shared/contract/task.go b/sundynix-shared/contract/task.go index 1a16670..5db9c40 100644 --- a/sundynix-shared/contract/task.go +++ b/sundynix-shared/contract/task.go @@ -24,6 +24,9 @@ const ( SubjectToolsPyAll = "sundynix.tools.py.>" // mcp-py 通配订阅 QueueToolsGo = "mcp-go-workers" // mcp-go 队列组(多副本负载均衡) QueueToolsPy = "mcp-py-workers" // mcp-py 队列组 + + // MetaUserID 是 Task.Meta 中承载已登录用户标识的键(用于偏好记忆召回)。 + MetaUserID = "user_id" ) // Task 是 DSL 解析组装后的可调度任务,在 NATS 上以 JSON 传输。