SpringCloud源码阅读2-Eureka客户端的秘密(中)

简介: SpringCloud源码阅读2-Eureka客户端的秘密(中)

2.1.2 fetchRegistry

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
        try {
            //获取本地缓存
            Applications applications = getApplications();
      //如果增量拉取被禁用或是第一次拉取,全量拉取server端已经注册的服务实例信息
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                //获取全部实例
                getAndStoreFullRegistry();
            } else {
              //增量拉取服务实例
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
        // 刷新本地缓存
        onCacheRefreshed();
        // 基于缓存中的实例数据更新远程实例状态, (发布StatusChangeEvent)
        updateInstanceRemoteStatus();
       // 注册表拉取成功后返回true
        return true;
    }

全量获取最终调用

EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());

增量获取

EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());

可以看出,客户端与服务端通信底层是EurekaTransport在提供支持。


2.1.3 initScheduledTasks

在此之前有必要说说:TimedSupervisorTask。 TimedSupervisorTask 是自动调节间隔的周期性任务,当不超时,将以初始化的间隔执行。当任务超时时,将下一个周期的间隔调大。每次超时都会增大相应倍数,直到外部设置的最大参数。一旦新任务不再超时,间隔自动恢复默认值。

也就是说,这是一个具有自适应的周期性任务。(非常棒的设计啊)

private void initScheduledTasks() {
    //1.如果获取服务列表,则创建周期性缓存更新(即获取服务列表任务)任务
        if (clientConfig.shouldFetchRegistry()) {
          //初始间隔时间(默认30秒)
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            //最大倍数 默认10倍
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //执行TimedSupervisorTask ,监督CacheRefreshThread任务的执行。
            //具体执行线程池cacheRefreshExecutor,具体任务CacheRefreshThread
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()//缓存刷新,调用fetchRegistry()获取服务列表
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    //2. 如何注册,就创建周期性续租任务,维持心跳。
        if (clientConfig.shouldRegisterWithEureka()) {
          //心跳间隔,默认30秒。
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            //最大倍数 默认10倍
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            //执行TimedSupervisorTask ,监督HeartbeatThread任务的执行。
            //具体执行线程池heartbeatExecutor,具体任务HeartbeatThread
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
            //3.创建应用实例信息复制器。
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
      //4.创建状态改变监听器,监听StatusChangeEvent 
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //状态有变化,使用信息复制器,执行一个任务,更新状态变化到注册中心
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
      //是否关注状态变化,将监听器添加到applicationInfoManager
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 启动InstanceInfo复制器
       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

总结下initScheduledTasks()的工作: 如何配置获取服务

  1. 创建监控任务线程cacheRefresh,监督服务获取CacheRefreshThread(DiscoveryClient-CacheRefreshExecutor-%d)线程的执行,默认每30秒执行一次。获取服务列表更新到本地缓存。任务内容refreshRegistry,本地缓存localRegionApps

如何配置注册,

  1. 创建监控任务线程"heartbeat",监督续约任务HeartbeatThread(DiscoveryClient-HeartbeatExecutor-%d)的执行,默认每30秒执行一次。任务内容为renew()
  2. 创建实例状态监听器,监听当前实例的状态变化,并通过InstanceInfo复制器,执行onDemandUpdate()方法,更新变化到远程server
  3. 启动InstanceInfo复制器定时线程(DiscoveryClient-InstanceInfoReplicator-%d),定时(默认40秒)检测当前实例的DataCenterInfo、LeaseInfo、InstanceStatus,如果有变更,执行InstanceInfoReplicator.this.run()方法将变更信息同步到server

下面我们看看这几个重要的任务内容:

  • refreshRegistry刷新缓存: refreshRegistry最终调用fetchRegistry获取服务列表,更新本地缓存。fetchRegistry在DiscoveryClient初始化时,主动获取一次;之后都是定时获取。
  • renew() 续约:通过eurekaTransport.registrationClient.sendHeartBeat向server发送当前实例信息
  • InstanceInfoReplicator.onDemandUpdate() 状态更新: 一旦有状态变化,停掉定时复制线程,立刻把状态更新到server.  最终调用的是InstanceInfoReplicator.this.run()
  • InstanceInfoReplicator.this.run() 复制当前实例信息到Server: 状态变化时,立即执行;平时每40秒执行一次。


相关文章
|
8天前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
27天前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
27天前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
27天前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
27天前
|
SQL Java 数据库连接
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
|
27天前
|
Java 开发工具 Spring
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
|
28天前
|
NoSQL Java Redis
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
|
28天前
|
Java Spring
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
|
28天前
|
Java Maven Python
【Azure Spring Cloud】部署Azure spring cloud 失败
【Azure Spring Cloud】部署Azure spring cloud 失败
|
1月前
|
缓存 Java Maven
SpringCloud基于Eureka的服务治理架构搭建与测试:从服务提供者到消费者的完整流程
Spring Cloud微服务框架中的Eureka是一个用于服务发现和注册的基础组件,它基于RESTful风格,为微服务架构提供了关键的服务注册与发现功能。以下是对Eureka的详细解析和搭建举例。