冷饭新炒:理解断路器CircuitBreaker的原理与实现(下)

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 笔者之前在查找Sentinel相关资料的时候,偶然中找到了Martin Fowler大神的一篇文章《CircuitBreaker》。于是花了点时间仔细阅读,顺便温习一下断路器CircuitBreaker的原理与实现。

CircuitBreaker的简易实现



基于异常阈值并且能够自恢复的实现


基于异常阈值、能够自恢复的CircuitBreaker实现需要引入Half_Open状态,同时需要记录最后一次失败调用的时间戳以及reset_timeout(断路器的当前的系统时间戳减去上一阶段最后一次失败调用的时间差,大于某个值的时候,并且当前的失败调用大于失败阈值则需要把状态重置为Half_Open,这里的"某个值"定义为reset_timeout),示意图如下:


网络异常,图片无法展示
|


假设当前的调用为圆形6,当前系统时间戳减去(上一轮)最后一个失败调用(圆形5)的时间戳大于预设的reset_timeout的时候,不论当次调用是成功还是失败,直到下一次调用失败或者失败调用数降低到转换为Closed状态之前,都处于Half_Open状态,会对单个调用进行放行(并发场景下也有可能同时放行多个调用)。代码实现如下:


// 添加一个Monitor用于记录状态变更
public enum CircuitBreakerStatusMonitor {
    /**
     * 单例
     */
    X;
    public void report(String name, CircuitBreakerStatus o, CircuitBreakerStatus n) {
        System.out.println(String.format("断路器[%s]状态变更,[%s]->[%s]", name, o, n));
    }
    public void reset(String name) {
        System.out.println(String.format("断路器[%s]重置", name));
    }
}
@Getter
public class RestCircuitBreaker {
    private final long failureThreshold;
    private final long resetTimeout;
    private LongAdder failureCounter;
    private LongAdder callCounter;
    private AtomicReference<CircuitBreakerStatus> status;
    private final Object fallback = null;
    /**
     * 最后一次调用失败的时间戳
     */
    private long lastFailureTime;
    public RestCircuitBreaker(long failureThreshold, long resetTimeout) {
        this.failureThreshold = failureThreshold;
        this.resetTimeout = resetTimeout;
        reset();
    }
    public void reset() {
        CircuitBreakerStatusMonitor.X.reset("RestCircuitBreaker");
        this.callCounter = new LongAdder();
        this.failureCounter = new LongAdder();
        this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
        this.lastFailureTime = -1L;
    }
    @SuppressWarnings("unchecked")
    public <T> T call(Supplier<T> supplier) {
        try {
            if (shouldAllowExecution()) {
                T result = supplier.get();
                markSuccess();
                return result;
            }
        } catch (Exception e) {
            markNoneSuccess();
        } finally {
            this.callCounter.increment();
        }
        return (T) fallback;
    }
    public void call(Runnable runnable) {
        call(() -> {
            runnable.run();
            return null;
        });
    }
    boolean shouldAllowExecution() {
        // 本质是Closed状态
        if (lastFailureTime == -1L) {
            return true;
        }
        // 没到达阈值
        if (failureThreshold > failureCounter.sum()) {
            return true;
        }
        return shouldTryAfterRestTimeoutWindow()
                && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
    }
    boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
        boolean r = status.compareAndSet(o, n);
        if (r) {
            CircuitBreakerStatusMonitor.X.report("RestCircuitBreaker", o, n);
        }
        return r;
    }
    boolean shouldTryAfterRestTimeoutWindow() {
        long lastFailureTimeSnap = lastFailureTime;
        long currentTime = System.currentTimeMillis();
        return currentTime > lastFailureTimeSnap + resetTimeout;
    }
    public void markSuccess() {
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
            reset();
        }
    }
    public void markNoneSuccess() {
        this.failureCounter.increment();
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
        if (this.failureCounter.sum() >= failureThreshold &&
                changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
    }
}
复制代码


编写一个测试客户端RestCircuitBreakerClient


