public void evict(long additionalLeaseMs) { // 判断是否开启自我保护,自我保护期间不剔除任何任务 if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); //循环获得 所有过期的租约 for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // 判断是否过期 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // 计算 最大允许清理租约数量 int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; // 计算 清理租约数量 int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { Random random = new Random(System.currentTimeMillis()); // 遍历清理。 for (int i = 0; i < toEvict; i++) { int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); internalCancel(appName, id, false); } } }
isLeaseExpirationEnabled():判断是否开启自我保护的两个条件
- 自我保护配置处于开启状态
- 当前单位续约数(renewsLastMin统计器统计的数据)<阈值
Lease.isExpire():是否过期的判断:
public boolean isExpired(long additionalLeaseMs) { return ( //或者明确实例下线时间。 evictionTimestamp > 0 //或者距离最后更新时间已经过去至少3分钟 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); }
- evictionTimestamp : 实例下线时间,当客户端下线时,会更新这个时间
- duration : 过期间隔,默认为90秒
- lastUpdateTimestamp : 为最后更新时间
//续约时更新lastUpdateTimestamp,加上了过期间隔? public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; }
过期时间判断: System.currentTimeMillis()> lastUpdateTimestamp + duration + additionalLeaseMs 这里加了两次duration, 也就是180秒,加上延迟下线时间。也就是最少需要3分钟才判断下线。
3.3 小结
至此Eureka server的初始化就完成了。 这里通过debug模式来看看初始化过程中的定时任务。
4.API接口
Eureka Server 启动后,就是对外提供服务了。等待客户端来注册。
Eureka是一个基于REST(Representational State Transfer)服务,我们从官方文档中可以看到其对外提供的接口: 官方文档
可以推测,客户端注册时也是调用了这些接口来进行与服务端的通信的。
上文说过,Eureka 使用jersey框架来做MVC框架,暴露接口。ApplicationResource
类似springmvc中的Controller。
在com.netflix.eureka.resources
包下我们可以看到这些ApplicationResource
4.1注册接口
ApplicationResource.addInstance
对应的就是服务注册接口
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { .... //使用PeerAwareInstanceRegistryImpl#register() 注册实例信息。 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
InstanceRegistry @Override public void register(final InstanceInfo info, final boolean isReplication) { //发布注册事件, handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); } PeerAwareInstanceRegistryImpl @Override public void register(final InstanceInfo info, final boolean isReplication) { //租期90s int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //注册实例 super.register(info, leaseDuration, isReplication); //复制到其他节点。 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
4.1.1注册到当前Eureka
AbstractInstanceRegistry public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { read.lock()读锁 1.从缓存中获取实例名称对应的租约信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); 2.统计数+1 REGISTER.increment(isReplication); //gmap为null.则创建一个Map。 3.租约的处理分两种情况: 租约已经存在: 比较新租约与旧租约的LastDirtyTimestamp,使用LastDirtyTimestamp最晚的租约 租约不存在,即新注册: synchronized (lock) { 更新期待每分钟续约数 更新续约阈值 } 将租约放入appname对应的map中。 4.在最近注册队(recentRegisteredQueue)里添加一个当前注册信息 5.状态的处理: 将当前实例的OverriddenStatus状态,放到Eureka Server的overriddenInstanceStatusMap; 根据OverriddenStatus状态,设置状态 7.实例actionType=ADDED registrant.setActionType(ActionType.ADDED); 8. 维护recentlyChangedQueue,保存最近操作 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); 9.更新最后更新时间 registrant.setLastUpdatedTimestamp(); 10.使当前实例的结果缓存ResponseCache失效() invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); }
4.1.2复制到其他节点
此处可以看源码阅读,在此不讲了
4.2查询接口
我们获取的实例信息,其实都是从缓存中获取的String payLoad = responseCache.get(cacheKey);
@GET public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") final String acceptHeader, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) { if (!registry.shouldAllowAccess(false)) { return Response.status(Status.FORBIDDEN).build(); } EurekaMonitors.GET_APPLICATION.increment(); CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; if (acceptHeader == null || !acceptHeader.contains("json")) { keyType = Key.KeyType.XML; } Key cacheKey = new Key( Key.EntityType.Application, appName, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept) ); String payLoad = responseCache.get(cacheKey); if (payLoad != null) { logger.debug("Found: {}", appName); return Response.ok(payLoad).build(); } else { logger.debug("Not Found: {}", appName); return Response.status(Status.NOT_FOUND).build(); } }
总结
由于篇幅限制:
- Renew: 服务续约
- Cancel: 服务下线 不说了。
至此:Eureka服务端内容大体讲完,只讲了些大概,具体建议跟源码。
如有错误,敬请指出