Eureka 由浅入深解读,8W+篇幅,再也不想学下去了(二)

简介: Eureka 由浅入深解读,8W+篇幅,再也不想学下去了(二)


3.6 服务注册

在拉取完 Eureka Server 中的注册表信息并将其缓存在本地后,Eureka Client 将向 Eureka Server 注册自身服务实例元数据,主要逻辑在 DiscoveryClient # register 方法中。代码如下:

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        // 发起注册
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    // 注册成功
    return httpResponse.getStatusCode() == 204;
}


在注册的时候,Eureka CLient 会将自身信息封装成实例元数据(InstanceInfo中)发送到 Eureka Server 中请求服务注册,当 Eureka Server 返回 204 状态码,表示注册成功。


相关的注册 url 在追踪到 AbstractJerseyEurekaHttpClient # register即可看出为,http://localhost:9000/eureka/apps/SERVER-USER,其中http://localhost:9000/eureka/是配置配置文件指定的注册中心地址,/apps/${app_name}则是具体的服务注册,参数为 InstanceInfo 实例名称。


debug 查看注册url,如下图:


image.png


3.7 初始化定时任务

在 Eureka Client 应用中,服务的注册是一个持续的过程,所以 Eureka Client 会通过定时发送心跳的方式于 Eureka Server 进行通信,维持自己在 Server 注册表上的续租。


同时,Eureka Server 注册表中的服务实例信息是动态变化的,为了保持 Eureka Client 与 Eureka Server 的注册表信息一致性,Eureka Client 会定时向 Eureka Server 拉取服务注册表信息并更新本地缓存。


并且 Eureka Cliten 为了监控自身应用信息和状态的变化,Eureka Client 设置了一个按需注册的定时器,定时检查自身应用信息活者状态变化,并在发生变化时向 Eureka Server 重新注册,避免注册表中的本服务实例信息不可用。


在 DiscoveryClient # initScheduledTasks 方法中初始化了三个定时器任务:


  • 一个用于向 Eureka Server 拉取注册表信息刷新本地缓存
  • 一个用于向 Eureka Server 发送心跳
  • 一个用于进行按需注册操作


相关代码如下:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // 注册表缓存刷新定时器
        // 获取配置文件中刷新间隔,默认为 30s,可以通过 eureka.client.registry-fetch-interval-seconds进行设置
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    if (clientConfig.shouldRegisterWithEureka()) {
        // 发送心跳定时器,默认30秒发送一次心跳
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
        // 心跳定时器
        scheduler.schedule(
            new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);
        // 下面是按需注册定时器相关逻辑
        // ......
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}


3.7.1 缓存刷新定时任务与发送心跳定时任务

在 DiscoveryClient # initScheduledTasks 方法中,通过 ScheduledExecutorService # schedule 的方式提交缓存刷新任务和发送心跳任务,任务执行的方式为延时执行并且不循环,这两个任务的定时循环逻辑由 TimedSupervisorTask 提供实现。


TimedSupervisorTask 继承了 TimeTask ,提供执行定时任务的功能。它在 run 方法中定义执行定时任务的逻辑,具体代码如下:

public class TimedSupervisorTask extends TimerTask {
    // ......
    @Override
    public void run() {
        Future<?> future = null;
        try {
            // 执行任务
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 等待任务执行结果
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            // 执行完成,设置下次任务执行频率(时间间隔)
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            // 执行任务超时
            timeoutCounter.increment();
    // 设置下次任务执行频率(时间间隔)
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
    // 执行任务被拒绝,统计被拒绝次数
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }
    // 其他异常,统计异常次数
            throwableCounter.increment();
        } finally {
            // 取消非结束任务
            if (future != null) {
                future.cancel(true);
            }
    // 如果定时任务未关闭,定义下一次任务
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
    // ......
}


rum方法中主要实现功能就是定时执行一个任务调度,其过程如下:


  • schedule 初始化并延迟执行 TimedSupervisorTask。
  • TimedSupervisorTask 将 task 提交 executor 中执行,task 和 executor 在初始化 TimedSupervisorTask 时传入。
  • 当 task 正常执行,TimedSupervisorTask 将自己提交到 schedule ,延迟 delay 时间后再此执行。
  • 当 task 执行超时,计算新的 delay 时间 (不超过 maxDelay ),TimedSupervisorTask将自己提交到 schedule,延迟 delay 时间后再次执行。


其执行流程图:


image.png


TimedSupervisorTask通过这种不断循环提交任务的方式,完成定时执行任务的功能。


在 DiscoveryClient # initScheduledTasks 方法中,提交缓存刷新定时任务的线程任务为 CacheRefreshThread,提交发送心跳定时任务的线程为 HeartThread。


CacheRefreshThread继承了 Runnable 接口,代码如下:

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
    @VisibleForTesting
    void refreshRegistry() {
        try {
            // .....
            // 判断远程Region是否改变(即 Eureka Server 地址是否发生改变),决定进行全量拉去还是增量拉取
            boolean success = fetchRegistry(remoteRegionsModified);
            // .....
            // 打印更新注册表缓存后的变化
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }
}


CacheRefreshThread 线程任务将委托 DiscoveryClient # fetchRegistry 方法进行缓存化系的具体操作。


HeartThread同样继承了 Runnable 接口,该任务的作用是向 Eureka Server 发送心跳请求,维持 Eureka Client 在注册表中的续约,代码如下:

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}


可看出,其主要的逻辑代码在 renew() 方法中,代码如下:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // 调用 HTTP 发送心跳到 Eureka Server 中维持租续
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        // Eureka Server 中不存在该应用实例
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            // 重新注册
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        // 续约成功
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}


Eureka Server 会根据续租提交的 appName 与 instanceInfoId 来更新注册表中的服务实例的续租。当注册表中不存在该服务实例时,将返回 404 状态码,发送心跳请求的 Eureka Client 在接受到 404 状态码后将会重新发起注册,如果续约成功,将会返回 200 状态码。


通过 debug 我们可以在 AbstractJerseyEurekaHttpClient # sendHeartBeat 方法中,可以发现服务续租调用的接口即传递的参数,如图:


image.png


续租的 url :http://localhost:9000/eureka/apps/A P P N A M E / {APP_NAME}/APP

N


AME/{INSTANCE_INFO_ID},HTTP方法为put,参数主要有 status(当前服务状态),lastDirtyTimestamp(上次数据变化时间)以及 overriddenStatus。


3.7.2 按需注册定时任务

按需注册定时任务的作用是当 Eureka Client 中的 InstanceInfo 或者 status 发生变化时,重新向 Eureka Server 发起注册请求,更新注册表中的服务实例信息,保证 Eureka Server 注册表中服务实例有效和可用。按需注册代码如下:

// InstanceInfo replicator
// 定时检查刷新服务实例信息,检查是或否有变化,是否需要重新注册
instanceInfoReplicator = new InstanceInfoReplicator(
    this,
    instanceInfo,
    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
    2); // burstSize
// 监控应用的 status 变化,发生变化即可发起重新注册
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }
    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    // 注册应用状态改变监听器
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动定时按需注册定时任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());


按需注册功能分为两部分:


  1. 一部分定义了一个定时任务,定时刷新服务实例的信息和检查应用状态的变化,在服务实例信息发生改变的情况下向 Eureka Server 重新发起注册操作。
  2. 一部分时注册状态改变监控器,在应用状态改变的情况时,刷新服务实例信息,在服务实例信息发生改变的情况下 Eureka Server 重新发起注册操作。


instanceInfoReplicator 中的定时任务逻辑位于 #run 方法中,如下所示:

public void run() {
    try {
        // 刷新了 InstanceInfo中的服务实例信息
        discoveryClient.refreshInstanceInfo();
  // 如果数据发生改变,则返回数据更新时间
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            // 注册服务实例
            discoveryClient.register();
            // 重置更新状态
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        // 执行下一个延时任务
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}


其中 DiscoveryClient 中刷新本地服务实例信息和检查服务状态变化的代码如下(discoveryClient.refreshInstanceInfo):

void refreshInstanceInfo() {
    // 刷新服务实例信息
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    // 更新租续的信息
    applicationInfoManager.refreshLeaseInfoIfRequired();
    InstanceStatus status;
    try {
        // 调用 HealthCheckHandler 检查服务实例的状态变化
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
        status = InstanceStatus.DOWN;
    }
    if (null != status) {
        applicationInfoManager.setInstanceStatus(status);
    }
}


在 instanceInfoReplicator # run 方法中,首先调用了 discoveryClient.refreshInstanceInfo 方法刷新当前的服务实例信息,查看当前服务实例信息和服务状态是否发生变化,如果当前服务实例信息或者服务状态发生变化将向 Eureka Server 重新发起服务注册操作。


最后再此声明了一下延时任务,用于再测调用 run 方法,继续检查服务实例信息和服务状态的变化,在服务实例信息发生变化的情况下重新发起注册。


如果 Eureka Client 的状态发生变化(在 SpringBoot 通过 Actuator 对服务状态进行监控,具体实现为 EurekaHealthCheckHandler),注册在 ApplicationInfoManager 的状态改变监控器将会被触发,从而调用 InstanceInfoReplicator # onDemandUpdate 方法,检查服务实例信息和服务状态的变化,可能会引发按需注册任务。代码如下:

public boolean onDemandUpdate() {
    // 控制流量,当超过限制时,不能进行按需更新
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    // 取消上次 run 任务
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }
                    // 重新启动 run 方法
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}