public class RestCircuitBreakerClient {
    public static void main(String[] args) throws Exception {
        Service service = new Service();
        RestCircuitBreaker cb = new RestCircuitBreaker(5, 500);
        for (int i = 0; i < 10; i++) {
            int temp = i;
            String result = cb.call(() -> service.process(temp));
            System.out.println(String.format("返回结果:%s,number:%d", result, temp));
        }
        Thread.sleep(501L);
        // 故意成功
        cb.call(service::processSuccess);
        for (int i = 0; i < 3; i++) {
            int temp = i;
            String result = cb.call(() -> service.process(temp));
            System.out.println(String.format("返回结果:%s,number:%d", result, temp));
        }
    }
    public static class Service {
        public String process(int i) {
            System.out.println("进入process方法,number:" + i);
            throw new RuntimeException(String.valueOf(i));
        }
        public void processSuccess() {
            System.out.println("调用processSuccess方法");
        }
    }
}
复制代码


输出结果如下:


断路器[RestCircuitBreaker]重置
进入process方法,number:0
返回结果:null,number:0
进入process方法,number:1
返回结果:null,number:1
进入process方法,number:2
返回结果:null,number:2
进入process方法,number:3
返回结果:null,number:3
进入process方法,number:4
断路器[RestCircuitBreaker]状态变更,[CLOSED]->[OPEN]
返回结果:null,number:4
返回结果:null,number:5
返回结果:null,number:6
返回结果:null,number:7
返回结果:null,number:8
返回结果:null,number:9
断路器[RestCircuitBreaker]状态变更,[OPEN]->[HALF_OPEN]
调用processSuccess方法   # <------ 这个位置的成功调用重置了断路器的状态
断路器[RestCircuitBreaker]状态变更,[HALF_OPEN]->[CLOSED]
断路器[RestCircuitBreaker]重置
进入process方法,number:0
返回结果:null,number:0
进入process方法,number:1
返回结果:null,number:1
进入process方法,number:2
返回结果:null,number:2
复制代码


基于线程池隔离和超时控制


在使用CircuitBreaker的时候,可以基于不同的资源(唯一标识可以使用resource_key或者resource_name)创建单独的线程池,让资源基于线程池进行隔离调用。这种设计的原则借鉴于运货船的船舱设计,每个船舱都使用绝缘的材料进行分隔,一旦某个船舱出现了火情,也不会蔓延到其他船舱。在Java体系中,可以使用线程池ThreadPoolExecutor#submit(Callable<T> task)进行指定超时上限限制的任务提交和结果获取,这样就可以预设一个调用超时时间上限,限制每个调用的可用的最大调用时间。


网络异常,图片无法展示
|


首先需要设计一个轻量级的资源线程池管理模块:


// 资源配置
@Data
public class CircuitBreakerResourceConf {
    private String resourceName;
    private int coreSize;
    private int queueSize;
    private long timeout;
}
public enum CircuitBreakerResourceManager {
    /**
     * 单例
     */
    X;
    public final Map<String, CircuitBreakerResource> cache = new ConcurrentHashMap<>(8);
    public void register(CircuitBreakerResourceConf conf) {
        cache.computeIfAbsent(conf.getResourceName(), rn -> {
            int coreSize = conf.getCoreSize();
            int queueSize = conf.getQueueSize();
            BlockingQueue<Runnable> queue;
            if (queueSize > 0) {
                queue = new ArrayBlockingQueue<>(queueSize);
            } else {
                queue = new SynchronousQueue<>();
            }
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    coreSize,
                    coreSize,
                    0,
                    TimeUnit.SECONDS,
                    queue,
                    new ThreadFactory() {
                        private final AtomicInteger counter = new AtomicInteger();
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            thread.setDaemon(true);
                            thread.setName(rn + "-CircuitBreakerWorker-" + counter.getAndIncrement());
                            return thread;
                        }
                    },
                    new ThreadPoolExecutor.AbortPolicy()
            );
            CircuitBreakerResource resource = new CircuitBreakerResource();
            resource.setExecutor(executor);
            resource.setTimeout(conf.getTimeout());
            return resource;
        });
    }
    public CircuitBreakerResource get(String resourceName) {
        return Optional.ofNullable(cache.get(resourceName)).orElseThrow(() -> new IllegalArgumentException(resourceName));
    }
}
复制代码


编写断路器ResourceCircuitBreaker的实现代码:


@Getter
public class ResourceCircuitBreaker {
    private final long failureThreshold;
    private final long resetTimeout;
    private LongAdder failureCounter;
    private LongAdder callCounter;
    private AtomicReference<CircuitBreakerStatus> status;
    private final ThreadPoolExecutor executor;
    private final Object fallback = null;
    private final String circuitBreakerName;
    /**
     * 最后一次调用失败的时间戳
     */
    private long lastFailureTime;
    /**
     * 执行超时上限,单位毫秒
     */
    private final long executionTimeout;
    public ResourceCircuitBreaker(String resourceName, long failureThreshold, long resetTimeout) {
        CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName);
        this.circuitBreakerName = "ResourceCircuitBreaker-" + resourceName;
        this.executor = resource.getExecutor();
        this.executionTimeout = resource.getTimeout();
        this.failureThreshold = failureThreshold;
        this.resetTimeout = resetTimeout;
        reset();
    }
    public void reset() {
        CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName);
        this.callCounter = new LongAdder();
        this.failureCounter = new LongAdder();
        this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
        this.lastFailureTime = -1L;
    }
    @SuppressWarnings("unchecked")
    public <T> T call(Supplier<T> supplier) {
        try {
            if (shouldAllowExecution()) {
                Future<T> future = this.executor.submit(warp(supplier));
                T result = future.get(executionTimeout, TimeUnit.MILLISECONDS);
                markSuccess();
                return result;
            }
        } catch (Exception e) {
            markNoneSuccess();
        } finally {
            this.callCounter.increment();
        }
        return (T) fallback;
    }
    <T> Callable<T> warp(Supplier<T> supplier) {
        return supplier::get;
    }
    public void call(Runnable runnable) {
        call(() -> {
            runnable.run();
            return null;
        });
    }
    boolean shouldAllowExecution() {
        // 本质是Closed状态
        if (lastFailureTime == -1L) {
            return true;
        }
        // 没到达阈值
        if (failureThreshold > failureCounter.sum()) {
            return true;
        }
        return shouldTryAfterRestTimeoutWindow()
                && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
    }
    boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
        boolean r = status.compareAndSet(o, n);
        if (r) {
            CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n);
        }
        return r;
    }
    boolean shouldTryAfterRestTimeoutWindow() {
        long lastFailureTimeSnap = lastFailureTime;
        long currentTime = System.currentTimeMillis();
        return currentTime > lastFailureTimeSnap + resetTimeout;
    }
    public void markSuccess() {
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
            reset();
        }
    }
    public void markNoneSuccess() {
        this.failureCounter.increment();
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
        if (this.failureCounter.sum() >= failureThreshold &&
                changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
    }
}
复制代码


编写测试场景类ResourceCircuitBreakerClient


public class ResourceCircuitBreakerClient {
    public static void main(String[] args) throws Exception {
        CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf();
        conf.setCoreSize(10);
        conf.setQueueSize(0);
        conf.setResourceName("SERVICE");
        conf.setTimeout(50);
        CircuitBreakerResourceManager.X.register(conf);
        Service service = new Service();
        ResourceCircuitBreaker cb = new ResourceCircuitBreaker("SERVICE", 5, 500);
        for (int i = 0; i < 10; i++) {
            int temp = i;
            String result = cb.call(() -> service.process(temp));
            System.out.println(String.format("返回结果:%s,number:%d", result, temp));
        }
        Thread.sleep(501L);
        cb.call(service::processSuccess);
        for (int i = 0; i < 3; i++) {
            int temp = i;
            String result = cb.call(() -> service.process(temp));
            System.out.println(String.format("返回结果:%s,number:%d", result, temp));
        }
    }
    public static class Service {
        private final Random r = new Random();
        public String process(int i) {
            int sleep = r.nextInt(200);
            System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒",
                    Thread.currentThread().getName(), i, sleep));
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException ignore) {
            }
            return String.valueOf(i);
        }
        public void processSuccess() {
            System.out.println(String.format("线程[%s]-调用processSuccess方法", Thread.currentThread().getName()));
        }
    }
}
复制代码


某次执行的输出结果如下:


断路器[ResourceCircuitBreaker-SERVICE]重置
线程[SERVICE-CircuitBreakerWorker-0]-进入process方法,number:0,休眠67毫秒
返回结果:null,number:0
线程[SERVICE-CircuitBreakerWorker-1]-进入process方法,number:1,休眠85毫秒
返回结果:null,number:1
线程[SERVICE-CircuitBreakerWorker-2]-进入process方法,number:2,休眠72毫秒
返回结果:null,number:2
线程[SERVICE-CircuitBreakerWorker-3]-进入process方法,number:3,休眠88毫秒
返回结果:null,number:3
线程[SERVICE-CircuitBreakerWorker-4]-进入process方法,number:4,休眠28毫秒
返回结果:4,number:4
线程[SERVICE-CircuitBreakerWorker-5]-进入process方法,number:5,休眠102毫秒
断路器[ResourceCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN]
返回结果:null,number:5
返回结果:null,number:6
返回结果:null,number:7
返回结果:null,number:8
返回结果:null,number:9
断路器[ResourceCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN]
线程[SERVICE-CircuitBreakerWorker-6]-调用processSuccess方法
断路器[ResourceCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED]
断路器[ResourceCircuitBreaker-SERVICE]重置
线程[SERVICE-CircuitBreakerWorker-7]-进入process方法,number:0,休眠74毫秒
返回结果:null,number:0
线程[SERVICE-CircuitBreakerWorker-8]-进入process方法,number:1,休眠111毫秒
返回结果:null,number:1
线程[SERVICE-CircuitBreakerWorker-9]-进入process方法,number:2,休眠183毫秒
返回结果:null,number:2
复制代码


滑动窗口和百分比统计


上一个小节已经实现了资源基于线程池隔离进行调用,但是有一点明显的不足就是:断路器的状态管理和重置并不符合生产场景,HALF_OPEN -> CLOSED的状态切换和重置不应该在放行单个调用成功之后立刻触发,而应该建立在一定时间范围内,调用的(平均)失败率下降到某个阈值或者调用的(平均)成功率恢复到某个阈值,否则很多场景下会导致断路器的状态频繁发生切换,功能基本处于失效的状态。也就是大多数场景下,一段时间内的failurePercent会比异常计数和failureThreshold的直接对比更加准确。可以引入滑动窗口(Sliding Window)的概念,记录每个时间单元内的调用总次数、调用成功次数、调用超时次数和非超时的调用失败次数,为了简化操作这个时间单元定义为1秒:


网络异常,图片无法展示
|


定义一个用于记录这四种调用次数的桶Bucket类(这里的实现稍微跟上图有点不同,非超时失败修改为线程池拒绝的任务统计,而失败统计包括了任务超时执行和一般的业务异常):


@RequiredArgsConstructor
@Getter
public class MetricInfo {
    private final long total;
    private final long success;
    private final long failure;
    private final long reject;
    public static final MetricInfo EMPTY = new MetricInfo(0, 0, 0, 0);
    public MetricInfo merge(MetricInfo other) {
        return new MetricInfo(
                this.total + other.getTotal(),
                this.success + other.getSuccess(),
                this.failure + other.getFailure(),
                this.reject + other.getReject()
        );
    }
}
public class Bucket {
    // 记录窗口开始的时间戳
    @Getter
    private final long windowStartTimestamp;
    private final LongAdder total;
    private final LongAdder success;
    private final LongAdder failure;
    private final LongAdder reject;
    public Bucket(long windowStartTimestamp) {
        this.windowStartTimestamp = windowStartTimestamp;
        this.total = new LongAdder();
        this.success = new LongAdder();
        this.reject = new LongAdder();
        this.failure = new LongAdder();
    }
    public void increaseTotal() {
        this.total.increment();
    }
    public void increaseSuccess() {
        this.success.increment();
    }
    public void increaseFailure() {
        this.failure.increment();
    }
    public void increaseReject() {
        this.reject.increment();
    }
    public long totalCount() {
        return this.total.sum();
    }
    public long successCount() {
        return this.success.sum();
    }
    public long failureCount() {
        return this.failure.sum();
    }
    public long rejectCount() {
        return this.reject.sum();
    }
    public void reset() {
        this.total.reset();
        this.success.reset();
        this.failure.reset();
        this.reject.reset();
    }
    public MetricInfo metricInfo() {
        return new MetricInfo(
                totalCount(),
                successCount(),
                failureCount(),
                rejectCount()
        );
    }
    @Override
    public String toString() {
        return String.format("Bucket[wt=%d,t=%d,s=%d,f=%d,r=%d]",
                windowStartTimestamp,
                totalCount(),
                successCount(),
                failureCount(),
                rejectCount()
        );
    }
}
复制代码


Hystrix中,为了更加灵活,Bucket中的计数器设计为LongAdder[]类型,便于通过各种需要计数事件枚举的顺序值来直接进行计数和累加,而为了节约内存空间,滑动窗口设计成一个容量固定可复用的环形队列BucketCircularArray#ListState,这里可以站在巨人的肩膀上借鉴其思路实现BucketCircular


