diff --git a/PROGRESS.md b/PROGRESS.md index ae30bd2..12d1ffc 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -52,7 +52,7 @@ - [x] 记忆召回(画像 + 历史注入) - [x] 报告专用编排(规划 → 分章并行 → 汇聚 → 存源) - [x] 会话历史写回 -- [ ] Harness 熔断降级中心(`CircuitBreaker.Allow()` 恒 true,桩) +- [x] Harness 熔断降级中心(真三态状态机 Closed/Open/HalfOpen + 单测含 -race;熔断时回流提示并收尾流,不静默丢弃) - [ ] Harness LLM 自动化评测(桩) - [ ] 长期偏好记忆抽取(LLM 抽取 → 去重 → memory_upsert,TODO) @@ -89,7 +89,7 @@ - [ ] **真实登录 / 鉴权 / 会话**(替掉裸 `X-User-ID`,最影响"能否交付他人用") - [ ] **代码解释器 + 安全沙箱**(mcp-py 核心能力,目前全桩) -- [ ] **Harness 三件**:输入/输出护栏 · 熔断降级状态机 · LLM 自动化评测 +- [ ] **Harness 余下两件**:输入/输出护栏 · LLM 自动化评测(熔断降级已完成 ✅) - [ ] **长期记忆抽取** + external_api 工具 - [ ] **计费 / 商业化**真实实现 - [ ] 微服务化拆分(Morph B)—— 现为 Monolith First,**按设计如此,非缺陷** diff --git a/sundynix-dispatcher/internal/eino/orchestrator.go b/sundynix-dispatcher/internal/eino/orchestrator.go index 1aa2b7a..eef7fff 100644 --- a/sundynix-dispatcher/internal/eino/orchestrator.go +++ b/sundynix-dispatcher/internal/eino/orchestrator.go @@ -47,13 +47,18 @@ func NewOrchestrator(pool *llm.Pool, breaker *harness.CircuitBreaker, sink Token // Handle 消费一个任务:按 DSL 编译 Eino 图并执行,把 Token 流回流到 sundynix.streams.。 func (o *Orchestrator) Handle(ctx context.Context, t *contract.Task) error { - if !o.breaker.Allow() { - log.Printf("[eino] circuit open, drop task %s", t.ID) - return nil - } tr := o.tracer(t.ID) defer tr.done() + // 熔断开启:快速拒绝,但要让客户端解阻(回流提示 + 收尾流),不静默丢弃。 + if !o.breaker.Allow() { + log.Printf("[eino] 熔断开启,拒绝任务 %s", t.ID) + tr.info("task", "system", "服务熔断", "后端连续失败,暂时拒绝新任务,请稍后重试") + _ = o.sink.PublishToken(t.ID, []byte("⚠️ 服务繁忙(已触发熔断保护),请稍后重试。")) + _ = o.sink.CompleteStream(t.ID) + return nil + } + // 报告生成走专用多步编排(规划→分章并行检索撰写→汇聚→渲染 Word),而非通用对话图。 if intent, _ := t.Meta[contract.MetaIntent].(string); intent == contract.IntentReport { return o.handleReport(ctx, t, tr) diff --git a/sundynix-dispatcher/internal/harness/circuitbreaker.go b/sundynix-dispatcher/internal/harness/circuitbreaker.go index 0710c7a..6037ff9 100644 --- a/sundynix-dispatcher/internal/harness/circuitbreaker.go +++ b/sundynix-dispatcher/internal/harness/circuitbreaker.go @@ -1,12 +1,126 @@ +// Package harness 提供 dispatcher 的治理组件(熔断降级 / 评测等)。 package harness -// CircuitBreaker 实现熔断降级中心:后端异常时熔断并切换降级策略。 -type CircuitBreaker struct{ /* state, counters */ } +import ( + "log" + "sync" + "time" +) -func NewCircuitBreaker() *CircuitBreaker { return &CircuitBreaker{} } +// State 是熔断器状态。 +type State int -// Allow 判定当前是否放行请求。 -func (c *CircuitBreaker) Allow() bool { return true } // TODO: half-open / open 状态机 +const ( + Closed State = iota // 闭合:正常放行 + Open // 断开:连续失败超阈值,快速拒绝 + HalfOpen // 半开:冷却后放行少量探测,成功则恢复 +) + +func (s State) String() string { + switch s { + case Open: + return "open" + case HalfOpen: + return "half-open" + default: + return "closed" + } +} + +// 默认参数。 +const ( + defaultThreshold = 5 // 闭合态连续失败达此数 → 断开 + defaultCooldown = 10 * time.Second // 断开后多久转半开 + defaultHalfOpenMax = 1 // 半开态最多放行的探测数 +) + +// CircuitBreaker 实现熔断降级中心:后端连续失败时断开、快速拒绝,冷却后半开探测, +// 探测成功则恢复闭合、失败则重新断开。并发安全(多任务 goroutine 共享一个实例)。 +type CircuitBreaker struct { + mu sync.Mutex + state State + fails int // 闭合态连续失败计数 + openUntil time.Time // 断开持续到的时间点 + halfOpenProbes int // 半开态已放行的探测数 + + threshold int + cooldown time.Duration + halfOpenMax int + now func() time.Time // 可注入时钟(测试用) +} + +func NewCircuitBreaker() *CircuitBreaker { + return &CircuitBreaker{ + state: Closed, + threshold: defaultThreshold, + cooldown: defaultCooldown, + halfOpenMax: defaultHalfOpenMax, + now: time.Now, + } +} + +// Allow 判定当前是否放行请求,并在冷却到点时把断开切换为半开。 +func (c *CircuitBreaker) Allow() bool { + c.mu.Lock() + defer c.mu.Unlock() + switch c.state { + case Open: + if !c.now().Before(c.openUntil) { // 冷却到点 → 转半开,放行首个探测 + c.state = HalfOpen + c.halfOpenProbes = 1 + log.Printf("[harness] 熔断器 open → half-open(放行探测)") + return true + } + return false + case HalfOpen: + if c.halfOpenProbes < c.halfOpenMax { + c.halfOpenProbes++ + return true + } + return false // 探测名额用尽,待 Report 决出结果 + default: // Closed + return true + } +} // Report 上报一次调用结果以驱动状态机。 -func (c *CircuitBreaker) Report(success bool) {} // TODO +func (c *CircuitBreaker) Report(success bool) { + c.mu.Lock() + defer c.mu.Unlock() + if success { + switch c.state { + case HalfOpen: + c.state = Closed + c.fails = 0 + c.halfOpenProbes = 0 + log.Printf("[harness] 熔断器 half-open → closed(已恢复)") + default: + c.fails = 0 + } + return + } + switch c.state { + case HalfOpen: + c.trip() // 探测失败 → 重新断开 + case Closed: + c.fails++ + if c.fails >= c.threshold { + c.trip() + } + } +} + +// trip 切换到断开态并设定冷却到点。调用方须持锁。 +func (c *CircuitBreaker) trip() { + c.state = Open + c.openUntil = c.now().Add(c.cooldown) + c.halfOpenProbes = 0 + log.Printf("[harness] 熔断器断开(连续失败),%.0fs 后转半开", c.cooldown.Seconds()) +} + +// State 返回当前状态(观测 / 测试用)。 +func (c *CircuitBreaker) State() State { + c.mu.Lock() + defer c.mu.Unlock() + return c.state +} diff --git a/sundynix-dispatcher/internal/harness/circuitbreaker_test.go b/sundynix-dispatcher/internal/harness/circuitbreaker_test.go new file mode 100644 index 0000000..51bff92 --- /dev/null +++ b/sundynix-dispatcher/internal/harness/circuitbreaker_test.go @@ -0,0 +1,112 @@ +package harness + +import ( + "sync" + "testing" + "time" +) + +// newTestCB 造一个可控时钟、低阈值的熔断器,便于确定性测试。 +func newTestCB(threshold int, cooldown time.Duration, clock *time.Time) *CircuitBreaker { + c := NewCircuitBreaker() + c.threshold = threshold + c.cooldown = cooldown + c.now = func() time.Time { return *clock } + return c +} + +func TestCircuitBreaker_OpensAfterThreshold(t *testing.T) { + now := time.Unix(0, 0) + c := newTestCB(3, 10*time.Second, &now) + + if !c.Allow() || c.State() != Closed { + t.Fatal("初始应闭合放行") + } + c.Report(false) + c.Report(false) + if c.State() != Closed { + t.Fatal("未达阈值不应断开") + } + c.Report(false) // 第 3 次连续失败 → 断开 + if c.State() != Open { + t.Fatalf("达阈值应断开, got %v", c.State()) + } + if c.Allow() { + t.Error("断开态应拒绝放行") + } +} + +func TestCircuitBreaker_SuccessResetsFails(t *testing.T) { + now := time.Unix(0, 0) + c := newTestCB(3, 10*time.Second, &now) + c.Report(false) + c.Report(false) + c.Report(true) // 成功清零连续失败 + c.Report(false) + c.Report(false) + if c.State() != Closed { + t.Errorf("成功应清零计数,不应断开, got %v", c.State()) + } +} + +func TestCircuitBreaker_HalfOpenRecovers(t *testing.T) { + now := time.Unix(0, 0) + c := newTestCB(2, 10*time.Second, &now) + c.Report(false) + c.Report(false) // 断开 + if c.State() != Open || c.Allow() { + t.Fatal("应断开并拒绝") + } + // 冷却未到 → 仍拒绝。 + now = now.Add(5 * time.Second) + if c.Allow() { + t.Fatal("冷却未到不应放行") + } + // 冷却到点 → 半开放行一个探测,第二个被拒。 + now = now.Add(6 * time.Second) + if !c.Allow() || c.State() != HalfOpen { + t.Fatalf("冷却到点应转半开并放行探测, state=%v", c.State()) + } + if c.Allow() { + t.Error("半开态超过探测名额应拒绝") + } + // 探测成功 → 恢复闭合。 + c.Report(true) + if c.State() != Closed || !c.Allow() { + t.Errorf("探测成功应恢复闭合, state=%v", c.State()) + } +} + +func TestCircuitBreaker_HalfOpenProbeFailReopens(t *testing.T) { + now := time.Unix(0, 0) + c := newTestCB(1, 10*time.Second, &now) + c.Report(false) // 阈值 1 → 立即断开 + now = now.Add(11 * time.Second) + if !c.Allow() || c.State() != HalfOpen { + t.Fatal("应转半开") + } + c.Report(false) // 探测失败 → 重新断开,冷却从此刻重算 + if c.State() != Open { + t.Fatalf("探测失败应重新断开, got %v", c.State()) + } + if c.Allow() { + t.Error("重新断开后冷却内应拒绝") + } +} + +func TestCircuitBreaker_ConcurrentSafe(t *testing.T) { + now := time.Now() + c := newTestCB(1000000, time.Second, &now) + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 200; j++ { + c.Allow() + c.Report(j%2 == 0) + } + }() + } + wg.Wait() // 仅验证无数据竞争 / 死锁(配合 -race) +}