InstanceInfoReplicator # onDemandUpdate 方法调用 InstanceInfoReplicator # run 方法检查服务实例信息和服务状态的变化,并在服务实例信息发生变化的情况下向 Eureka Server 发起重新注册的请求。


为了防止重新重复执行 run 方法, onDemandUpdate 方法还会取消执行上次已提交且为未完成的 run 方法,执行最新的按需注册任务。


按需注册定时任务的处理流程如图:


微信图片_20220427002019.png


3.8 服务下线

服务下线就是 Eureka Client 向 Eureka Server 注销自身在注册表中的信息,DiscoveryClient 中对象在销毁前执行的清理方法如下:

@PreDestroy
@Override
public synchronized void shutdown() {
    // 同步方法
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");
        // 原子操作,确保只执行一次
        if (statusChangeListener != null && applicationInfoManager != null) {
            // 注销状态监听器
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        // 取消定时任务
        cancelScheduledTasks();
        // If APPINFO was registered
        if (applicationInfoManager != null
            && clientConfig.shouldRegisterWithEureka()
            && clientConfig.shouldUnregisterOnShutdown()) {
            // 服务下线
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 关闭 Jersy 客户端
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
        // 关闭相关 Monitor
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();
        logger.info("Completed shut down of DiscoveryClient");
    }
}


在销毁 DiscoveryClient 之前,会进行一系列的清理工作,包括 ApplicationInfoManager 中的 StatusChangeListener、取消定时任务、服务下线和关闭 Jersey 客户端等。


下面我们主要看 unregister 服务下线方法,代码如下:

void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}


其中调用了 AbstractJerseyEurekaHttpClient # cancel 方法中,可以发现服务下线调用的接口以及传递的参数,代码如下:

@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}


服务下线的接口地址为 apps/A P P N A M E / {APP_NAME}/APP

N


AME/{INSTANCE_INFO_ID},参数为服务名称和服务实例id,HTTP方法为delete。


3.9 Eureka Client源码执行流程总图


微信图片_20220427002202.png


四、Eureka Server 源码解析

Eureak Server 作为一个开箱即用的服务注册中心,提供了以下功能,用以满足与 Eureka Client 交互的需求:


  1. 服务注册
  2. 接受服务心跳
  3. 服务剔除
  4. 服务下线
  5. 集群同步
  6. 获取注册表中服务实例信息


同时 Eureka Server 它也是一个 Eureka Client,所以在不禁用 Eureka Server 的客户端行为时,它是会向它配置文件中的其他 Eureka Server 进行拉取注册表、服务注册和发送心跳等操作。


下面先看注册表的类关系 InstanceRegistry ,为后面的服务注册、Eureka-Server 集群复制做整体的铺垫。


4.1 服务实例注册表

InstanceRegistry 是 Eureka Server 中注册表管理的核心接口,类结构图如下:


image.png


  • InstanceRegistry类,对 Eureka Server 的注册表实现类 PeerAwareInstanceRegistryImpl 进行了继承和扩展,使其适配 Spring Cloud 的使用环境,主要实现由 PeerAwareInstanceRegistryImpl 提供。
  • InstanceRegistry接口,是 Eureka Server 注册表的最核心接口,其职责是在内存中管理注册到 Eureka Server 中的服务实例信息。
  • LeaseManager接口,是对注册到 Eureka Server 中的服务实例租续进行管理。
  • LookupService接口,是提供对服务实例进行检索,发现活跃的服务实例功能(Eureka Client源码介绍中讲过)。


从上而下,LeaseManager 接口提供的方法代码如下:

public interface LeaseManager<T> {
    // 注册,创建新的租约
    void register(T r, int leaseDuration, boolean isReplication);
    // 下线,取消指定服务的租约
    boolean cancel(String appName, String id, boolean isReplication);
    // 续约,更新指定服务的租约
    boolean renew(String appName, String id, boolean isReplication);
    // 剔除,剔除服务实例
    void evict();
}