public class BucketCircular implements Iterable<Bucket> {
    private final AtomicReference<BucketArray> bucketArray;
    public BucketCircular(int bucketNumber) {
        // 这里有个技巧,初始化数组的时候让数组的总长度为桶数量+1,便于额外的添加和移除桶操作
        AtomicReferenceArray<Bucket> buckets = new AtomicReferenceArray<>(bucketNumber + 1);
        this.bucketArray = new AtomicReference<>(new BucketArray(buckets, 0, 0, bucketNumber));
    }
    public Bucket getTail() {
        return this.bucketArray.get().tail();
    }
    /**
     * 在环形队列尾部添加一个桶
     */
    public void addTail(Bucket bucket) {
        BucketArray bucketArray = this.bucketArray.get();
        BucketArray newBucketArray = bucketArray.addBucket(bucket);
        // 这个方法会在锁中执行,理论上不会CAS失败
        this.bucketArray.compareAndSet(bucketArray, newBucketArray);
    }
    public Bucket[] toArray() {
        return this.bucketArray.get().toArray();
    }
    public int size() {
        return this.bucketArray.get().getSize();
    }
    @Override
    public Iterator<Bucket> iterator() {
        return Collections.unmodifiableList(Arrays.asList(toArray())).iterator();
    }
    public void clear() {
        while (true) {
            BucketArray bucketArray = this.bucketArray.get();
            BucketArray clear = bucketArray.clear();
            if (this.bucketArray.compareAndSet(bucketArray, clear)) {
                return;
            }
        }
    }
}
复制代码


网络异常,图片无法展示
|


添加一个新的Bucket到循环队列的尾部的时候,因为队列的长度是固定的,需要判断是否需要重新计算头指针和尾指针。测试一下:


public static void main(String[] args) throws Exception {
    BucketCircular circular = new BucketCircular(5);
    circular.addTail(new Bucket(111L));
    circular.addTail(new Bucket(System.currentTimeMillis()));
    circular.addTail(new Bucket(System.currentTimeMillis()));
    circular.addTail(new Bucket(System.currentTimeMillis()));
    circular.addTail(new Bucket(System.currentTimeMillis()));
    circular.addTail(new Bucket(System.currentTimeMillis()));
    circular.addTail(new Bucket(222L));
    Stream.of(circular.toArray()).forEach(System.out::println);
}
// 输出结果
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=222,t=0,s=0,f=0,r=0]
复制代码


接着编写一个用于管理Bucket和提供数据统计入口的SlidingWindowMonitor


