Spring Cloud Hystrix源码分析

简介: Spring Cloud Hystrix源码分析Spring Cloud Hystrix源码解读@EnableCircuitBreaker职责:激活Circuit Breaker初始化顺序@EnableCircuitBreakerEnableCircuitBreakerImportSel...

Spring Cloud Hystrix源码分析

Spring Cloud Hystrix源码解读
@EnableCircuitBreaker
职责:

  • 激活Circuit Breaker
    初始化顺序
  • @EnableCircuitBreaker
  • EnableCircuitBreakerImportSelector
  • HystrixCircuitBreakerConfiguration

HystrixCircuitBreakerConfiguration
初始化组件

  • HystrixCommandAspect
  • HystrixShutdownHook
  • HystrixStreamEndpoint:Servlet
  • HystrixMetricsPollerConfiguration

Netflix Hystrix源码解读

HystrixCommandAspect
依赖组件

  • MetaholderFactory
  • HystrixCommandFactory:生成HystriInvokable
  • HystrixInvokable

    CommandCollapser
    GenericObservableCommand
    GenericCommand

Future实现服务熔断

package com.segumentfault.springcloudlesson9.future;

import java.util.Random;
import java.util.concurrent.*;

/**
 * 通过 {@link Future} 实现 服务熔断
 *
 */
public class FutureCircuitBreakerDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        // 初始化线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        RandomCommand command = new RandomCommand();

        Future<String> future = executorService.submit(command::run);

        String result = null;
        // 100 毫秒超时时间
        try {
            result = future.get(100, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // fallback 方法调用
            result = command.fallback();
        }

        System.out.println(result);

        executorService.shutdown();

    }

    /**
     * 随机对象
     */
    private static final Random random = new Random();

    /**
     * 随机事件执行命令
     */
    public static class RandomCommand implements Command<String> {


        @Override
        public String run() throws InterruptedException {

            long executeTime = random.nextInt(200);

            // 通过休眠来模拟执行时间
            System.out.println("Execute Time : " + executeTime + " ms");

            Thread.sleep(executeTime);

            return "Hello,World";
        }

        @Override
        public String fallback() {
            return "Fallback";
        }
    }


    public interface Command<T> {

        /**
         * 正常执行,并且返回结果
         *
         * @return
         */
        T run() throws Exception;

        /**
         * 错误时,返回容错结果
         *
         * @return
         */
        T fallback();

    }
}

RxJava基础
单数据:Single

Single.just("Hello,World") // 仅能发布单个数据
        .subscribeOn(Schedulers.io()) // 在 I/O 线程执行
        .subscribe(RxJavaDemo::println) // 订阅并且消费数据
;

多数据:Observable

List<Integer> values = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

Observable.from(values) //发布多个数据
        .subscribeOn(Schedulers.computation()) // 在 I/O 线程执行
        .subscribe(RxJavaDemo::println) // 订阅并且消费数据
;

// 等待线程执行完毕
Thread.sleep(100);

使用标准Reactive模式

List<Integer> values = Arrays.asList(1, 2, 3);

Observable.from(values) //发布多个数据
        .subscribeOn(Schedulers.newThread()) // 在 newThread 线程执行
        .subscribe(value -> {

            if (value > 2)
                throw new IllegalStateException("数据不应许大于 2");

            //消费数据
            println("消费数据:" + value);

        }, e -> {
            // 当异常情况,中断执行
            println("发生异常 , " + e.getMessage());
        }, () -> {
            // 当整体流程完成时
            println("流程执行完成");
        })

;

// 等待线程执行完毕
Thread.sleep(100);

Java 9 Flow API

只做了解

package concurrent.java9;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * {@link SubmissionPublisher}
 *
 * @author mercyblitz
 **/
public class SubmissionPublisherDemo {

    public static void main(String[] args) throws InterruptedException {

        try (SubmissionPublisher<Integer> publisher =
                     new SubmissionPublisher<>()) {

            //Publisher(100) => A -> B -> C => Done
            publisher.subscribe(new IntegerSubscriber("A"));
            publisher.subscribe(new IntegerSubscriber("B"));
            publisher.subscribe(new IntegerSubscriber("C"));

            // 提交数据到各个订阅器
            publisher.submit(100);

        }


        Thread.currentThread().join(1000L);

    }