LeaseManager 接口的作用是对注册到 Eureka Server 中的服务实例续约进行管理,分别有服务注册、服务下线、服务租续更新以及服务剔除等操作。


在 LeaseManager 中管理的对象是 Lease ,Lease 代表一个 Eureka Clietn 服务实例信息的租续,它提供了对其内持有的类的事件有效性操作。Lease 持有的类是代表服务实例信息的 InstanceInfo。Lease中定义了租约的操作,分别是注册、下线、更新,同时提供了对租约中事件属性的各项操作。租约默认的有效时长(duration)为 90 秒


InstanceRegistry接口 在继承 LeaseManager 和 LookupService 接口的基础上,还添加了一些特有的方法,可以更为简单地管理服务实例租约和查询注册表中的服务实例信息。可以通过 AbstractInstanceRegistry 查看 InstanceRegistry 接口方法的具体实现。


PeerAwareInstanceRegistry 继承了 InstanceRegistry 接口,在其基础上添加了 Eureka Server 集群同步的操作,其实现类 PeerAwareInstanceRegistryImpl 继承了 AbstractInstanceRegistry 的实现,在对本地注册表操作的基础上添加了对其 peer 节点的同步复制操作,使得 Eureka Server 集群中的注册表信息保持一致。


4.2 服务注册

Eureka Client 在发起服务注册时会将自身服务实例元数据封装在 InstanceInfo 中,然后将 InstanceInfo 发送到 Eureka Server。Eureka Server 在接收到 Eureka Client 发送的 InstanceInfo 后将会尝试将其放到本地注册表中以供其他 Eureka Client 进行服务发现。


服务注册的主要实现位于 AbstractInstanceRegistry # registry 方法中,代码如下:


public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 获取读锁
        read.lock();
        // 这里的 registry 是 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 类型,根据 appName 对服务实例集群进行分类
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            // putIfAbsent(key,value):key和value进行了关联则返回 value,否则将 key和value进行关联,返回null
            // 这里有一个比较严谨的操作,防止再添加新得服务实例集群租续时把已有其他线程添加的集群租续覆盖掉,如果存在该键值,直接返回已存在的值;否者添加改键值对,返回null
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 根据 instanceId 获取实例租续
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            // 如果该实例的租续已经存在,比较最后更新时间戳大小,取最大值的注册信息为有效
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // 租续不存在,这是一个新的注册实例
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // 自我保护机制
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                        (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        // 创建新的租续
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 如果租续存在,继承租续的服务上线初始值
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 保存租续
        gMap.put(registrant.getId(), lease);
        // 用来统计最近注册服务实例的数据
        synchronized (recentRegisteredQueue) {
            // 添加最近注册队列
            recentRegisteredQueue.add(new Pair<Long, String>(
                System.currentTimeMillis(),
                registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                         + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }
        // Set the status based on the overridden status rules
        // 根据服务状态规则得到服务hi里的最终状态,并设置服务实例的当前状态
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);
        // If the lease is registered with UP status, set lease service up timestamp
        // 如果实例状态为 UP,设置租约的服务上线时间,只有第一次设置有效
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        // 添加最近租约变更记录队列,表示 ActionType 为 ADDED,这将用于 Eureka Client 增量式获取注册表信息
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        // 设置服务实例信息更新时间
        registrant.setLastUpdatedTimestamp();
        // 设置 response 缓存过期,这将用于 Eurkea Client 全量获取注册表信息
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        // 释放锁
        read.unlock();
    }
}


registry 的结构:

registry:ConcurrentHashMap<appName, Map<instanceId, Lease<InstanceInfo>>>


在 registry 中,服务实例的 InstanceInfo 保存在 Lease 中,Lease 在 AbstractInstanceRegistry 中统一通过 ConcurrentHashMap 保存在内存中。


在服务注册过程中,会先获取一个读锁,防止其他线程对 registry 注册表进行数据操作,避免数据的不一致。然后从 resgitry 查询对应的 InstanceInfo 租续是否已经存在注册表中,根据 appName 划分服务集群,使用 InstanceId 唯一标记服务实例。如果租约存在,比较两个租约中的 InstanceInfo 的最后更新时间 lastDirtyTimestamp,保留时间戳最大的服务实例信息 InstanceInfo。如果租约不存在,意味着这是一个全新的服务注册,将会进行自我保护的统计,创建新的租约保存 InstanceInfo。接着将租约放到 resgitry 注册表中。