// 累计数据累加器
public class BucketCumulativeCalculator {
    private LongAdder total = new LongAdder();
    private LongAdder success = new LongAdder();
    private LongAdder failure = new LongAdder();
    private LongAdder reject = new LongAdder();
    public void addBucket(Bucket lb) {
        total.add(lb.totalCount());
        success.add(lb.successCount());
        failure.add(lb.failureCount());
        reject.add(lb.rejectCount());
    }
    public MetricInfo sum() {
        return new MetricInfo(
                total.sum(),
                success.sum(),
                failure.sum(),
                reject.sum()
        );
    }
    public void reset() {
        total = new LongAdder();
        success = new LongAdder();
        failure = new LongAdder();
        reject = new LongAdder();
    }
}
// 下面的几个参数为了简单起见暂时固定
public class SlidingWindowMonitor {
    /**
     * 窗口长度 - 10秒
     */
    private final int windowDuration = 10000;
    /**
     * 桶的大小 - 时间单位为1秒
     */
    private final int bucketSizeInTimeUint = 1000;
    /**
     * 桶的数量 - 必须满足windowDuration % bucketSizeInTimeUint = 0
     */
    private final int bucketNumber = windowDuration / bucketSizeInTimeUint;
    private final BucketCircular bucketCircular;
    /**
     * 用于创建桶的时候进行锁定
     */
    private final ReentrantLock lock;
    /**
     * 累计计数器
     */
    private final BucketCumulativeCalculator calculator = new BucketCumulativeCalculator();
    public SlidingWindowMonitor() {
        this.bucketCircular = new BucketCircular(bucketNumber);
        this.lock = new ReentrantLock();
    }
    void reset() {
        Bucket tailBucket = bucketCircular.getTail();
        if (null != tailBucket) {
            calculator.addBucket(tailBucket);
        }
        bucketCircular.clear();
    }
    /**
     * 累计统计
     */
    public MetricInfo getCumulativeMetricInfo() {
        return getCurrentMetricInfo().merge(calculator.sum());
    }
    /**
     * 当前统计
     */
    public MetricInfo getCurrentMetricInfo() {
        Bucket currentBucket = getCurrentBucket();
        if (null == currentBucket) {
            return MetricInfo.EMPTY;
        }
        return currentBucket.metricInfo();
    }
    /**
     * 滚动统计 - 这个就是断路器计算错误请求百分比的来源数据
     */
    public MetricInfo getRollingMetricInfo() {
        Bucket currentBucket = getCurrentBucket();
        if (null == currentBucket) {
            return MetricInfo.EMPTY;
        }
        MetricInfo info = new MetricInfo(0, 0, 0, 0);
        for (Bucket bucket : this.bucketCircular) {
            info = info.merge(bucket.metricInfo());
        }
        return info;
    }
    /**
     * 这个方法是核心 - 用于获取当前系统时间对应的Bucket
     */
    Bucket getCurrentBucket() {
        long time = System.currentTimeMillis();
        Bucket tailBucket = bucketCircular.getTail();
        // 队尾的桶还在当前的时间所在的桶区间内则直接使用此桶
        if (null != tailBucket && time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) {
            return tailBucket;
        }
        if (lock.tryLock()) {
            try {
                // 循环队列为空
                if (null == bucketCircular.getTail()) {
                    Bucket newBucket = new Bucket(time);
                    bucketCircular.addTail(newBucket);
                    return newBucket;
                } else {
                    // 需要创建足够多的桶以追上当前的时间
                    for (int i = 0; i < bucketNumber; i++) {
                        tailBucket = bucketCircular.getTail();
                        // 最新的一个桶已经追上了当前时间
                        if (time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) {
                            return tailBucket;
                        }
                        // 当前时间已经到了下一个窗口
                        else if (time > tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint + windowDuration) {
                            reset();
                            return getCurrentBucket();
                        }
                        // 这种情况是当前最新时间比窗口超前,要填补过去的桶
                        else {
                            bucketCircular.addTail(new Bucket(tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint));
                            calculator.addBucket(tailBucket);
                        }
                    }
                    return bucketCircular.getTail();
                }
            } finally {
                lock.unlock();
            }
        } else {
            // 获取锁失败说明多线程并发创建桶,再获取一次不空则为另一个获取锁成功的线程创建的最新的桶,否则需要进行线程等待和递归获取
            tailBucket = bucketCircular.getTail();
            if (null != tailBucket) {
                return tailBucket;
            }
            try {
                Thread.sleep(5);
            } catch (InterruptedException ignore) {
            }
            // 递归
            return getCurrentBucket();
        }
    }
    public void incrementTotal() {
        getCurrentBucket().increaseTotal();
    }
    public void incrementSuccess() {
        getCurrentBucket().increaseSuccess();
    }
    public void incrementFailure() {
        getCurrentBucket().increaseFailure();
    }
    public void incrementReject() {
        getCurrentBucket().increaseReject();
    }
}
复制代码


最后,把SlidingWindowMonitor和之前的ResourceCircuitBreaker做一次融合进化,得到SlidingWindowCircuitBreaker


package cn.throwx.cb;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
 * @author throwable
 * @version v1
 * @description
 * @since 2020/10/25 17:14
 */
