feat(dispatcher): 熔断降级真三态状态机(弃用空桩)+ 单测
CircuitBreaker 此前是空桩(Allow 恒 true、Report 空操作),dispatcher 调 LLM/工具 无任何失败保护——今天就撞上 DeepSeek 流连接累积把报告卡死。改为真实三态熔断: - Closed:正常放行;连续失败达阈值(默认5) → Open。 - Open:快速拒绝;冷却(默认10s)到点 → HalfOpen 放行少量探测(默认1)。 - HalfOpen:探测成功 → Closed 恢复;探测失败 → 重新 Open。 - sync.Mutex 并发安全(多任务 goroutine 共享);时钟可注入便于确定性测试。 orchestrator.Handle:熔断开启时不再静默丢弃任务,改为回流"服务繁忙"提示 + CompleteStream 收尾,让客户端解阻不挂死。 测试(含 -race):达阈值断开、成功清零、半开恢复、探测失败重断、并发安全 —— 全过。 PROGRESS.md 勾掉熔断项。 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+2
-2
@@ -52,7 +52,7 @@
|
|||||||
- [x] 记忆召回(画像 + 历史注入)
|
- [x] 记忆召回(画像 + 历史注入)
|
||||||
- [x] 报告专用编排(规划 → 分章并行 → 汇聚 → 存源)
|
- [x] 报告专用编排(规划 → 分章并行 → 汇聚 → 存源)
|
||||||
- [x] 会话历史写回
|
- [x] 会话历史写回
|
||||||
- [ ] Harness 熔断降级中心(`CircuitBreaker.Allow()` 恒 true,桩)
|
- [x] Harness 熔断降级中心(真三态状态机 Closed/Open/HalfOpen + 单测含 -race;熔断时回流提示并收尾流,不静默丢弃)
|
||||||
- [ ] Harness LLM 自动化评测(桩)
|
- [ ] Harness LLM 自动化评测(桩)
|
||||||
- [ ] 长期偏好记忆抽取(LLM 抽取 → 去重 → memory_upsert,TODO)
|
- [ ] 长期偏好记忆抽取(LLM 抽取 → 去重 → memory_upsert,TODO)
|
||||||
|
|
||||||
@@ -89,7 +89,7 @@
|
|||||||
|
|
||||||
- [ ] **真实登录 / 鉴权 / 会话**(替掉裸 `X-User-ID`,最影响"能否交付他人用")
|
- [ ] **真实登录 / 鉴权 / 会话**(替掉裸 `X-User-ID`,最影响"能否交付他人用")
|
||||||
- [ ] **代码解释器 + 安全沙箱**(mcp-py 核心能力,目前全桩)
|
- [ ] **代码解释器 + 安全沙箱**(mcp-py 核心能力,目前全桩)
|
||||||
- [ ] **Harness 三件**:输入/输出护栏 · 熔断降级状态机 · LLM 自动化评测
|
- [ ] **Harness 余下两件**:输入/输出护栏 · LLM 自动化评测(熔断降级已完成 ✅)
|
||||||
- [ ] **长期记忆抽取** + external_api 工具
|
- [ ] **长期记忆抽取** + external_api 工具
|
||||||
- [ ] **计费 / 商业化**真实实现
|
- [ ] **计费 / 商业化**真实实现
|
||||||
- [ ] 微服务化拆分(Morph B)—— 现为 Monolith First,**按设计如此,非缺陷**
|
- [ ] 微服务化拆分(Morph B)—— 现为 Monolith First,**按设计如此,非缺陷**
|
||||||
|
|||||||
@@ -47,13 +47,18 @@ func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink Token
|
|||||||
|
|
||||||
// Handle 消费一个任务:按 DSL 编译 Eino 图并执行,把 Token 流回流到 sundynix.streams.<id>。
|
// Handle 消费一个任务:按 DSL 编译 Eino 图并执行,把 Token 流回流到 sundynix.streams.<id>。
|
||||||
func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error {
|
func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error {
|
||||||
if !o.breaker.Allow() {
|
|
||||||
log.Printf("[eino] circuit open, drop task %s", t.ID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
tr := o.tracer(t.ID)
|
tr := o.tracer(t.ID)
|
||||||
defer tr.done()
|
defer tr.done()
|
||||||
|
|
||||||
|
// 熔断开启:快速拒绝,但要让客户端解阻(回流提示 + 收尾流),不静默丢弃。
|
||||||
|
if !o.breaker.Allow() {
|
||||||
|
log.Printf("[eino] 熔断开启,拒绝任务 %s", t.ID)
|
||||||
|
tr.info("task", "system", "服务熔断", "后端连续失败,暂时拒绝新任务,请稍后重试")
|
||||||
|
_ = o.sink.PublishToken(t.ID, []byte("⚠️ 服务繁忙(已触发熔断保护),请稍后重试。"))
|
||||||
|
_ = o.sink.CompleteStream(t.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// 报告生成走专用多步编排(规划→分章并行检索撰写→汇聚→渲染 Word),而非通用对话图。
|
// 报告生成走专用多步编排(规划→分章并行检索撰写→汇聚→渲染 Word),而非通用对话图。
|
||||||
if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport {
|
if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport {
|
||||||
return o.handleReport(ctx, t, tr)
|
return o.handleReport(ctx, t, tr)
|
||||||
|
|||||||
@@ -1,12 +1,126 @@
|
|||||||
|
// Package harness 提供 dispatcher 的治理组件(熔断降级 / 评测等)。
|
||||||
package harness
|
package harness
|
||||||
|
|
||||||
// CircuitBreaker 实现熔断降级中心:后端异常时熔断并切换降级策略。
|
import (
|
||||||
type CircuitBreaker struct{ /* state, counters */ }
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
func NewCircuitBreaker() *CircuitBreaker { return &CircuitBreaker{} }
|
// State 是熔断器状态。
|
||||||
|
type State int
|
||||||
|
|
||||||
// Allow 判定当前是否放行请求。
|
const (
|
||||||
func (c *CircuitBreaker) Allow() bool { return true } // TODO: half-open / open 状态机
|
Closed State = iota // 闭合:正常放行
|
||||||
|
Open // 断开:连续失败超阈值,快速拒绝
|
||||||
|
HalfOpen // 半开:冷却后放行少量探测,成功则恢复
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s State) String() string {
|
||||||
|
switch s {
|
||||||
|
case Open:
|
||||||
|
return "open"
|
||||||
|
case HalfOpen:
|
||||||
|
return "half-open"
|
||||||
|
default:
|
||||||
|
return "closed"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 默认参数。
|
||||||
|
const (
|
||||||
|
defaultThreshold = 5 // 闭合态连续失败达此数 → 断开
|
||||||
|
defaultCooldown = 10 * time.Second // 断开后多久转半开
|
||||||
|
defaultHalfOpenMax = 1 // 半开态最多放行的探测数
|
||||||
|
)
|
||||||
|
|
||||||
|
// CircuitBreaker 实现熔断降级中心:后端连续失败时断开、快速拒绝,冷却后半开探测,
|
||||||
|
// 探测成功则恢复闭合、失败则重新断开。并发安全(多任务 goroutine 共享一个实例)。
|
||||||
|
type CircuitBreaker struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
state State
|
||||||
|
fails int // 闭合态连续失败计数
|
||||||
|
openUntil time.Time // 断开持续到的时间点
|
||||||
|
halfOpenProbes int // 半开态已放行的探测数
|
||||||
|
|
||||||
|
threshold int
|
||||||
|
cooldown time.Duration
|
||||||
|
halfOpenMax int
|
||||||
|
now func() time.Time // 可注入时钟(测试用)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCircuitBreaker() *CircuitBreaker {
|
||||||
|
return &CircuitBreaker{
|
||||||
|
state: Closed,
|
||||||
|
threshold: defaultThreshold,
|
||||||
|
cooldown: defaultCooldown,
|
||||||
|
halfOpenMax: defaultHalfOpenMax,
|
||||||
|
now: time.Now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow 判定当前是否放行请求,并在冷却到点时把断开切换为半开。
|
||||||
|
func (c *CircuitBreaker) Allow() bool {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
switch c.state {
|
||||||
|
case Open:
|
||||||
|
if !c.now().Before(c.openUntil) { // 冷却到点 → 转半开,放行首个探测
|
||||||
|
c.state = HalfOpen
|
||||||
|
c.halfOpenProbes = 1
|
||||||
|
log.Printf("[harness] 熔断器 open → half-open(放行探测)")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
case HalfOpen:
|
||||||
|
if c.halfOpenProbes < c.halfOpenMax {
|
||||||
|
c.halfOpenProbes++
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false // 探测名额用尽,待 Report 决出结果
|
||||||
|
default: // Closed
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Report 上报一次调用结果以驱动状态机。
|
// Report 上报一次调用结果以驱动状态机。
|
||||||
func (c *CircuitBreaker) Report(success bool) {} // TODO
|
func (c *CircuitBreaker) Report(success bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if success {
|
||||||
|
switch c.state {
|
||||||
|
case HalfOpen:
|
||||||
|
c.state = Closed
|
||||||
|
c.fails = 0
|
||||||
|
c.halfOpenProbes = 0
|
||||||
|
log.Printf("[harness] 熔断器 half-open → closed(已恢复)")
|
||||||
|
default:
|
||||||
|
c.fails = 0
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch c.state {
|
||||||
|
case HalfOpen:
|
||||||
|
c.trip() // 探测失败 → 重新断开
|
||||||
|
case Closed:
|
||||||
|
c.fails++
|
||||||
|
if c.fails >= c.threshold {
|
||||||
|
c.trip()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// trip 切换到断开态并设定冷却到点。调用方须持锁。
|
||||||
|
func (c *CircuitBreaker) trip() {
|
||||||
|
c.state = Open
|
||||||
|
c.openUntil = c.now().Add(c.cooldown)
|
||||||
|
c.halfOpenProbes = 0
|
||||||
|
log.Printf("[harness] 熔断器断开(连续失败),%.0fs 后转半开", c.cooldown.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// State 返回当前状态(观测 / 测试用)。
|
||||||
|
func (c *CircuitBreaker) State() State {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.state
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,112 @@
|
|||||||
|
package harness
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newTestCB 造一个可控时钟、低阈值的熔断器,便于确定性测试。
|
||||||
|
func newTestCB(threshold int, cooldown time.Duration, clock *time.Time) *CircuitBreaker {
|
||||||
|
c := NewCircuitBreaker()
|
||||||
|
c.threshold = threshold
|
||||||
|
c.cooldown = cooldown
|
||||||
|
c.now = func() time.Time { return *clock }
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCircuitBreaker_OpensAfterThreshold(t *testing.T) {
|
||||||
|
now := time.Unix(0, 0)
|
||||||
|
c := newTestCB(3, 10*time.Second, &now)
|
||||||
|
|
||||||
|
if !c.Allow() || c.State() != Closed {
|
||||||
|
t.Fatal("初始应闭合放行")
|
||||||
|
}
|
||||||
|
c.Report(false)
|
||||||
|
c.Report(false)
|
||||||
|
if c.State() != Closed {
|
||||||
|
t.Fatal("未达阈值不应断开")
|
||||||
|
}
|
||||||
|
c.Report(false) // 第 3 次连续失败 → 断开
|
||||||
|
if c.State() != Open {
|
||||||
|
t.Fatalf("达阈值应断开, got %v", c.State())
|
||||||
|
}
|
||||||
|
if c.Allow() {
|
||||||
|
t.Error("断开态应拒绝放行")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCircuitBreaker_SuccessResetsFails(t *testing.T) {
|
||||||
|
now := time.Unix(0, 0)
|
||||||
|
c := newTestCB(3, 10*time.Second, &now)
|
||||||
|
c.Report(false)
|
||||||
|
c.Report(false)
|
||||||
|
c.Report(true) // 成功清零连续失败
|
||||||
|
c.Report(false)
|
||||||
|
c.Report(false)
|
||||||
|
if c.State() != Closed {
|
||||||
|
t.Errorf("成功应清零计数,不应断开, got %v", c.State())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCircuitBreaker_HalfOpenRecovers(t *testing.T) {
|
||||||
|
now := time.Unix(0, 0)
|
||||||
|
c := newTestCB(2, 10*time.Second, &now)
|
||||||
|
c.Report(false)
|
||||||
|
c.Report(false) // 断开
|
||||||
|
if c.State() != Open || c.Allow() {
|
||||||
|
t.Fatal("应断开并拒绝")
|
||||||
|
}
|
||||||
|
// 冷却未到 → 仍拒绝。
|
||||||
|
now = now.Add(5 * time.Second)
|
||||||
|
if c.Allow() {
|
||||||
|
t.Fatal("冷却未到不应放行")
|
||||||
|
}
|
||||||
|
// 冷却到点 → 半开放行一个探测,第二个被拒。
|
||||||
|
now = now.Add(6 * time.Second)
|
||||||
|
if !c.Allow() || c.State() != HalfOpen {
|
||||||
|
t.Fatalf("冷却到点应转半开并放行探测, state=%v", c.State())
|
||||||
|
}
|
||||||
|
if c.Allow() {
|
||||||
|
t.Error("半开态超过探测名额应拒绝")
|
||||||
|
}
|
||||||
|
// 探测成功 → 恢复闭合。
|
||||||
|
c.Report(true)
|
||||||
|
if c.State() != Closed || !c.Allow() {
|
||||||
|
t.Errorf("探测成功应恢复闭合, state=%v", c.State())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCircuitBreaker_HalfOpenProbeFailReopens(t *testing.T) {
|
||||||
|
now := time.Unix(0, 0)
|
||||||
|
c := newTestCB(1, 10*time.Second, &now)
|
||||||
|
c.Report(false) // 阈值 1 → 立即断开
|
||||||
|
now = now.Add(11 * time.Second)
|
||||||
|
if !c.Allow() || c.State() != HalfOpen {
|
||||||
|
t.Fatal("应转半开")
|
||||||
|
}
|
||||||
|
c.Report(false) // 探测失败 → 重新断开,冷却从此刻重算
|
||||||
|
if c.State() != Open {
|
||||||
|
t.Fatalf("探测失败应重新断开, got %v", c.State())
|
||||||
|
}
|
||||||
|
if c.Allow() {
|
||||||
|
t.Error("重新断开后冷却内应拒绝")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCircuitBreaker_ConcurrentSafe(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
c := newTestCB(1000000, time.Second, &now)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < 200; j++ {
|
||||||
|
c.Allow()
|
||||||
|
c.Report(j%2 == 0)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait() // 仅验证无数据竞争 / 死锁(配合 -race)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user