2.1.2 fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { //获取本地缓存 Applications applications = getApplications(); //如果增量拉取被禁用或是第一次拉取,全量拉取server端已经注册的服务实例信息 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //获取全部实例 getAndStoreFullRegistry(); } else { //增量拉取服务实例 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { return false; } finally { if (tracer != null) { tracer.stop(); } } // 刷新本地缓存 onCacheRefreshed(); // 基于缓存中的实例数据更新远程实例状态, (发布StatusChangeEvent) updateInstanceRemoteStatus(); // 注册表拉取成功后返回true return true; }
全量获取最终调用
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
增量获取
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
可以看出,客户端与服务端通信底层是EurekaTransport在提供支持。
2.1.3 initScheduledTasks
在此之前有必要说说:TimedSupervisorTask。 TimedSupervisorTask 是自动调节间隔的周期性任务,当不超时,将以初始化的间隔执行。当任务超时时,将下一个周期的间隔调大。每次超时都会增大相应倍数,直到外部设置的最大参数。一旦新任务不再超时,间隔自动恢复默认值。
也就是说,这是一个具有自适应的周期性任务。(非常棒的设计啊)
private void initScheduledTasks() { //1.如果获取服务列表,则创建周期性缓存更新(即获取服务列表任务)任务 if (clientConfig.shouldFetchRegistry()) { //初始间隔时间(默认30秒) int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); //最大倍数 默认10倍 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //执行TimedSupervisorTask ,监督CacheRefreshThread任务的执行。 //具体执行线程池cacheRefreshExecutor,具体任务CacheRefreshThread scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()//缓存刷新,调用fetchRegistry()获取服务列表 ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } //2. 如何注册,就创建周期性续租任务,维持心跳。 if (clientConfig.shouldRegisterWithEureka()) { //心跳间隔,默认30秒。 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); //最大倍数 默认10倍 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //执行TimedSupervisorTask ,监督HeartbeatThread任务的执行。 //具体执行线程池heartbeatExecutor,具体任务HeartbeatThread scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); //3.创建应用实例信息复制器。 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //4.创建状态改变监听器,监听StatusChangeEvent statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //状态有变化,使用信息复制器,执行一个任务,更新状态变化到注册中心 instanceInfoReplicator.onDemandUpdate(); } }; //是否关注状态变化,将监听器添加到applicationInfoManager if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 启动InstanceInfo复制器 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
总结下initScheduledTasks()的工作: 如何配置获取服务
- 创建监控任务线程
cacheRefresh
,监督服务获取CacheRefreshThread(DiscoveryClient-CacheRefreshExecutor-%d)
线程的执行,默认每30秒执行一次。获取服务列表更新到本地缓存。任务内容refreshRegistry
,本地缓存localRegionApps
如何配置注册,
- 创建监控任务线程
"heartbeat",
监督续约任务HeartbeatThread(DiscoveryClient-HeartbeatExecutor-%d)
的执行,默认每30秒执行一次。任务内容为renew()
- 创建实例状态监听器,监听当前实例的状态变化,并通过InstanceInfo复制器,执行
onDemandUpdate()
方法,更新变化到远程server - 启动InstanceInfo复制器定时线程
(DiscoveryClient-InstanceInfoReplicator-%d)
,定时(默认40秒)检测当前实例的DataCenterInfo、LeaseInfo、InstanceStatus,如果有变更,执行InstanceInfoReplicator.this.run()
方法将变更信息同步到server
下面我们看看这几个重要的任务内容:
- refreshRegistry刷新缓存: refreshRegistry最终调用fetchRegistry获取服务列表,更新本地缓存。fetchRegistry在DiscoveryClient初始化时,主动获取一次;之后都是定时获取。
- renew() 续约:通过
eurekaTransport.registrationClient.sendHeartBeat
向server发送当前实例信息 - InstanceInfoReplicator.onDemandUpdate() 状态更新: 一旦有状态变化,停掉定时复制线程,立刻把状态更新到server. 最终调用的是InstanceInfoReplicator.this.run()
- InstanceInfoReplicator.this.run() 复制当前实例信息到Server: 状态变化时,立即执行;平时每40秒执行一次。