之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态,缓存操作包括将 InstanceInfo 加入用于统计 Eureka Client 增量式获取注册表信息得 recentlyChangedQueue 和失败 responseCache 中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁完成服务注册。


在服务注册中,registry 方法为了防止数据被错误的覆盖而进行了大量的同步操作(读写锁,synchronized 锁)。


4.3 接受服务心跳

我们知道,Eureka Client 为了向服务端证明自己是有用的,会定时向 Eureka Server 发送一个心跳(默认30秒一次),以此来证明自己可用不被 Eureka Server 从注册表中剔除掉。


在 Eureka Server 中处理心跳请求的核心逻辑位于 AbstractInstanceRegistry # renew。


renew 方法是对 Eureka Client 位于注册表中的租约的续租操作,不像 register 方法需要服务实例信息,进根据服务实例的服务名和服务实例 id 即可更新对应租约的有效时间,源码如下:

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 根据appName获取服务集群的租约集合
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        // 从集合中获取租约
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        // 租约不存在 直接返回 false
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // 根据覆盖状态规则得到服务实例的最终状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                // 如果得到的服务实例最后状态是 UNKNOWN 取消续约
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            // instanceInfo 中的状态改变,更新
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId());
                // 更新服务状态
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        // 统计每分钟续租的次数 用于自我保护
        renewsLastMin.increment();
        // 更新租约中的有效时间
        leaseToRenew.renew();
        return true;
    }
}


在 renew 方法中,不关注 InstanceInfo ,仅关注于租约本身以及租约的服务实例状态。如果根据服务实例的 appName 和 instanceInfoId 查询出服务实例的租约,并且根据 getOverriddenInstanceStatus 方法等到的 instanceStatus 不为 InstanceStatus.UNKNOWN 那么更新租约中的有效时间,即更新租约 Lease 中 lastUpdateTimestamp ,达到续约的目的;如果租约不存在,那么返回续租失败。


4.4 服务剔除

如果 Eureka Client 在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因),那么服务的状态就处于不可知的状态,不能保证能够从该服务中获取到反馈,所以需要服务剔除 AbstractInstanceRegistry # evict 方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期续租剔除。源代码如下:

@Override
public void evict() {
    evict(0l);
}
public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    if (!isLeaseExpirationEnabled()) {
        // 自我保护相关,出现该状态,不允许剔除服务
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }
    // 遍历注册表register 一次性获取所有的过期租约
    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // 计算最大允许剔除的租约的数量 获取注册表租约总数
    int registrySize = (int) getLocalRegistrySize();
    // 计算注册表租约的阀值 与自我保护相关
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;
    // 计算剔除租约的数量
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
        Random random = new Random(System.currentTimeMillis());
        // 逐个随机剔除
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);
            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // 逐个剔除
            internalCancel(appName, id, false);
        }
    }
}


服务剔除将会遍历 registry 注册表,找出其中所有的过期租约,然后根据配置文件中续租百分比阈值和当前注册表的租约总数量计算出最大允许的剔除租约的数量(当前注册表中租约总数量减去当前注册表租约阈值),分批次剔除过期的服务实例租约。


对过期的服务实例租约调用 AbstractInstanceRegistry # internalCancel 服务下线的方法将其从注册表中清除掉。


为了保证 Eureka Server 的可用性,在服务剔除 evict 方法中有很多限制:


  1. 自我保护时期不能进行服务剔除操作。
  2. 过期操作是分批进行。
  3. 服务剔除是随机逐个剔除,剔除均匀分布在所有应用中,防止在同一时间内同意服务集群中的服务全部过期被剔除,以致大量剔除发生,在未进行自我保护前提促使程序崩溃。


服务剔除是一个定时的任务,所以 AbstractInstanceRegistry 中定义了一个 EvictionTask 用于定时执行服务剔除,默认 60 秒一次。


服务剔除的定时任务一般在 AbstractInstanceRegistry 初始化结束后进行,按照执行频率 evictionIntervalTimerInMs 的设定,定时剔除过期的服务实例租约。


自我保护机制主要在 Eureka Client 和 Eureka Server 之间存在网络分区的情况下发挥保护作用,在服务器端和客户端都有对应的实现。


假设在某种特定的情况下(网络故障…), Eureka Client 和 Eureka Server 无法进行通信,此使 Eureka Client 无法向 Eureka Server 发起祖册和续约请求,Eureka Server 中就可能因注册表中的服务实例租约出现大量过期而面临被剔除的危险,然而此使的 Eureka Client 可能是处于健康状态的(可接受服务访问),如果直接将注册表中大量过期的服务实例租约剔除掉先然是不合理的。