    private static class IntegerSubscriber implements
            Flow.Subscriber<Integer> {

        private final String name;

        private Flow.Subscription subscription;

        private IntegerSubscriber(String name) {
            this.name = name;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.printf(
                    "Thread[%s] Current Subscriber[%s] " +
                            "subscribes subscription[%s]\n",
                    Thread.currentThread().getName(),
                    name,
                    subscription);
            this.subscription = subscription;
            subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            System.out.printf(
                    "Thread[%s] Current Subscriber[%s] " +
                            "receives item[%d]\n",
                    Thread.currentThread().getName(),
                    name,
                    item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            throwable.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.printf(
                    "Thread[%s] Current Subscriber[%s] " +
                            "is completed!\n",
                    Thread.currentThread().getName(),
                    name);
        }

    }

}
目录
相关文章
|
24天前
|
监控 Java 应用服务中间件
Spring Boot整合Tomcat底层源码分析
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置和起步依赖等特性,大大简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是其与Tomcat的整合。
48 1
|
4月前
|
Java UED 开发者
Spring Boot 降级功能的神秘面纱:Hystrix 与 Resilience4j 究竟藏着怎样的秘密?
【8月更文挑战第29天】在分布式系统中,服务稳定性至关重要。为应对故障,Spring Boot 提供了 Hystrix 和 Resilience4j 两种降级工具。Hystrix 作为 Netflix 的容错框架,通过隔离依赖、控制并发及降级机制增强系统稳定性;Resilience4j 则是一个轻量级库,提供丰富的降级策略。两者均可有效提升系统可靠性,具体选择取决于需求与场景。在面对服务故障时,合理运用这些工具能确保系统基本功能正常运作,优化用户体验。以上简介包括了两个工具的简单示例代码,帮助开发者更好地理解和应用。
88 0
|
11天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
26 6
|
11天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
30 5
|
11天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
22 5
|
1月前
|
前端开发 Java Spring
Spring MVC源码分析之DispatcherServlet#getHandlerAdapter方法
`DispatcherServlet`的 `getHandlerAdapter`方法是Spring MVC处理请求的核心部分之一。它通过遍历预定义的 `HandlerAdapter`列表,找到适用于当前处理器的适配器,并调用适配器执行具体的处理逻辑。理解这个方法有助于深入了解Spring MVC的工作机制和扩展点。
33 1
|
1月前
|
前端开发 Java Spring
Spring MVC源码分析之DispatcherServlet#getHandlerAdapter方法
`DispatcherServlet`的 `getHandlerAdapter`方法是Spring MVC处理请求的核心部分之一。它通过遍历预定义的 `HandlerAdapter`列表,找到适用于当前处理器的适配器,并调用适配器执行具体的处理逻辑。理解这个方法有助于深入了解Spring MVC的工作机制和扩展点。
31 1
|
2月前
|
缓存 JavaScript Java
Spring之FactoryBean的处理底层源码分析
本文介绍了Spring框架中FactoryBean的重要作用及其使用方法。通过一个简单的示例展示了如何通过FactoryBean返回一个User对象,并解释了在调用`getBean()`方法时,传入名称前添加`&`符号会改变返回对象类型的原因。进一步深入源码分析,详细说明了`getBean()`方法内部对FactoryBean的处理逻辑,解释了为何添加`&`符号会导致不同的行为。最后,通过具体代码片段展示了这一过程的关键步骤。
Spring之FactoryBean的处理底层源码分析
|
1月前
|
前端开发 Java Spring
Spring MVC源码分析之DispatcherServlet#getHandlerAdapter方法
`DispatcherServlet`的 `getHandlerAdapter`方法是Spring MVC处理请求的核心部分之一。它通过遍历预定义的 `HandlerAdapter`列表,找到适用于当前处理器的适配器,并调用适配器执行具体的处理逻辑。理解这个方法有助于深入了解Spring MVC的工作机制和扩展点。
24 0
|
3月前
|
XML 监控 Java
Spring Cloud全解析:熔断之Hystrix简介
Hystrix 是由 Netflix 开源的延迟和容错库,用于提高分布式系统的弹性。它通过断路器模式、资源隔离、服务降级及限流等机制防止服务雪崩。Hystrix 基于命令模式,通过 `HystrixCommand` 封装对外部依赖的调用逻辑。断路器能在依赖服务故障时快速返回备选响应,避免长时间等待。此外,Hystrix 还提供了监控功能,能够实时监控运行指标和配置变化。依赖管理方面,可通过 `@EnableHystrix` 启用 Hystrix 支持,并配置全局或局部的降级策略。结合 Feign 可实现客户端的服务降级。
188 23