public class SlidingWindowCircuitBreaker {
    /**
     * 失败百分比阈值
     */
    private final long errorPercentThreshold;
    /**
     * 熔断等待窗口
     */
    private final long resetTimeout;
    private AtomicReference<CircuitBreakerStatus> status;
    private final ThreadPoolExecutor executor;
    private final String circuitBreakerName;
    /**
     * 最后一次调用失败的时间戳
     */
    private long lastFailureTime;
    /**
     * 执行超时上限,单位毫秒
     */
    private final long executionTimeout;
    /**
     * 滑动窗口监视器
     */
    private final SlidingWindowMonitor slidingWindowMonitor;
    public SlidingWindowCircuitBreaker(String resourceName,
                                       long errorPercentThreshold,
                                       long resetTimeout) {
        CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName);
        this.circuitBreakerName = "SlidingWindowCircuitBreaker-" + resourceName;
        this.executor = resource.getExecutor();
        this.executionTimeout = resource.getTimeout();
        this.errorPercentThreshold = errorPercentThreshold;
        this.resetTimeout = resetTimeout;
        this.slidingWindowMonitor = new SlidingWindowMonitor();
        reset();
    }
    public void reset() {
        CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName);
        this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
        this.lastFailureTime = -1L;
    }
    @SuppressWarnings("unchecked")
    public <T> T call(Supplier<T> supplier) {
        return call(supplier, (Fallback<T>) Fallback.F);
    }
    public <T> T call(Supplier<T> supplier, Fallback<T> fallback) {
        try {
            if (shouldAllowExecution()) {
                slidingWindowMonitor.incrementTotal();
                Future<T> future = this.executor.submit(warp(supplier));
                T result = future.get(executionTimeout, TimeUnit.MILLISECONDS);
                markSuccess();
                return result;
            }
        } catch (RejectedExecutionException ree) {
            markReject();
        } catch (Exception e) {
            markFailure();
        }
        return fallback.fallback();
    }
    <T> Callable<T> warp(Supplier<T> supplier) {
        return supplier::get;
    }
    public void call(Runnable runnable) {
        call(() -> {
            runnable.run();
            return null;
        });
    }
    boolean shouldAllowExecution() {
        // 本质是Closed状态
        if (lastFailureTime == -1L) {
            return true;
        }
        // 没到达阈值
        if (errorPercentThreshold > rollingErrorPercentage()) {
            return false;
        }
        return shouldTryAfterRestTimeoutWindow()
                && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
    }
    boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
        boolean r = status.compareAndSet(o, n);
        if (r) {
            CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n);
        }
        return r;
    }
    boolean shouldTryAfterRestTimeoutWindow() {
        long lastFailureTimeSnap = lastFailureTime;
        long currentTime = System.currentTimeMillis();
        return currentTime > lastFailureTimeSnap + resetTimeout;
    }
    public void markSuccess() {
        slidingWindowMonitor.incrementSuccess();
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
            reset();
        }
    }
    public void markReject() {
        slidingWindowMonitor.incrementReject();
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
    }
    public int rollingErrorPercentage() {
        MetricInfo rollingMetricInfo = slidingWindowMonitor.getRollingMetricInfo();
        long rejectCount = rollingMetricInfo.getReject();
        long failureCount = rollingMetricInfo.getFailure();
        long totalCount = rollingMetricInfo.getTotal();
        int errorPercentage = (int) ((double) (rejectCount + failureCount) / totalCount * 100);
        CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, String.format("错误百分比:%d", errorPercentage));
        return errorPercentage;
    }
    public void markFailure() {
        slidingWindowMonitor.incrementFailure();
        if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
        if (rollingErrorPercentage() >= errorPercentThreshold &&
                changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
            this.lastFailureTime = System.currentTimeMillis();
        }
    }
}
复制代码


编写一个测试客户端SlidingWindowCircuitBreakerClient


public class SlidingWindowCircuitBreakerClient {
    public static void main(String[] args) throws Exception {
        CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf();
        conf.setCoreSize(10);
        conf.setQueueSize(0);
        conf.setResourceName("SERVICE");
        conf.setTimeout(50);
        CircuitBreakerResourceManager.X.register(conf);
        Service service = new Service();
        SlidingWindowCircuitBreaker cb = new SlidingWindowCircuitBreaker("SERVICE", 50, 500);
        for (int i = 0; i < 10; i++) {
            int temp = i;
            String result = cb.call(() -> service.process(temp));
            System.out.println(String.format("返回结果:%s,number:%d", result, temp));
        }
        Thread.sleep(501L);
        cb.call(service::processSuccess);
        for (int i = 0; i < 3; i++) {
            int temp = i;
            String result = cb.call(() -> service.process(temp));
            System.out.println(String.format("返回结果:%s,number:%d", result, temp));
        }
        Thread.sleep(501L);
        cb.call(service::processSuccess);
        cb.call(service::processSuccess);
    }
    public static class Service {
        private final Random r = new Random();
        public String process(int i) {
            int sleep = r.nextInt(200);
            System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒",
                    Thread.currentThread().getName(), i, sleep));
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException ignore) {
            }
            return String.valueOf(i);
        }
        public void processSuccess() {
            System.out.println(String.format("线程[%s]-调用processSuccess方法", Thread.currentThread().getName()));
        }
    }
}
复制代码


某次执行结果如下:


断路器[SlidingWindowCircuitBreaker-SERVICE]重置
线程[SERVICE-CircuitBreakerWorker-0]-进入process方法,number:0,休眠67毫秒
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN]
返回结果:null,number:0
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:1
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:2
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:3
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:4
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:5
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:6
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:7
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:8
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
返回结果:null,number:9
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:100
断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN]
线程[SERVICE-CircuitBreakerWorker-1]-调用processSuccess方法
断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED]
断路器[SlidingWindowCircuitBreaker-SERVICE]重置
线程[SERVICE-CircuitBreakerWorker-2]-进入process方法,number:0,休眠84毫秒
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66
断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[CLOSED]->[OPEN]
返回结果:null,number:0
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66
返回结果:null,number:1
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66
返回结果:null,number:2
断路器[SlidingWindowCircuitBreaker-SERVICE]-错误百分比:66
断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[OPEN]->[HALF_OPEN]
线程[SERVICE-CircuitBreakerWorker-3]-调用processSuccess方法
断路器[SlidingWindowCircuitBreaker-SERVICE]状态变更,[HALF_OPEN]->[CLOSED]
断路器[SlidingWindowCircuitBreaker-SERVICE]重置
线程[SERVICE-CircuitBreakerWorker-4]-调用processSuccess方法
复制代码


小结



生产上应用CircuitBreaker模式建议使用主流实现例如Hystrix或者更活跃的Sentinel,但是要深入学习此模式则需要老老实实做一次推演。


参考资料:

  • CircuitBreaker - by Martin Fowler
  • 《Release It! Design and Deploy Production-Ready Software》


(本文完 c-4-d e-a-20201025)


相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
SpringCloudAlibaba 监控 Dubbo
SpringCloudAliBaba篇 之 Sentinel:图解分布式系统的流量防卫兵(上)
SpringCloudAliBaba篇 之 Sentinel:图解分布式系统的流量防卫兵
528 0
|
3月前
|
缓存 监控 负载均衡
一文讲明Hystrix熔断器
这篇文章详细阐述了Hystrix熔断器的原理和应用,解释了分布式系统中服务雪崩的问题,并展示了如何在Spring Cloud框架中使用Hystrix进行熔断和降级处理。
一文讲明Hystrix熔断器
|
缓存 SpringCloudAlibaba Linux
SpringCloudAliBaba篇 之 Sentinel:图解分布式系统的流量防卫兵(下)
SpringCloudAliBaba篇 之 Sentinel:图解分布式系统的流量防卫兵
109 0
|
算法 Dubbo Java
Sentinel流量防卫兵
Sentinel流量防卫兵
|
负载均衡 算法 Cloud Native
【云原生】springcloud09——但愿发长久,空手撕Ribbon
【云原生】springcloud09——但愿发长久,空手撕Ribbon
|
缓存 运维 监控
SpringCloud-Hystrix——让你的系统稳一点儿
在微服务架构中,我们将系统拆分成了一个个的服务单元,各单元应用间通过服务注册与订阅的方式互相依赖。由于每个单元都在不同的进程中运行,依赖通过远程调用的方式执行,这样就有可能因为网络原因或是依赖服务自身问题出现调用故障或延迟,而这些问题会直接导致调用方的对外服务也出现延迟,若此时调用方的请求不断增加,最后就会出现因等待出现故障的依赖方响应而形成任务积压,线程资源无法释放,最终导致自身服务的瘫痪,进一步甚至出现故障的蔓延最终导致整个系统的瘫痪,导致服务雪崩效应。为了解决这样的问题,产生了断路器等一系列的服务保护机制。
|
JSON 监控 算法
阿里限流神器Sentinel夺命连环 17 问?
阿里限流神器Sentinel夺命连环 17 问?
|
Shell 开发工具
浅学了波ZooKeeper,我来做个总结
浅学了波ZooKeeper,我来做个总结
184 0
浅学了波ZooKeeper,我来做个总结
|
缓存 NoSQL 算法
|
NoSQL 算法 Java
《我想进大厂》之分布式锁夺命连环9问
对于一个单机的系统,我们可以通过synchronized或者ReentrantLock等这些常规的加锁方式来实现,然而对于一个分布式集群的系统而言,单纯的本地锁已经无法解决问题,所以就需要用到分布式锁了,通常我们都会引入三方组件或者服务来解决这个问题,比如数据库、Redis、Zookeeper等。
《我想进大厂》之分布式锁夺命连环9问