针对上面那种情况,Eureka 设计了”自我保护机制“。在 Eureka Server 处,如果出现大量服务实例过期被剔除的现象,那么改 Server 节点将进入自我保护模式,保护注册表中的信息不再被剔除,在通信稳定后在退出该模式;在Eureka Client 处,如果向 Eureka Server 注册失败,将快速超时并尝试于其他的 Eureka Server 进行通信。”自我保护机制“的设计大大提高了 Eureka 的可用性。


4.5 服务下线

Eureka Client 在应用销毁时,会向 Eureka Server 发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。在服务剔除的过程中,也是通过服务下线的逻辑完成对单个服务实例过期租约的清楚工作。


服务下线的主要实现代码位于 AbstractInstanceRegistry # internalCancel 方法中,仅需要服务实例的服务名和服务实例的 id 即可完成服务下线。源代码如下:

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        // 获取读锁,防止被其他线程进行修改
        read.lock();
        CANCEL.increment(isReplication);
        // 根据appName获取服务实例的集群
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            // 移除服务实例的租约
            leaseToCancel = gMap.remove(id);
        }
        // 将服务实例信息添加到最近下线服务实例统计队列
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }
        // 移除实例的状态信息
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        // 租约不存在,返回 false
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            // 设置租约的下线时间
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                // 添加最近租约变更记录队列,标识ActionType DELETED
          // 这将用于Eureka Client 增量式获取注册表信息
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // 设置 response 缓存过期
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            // 下线成功
            return true;
        }
    } finally {
        // 释放锁
        read.unlock();
    }
}


internalCancel 方法与 register 方法的行为过程很类似,首先通过 register 根据服务名和服务实例 id 查询关于服务实例的租约 Lease 是否存在,统计最近下线的服务实例用于 Eureka Server 主页展示。如果租约不存在,返回下线失败;如果租约存在,从 register 注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,以用于 Eureka Client 的增量式获取注册表信息,最后设置 reposonse 缓存过期。


internalCancel 方法中同样通过读锁保证 register 注册表中数据的一致性,避免脏读。


4.6 集群同步

如果,Eurkea Server 是通过集群的方式进行部署,那么为了维护整个集群中 Eureka Server 注册表数据的一致性,势必需要一个机制同步 Eureka Server 集群中注册表的数据。


Eureka Server 集群同步包含两个部分:


  1. 一部分是 Eureka Server 在启动过程中从它的 peer 节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中。
  2. 另一部分是 Eureka Server 每次对本地注册表进行操作时,同时会将操作同步到它的 peer 节点中,达到集群注册表数据统一的目的。


4.6.1 Eureka Server 初始化本地注册表信息

在 Eureka Server 启动的过程中,会从它的 peer 节点中拉取注册表来初始化本地注册表,这部分主要通过 PeerAwareInstanceRegistry # syncUp 方法完成,他将从可能存在 peer 节点中,拉取 peer 节点中的注册表信息,并将其中的服务实例信息注册到本地注册表中,源代码如下:

public int syncUp() {
    // Copy entire entry from neighboring DS node
    // 从临近的 peer 中复制整个注册表
    int count = 0;
    // 如果获取不到,线程等待
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 获取所有的服务实例
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    // 判断是否可以注册,主要用于 AWS 环境下进行,若部署在其他环境,直接返回ture
                    if (isRegisterable(instance)) {
                        // 注册到自身的注册表中
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}


Eureka Server 也是一个 Eureka Client ,在启动的时候也会进行 DiscoveryClient 的初始化,会从其对应的 Eureka Server 中拉取全量的注册表信息。


在 Eureka Server 集群部署的情况下, Eureka Server 从它的 peer 节点中拉取到注册表信息后,将遍历这个 Applications,将所有的服务实例通过 AbstractRegistry # register 方法啊注册到自身注册表中。


在初始化本地注册表时,Eureka Server 并不会接受来自 Eureka Client 的通信请求(如注册、或者获取注册表信息等请求)。 在同步注册表信息结束后会通过 PeerAwareInstanceRegistryImpl # openForTraffic 方法允许该 Server 接受流量。源代码如下:

public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 初始化自我保护机制统计参数
    this.expectedNumberOfRenewsPerMin = count * 2;
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    // 如果同步的应用实例数量为 0 ,将在一段时间内拒绝 Client 获取注册信息
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    // 判断是否 AWS 预序环境,可忽略该部分
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 修改服务实例的状态为健康上线,可以接受流量
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}


