CircuitBreaker的简易实现
基于异常阈值并且能够自恢复的实现
基于异常阈值、能够自恢复的CircuitBreaker
实现需要引入Half_Open
状态,同时需要记录最后一次失败调用的时间戳以及reset_timeout
(断路器的当前的系统时间戳减去上一阶段最后一次失败调用的时间差,大于某个值的时候,并且当前的失败调用大于失败阈值则需要把状态重置为Half_Open
,这里的"某个值"定义为reset_timeout
),示意图如下:
假设当前的调用为圆形6
,当前系统时间戳减去(上一轮)最后一个失败调用(圆形5
)的时间戳大于预设的reset_timeout
的时候,不论当次调用是成功还是失败,直到下一次调用失败或者失败调用数降低到转换为Closed
状态之前,都处于Half_Open
状态,会对单个调用进行放行(并发场景下也有可能同时放行多个调用)。代码实现如下:
// 添加一个Monitor用于记录状态变更 public enum CircuitBreakerStatusMonitor { /** * 单例 */ X; public void report(String name, CircuitBreakerStatus o, CircuitBreakerStatus n) { System.out.println(String.format("断路器[%s]状态变更,[%s]->[%s]", name, o, n)); } public void reset(String name) { System.out.println(String.format("断路器[%s]重置", name)); } } @Getter public class RestCircuitBreaker { private final long failureThreshold; private final long resetTimeout; private LongAdder failureCounter; private LongAdder callCounter; private AtomicReference<CircuitBreakerStatus> status; private final Object fallback = null; /** * 最后一次调用失败的时间戳 */ private long lastFailureTime; public RestCircuitBreaker(long failureThreshold, long resetTimeout) { this.failureThreshold = failureThreshold; this.resetTimeout = resetTimeout; reset(); } public void reset() { CircuitBreakerStatusMonitor.X.reset("RestCircuitBreaker"); this.callCounter = new LongAdder(); this.failureCounter = new LongAdder(); this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this.lastFailureTime = -1L; } @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { try { if (shouldAllowExecution()) { T result = supplier.get(); markSuccess(); return result; } } catch (Exception e) { markNoneSuccess(); } finally { this.callCounter.increment(); } return (T) fallback; } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } boolean shouldAllowExecution() { // 本质是Closed状态 if (lastFailureTime == -1L) { return true; } // 没到达阈值 if (failureThreshold > failureCounter.sum()) { return true; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report("RestCircuitBreaker", o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow() { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess() { if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markNoneSuccess() { this.failureCounter.increment(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } if (this.failureCounter.sum() >= failureThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } } 复制代码
编写一个测试客户端RestCircuitBreakerClient
:
public class RestCircuitBreakerClient { public static void main(String[] args) throws Exception { Service service = new Service(); RestCircuitBreaker cb = new RestCircuitBreaker(5, 500); for (int i = 0; i < 10; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); // 故意成功 cb.call(service::processSuccess); for (int i = 0; i < 3; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } } public static class Service { public String process(int i) { System.out.println("进入process方法,number:" + i); throw new RuntimeException(String.valueOf(i)); } public void processSuccess() { System.out.println("调用processSuccess方法"); } } } 复制代码
输出结果如下:
断路器[RestCircuitBreaker]重置 进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 进入process方法,number:3 返回结果:null,number:3 进入process方法,number:4 断路器[RestCircuitBreaker]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:4 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9 断路器[RestCircuitBreaker]状态变更,[OPEN]->[HALF_OPEN] 调用processSuccess方法 # <------ 这个位置的成功调用重置了断路器的状态 断路器[RestCircuitBreaker]状态变更,[HALF_OPEN]->[CLOSED] 断路器[RestCircuitBreaker]重置 进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 复制代码
基于线程池隔离和超时控制
在使用CircuitBreaker
的时候,可以基于不同的资源(唯一标识可以使用resource_key
或者resource_name
)创建单独的线程池,让资源基于线程池进行隔离调用。这种设计的原则借鉴于运货船的船舱设计,每个船舱都使用绝缘的材料进行分隔,一旦某个船舱出现了火情,也不会蔓延到其他船舱。在Java
体系中,可以使用线程池ThreadPoolExecutor#submit(Callable<T> task)
进行指定超时上限限制的任务提交和结果获取,这样就可以预设一个调用超时时间上限,限制每个调用的可用的最大调用时间。
首先需要设计一个轻量级的资源线程池管理模块:
// 资源配置 @Data public class CircuitBreakerResourceConf { private String resourceName; private int coreSize; private int queueSize; private long timeout; } public enum CircuitBreakerResourceManager { /** * 单例 */ X; public final Map<String, CircuitBreakerResource> cache = new ConcurrentHashMap<>(8); public void register(CircuitBreakerResourceConf conf) { cache.computeIfAbsent(conf.getResourceName(), rn -> { int coreSize = conf.getCoreSize(); int queueSize = conf.getQueueSize(); BlockingQueue<Runnable> queue; if (queueSize > 0) { queue = new ArrayBlockingQueue<>(queueSize); } else { queue = new SynchronousQueue<>(); } ThreadPoolExecutor executor = new ThreadPoolExecutor( coreSize, coreSize, 0, TimeUnit.SECONDS, queue, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(rn + "-CircuitBreakerWorker-" + counter.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.AbortPolicy() ); CircuitBreakerResource resource = new CircuitBreakerResource(); resource.setExecutor(executor); resource.setTimeout(conf.getTimeout()); return resource; }); } public CircuitBreakerResource get(String resourceName) { return Optional.ofNullable(cache.get(resourceName)).orElseThrow(() -> new IllegalArgumentException(resourceName)); } } 复制代码
编写断路器ResourceCircuitBreaker
的实现代码:
@Getter public class ResourceCircuitBreaker { private final long failureThreshold; private final long resetTimeout; private LongAdder failureCounter; private LongAdder callCounter; private AtomicReference<CircuitBreakerStatus> status; private final ThreadPoolExecutor executor; private final Object fallback = null; private final String circuitBreakerName; /** * 最后一次调用失败的时间戳 */ private long lastFailureTime; /** * 执行超时上限,单位毫秒 */ private final long executionTimeout; public ResourceCircuitBreaker(String resourceName, long failureThreshold, long resetTimeout) { CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName); this.circuitBreakerName = "ResourceCircuitBreaker-" + resourceName; this.executor = resource.getExecutor(); this.executionTimeout = resource.getTimeout(); this.failureThreshold = failureThreshold; this.resetTimeout = resetTimeout; reset(); } public void reset() { CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName); this.callCounter = new LongAdder(); this.failureCounter = new LongAdder(); this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this.lastFailureTime = -1L; } @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { try { if (shouldAllowExecution()) { Future<T> future = this.executor.submit(warp(supplier)); T result = future.get(executionTimeout, TimeUnit.MILLISECONDS); markSuccess(); return result; } } catch (Exception e) { markNoneSuccess(); } finally { this.callCounter.increment(); } return (T) fallback; } <T> Callable<T> warp(Supplier<T> supplier) { return supplier::get; } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } boolean shouldAllowExecution() { // 本质是Closed状态 if (lastFailureTime == -1L) { return true; } // 没到达阈值 if (failureThreshold > failureCounter.sum()) { return true; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow() { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess() { if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markNoneSuccess() { this.failureCounter.increment(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } if (this.failureCounter.sum() >= failureThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } } 复制代码
编写测试场景类ResourceCircuitBreakerClient
:
public class ResourceCircuitBreakerClient { public static void main(String[] args) throws Exception { CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf(); conf.setCoreSize(10); conf.setQueueSize(0); conf.setResourceName("SERVICE"); conf.setTimeout(50); CircuitBreakerResourceManager.X.register(conf); Service service = new Service(); ResourceCircuitBreaker cb = new ResourceCircuitBreaker("SERVICE", 5, 500); for (int i = 0; i < 10; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); cb.call(service::processSuccess); for (int i = 0; i < 3; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } } public static class Service { private final Random r = new Random(); public String process(int i) { int sleep = r.nextInt(200); System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒", Thread.currentThread().getName(), i, sleep)); try { Thread.sleep(sleep); } catch (InterruptedException ignore) { } return String.valueOf(i); } public void processSuccess() { System.out.println(String.format("线程[%s]-调用processSuccess方法", Thread.currentThread().getName())); } } } 复制代码
某次执行的输出结果如下:
断路器[ResourceCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-0]-进入process方法,number:0,休眠67毫秒 返回结果:null,number:0 线程[SERVICE-CircuitBreakerWorker-1]-进入process方法,number:1,休眠85毫秒 返回结果:null,number:1 线程[SERVICE-CircuitBreakerWorker-2]-进入process方法,number:2,休眠72毫秒 返回结果:null,number:2 线程[SERVICE-CircuitBreakerWorker-3]-进入process方法,number:3,休眠88毫秒 返回结果:null,number:3 线程[SERVICE-CircuitBreakerWorker-4]-进入process方法,number:4,休眠28毫秒 返回结果:4,number:4 线程[SERVICE-CircuitBreakerWorker-5]-进入process方法,number:5,休眠102毫秒 断路器[ResourceCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9 断路器[ResourceCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN] 线程[SERVICE-CircuitBreakerWorker-6]-调用processSuccess方法 断路器[ResourceCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED] 断路器[ResourceCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-7]-进入process方法,number:0,休眠74毫秒 返回结果:null,number:0 线程[SERVICE-CircuitBreakerWorker-8]-进入process方法,number:1,休眠111毫秒 返回结果:null,number:1 线程[SERVICE-CircuitBreakerWorker-9]-进入process方法,number:2,休眠183毫秒 返回结果:null,number:2 复制代码
滑动窗口和百分比统计
上一个小节已经实现了资源基于线程池隔离进行调用,但是有一点明显的不足就是:断路器的状态管理和重置并不符合生产场景,HALF_OPEN -> CLOSED
的状态切换和重置不应该在放行单个调用成功之后立刻触发,而应该建立在一定时间范围内,调用的(平均)失败率下降到某个阈值或者调用的(平均)成功率恢复到某个阈值,否则很多场景下会导致断路器的状态频繁发生切换,功能基本处于失效的状态。也就是大多数场景下,一段时间内的failurePercent
会比异常计数和failureThreshold
的直接对比更加准确。可以引入滑动窗口(Sliding Window
)的概念,记录每个时间单元内的调用总次数、调用成功次数、调用超时次数和非超时的调用失败次数,为了简化操作这个时间单元定义为1
秒:
定义一个用于记录这四种调用次数的桶Bucket
类(这里的实现稍微跟上图有点不同,非超时失败修改为线程池拒绝的任务统计,而失败统计包括了任务超时执行和一般的业务异常):
@RequiredArgsConstructor @Getter public class MetricInfo { private final long total; private final long success; private final long failure; private final long reject; public static final MetricInfo EMPTY = new MetricInfo(0, 0, 0, 0); public MetricInfo merge(MetricInfo other) { return new MetricInfo( this.total + other.getTotal(), this.success + other.getSuccess(), this.failure + other.getFailure(), this.reject + other.getReject() ); } } public class Bucket { // 记录窗口开始的时间戳 @Getter private final long windowStartTimestamp; private final LongAdder total; private final LongAdder success; private final LongAdder failure; private final LongAdder reject; public Bucket(long windowStartTimestamp) { this.windowStartTimestamp = windowStartTimestamp; this.total = new LongAdder(); this.success = new LongAdder(); this.reject = new LongAdder(); this.failure = new LongAdder(); } public void increaseTotal() { this.total.increment(); } public void increaseSuccess() { this.success.increment(); } public void increaseFailure() { this.failure.increment(); } public void increaseReject() { this.reject.increment(); } public long totalCount() { return this.total.sum(); } public long successCount() { return this.success.sum(); } public long failureCount() { return this.failure.sum(); } public long rejectCount() { return this.reject.sum(); } public void reset() { this.total.reset(); this.success.reset(); this.failure.reset(); this.reject.reset(); } public MetricInfo metricInfo() { return new MetricInfo( totalCount(), successCount(), failureCount(), rejectCount() ); } @Override public String toString() { return String.format("Bucket[wt=%d,t=%d,s=%d,f=%d,r=%d]", windowStartTimestamp, totalCount(), successCount(), failureCount(), rejectCount() ); } } 复制代码
在Hystrix
中,为了更加灵活,Bucket
中的计数器设计为LongAdder[]
类型,便于通过各种需要计数事件枚举的顺序值来直接进行计数和累加,而为了节约内存空间,滑动窗口设计成一个容量固定可复用的环形队列BucketCircularArray#ListState
,这里可以站在巨人的肩膀上借鉴其思路实现BucketCircular
:
public class BucketCircular implements Iterable<Bucket> { private final AtomicReference<BucketArray> bucketArray; public BucketCircular(int bucketNumber) { // 这里有个技巧,初始化数组的时候让数组的总长度为桶数量+1,便于额外的添加和移除桶操作 AtomicReferenceArray<Bucket> buckets = new AtomicReferenceArray<>(bucketNumber + 1); this.bucketArray = new AtomicReference<>(new BucketArray(buckets, 0, 0, bucketNumber)); } public Bucket getTail() { return this.bucketArray.get().tail(); } /** * 在环形队列尾部添加一个桶 */ public void addTail(Bucket bucket) { BucketArray bucketArray = this.bucketArray.get(); BucketArray newBucketArray = bucketArray.addBucket(bucket); // 这个方法会在锁中执行,理论上不会CAS失败 this.bucketArray.compareAndSet(bucketArray, newBucketArray); } public Bucket[] toArray() { return this.bucketArray.get().toArray(); } public int size() { return this.bucketArray.get().getSize(); } @Override public Iterator<Bucket> iterator() { return Collections.unmodifiableList(Arrays.asList(toArray())).iterator(); } public void clear() { while (true) { BucketArray bucketArray = this.bucketArray.get(); BucketArray clear = bucketArray.clear(); if (this.bucketArray.compareAndSet(bucketArray, clear)) { return; } } } } 复制代码
添加一个新的Bucket
到循环队列的尾部的时候,因为队列的长度是固定的,需要判断是否需要重新计算头指针和尾指针。测试一下:
public static void main(String[] args) throws Exception { BucketCircular circular = new BucketCircular(5); circular.addTail(new Bucket(111L)); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(System.currentTimeMillis())); circular.addTail(new Bucket(222L)); Stream.of(circular.toArray()).forEach(System.out::println); } // 输出结果 Bucket[wt=1603613365205,t=0,s=0,f=0,r=0] Bucket[wt=1603613365205,t=0,s=0,f=0,r=0] Bucket[wt=1603613365205,t=0,s=0,f=0,r=0] Bucket[wt=1603613365205,t=0,s=0,f=0,r=0] Bucket[wt=222,t=0,s=0,f=0,r=0] 复制代码
接着编写一个用于管理Bucket
和提供数据统计入口的SlidingWindowMonitor
:
// 累计数据累加器 public class BucketCumulativeCalculator { private LongAdder total = new LongAdder(); private LongAdder success = new LongAdder(); private LongAdder failure = new LongAdder(); private LongAdder reject = new LongAdder(); public void addBucket(Bucket lb) { total.add(lb.totalCount()); success.add(lb.successCount()); failure.add(lb.failureCount()); reject.add(lb.rejectCount()); } public MetricInfo sum() { return new MetricInfo( total.sum(), success.sum(), failure.sum(), reject.sum() ); } public void reset() { total = new LongAdder(); success = new LongAdder(); failure = new LongAdder(); reject = new LongAdder(); } } // 下面的几个参数为了简单起见暂时固定 public class SlidingWindowMonitor { /** * 窗口长度 - 10秒 */ private final int windowDuration = 10000; /** * 桶的大小 - 时间单位为1秒 */ private final int bucketSizeInTimeUint = 1000; /** * 桶的数量 - 必须满足windowDuration % bucketSizeInTimeUint = 0 */ private final int bucketNumber = windowDuration / bucketSizeInTimeUint; private final BucketCircular bucketCircular; /** * 用于创建桶的时候进行锁定 */ private final ReentrantLock lock; /** * 累计计数器 */ private final BucketCumulativeCalculator calculator = new BucketCumulativeCalculator(); public SlidingWindowMonitor() { this.bucketCircular = new BucketCircular(bucketNumber); this.lock = new ReentrantLock(); } void reset() { Bucket tailBucket = bucketCircular.getTail(); if (null != tailBucket) { calculator.addBucket(tailBucket); } bucketCircular.clear(); } /** * 累计统计 */ public MetricInfo getCumulativeMetricInfo() { return getCurrentMetricInfo().merge(calculator.sum()); } /** * 当前统计 */ public MetricInfo getCurrentMetricInfo() { Bucket currentBucket = getCurrentBucket(); if (null == currentBucket) { return MetricInfo.EMPTY; } return currentBucket.metricInfo(); } /** * 滚动统计 - 这个就是断路器计算错误请求百分比的来源数据 */ public MetricInfo getRollingMetricInfo() { Bucket currentBucket = getCurrentBucket(); if (null == currentBucket) { return MetricInfo.EMPTY; } MetricInfo info = new MetricInfo(0, 0, 0, 0); for (Bucket bucket : this.bucketCircular) { info = info.merge(bucket.metricInfo()); } return info; } /** * 这个方法是核心 - 用于获取当前系统时间对应的Bucket */ Bucket getCurrentBucket() { long time = System.currentTimeMillis(); Bucket tailBucket = bucketCircular.getTail(); // 队尾的桶还在当前的时间所在的桶区间内则直接使用此桶 if (null != tailBucket && time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) { return tailBucket; } if (lock.tryLock()) { try { // 循环队列为空 if (null == bucketCircular.getTail()) { Bucket newBucket = new Bucket(time); bucketCircular.addTail(newBucket); return newBucket; } else { // 需要创建足够多的桶以追上当前的时间 for (int i = 0; i < bucketNumber; i++) { tailBucket = bucketCircular.getTail(); // 最新的一个桶已经追上了当前时间 if (time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) { return tailBucket; } // 当前时间已经到了下一个窗口 else if (time > tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint + windowDuration) { reset(); return getCurrentBucket(); } // 这种情况是当前最新时间比窗口超前,要填补过去的桶 else { bucketCircular.addTail(new Bucket(tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint)); calculator.addBucket(tailBucket); } } return bucketCircular.getTail(); } } finally { lock.unlock(); } } else { // 获取锁失败说明多线程并发创建桶,再获取一次不空则为另一个获取锁成功的线程创建的最新的桶,否则需要进行线程等待和递归获取 tailBucket = bucketCircular.getTail(); if (null != tailBucket) { return tailBucket; } try { Thread.sleep(5); } catch (InterruptedException ignore) { } // 递归 return getCurrentBucket(); } } public void incrementTotal() { getCurrentBucket().increaseTotal(); } public void incrementSuccess() { getCurrentBucket().increaseSuccess(); } public void incrementFailure() { getCurrentBucket().increaseFailure(); } public void incrementReject() { getCurrentBucket().increaseReject(); } } 复制代码
最后,把SlidingWindowMonitor
和之前的ResourceCircuitBreaker
做一次融合进化,得到SlidingWindowCircuitBreaker
:
package cn.throwx.cb; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; /** * @author throwable * @version v1 * @description * @since 2020/10/25 17:14 */ public class SlidingWindowCircuitBreaker { /** * 失败百分比阈值 */ private final long errorPercentThreshold; /** * 熔断等待窗口 */ private final long resetTimeout; private AtomicReference<CircuitBreakerStatus> status; private final ThreadPoolExecutor executor; private final String circuitBreakerName; /** * 最后一次调用失败的时间戳 */ private long lastFailureTime; /** * 执行超时上限,单位毫秒 */ private final long executionTimeout; /** * 滑动窗口监视器 */ private final SlidingWindowMonitor slidingWindowMonitor; public SlidingWindowCircuitBreaker(String resourceName, long errorPercentThreshold, long resetTimeout) { CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName); this.circuitBreakerName = "SlidingWindowCircuitBreaker-" + resourceName; this.executor = resource.getExecutor(); this.executionTimeout = resource.getTimeout(); this.errorPercentThreshold = errorPercentThreshold; this.resetTimeout = resetTimeout; this.slidingWindowMonitor = new SlidingWindowMonitor(); reset(); } public void reset() { CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName); this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this.lastFailureTime = -1L; } @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { return call(supplier, (Fallback<T>) Fallback.F); } public <T> T call(Supplier<T> supplier, Fallback<T> fallback) { try { if (shouldAllowExecution()) { slidingWindowMonitor.incrementTotal(); Future<T> future = this.executor.submit(warp(supplier)); T result = future.get(executionTimeout, TimeUnit.MILLISECONDS); markSuccess(); return result; } } catch (RejectedExecutionException ree) { markReject(); } catch (Exception e) { markFailure(); } return fallback.fallback(); } <T> Callable<T> warp(Supplier<T> supplier) { return supplier::get; } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } boolean shouldAllowExecution() { // 本质是Closed状态 if (lastFailureTime == -1L) { return true; } // 没到达阈值 if (errorPercentThreshold > rollingErrorPercentage()) { return false; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow() { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess() { slidingWindowMonitor.incrementSuccess(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markReject() { slidingWindowMonitor.incrementReject(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } public int rollingErrorPercentage() { MetricInfo rollingMetricInfo = slidingWindowMonitor.getRollingMetricInfo(); long rejectCount = rollingMetricInfo.getReject(); long failureCount = rollingMetricInfo.getFailure(); long totalCount = rollingMetricInfo.getTotal(); int errorPercentage = (int) ((double) (rejectCount + failureCount) / totalCount * 100); CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, String.format("错误百分比:%d", errorPercentage)); return errorPercentage; } public void markFailure() { slidingWindowMonitor.incrementFailure(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } if (rollingErrorPercentage() >= errorPercentThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } } 复制代码
编写一个测试客户端SlidingWindowCircuitBreakerClient
:
public class SlidingWindowCircuitBreakerClient { public static void main(String[] args) throws Exception { CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf(); conf.setCoreSize(10); conf.setQueueSize(0); conf.setResourceName("SERVICE"); conf.setTimeout(50); CircuitBreakerResourceManager.X.register(conf); Service service = new Service(); SlidingWindowCircuitBreaker cb = new SlidingWindowCircuitBreaker("SERVICE", 50, 500); for (int i = 0; i < 10; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); cb.call(service::processSuccess); for (int i = 0; i < 3; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); cb.call(service::processSuccess); cb.call(service::processSuccess); } public static class Service { private final Random r = new Random(); public String process(int i) { int sleep = r.nextInt(200); System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒", Thread.currentThread().getName(), i, sleep)); try { Thread.sleep(sleep); } catch (InterruptedException ignore) { } return String.valueOf(i); } public void processSuccess() { System.out.println(String.format("线程[%s]-调用processSuccess方法", Thread.currentThread().getName())); } } } 复制代码
某次执行结果如下:
断路器[SlidingWindowCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-0]-进入process方法,number:0,休眠67毫秒 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:0 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:1 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:2 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:3 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:4 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:5 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:6 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:7 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:8 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 返回结果:null,number:9 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN] 线程[SERVICE-CircuitBreakerWorker-1]-调用processSuccess方法 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED] 断路器[SlidingWindowCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-2]-进入process方法,number:0,休眠84毫秒 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:0 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 返回结果:null,number:1 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 返回结果:null,number:2 断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN] 线程[SERVICE-CircuitBreakerWorker-3]-调用processSuccess方法 断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED] 断路器[SlidingWindowCircuitBreaker-SERVICE]重置 线程[SERVICE-CircuitBreakerWorker-4]-调用processSuccess方法 复制代码
小结
生产上应用CircuitBreaker
模式建议使用主流实现例如Hystrix
或者更活跃的Sentinel
,但是要深入学习此模式则需要老老实实做一次推演。
参考资料:
- CircuitBreaker - by Martin Fowler
- 《Release It! Design and Deploy Production-Ready Software》
(本文完 c-4-d e-a-20201025)