在 Eureka Server 中有一个 StatusFilter 过滤器,用于检查 Eureka Server 的状态,当 Server 的状态不为 UP 时,将拒绝所有的请求。


在 Client 请求获取注册表信息时,Server 会判断此时是否允许获取注册表中的信息。这种做法是为了避免 Eureka Server 在 syncUp 方法中没有获取到任何服务实例信息时(Eureka Server 集群部署的情况下),Eureka Server 注册表中的信息影响到 Eureka Client 缓存的注册表中的信息。


如果 Eureka Server 在 syncUp 方法中没有获取任何的服务实例信息,它将把 peerInstancesTransferEmptyOnStartup 设置为 true,这时该 Eureka Server 在 WaitTimeInMsWhenSyncEmpty(可以通过 eureka.server.wait-time-in-ms-when-sync-empty 设置,默认是 5 分钟)时间后才能被 Eureka Client 访问获取注册表信息。


4.6.2 Eureka Server 之间注册表信息的同步复制

为了保证 Eureka Server 集群运行时注册表信息的一致性,每个 Eureka Server 在对本地注册表进行管理操作时,会将相应的操作同步到所有 peer 节点中。


在 PeerAwareInstanceRegistryImpl 中,对 Abstractinstanceregistry 中的 register、cancel 和 renew 等方法都添加了同步到 peer 节点的操作,使 Server 集群中注册信息保持最终一致性,部分源代码如下:


下线

public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    // ...
    // 同步下线状态
    replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
    // ...
}


注册

public void register(final InstanceInfo info, final boolean isReplication) {
    // ...
    // 同步注册状态
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}


续约

public boolean renew(final String appName, final String id, final boolean isReplication) {
    // ...
    // 同步续约状态
    replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
    // ...
}


同步的主要操作有(枚举类):

public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
    // ...
}


每个同步方法都是调用如下方法:

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 如果 peer 集群为空,或者本来就是复制操作,那么久不在复制,防止造成循环复制
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 向 peer 集群中的每一个peer 进行同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            // 如果 peer 系欸但时自身的话,不进行同步复制
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 根据 Action 调用不同的同步请求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}


peerEurekaNode 代表一个可同步共享数据的 Eureka Server。在 PeerEurekaNode中,具有 register、cancel、heartbeat 和 statueUpdate 等诸多用于向 peer 节点同步注册表信息的操作。


在 replicateInstanceActionsToPeers 方法中根据 action 的不同,调用 PeerEurekaNode 的不同方法进行同步复制,代码如下所示:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                // 同步下线
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 同步心跳
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                // 同步注册
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 同步跟新状态
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 删除实例覆盖状态
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}


PeerEurekaNode 中的每一个同步复制都是通过比任务流的方式进行操作,同一时间段内相同服务实例的相同操作将使用相同的任务编号,在进行同步复制的时候根据任务编号合并操作,减少同步操作的数量和网络消耗,但是同步也造成同步复制的延时性,不满足 CAP 中的 C(强一致性)。


通过 Eureka Server 在启动过程中初始化本地注册表信息和 Eureka Server 集群间的同步复制操作,最终达到了集群中 Eureka Server 注册表信息一致的目的。


4.7 获取注册表中服务实例信息

Eureka Server 中获取注册表的服务实例信息主要通过两个方法实现:


AbstractInstanceRegistry # getApplicationsFromMultipleRegions 从多地区获取全量注册表数据

AbstractInstanceRegistry # getApplicationDeltasFromMultipleRegions 从多地区获取增量式注册表数据


4.7.1 getApplicationsFromMultipleRegions

getApplicationsFromMultipleRegions 方法将会从多个地区中获取全量注册表信息,并封装成 Application 返回,源代码如下:

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                 includeRemoteRegion, remoteRegions);
    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
    } else {
        GET_ALL_CACHE_MISS.increment();
    }
    Applications apps = new Applications();
    apps.setVersion(1L);
    // 从本地registry获取所有的服务实例信息工nstanceinfo
    for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
        // ...
    }
    if (includeRemoteRegion) {
        // 获取远程 Region 中的 Eureka Server 中的注册表信息
        // ...
    }
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}


它首先会将本地注册表 register 中的所有服务实例信息提取出来封装到 Applications 中,再根据是否需要拉取远程 Regist 中的注册表信息,将远程 Region 的 Eureka Server 注册表中的服务实例信息添加到 Application 中。最后将封装了全量注册表信息的 Applications 返回给 Client。


4.7.2 getApplicationDeltasFromMultipleRegions

getApplicationDeltasFromMultipleRegions 方法将会从多个地区中获取增量式注册表信息,并封装成 Applications 返回,代码如下:

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = allKnownRemoteRegions; // null means all remote regions.
    }
    boolean includeRemoteRegion = remoteRegions.length != 0;
    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        GET_ALL_CACHE_MISS_DELTA.increment();
    }
    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        // 开启写锁
        write.lock();
        // 遍历 recentlyChangedQueue 队列获取最近变化的服务实例信息 InstanceInfo
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
        while (iter.hasNext()) {
            // ...
        }
        // 获取远程Region 中的 Eureka Server 的增量式注册表信息
        if (includeRemoteRegion) {
            // ...
        }
        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        // 计算应用集合一致性哈希码,用以在Eureka Client 拉取时进行对比
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}


获取增量式注册表信息将会从 recentlyChangedQueue 中获取最近变化的服务实例信息。recentlyChangedQueue 中统计了进 3 分钟内进行注册、修改和剔除的服务实例信息,在服务注册 AbstractInstanceRegistry # registry 、接受心跳请求 AbstractInstanceRegistry # renew 和服务下线 AbstractInstanceRegistry # internalCancel 等方法中均可见到 recentlyChangedQueue 对这些服务实例进行登记,用于记录增量式注册表信息。getApplicationDeltasFromMultipleRegions 方法同样提供了从远程 Region 的 Eureka Server 获取增量式注册表信息的能力。


五、最后


上面对 Eureka Client 及 Eureka Server 的主要功能进行分析(读书笔记),篇幅应该说是非常的长了,所以本篇内容还是需要长时间的研读并结合源码去理解。


对于书中的后续内容“Eureka 进阶”就不打算放出来了,太耗费时间去整理了,如果大家感兴趣可以根据末尾的参考资料去阅读,这就都不多说明了哈!


好了,今天的内容到这里就结束了,关注我,我们下期见


目录
相关文章
|
Java 数据库连接 数据库
源码分析系列教程(完) - 终章总结
源码分析系列教程(完) - 终章总结
63 0
源码分析系列教程(完) - 终章总结
|
8月前
|
XML Java 数据格式
🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,学了这么久,你学废了吗?
🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,学了这么久,你学废了吗?
127 0
|
缓存 Java 中间件
谈谈你对Nacos配置动态更新原理的理解,这道题一定要会
Nacos作为阿里的开源中间件在Spring Cloud生态以后,不管是作为配置中心,还是作为注册中心,因为它简单易用的特性,在互联网公司被广泛运用。随后,大家会发现Nacos相关的面试题也就越来越多了。
264 0
|
监控 NoSQL 安全
Spring Boot 怎么做监控?这篇总算整明白了。。。
Spring Boot 怎么做监控?这篇总算整明白了。。。
2122 8
Spring Boot 怎么做监控?这篇总算整明白了。。。
|
JSON Prometheus 监控
Spring Boot 怎么做监控?这篇总算整明白了。。。(2)
Spring Boot 怎么做监控?这篇总算整明白了。。。(2)
239 0
Spring Boot 怎么做监控?这篇总算整明白了。。。(2)
|
XML Dubbo Java
不服不行啊!大牛确实把SpringCloud集成Dubbo给一次性讲透了
Spring Cloud集成Dubbo 目前Dubbo在国内还是有较多公司在使用的,一方面是因为Dubbo作为阿里巴巴开源的一个SOA服务治理解决方案,在国内发展较早,有比较好的先发优势;另一方面是因为在国内很多工程师对Dubbo框架都比较熟悉,有比较完善的文档介绍和实例;还有,Dubbo框架的性能优势和基于SPI的扩展机制也是Dubbo的优势所在。
不服不行啊!大牛确实把SpringCloud集成Dubbo给一次性讲透了
|
算法 网络协议 NoSQL
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(1)
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(1)
403 0
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(1)
|
JSON API Nacos
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(3)
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(3)
169 0
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(3)
|
缓存 API Nacos
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(2)
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(2)
182 0
面试官:Nacos 为什么这么强?讲讲实现原理?我懵了。。(2)
|
Kubernetes 前端开发 Java
小六六平时的开发小技巧二(Nacos在服务配置中心的妙用)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…
135 0