一、背景
Diamond作为配置中心中间件在阿里内部非常常用,diamond不仅适用于业务应用,各种中间件也能看到它的身影,可见其“配置管理”、“动态配置推送”能力十分重要。这里我将记录学习diamond的过程,秉承着知其然知其所以然的想法,来探索这个🐂🍺的中间件底座。
学习之前需要先带着几个问题,为什么会诞生diamond?diamond的原理是什么?用了diamond需要注意什么?
至于为什么会诞生diamond,这里我假想两个场景:
场景1:
在现实场景中复杂系统必定存在很多配置类数据,比如用户白名单列表、环境级别标识、数据库地址等等,如果是以多个微服务提供服务的系统中配置数据更是数不胜数,如果这些数据都配置在应用自身的文件中(如application.properites),那么这对开发人员来说对系统理解和运维都是存在极大挑战的。这些问题可以抽象为:
- 配置多且复杂,理解有难度
- 配置分散,不利于管理
那我们来思考如果将各个微服务应用的配置都收敛到同一个地方并通过可视化来管理,那是不是能解上述两个问题?我觉得在一定程度上是可以解决的。
场景2:
很多配置类数据都是需要根据业务场景动态变更的,比如切流开关、管理员信息、阈值配置等等,试想一下如果所有的配置都需要主动请求才能获取最新的数据,那会存在什么问题?这里我们假设配置数据都存在mysql数据库,一次业务处理流程如下图:
1.首先客户端发起请求
2.应用服务直接查询数据库,以此来获取最新的配置
3.应用服务等待数据库返回
4.应用服务根据数据库返回结果做相应处理,请求处理结束
可以看到每一次请求都需要直接查询数据库,这样将带来如下问题:
- 增加请求RT,数据库查询增加了处理耗时
- 可靠性降低,如果数据库异常,那么服务将不可用
- 由于数据库查询在主流程中,当QPS过大时,数据库有可能成为性能瓶颈
3.解决思路
要解决上述问题都有哪些方式可以实现?
1.考虑更换存储,将mysql换成redis,是否可以实现?
redis相比mysql有较低的RT也不会成为性能瓶颈,而且云redis支持集群部署、同城容灾等可靠性高,从性能层面看是满足要求的,但redis作为缓存型数据库不适合持久化,配置类数据往往要求持久化保存,而且redis更多是运行态使用,里面的数据不太好管理,综上所述redis不太满足要求。
2.考虑将配置查询操作旁路化
配置查询操作旁路化后RT、性能瓶颈都不是问题,剩下的就是数据的运维管理与更新问题。
diamond显然是采用了提供了两种思路来解决这个问题,针对运维管理问题,其diamond-ops组件提供了统一的配置管理页面,一定程度上解决了运维不方便的问题。
通过动态配置推送能力来解决配置如何更新到应用服务问题,这个是diamond的核心能力,后文我们将围绕着这个问题展开。
二、Diamond架构
2.1 架构组成
diamond整体如图所示,可以看到主要是由五个模块组成:
- diamond-server:diamond最核心的服务端应用,采用了spring mvc框架,向外部提供提供功能接口(restful接口)。
- diamond-client:用于访问diamond的客户端,通过SDK的方式嵌入到应用服务内部,主要使用配置查询、配置监听功能,但还有配置发布等其他功能
- 数据库:用来持久化diamond中各应用的配置,保证强一致性的关键组件
- http-server(地址服务器):用于保存每个集群的diamond-server地址,向diamond-client提供查询服务,向diamond-server提供注册地址和注销地址服务,是diamond客户端访问diamond服务端的broker(可以用curl命令查看其diamond-server ip地址:http://xxx.xxx.net:8080/diamond-server/diamond)
- diamond-ops:diamond管理控制台,提供基础的配置查询、变更、发布等运维层面的功能
2.2 整体流程
这里根据架构图简述一下diamond的整体流程:
- diamond-server部署集群,向地址服务器注册自身IP,扩容缩容会持续更新列表
- 用户应用服务启动,触发diamond-client初始化,向地址服务器获取diamond-server地址。diamond对业务进行了region级容灾,同城容灾,单元化等
- 获取列表成功后,diamond-client向服务器发起请求,查询配置并监听,只有监听配置后才会收到变更通知,即实现“ManagerListener”接口,重写“receiveConfigInfo”方法
- 用户通过diamond-ops变更配置,diamond-server会现将配置更新到数据库,然后把最新的配置刷新到服务器本地缓存,并通知所有diamond-server同步数据
- diamond-server本地缓存变更时,会通知对该配置进行监听的diamond-client
- diamond-client收到变更通知,向diamond-server发起配置查询请求,服务端将本地缓存的内容返回给diamond-client,diamond-client查询到最新配置后,回调业务的监听器,即通过“receiveConfigInfo”方法将数据传递给用户应用
- diamond通过心跳检测机制来保证http-server列表是可用的
三、原理分析
diamond最重要的功能是配置变更后,不需要用户应用客重启就能快速感知到,其核心是动态配置推送服务,接下来我们分析下diamond是如何实现动态推送的。
实时的动态配置推送,本质上是考虑如何将服务器上的配置变更动作实时发送给客户端。如果有一种机制可以使服务器源源不断的“推送”信息给客户端,那这个问题就解决了。TCP长连接是最自然的一种解决方案,但是由于设计上diamond选择了http协议提供服务,如何在Http协议的Request-Response通信模型下解决这个问题,是Diamond面临的难题。
3.1 基于拉模型的客户端轮询的方案
客户端通过轮询方式发现服务端的配置变更事件。轮询的频率决定了动态配置获取的实时性,类似与定时任务schedulerx每隔一段时间发起一起处理请求,先说下结论:这种方案不够高效,diamond并未采取,原因如下:
优点:
- 简单、可靠
缺点:
- 当用户量增加时,较高的轮询频率给整个Diamond服务带来的压力也不断增加
- 较低的轮询频率又影响客户端获取配置的时效性
- 从配置中心的应用场景上来看,是一种写少读多的系统,客户端大多数轮询请求都是没有意义的
3.2 基于推模型的客户端长轮询的方案
当前淘宝生产环境中的Diamond服务,采用了这种方案来实现动态配置实时推送。 基于Http长轮询模型,实现了让客户端在没有发生动态配置变更的时候减少轮询。这样减少了无意义的轮询请求量,提高了轮询的效率;也降低了系统负载,提升了整个系统的资源利用率。 另外,这种推拉结合的策略,做到了在长连接和短连接之间的平衡,实现上让服务端不用太关注连接的管理,效果上又获得了类似TCP长连接的信息推送的实时性。
大致示意图如下:
四、应用注册监听器
首先应用程序通过diamond-client注册需要监听的配置:dataId + group
@Service
public class DiamondUtil implements ManagerListener, InitializingBean {
/**
* 支持应用自己指定定时器,一般为null
* 也可继承ManagerListenerAdapter,这样就不用重写getExecutor方法
*
* @see com.taobao.diamond.manager.ManagerListenerAdapter
*/
@Override
public Executor getExecutor() {
return null;
}
/**
* 接收配置信息
*
* @param configInfo 配置值
*/
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println(configInfo);
}
/**
* 注册监听器,监听配置:dataId, group
*/
@Override
public void afterPropertiesSet() throws Exception {
String dataId = "dataId";
String group = "group";
Diamond.addListener(dataId, group, this);
String config = Diamond.getConfig(dataId, group, 10000L);
receiveConfigInfo(config);
}
}
五、客户端功能分析
5.1 主要模块分析
5.2 客户端主要功能
5.2.1 获取diamond-server列表
主要流程如下,其中任何环节失败都会抛异常,这里只展示正常链路。
public static DiamondEnv getDefaultEnv() {
if (defaultEnv == null) {
synchronized (DiamondEnv.class) {
if (defaultEnv == null) { //Double Checked
defaultEnv = new DiamondEnv(new ServerListManager());
// 实例化一个Gauge
Gauge defaultEnvListenerSizeGauge = new Gauge() {
@Override
public long lastUpdateTime() {
return System.currentTimeMillis();
}
@Override
public Integer getValue() {
return defaultEnv.getAllListeners().size();
}
};
// 注册到容器中
DiamondMetric.getMetricRegistry().register(
"middleware.diamond.defaultEnv.listenerSize", defaultEnvListenerSizeGauge);
}
}
}
return defaultEnv;
}
使用双重校验锁机制初始化defaultEnv单实例
public ServerListManager() {
isFixed = false;
isStarted = false;
name = DEFAULT_NAME;
}
static {
String defaultServerPort = "8080";
if (TlsUtil.tlsEnable()) {
defaultServerPort = "443";
}
serverPort = System.getProperty("diamond.server.port", defaultServerPort);
log.info("settings","[req-serv] diamond-server port:{}", serverPort);
DIAMOND_SERVER_IPS = System.getProperty("DIAMOND.SERVER.IPS","");
if(!StringUtils.isBlank(DIAMOND_SERVER_IPS)) {
try {
String[] ips = DIAMOND_SERVER_IPS.split(",");
for(String ip : ips) InetAddress.getByName(ip);
} catch (UnknownHostException e) {
final String msg = "[custom-serverlist] invalid custom server ips:" + DIAMOND_SERVER_IPS;
log.error("settings", "DIAMOND-XXXX", msg, e);
throw new IllegalArgumentException(msg, e);
}
log.info("settings", "[custom-serverlist] use custom server ips:{}", DIAMOND_SERVER_IPS);
}
ADDRESS_SERVER_URL = "http://"+ServerHttpAgent.domainName+":" + ServerHttpAgent.addressPort + "/diamond-server/diamond?labels=" + getLabelString();
log.info("ADDRESS_SERVER_URL=" + ADDRESS_SERVER_URL);
}
初始化ServerListManager,主要用户初始化服务器URL,比如:http://xxx.xxx.net:8080/diamond-server/diamond?labels=app:app-name,site:site_test,unit:Center.center,stage:DAILY,
protected DiamondEnv(ServerListManager serverListMgr) {
serverListMgr.setEnv(this);
try {
PER_TASK_CONFIG_SIZE = Double.valueOf(System.getProperty("PER_TASK_CONFIG_SIZE", "3000"));
log.warn("PER_TASK_CONFIG_SIZE:", PER_TASK_CONFIG_SIZE);
} catch (Throwable t) {
log.error("PER_TASK_CONFIG_SIZE", "PER_TASK_CONFIG_SIZE invalid", t);
}
initServerManager(serverListMgr);
cacheMap = new AtomicReference>(new HashMap());
worker = new ClientWorker(this);
}
初始化DiamondEnv,会同时初始化serverListManager, cacheMap, clientWorker
public void initServerManager(ServerListManager _serverMgr) {
_serverMgr.setEnv(this);
serverMgr = _serverMgr;
serverMgr.start();
agent = new ServerHttpAgent(serverMgr);
}
public synchronized void start() {
if (MockUtil.isMock()) {
return;
}
if (isStarted || isFixed) {
return;
}
GetServerListTask getServersTask = new GetServerListTask(ADDRESS_SERVER_URL);
for (int i = 0; i < 5 && serverUrls.isEmpty(); ++i) {
getServersTask.run();
try {
Thread.sleep((i + 1) * 100L);
} catch (Exception e) {
}
}
if (serverUrls.isEmpty()) {
log.error("Diamond-0008", LoggerHelper.getErrorCodeStr("Diamond", "Diamond-0008", "环境问题",
"fail to get diamond-server serverlist! env:" + name + ", not connnect url:" + ADDRESS_SERVER_URL));
log.error(name, "DIAMOND-XXXX", "[init-serverlist] fail to get diamond-server serverlist!");
RuntimeException e = new RuntimeException(
"fail to get diamond-server serverlist! env:" + name + ", not connnect url:" + ADDRESS_SERVER_URL);
// 这里会往System.err输出异常栈
e.printStackTrace();
throw e;
}
TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
isStarted = true;
}
serverListManger启动后,会同步循环5次获取diamond-server地址,如果获取不到则抛异常启动失败,获取成功后会启动定时任务每隔30s就刷新一次服务端地址。
class GetServerListTask implements Runnable {
final String url;
GetServerListTask(String url) {
this.url = url;
}
@Override
public void run() {
// 默认(local)集群ips,可以通过启动参数设置
if(!StringUtils.isEmpty(DIAMOND_SERVER_IPS) && DEFAULT_NAME.equals(name)) {
List customIps = new ArrayList();
String[] ips = DIAMOND_SERVER_IPS.split(",");
for(String ip : ips) {
customIps.add(ip);
}
updateIfChanged(customIps);
return;
}
// get server ips from jmenv
try {
updateIfChanged(getApacheServerList(url, name));
} catch (Throwable e) {
log.error(name, "DIAMOND-XXXX", "[update-serverlist] failed to update serverlist from address server!",
e);
}
}
}
服务端IP地址优先从启动参数获取,没有配置就从之前设置好的URL获取。
// 从地址服务器拿地址列表,返回NULL表示遇到服务器故障。
static List getApacheServerList(String url, String name) {
try {
HttpResult httpResult = HttpSimpleClient.httpGet(url, null, null, null, 3000);
if (200 == httpResult.code) {
if (DEFAULT_NAME.equals(name) ) {
EnvUtil.setSelfEnv(httpResult.headers);
}
List lines = IOUtils.readLines(new StringReader(httpResult.content));
List result = new ArrayList(lines.size());
for (String line : lines) {
if (null == line || line.trim().isEmpty()) {
continue;
} else {
String[] ipPort = line.trim().split(":");
String ip = ipPort[0].trim();
if (ipPort.length > 1) {
ipPortMap.put(ip, ipPort[1].trim());
}
result.add(ip);
}
}
return result;
} else {
log.error(ADDRESS_SERVER_URL, "DIAMOND-XXXX", "[check-serverlist] error. code={}", httpResult.code);
return null;
}
} catch (IOException e) {
log.error("Diamond-0001", LoggerHelper.getErrorCodeStr("Diamond", "Diamond-0001", "环境问题",e.toString()));
log.error(ADDRESS_SERVER_URL, "DIAMOND-XXXX", "[check-serverlist] exception. msg={}", e.toString(), e);
return null;
}
}
5.2.2 同步获取配置
Diamond将从三个地方获取配置信息,依次是:本地容灾目录、服务器、本地缓存,可靠性很高。
/**
* 本地容灾(Failover) -> Server -> 本地缓存(Snapshot)
*/
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws DiamondException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
if (MockServer.isTestMode()) {
return MockServer.getConfigInfo(dataId, group, this);
}
// 优先使用本地配置
String content = LocalConfigInfoProcessor.getFailover(this, dataId, group, tenant);
if (content != null) {
log.warn(getName(), "[get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", dataId,
group, tenant, ContentUtils.truncateContent(content));
String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this, dataId, group, tenant);
return doFilter(dataId, group, tenant, content, encryptedDataKey);
}
// 从服务器获取配置
try {
ConfigInfo configInfo = ClientWorker.getServerConfig(this, dataId, group, tenant, false, timeoutMs);
return doFilter(dataId, group, tenant, configInfo);
} catch (DiamondException ioe) {
if (DiamondException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
log.warn("Diamond-0003",
LoggerHelper.getErrorCodeStr("Diamond", "Diamond-0003", "环境问题", "get from server error"));
log.warn(getName(), "[get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", dataId,
group, tenant, ioe.toString());
}
// 服务器异常,从本地缓存SNAPSHOT文件中获取
content = LocalConfigInfoProcessor.getSnapshot(this, dataId, group, tenant);
log.warn(getName(), "[get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", dataId, group,
tenant, ContentUtils.truncateContent(content));
String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeySnapshot(this, dataId, group, tenant);
return doFilter(dataId, group, tenant, content, encryptedDataKey);
}
首先将从本地容灾目录获取配置
1.读取文件内容
- 路径:/home/admin/diamond/default_diamond/data/config-data
2.获取密文数据密钥encryptedDataKey
- 路径:/home/admin/diamond/default_diamond/encrypted-data-key/failover
从服务端获取配置:
/**
* 对于404响应码,返回NULL.
*
* @throws IOException
*/
static ConfigInfo getServerConfig(DiamondEnv env, String dataId, String group, String tenant, boolean notify, long readTimeout) throws DiamondException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
if (MockServer.isTestMode()) {
ConfigInfo configInfo = new ConfigInfo();
configInfo.setContent(MockServer.getConfigInfo(dataId, group, env));
return configInfo;
}
HttpResult result = null;
FastCompass compass = DiamondMetric.getConfigCompass();
long start = System.currentTimeMillis();
long end = 0;
try {
List params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
List headers = null;
if (notify) {
headers = Arrays.asList("notify", String.valueOf(notify));
}
result = env.agent.httpGet("/config.co", headers, params, Constants.ENCODE, readTimeout);
} catch (IOException e) {
log.error(env.getName(), "DIAMOND-XXXX",
"[sub-server] get server config exception, dataId={}, group={}, tenant={}, msg={}", dataId, group,
tenant, e.toString());
compass.record(0, "error");
end = System.currentTimeMillis();
DiamondMetric.getClusterHistogram().update(end - start);
throw new DiamondException(DiamondException.SERVER_ERROR, e.getMessage(), e);
}
compass.record(0, "success");
end = System.currentTimeMillis();
DiamondMetric.getClusterHistogram().update(end - start);
switch (result.code) {
case HttpURLConnection.HTTP_OK:
ConfigInfo configInfo = new ConfigInfo();
configInfo.setDataId(dataId);
configInfo.setGroup(group);
configInfo.setTenant(tenant);
configInfo.setContent(result.content);
setEncryptedDataKey(configInfo, result);
LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, tenant, result.content);
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(env, dataId, group, tenant, configInfo.getEncryptedDataKey());
return configInfo;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, tenant, null);
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(env, dataId, group, tenant, null);
return null;
case HttpURLConnection.HTTP_CONFLICT: {
log.error(env.getName(), "DIAMOND-XXXX",
"[sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}", dataId,
group, tenant);
throw new DiamondException(DiamondException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
log.error(env.getName(), "DIAMOND-XXXX", "[sub-server-error] no right, dataId={}, group={}, tenant={}",
dataId, group, tenant);
throw new DiamondException(result.code, result.content);
}
default: {
log.error(env.getName(), "DIAMOND-XXXX", "[sub-server-error] dataId={}, group={}, tenant={}, code={}",
dataId, group, tenant, result.code);
throw new DiamondException(result.code,
"http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
}
首先确定要调用的服务端IP地址,这里通过随机函数确定顺序,然后发起http get请求,根据返回结果做不同的处理(成功、删除、冲突等)。
服务器IP排序代码如下:
public ServerAddressIterator(List source) {
sorted = new ArrayList();
for (String address : source) {
sorted.add(new RandomizedServerAddress(address));
}
Collections.sort(sorted);
iter = sorted.iterator();
}
static class RandomizedServerAddress implements Comparable {
static Random random = new Random();
String serverIp;
int priority = 0;
int seed;
public RandomizedServerAddress(String ip) {
try {
this.serverIp = ip;
this.seed = random.nextInt(Integer.MAX_VALUE); //change random scope from 32 to Integer.MAX_VALUE to fix load balance issue
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public int compareTo(RandomizedServerAddress other) {
if (priority != other.priority) {
return other.priority - priority;
} else {
return other.seed - seed;
}
}
}
对服务端IP地址进行排序,用random函数随机确定seed。
/**
* 获取本地缓存文件内容。NULL表示没有本地文件或抛出异常。
*/
static public String getSnapshot(DiamondEnv env, String dataId, String group, String tenant) {
if (!SnapShotSwitch.getIsSnapShot()) {
return null;
}
File file = getSnapshotFile(env, dataId, group, tenant);
if (!file.exists() || !file.isFile()) {
return null;
}
try {
return readFile(file);
} catch (IOException ioe) {
log.error(env.getName(), "DIAMOND-XXXX","get snapshot error, " + file + ", " + ioe.toString());
return null;
}
}
如果服务器异常,则将从本地缓存文件中获取配置信息。
5.2.3 动态配置更新
整体流程如下图所示:
添加listener
public void addListeners(String dataId, String group, List listeners) {
group = null2defaultGroup(group);
CacheData cache = addCacheDataIfAbsent(dataId, group);
for (ManagerListener listener : listeners) {
cache.addListener(listener);
}
}
/**
* 添加监听器
*/
public void addListener(ManagerListener listener) {
if (null == listener) {
throw new IllegalArgumentException("listener is null");
}
ManagerListenerWrap wrap = new ManagerListenerWrap(listener);
if (listeners.addIfAbsent(wrap)) {
log.info(env.getName(), "[add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", tenant, dataId, group,
listeners.size());
}
}
/**
* 查询CacheData,不存在时新增。
*/
public CacheData addCacheDataIfAbsent(String dataId, String group) {
CacheData cache = getCache(dataId, group);
if (null != cache) {
return cache;
}
String key = GroupKey.getKeyTenant(dataId, group, TenantUtil.getUserTenant());
cache = new CacheData(this, dataId, group);
synchronized (cacheMap) {
CacheData cacheFromMap = getCache(dataId, group);
// multiple listeners on the same dataid+group and race condition,so double check again
if(null != cacheFromMap) { //other listener thread beat me to set to cacheMap
cache = cacheFromMap;
cache.setInitializing(true); //reset so that server not hang this check
} else {
int taskId = getAllCacheDataSize() / (int) getPER_TASK_CONFIG_SIZE();
cache.setTaskId(taskId);
}
Map copy = new HashMap(cacheMap.get());
copy.put(key, cache);
cacheMap.set(copy);
}
log.info(getName(), "[subscribe] {}", key);
return cache;
}
// cacheData构造函数
public CacheData(DiamondEnv env, String dataId, String group) {
if (null == dataId || null == group) {
throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
}
this.env = env;
this.dataId = dataId;
this.group = group;
this.tenant = TenantUtil.getUserTenant();
listeners = new CopyOnWriteArrayList();
this.isInitializing = true;
this.content = loadCacheContentFromDiskLocal(env, dataId, group, tenant);
this.md5 = getMd5String(content);
this.encryptedDataKey = loadEncryptedDataKeyFromDiskLocal(env, dataId, group, tenant);
}
客户端通过addListeners方法监听配置,可以添加多个listener。通过diamondEnv, dataId, group创建CacheData,listeners是采用线程安全的CopyOnWriteArrayList,isInitializing表示数据是第一次发起监听,将从容灾、缓存目录依次获取content、encryptedDataKey,获取对应的md5值,并把CacheData set到cacheMap中,最后将listener add到CacheData.listeners中。
ClientWork
在初始化DiamondEnv时会一同创建clientWork。
ClientWorker(final DiamondEnv env) {
this.env = env;
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.taobao.diamond.client.Worker." + env.serverMgr.name);
t.setDaemon(true);
return t;
}
});
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.taobao.diamond.client.Worker.longPulling" + env.serverMgr.name);
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
log.error(env.getName(), "DIAMOND-XXXX", "[sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo(DiamondEnv env) {
// 分任务
int listenerSize = env.getAllCacheDataSize();
// 向上取整为批数,3000/次
int longingTaskCount = (int) Math.ceil(listenerSize / env.getPER_TASK_CONFIG_SIZE());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPullingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
clientWork会创建两个线程池,executor每隔10s会触发一次checkConfigInfo,会为cacheMap中的每一个cacheData创建一个长轮询LongPullingRunnable任务检测服务器是否有变更,executorService就是用于执行长轮询任务的。可以看到这个checkConfigInfo分了几批去执行,每批可以check3000个数据。
class LongPullingRunnable implements Runnable {
private int taskId;
public LongPullingRunnable(int taskId) {
this.taskId = taskId;
}
public void run() {
try {
List cacheDatas = new ArrayList();
// check failover config
for (CacheData cacheData : env.getAllCacheDataSnapshot()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(env, cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
log.error("DIAMOND-CLIENT", "get local config info error", e);
}
}
}
List inInitializingCacheList = new ArrayList();
// check server config
List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String key[] = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
CacheData cache = env.getCache(dataId, group, tenant);
ConfigInfo configInfo;
//第一次监听不计入推送延时SLI
if (cache.isInitializing()) {
configInfo = getServerConfig(env, dataId, group, tenant, false, 3000L);
} else {
configInfo = getServerConfig(env, dataId, group, tenant, true, 3000L);
}
String content = null;
String encryptedDataKey = null;
if (configInfo != null) {
content = configInfo.getContent();
encryptedDataKey = configInfo.getEncryptedDataKey();
}
cache.setContent(content);
cache.setEncryptedDataKey(encryptedDataKey);
log.info(env.getName(), "[data-received] dataId={}, group={}, tenant={}, md5={}, content={}", dataId,
group, tenant, cache.getMd5(), ContentUtils.truncateContent(content));
} catch (DiamondException ioe) {
log.error(env.getName(), "DIAMOND-XXXX",
"[get-update] get changed config exception. dataId={}, group={}, tenant={}, msg={}",
dataId, group, tenant, ioe.toString());
}
}
for (CacheData cacheData : cacheDatas) {
// cacheData.checkListenerMd5();
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
} catch (Throwable e) {
log.error("500", "longPulling error", e);
} finally {
executorService.execute(this);
}
}
}
长轮询任务首先会检测配置是否使用容灾目录,分为三种场景:1.未使用容灾配置,但是容灾目录存在,则从容灾目录读取配置信息刷新(content, md5, encryptedDataKey)到cacheData;2.使用容灾配置,但是目录为空,则将配置置为不使用;3.使用容灾配置并且目录存在,比较容灾文件修改时间是否与cacheData时间一致,判断出容灾配置是否做了修改。
/**
* 检查本地容灾文件。及时修改是否使用本地容灾标志位,本地容灾时间戳和md5.
*/
static void checkLocalConfig(DiamondEnv env, CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
File path = LocalConfigInfoProcessor.getFailoverFile(env, dataId, group, tenant);
// 没有 -> 有
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(env, dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(env, dataId, group, tenant);
cacheData.setEncryptedDataKey(encryptedDataKey);
log.warn(env.getName(),
"[failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
// 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
log.warn(env.getName(), "[failover-change] failover file deleted. dataId={}, group={}, tenant={}", dataId,
group, tenant);
return;
}
// 有变更
if (cacheData.isUseLocalConfigInfo() && path.exists()
&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(env, dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(env, dataId, group, tenant);
cacheData.setEncryptedDataKey(encryptedDataKey);
log.warn(env.getName(),
"[failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
}
检测完容灾目录后cacheData被分为两类,1.使用容灾配置的将检测md5值是否变化,然后通知用户程序。
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(env, dataId, group, content, md5, encryptedDataKey, wrap);
}
}
}
static void safeNotifyListener(final DiamondEnv env, final String dataId, final String group, final String content,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final ManagerListener listener = listenerWrap.listener;
Runnable job = new Runnable() {
public void run() {
ClassLoader diamondClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader= listener.getClass().getClassLoader();
try {
if(listener instanceof SharedManagerListenerAdapter){
SharedManagerListenerAdapter adapter = (SharedManagerListenerAdapter) listener;
adapter.fillContext(dataId, group);
log.info(env.getName(), "[notify-context] dataId={}, group={}, md5={}", dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
env.getConfigFilterChainManager().doFilter(null, cr);
String contentTmp = cr.getContent();
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
log.info(
env.getName(),
"[notify-ok] dataId={}, group={}, md5={}, listener={} ",
dataId, group, md5, listener);
} catch (DiamondException de) {
log.error(env.getName(), "DIAMOND-XXXX",
"[notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", dataId,
group, md5, listener, de.getErrCode(), de.getErrMsg());
} catch (Throwable t) {
log.error(env.getName(), "DIAMOND-XXXX",
"[notify-error] dataId={}, group={}, md5={}, listener={} tx={}", dataId, group, md5,
listener, t.getCause());
}
finally
{
Thread.currentThread().setContextClassLoader(diamondClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
log.error(
env.getName(),
"DIAMOND-XXXX",
"[notify-error] dataId={}, group={}, md5={}, listener={} throwable={}",
dataId, group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
log.info(env.getName(), "[notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",(finishNotify - startNotify), dataId ,group, md5, listener);
}
2.未使用容灾配置的将查询diamond-server获取有变更的dataId, group,然后再查询一次详细配置信息。
/**
* 从DiamondServer获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List checkUpdateDataIds(List cacheDatas, List inInitializingCacheList) {
if (MockServer.isTestMode()) {
// 避免 test mode cpu% 过高
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
}
List updateList = new ArrayList();
for (CacheData cacheData : cacheDatas) {
if (cacheData.isInitializing()) {
// cacheData 首次出现在cacheMap中&首次check更新
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
if (!CacheData
.getMd5String(
MockServer.getConfigInfo(cacheData.dataId, cacheData.group, cacheData.tenant, env))
.equals(cacheData.getMd5())) {
updateList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
return updateList;
} else {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// cacheData 首次出现在cacheMap中&首次check更新
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
}
将所有cacheData里的dataId,group,md5,tenant拼接成字符串,并识别出是否有首次出现的配置。
/**
* 从DiamondServer获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) {
List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
long timeout = TimeUnit.SECONDS.toMillis(30L);
List headers = new ArrayList(2);
headers.add("longPullingTimeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("longPullingNoHangUp");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return Collections.emptyList();
}
try {
HttpResult result = env.agent.httpPostLongPolling("/config.co", headers, params, Constants.ENCODE, timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(env, result.content);
} else {
setHealthServer(false);
if (result.code == 500) {
log.error("Diamond-0007", LoggerHelper.getErrorCodeStr("Diamond", "Diamond-0007", "环境问题",
"[check-update] get changed dataId error"));
}
log.error(env.getName(), "DIAMOND-XXXX", "[check-update] get changed dataId error, code={}",
result.code);
}
} catch (IOException e) {
setHealthServer(false);
log.error(env.getName(), "DIAMOND-XXXX", "[check-update] get changed dataId exception, msg={}",
e.toString());
}
return Collections.emptyList();
}
构造http请求,header添加longPullingTimeout=30s,表示这是长轮询请求,服务器会根据这个参数判断在配置未变更的情况下是否hold请求,如果有首次出现的配置,则添加longPullingNoHangUp=true,之后发起http post请求。
URL:http(s)://ip:port/diamond-server
获取到变更的dataId, group后,将查询服务器具体配置(流程见 #5.2.2),然后将配置信息刷新到cacheData里(content、md5、encryptedDataKey)。
最后遍历cacheData列表,如果cacheData不为首次出现或者在首次出现列表里,将检测md5是否变化调用cacheData的checkListenerMd5方法,变更的配置将回调receive方法通知应用程序,再将cacheData的initializing置为false。
for (CacheData cacheData : cacheDatas) {
// cacheData.checkListenerMd5();
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
六、服务端功能分析
服务端的代码需要下载diamond代码才能看到,gitlab地址:http://code.aone.alibaba-inc.com/middleware-diamond/diamond-server
diamond-server是基于spring mvc框架的web应用,通过controller与diamond-client、diamond-ops交互,以jsp返回http状态码对应的页面。
6.1 同步获取数据
客户端通过dataId, group查询配置详情,具体的controller地址为:com.taobao.diamond.server.controller.ConfigServlet#doGet
/**
* 取数据
*/
@RequestMapping(method = RequestMethod.GET)
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// check params
String dataId = request.getParameter("dataId");
if (StringUtils.isBlank(dataId)) {
forwardErrorPage(request, response, HttpServletResponse.SC_BAD_REQUEST);
return;
}
String group = request.getParameter("group");
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, clientIp);
}
com.taobao.diamond.server.controller.ConfigServletInner#doGetConfig
/**
* 同步配置获取接口
*/
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group, String clientIp)
throws IOException, ServletException {
final String groupKey = GroupKey2.getKey(dataId, group);
int lockResult = tryConfigReadLock(request, response, groupKey);
final String requestIp = RequestUtil.getRemoteIp(request);
if (lockResult > 0) {
FileInputStream fis = null;
try {
File file = DiskUtil.targetFile(dataId, group);
fis = new FileInputStream(file);
String md5 = ConfigService.getContentMd5(groupKey);
long lastModified = ConfigService.getLastModifiedTs(groupKey);
response.setHeader(Constants.CONTENT_MD5, md5);
response.setHeader("Pragma", "no-cache"); // 禁用缓存
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setDateHeader("Last-Modified", file.lastModified());
fis.getChannel().transferTo(0L, fis.getChannel().size(),
Channels.newChannel(response.getOutputStream()));
LogUtil.pullCheckLog.warn("{}|{}|{}|{}", new Object[]{groupKey,requestIp,md5, TimeUtil.getCurrentTime()});
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get 否则无法直接把delayed作为推送延时的依据,因为主动get请求的delayed值都很大
ConfigTraceService.logPullEvent(dataId, group, lastModified, ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);
} finally {
releaseConfigReadLock(groupKey);
if (null != fis) fis.close();
}
} else if (lockResult == 0) {
// FIXME CacheItem 不存在了无法简单的计算推送delayed,这里简单的记做-1
ConfigTraceService.logPullEvent(dataId, group, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
//pullLog.info("[client-get] clientIp={}, {}, no data", new Object[]{clientIp, groupKey});
ConfigServlet.forwardErrorPage(request, response, HttpServletResponse.SC_NOT_FOUND);
return HttpServletResponse.SC_NOT_FOUND + "";
} else {
// ConfigTraceService.logPullEvent(dataId, group, -1, ConfigTraceService.PULL_EVENT_CONFLICT, -1, requestIp);
pullLog.info("[client-get] clientIp={}, {}, get data during dump", new Object[]{clientIp, groupKey});
ConfigServlet.forwardErrorPage(request, response, HttpServletResponse.SC_CONFLICT);
return HttpServletResponse.SC_CONFLICT + "";
}
return HttpServletResponse.SC_OK + "";
}
通过dataId, group构造groupKey,并尝试重试10次获取读锁。
private static int tryConfigReadLock(String groupKey) throws IOException, ServletException {
int lockResult = -1; // 默认加锁失败
// 尝试加锁,最多10次
for (int i = 9; i >= 0; --i) {
lockResult = ConfigService.tryReadLock(groupKey);
if (0 == lockResult) { // 数据不存在
break;
}
if (lockResult > 0) { // success
break;
}
if (i > 0) { // retry
try {
Thread.sleep(1);
} catch (Exception e) {
}
}
}
return lockResult;
}
static private final ConcurrentHashMap cache =
new ConcurrentHashMap();
/**
* 给数据加读锁。如果成功,后面必须调用{@link #releaseReadLock(String)},失败则不需要。
*
* @param groupKey
* @return 零表示没有数据,失败。正数表示成功,负数表示有写锁导致加锁失败。
*/
static public int tryReadLock(String groupKey) {
CacheItem groupItem = cache.get(groupKey);
int result = (null == groupItem) ? 0 : (groupItem.rwLock.tryReadLock() ? 1 : -1);
if (result < 0) {
defaultLog.warn("[read-lock] failed, {}, {}", result, groupKey);
}
return result;
}
简单读些锁实现如下:
/**
* 最简单的读写锁实现。要求加锁和解锁必须成对调用。
*
*/
public class SimpleReadWriteLock {
public synchronized boolean tryReadLock() {
if (isWriteLocked()) {
return false;
} else {
status++;
return true;
}
}
public synchronized void releaseReadLock() {
status--;
}
public synchronized boolean tryWriteLock() {
if (!isFree()) {
return false;
} else {
status = -1;
return true;
}
}
public synchronized void releaseWriteLock() {
status = 0;
}
private boolean isWriteLocked() {
return status < 0;
}
private boolean isFree() {
return status == 0;
}
// ================
// 零表示没有锁;负数表示加写锁;正数表示加读锁,数值表示读锁的个数。
private int status = 0;
}
获取到锁之后,就从服务器磁盘里读取配置信息对应的文件,然后将文件流写入HttpServletResponse 的outPutStream中,这里运用了零拷贝的操作。
文件路径:/home/admin/diamond/data/beta-data/group/dataId
fis.getChannel().transferTo(0L, fis.getChannel().size(),
Channels.newChannel(response.getOutputStream()));
6.2 长轮询检测配置变更
客户端通过dataId, group查询配置详情,具体的controller地址为:com.taobao.diamond.server.controller.ConfigServlet#doGet
/**
* 比较MD5
*/
@RequestMapping(method = RequestMethod.POST)
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
String probeModify = request.getParameter(Constants.PROBE_MODIFY_REQUEST);
if (StringUtils.isBlank(probeModify)) {
throw new IOException("invalid probeModify");
}
Map clientMd5Map = ConfigController.getClientMd5Map(probeModify);
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
解析header参数Probe-Modify-Request为map,key: dataId+group value: md5
com.taobao.diamond.server.controller.ConfigServletInner#doPollingConfig
/**
* 轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize) throws IOException, ServletException {
// 长轮询
if (LongPullingService.isSupportLongPulling(request)) {
longPullingService.addLongPullingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// else 兼容短轮询逻辑
List changedGroups = ConfigController.compareMd5(clientMd5Map);
// 兼容短轮询result
String oldResult = ConfigController.compareMd5OldResult(changedGroups);
String newResult = ConfigController.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
if (versionNum < 204) { // 2.0.4版本以前, 返回值放入header中
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
// 禁用缓存
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
ConfigServlet.forwardErrorPage(request, response, HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
启动长轮询任务,内置定时任务,每个10s执行一次statTask任务。
public LongPullingService() {
allSubs = new LinkedList();
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.taobao.diamond.LongPulling");
return t;
}
});
scheduler.scheduleWithFixedDelay(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
}
class StatTask implements Runnable {
@Override
public void run() {
memoryLog.info("[long-pulling] client count " + allSubs.size());
}
}
提交ClientLongPulling给长轮询任务执行。
public void addLongPullingClient(HttpServletRequest req, HttpServletResponse rsp,
Map clientMd5Map, int probeRequestSize) {
// 一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
scheduler.execute(new ClientLongPulling(asyncContext, clientMd5Map, probeRequestSize));
}
class ClientLongPulling implements Runnable {
@Override
public void run() {
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
List changedGroups = ConfigController.compareMd5(
clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups, System.currentTimeMillis());
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
new Object[]{ (System.currentTimeMillis() - createTime),
"instant", RequestUtil.getRemoteIp(request), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size()});
return;
}
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
allSubs.remove(ClientLongPulling.this); // 删除订阅关系
sendResponse(null, 0L);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
new Object[]{ (System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling",
clientMd5Map.size(), probeRequestSize});
}
}, getLongPullingTimeout(request), TimeUnit.MILLISECONDS);
allSubs.add(this);
}
void sendResponse(List changedGroups, long changeTime) {
if (null != asyncTimeoutFuture) { // 取消超时任务
asyncTimeoutFuture.cancel(false);
}
generateResponse(changedGroups);
asyncContext.complete(); // 告诉容器发送HTTP响应
}
void generateResponse(List changedGroups) {
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
//String clientIp = RequestUtil.getRemoteIp(request);
//long now = System.currentTimeMillis();
if (null == changedGroups) {
//pullLog.info("[long-pulling] " + clientIp + ", timeout delayMs " + (now - createTime));
return;
}
try {
String respString = ConfigController.compareMd5ResultString(changedGroups);
request.setAttribute("content", respString);
// 禁用缓存
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
ConfigServlet.forwardErrorPage(request, response, HttpServletResponse.SC_OK);
/*
pullLog.info("[long-pulling] send response to " + clientIp + ", delayMs="
+ (now - createTime) + ", notifyDelay=" + (now - changeTime));
*/
} catch (Exception se) {
pullLog.error(se.toString(), se);
}
}
ClientLongPulling(AsyncContext ac, Map clientMd5Map, int probeRequestSize) {
this.asyncContext = ac;
this.clientMd5Map = clientMd5Map;
this.probeRequestSize = probeRequestSize;
this.createTime = System.currentTimeMillis();
}
// =================
final AsyncContext asyncContext;
final Map clientMd5Map;
final long createTime;
final int probeRequestSize;
Future asyncTimeoutFuture;
}
ConfigController通过比较客户端md5和服务端md5,识别出是否有配置变更。如果有变更则直接返回,没有则创建创建延时任务,超时时间为30s-500ms,到时无论配置是否变更都直接返回。
static long getLongPullingTimeout(HttpServletRequest req) {
String str = req.getHeader(LongPullingService.LONG_PULLING_HEADER);
return Math.max(10000, Long.parseLong(str) - 500); // 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动
}
static public List compareMd5(Map clientMd5Map) {
List changedGroupKeys = new ArrayList();
for (Map.Entry entry : clientMd5Map.entrySet()) {
String groupKey = entry.getKey();
String clientMd5 = entry.getValue();
boolean isUptodate = ConfigService.isUptodate(groupKey, clientMd5);
if (!isUptodate) {
changedGroupKeys.add(groupKey);
}
}
return changedGroupKeys;
}
static public boolean isUptodate(String groupKey, String md5) {
String serverMd5 = ConfigService.getContentMd5(groupKey);
return StringUtils.equals(md5, serverMd5);
}
/**
* 返回cache的md5。零长度字符串表示没有该数据。
*/
static public String getContentMd5(String groupKey) {
CacheItem item = cache.get(groupKey);
return (null != item) ? item.md5 : Constants.NULL;
}
6.3 配置变更
6.3.1 接收通知
服务端处理更新任务的controller为NotifyController,对应的方法为notifyConfigInfo,这里要做的操作是更改本地文件的配置信息。首先会产生一个DumpTask,并交由TaskManager进行处理。对应的DumpTask有一个任务处理器DumpProcess。dump任务处理器会先找到变更的数据,ConfigInfo,如果找到了,那么就将最新变更的数据写入本地文件,然后更新md5信息。类似于客户端的CachgData,客户端维持了一个CacheItem,这个cache里面只保存了key(group+dataId)和对应的md5。并且会触发一个LocalDataChangeEnvent。LongPullingService会对该事件做响应,会执行一个DataChangeTask,该task会遍历现在所有的订阅,如果订阅到了变化的数据。就返回长轮询的结果。
@Controller
@RequestMapping("/notify.do")
public class NotifyController {
@Autowired
private DumpService dumpService;
/**
* 通知配置信息改变
*/
@RequestMapping(method = RequestMethod.GET, params = "method=notifyConfigInfo")
public String notifyConfigInfo(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId")String dataId,
@RequestParam("group")String group) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
dumpService.dump(dataId, group, lastModifiedTs, handleIp);
return String.valueOf(HttpServletResponse.SC_OK);
}
}
@Autowired
public DumpService(PersistService persistService) {
DiskUtil.clearAll();
this.persistService = persistService;
DumpProcessor processor = new DumpProcessor(this);
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
dumpTaskMgr = new TaskManager("com.taobao.diamond.server.DumpTaskManager");
dumpTaskMgr.setDefaultTaskProcessor(processor);
dumpTaskMgr.addProcessor(DumpAllTask.taskId, dumpAllProcessor);
Runnable dumpAll = new Runnable() {
@Override
public void run() {
dumpTaskMgr.addTask(DumpAllTask.taskId, new DumpAllTask());
}
};
TimerTaskService.scheduleWithFixedDelay(dumpAll, dumpAllIntervalInHour,
dumpAllIntervalInHour, TimeUnit.HOURS);
// initial dump all
dumpAllProcessor.process(DumpAllTask.taskId, new DumpAllTask());
}
class DumpProcessor implements TaskProcessor {
DumpProcessor(DumpService dumpService) {
this.dumpService = dumpService;
}
@Override
public boolean process(String taskType, Task task) {
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.groupKey);
String dataId = pair[0];
String group = pair[1];
long lastModified = dumpTask.lastModified;
String handleIp = dumpTask.handleIp;
ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group);
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
if (null != cf) {
AggrWhitelist.load(cf.getContent());
} else {
AggrWhitelist.load(null);
}
}
if (dataId.equals(ACLService.ACL_REGISTERAPP_DATAID)) {
if (null != cf) {
ACLService.loadRegisterApp(cf.getContent());
} else {
ACLService.loadRegisterApp(null);
}
}
if (dataId.equals(ACLService.ACL_TRUSTIPS_DATAID)) {
if (null != cf) {
ACLService.loadTrustIps(cf.getContent());
} else {
ACLService.loadTrustIps(null);
}
}
long delayed = System.currentTimeMillis() - lastModified;
boolean result;
if (null != cf) {
result = ConfigService.dump(dataId, group, cf.getContent(), lastModified);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, delayed, cf.getContent().length());
}
} else {
result = ConfigService.remove(dataId, group);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, delayed, 0);
}
}
return result;
}
// =====================
final DumpService dumpService;
}
/**
* 保存配置文件,并缓存md5.
*/
static public boolean dump(String dataId, String group, String content, long lastModifiedTs) {
final String groupKey = GroupKey2.getKey(dataId, group);
makeSure(groupKey);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
DiskUtil.saveToDisk(dataId, group, content);
final String md5 = MD5.getInstance().getMD5String(content);
updateMd5(groupKey, md5, lastModifiedTs);
/*
dumpLog.info("[dump-ok] {}, length={}, md5={}, content={}", new Object[] { groupKey,
content.length(), md5, ContentUtils.truncateContent(content) });
*/
return true;
} catch (IOException ioe) {
dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(),
ioe);
return false;
} finally {
releaseWriteLock(groupKey);
}
}
static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (!cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
}
}
6.3.2 更新本地
客户端推送数据时请求的url是/basestone.do?method=syncUpdateAll,我们直接看对应的处理controller的方法,BaeStoneController。syncUpdateConfigAll(). 该方法先校验参数,然后判断权限,之后会将dataId,group,content等参数构造为ConfigInfo数据,然后persistService将数据持久化. 并且发布一个ConfigDataChangeEnvent,让事件的Listener去处理相应的数据变更。persistService是持久化服务专门操作数据库。这个地方使用insertOrUpdate方法,在插入失败的情况下再执行update操作。所有的写操作都会记录到表his_config_info,作为历史操作保存。最新的值会保存在config_info表中。
我遇到的场景中这个功能并不常用,diamond变更一般是在diamond-ops操作,这里不做展开。
6.3.3 发送消息
通过diamond-ops可以更改配置信息,Diamond为集群部署,通过http请求通知多实例更新配置信息。
com.taobao.diamond.server.service.notify.NotifyTaskProcessor
@Override
public boolean process(String taskType, Task task) {
NotifyTask notifyTask = (NotifyTask) task;
String dataId = notifyTask.getDataId();
String group = notifyTask.getGroup();
long lastModified = notifyTask.getLastModified();
boolean isok = true;
for (String ip : serverListService.getServerList()) {
isok = notifyToDump(dataId, group,lastModified, ip) && isok;
}
return isok;
}
notifytask会遍历服务器列表,然后发起http get请求,请求地址为:
http://{0}:8080/diamond-server/notify.do?method=notifyConfigInfo&dataId={1}&group={2}
/**
* 通知其他server
*/
boolean notifyToDump(String dataId, String group,long lastModified, String serverIp) {
long delayed = System.currentTimeMillis() - lastModified;
try {
// XXX 為了方便系统beta,不改变notify.do接口,新增lastModifed参数通过Http header传递
List headers = Arrays.asList(
NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(lastModified),
NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, SystemConfig.LOCAL_IP);
String urlString = MessageFormat.format(URL_PATTERN, serverIp, dataId, group);
HttpResult result = NotifyService.invokeURL(urlString, headers, Constants.ENCODE);
if (result.code == 200) {
//log.info("[notify-ok] {}, {}, to {}", new Object[] { dataId, group, serverIp });
ConfigTraceService.logNotifyEvent(dataId, group, lastModified, SystemConfig.LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_OK, delayed, serverIp);
return true;
} else {
log.error("[notify-error] {}, {}, to {}, result {}", new Object[] { dataId, group,
serverIp, result.code });
// ConfigTraceService.logNotifyEvent(dataId, group, lastModified, ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, serverIp);
return false;
}
} catch (Exception e) {
log.error(
"[notify-exception] " + dataId + ", " + group + ", to " + serverIp + ", "
+ e.toString(), e);
// ConfigTraceService.logNotifyEvent(dataId, group, lastModified, ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, serverIp);
return false;
}
}
static final String URL_PATTERN = "http://{0}:8080/diamond-server/notify.do?method=notifyConfigInfo&dataId={1}&group={2}";
static public HttpResult invokeURL(String url, List headers, String encoding) throws IOException {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) new URL(url).openConnection();
conn.setConnectTimeout(TIMEOUT);
conn.setReadTimeout(TIMEOUT);
conn.setRequestMethod("GET");
if (null != headers && !StringUtils.isEmpty(encoding)) {
for (Iterator iter = headers.iterator(); iter.hasNext();) {
conn.addRequestProperty(iter.next(), iter.next());
}
}
conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
conn.connect(); // 建立TCP连接
int respCode = conn.getResponseCode(); // 这里内部发送请求
String resp = null;
if (HttpServletResponse.SC_OK == respCode) {
resp = IOUtils.toString(conn.getInputStream());
} else {
resp = IOUtils.toString(conn.getErrorStream());
}
return new HttpResult(respCode, resp);
} finally {
if (conn != null) {
conn.disconnect();
}
}
}
NotifyController接口收到请求,dump方法
@Controller
@RequestMapping("/notify.do")
public class NotifyController {
@Autowired
private DumpService dumpService;
/**
* 通知配置信息改变
*
*/
@RequestMapping(method = RequestMethod.GET, params = "method=notifyConfigInfo")
public String notifyConfigInfo(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId")String dataId,
@RequestParam("group")String group) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
dumpService.dump(dataId, group, lastModifiedTs, handleIp);
return String.valueOf(HttpServletResponse.SC_OK);
}
}
dumpService会创建一个有10个核心线程数的线程池。
/**
* 定时任务服务
*/
public class TimerTaskService {
private static ScheduledExecutorService scheduledExecutorService = Executors
.newScheduledThreadPool(10, new ThreadFactory() {
AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.taobao.diamond.server.Timer-" + count.getAndIncrement());
return t;
}
});
static public void scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
@Service
public class DumpService {
@Autowired
public DumpService(PersistService persistService) {
DiskUtil.clearAll();
this.persistService = persistService;
DumpProcessor processor = new DumpProcessor(this);
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
dumpTaskMgr = new TaskManager("com.taobao.diamond.server.DumpTaskManager");
dumpTaskMgr.setDefaultTaskProcessor(processor);
dumpTaskMgr.addProcessor(DumpAllTask.taskId, dumpAllProcessor);
Runnable dumpAll = new Runnable() {
@Override
public void run() {
dumpTaskMgr.addTask(DumpAllTask.taskId, new DumpAllTask());
}
};
TimerTaskService.scheduleWithFixedDelay(dumpAll, dumpAllIntervalInHour,
dumpAllIntervalInHour, TimeUnit.HOURS);
// initial dump all
dumpAllProcessor.process(DumpAllTask.taskId, new DumpAllTask());
}
public void dump(String dataId, String group, long lastModified, String handleIp) {
String groupKey = GroupKey2.getKey(dataId, group);
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp));
}
// =====================
static final int dumpAllIntervalInHour = 6; // 全量dump间隔
final PersistService persistService;
final TaskManager dumpTaskMgr;
}
dumpService通过线程池执行DumpAllProcessor.process方法。
class DumpAllProcessor implements TaskProcessor {
DumpAllProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.persistService = dumpService.persistService;
}
@Override
public boolean process(String taskType, Task task) {
int rowCount = persistService.configInfoCount();
int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE);
int actualRowCount = 0;
for (int pageNo = 1; pageNo <= pageCount; pageNo++) {
Page page = persistService.findAllConfigInfoForDumpAll(pageNo, PAGE_SIZE);
if (page != null) {
for (PersistService.ConfigInfoWrapper cf : page.getPageItems()) {
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
if (cf.getDataId().equals(ACLService.ACL_REGISTERAPP_DATAID)) {
ACLService.loadRegisterApp(cf.getContent());
}
if (cf.getDataId().equals(ACLService.ACL_TRUSTIPS_DATAID)) {
ACLService.loadTrustIps(cf.getContent());
}
boolean result = ConfigService.dump(cf.getDataId(), cf.getGroup(), cf.getContent(), cf.getLastModified());
/*
ConfigTraceService.logDumpAllEvent(cf.getDataId(), cf.getGroup(), cf.getLastModified(),
result ? ConfigTraceService.DUMP_EVENT_OK : ConfigTraceService.DUMP_EVENT_ERROR);
*/
final String content = cf.getContent();
final String md5 = MD5.getInstance().getMD5String(content);
LogUtil.dumpLog.info("[dump-all-ok] {}, {}, length={}, md5={}", new Object[]{
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(),
content.length(), md5});
}
actualRowCount += page.getPageItems().size();
defaultLog.info("[all-dump] {} / {}", actualRowCount, rowCount);
}
}
return true;
}
// =====================
static final int PAGE_SIZE = 1000;
final DumpService dumpService;
final PersistService persistService;
}
process会从DB查询最新的配置信息,然后通过ConfigService.dump方法,将配置信息更新到服务端磁盘文件,然后更新服务端缓存(content、md5、更新时间),这样diamond-server集群各实例缓存的配置都得到了更新。
/**
* 保存配置文件,并缓存md5.
*/
static public boolean dump(String dataId, String group, String content, long lastModifiedTs) {
final String groupKey = GroupKey2.getKey(dataId, group);
makeSure(groupKey);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
DiskUtil.saveToDisk(dataId, group, content);
final String md5 = MD5.getInstance().getMD5String(content);
updateMd5(groupKey, md5, lastModifiedTs);
/*
dumpLog.info("[dump-ok] {}, length={}, md5={}, content={}", new Object[] { groupKey,
content.length(), md5, ContentUtils.truncateContent(content) });
*/
return true;
} catch (IOException ioe) {
dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(),
ioe);
return false;
} finally {
releaseWriteLock(groupKey);
}
}
static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (!cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
}
}
七、注意事项
摘抄自diamond官方文档
Diamond的比较适合低频变更的动态配置管理,对于配置的变更只保证最终一致性,不保证过程一致性;要求接入的业务满足幂等性,有部分用户拿Diamond做事件通知,可能会导致事件丢失。
- 低频:使用后台配置管理系统应满足更新频率低的原则,配置管理系统定位不是消息中间件,本身的性能是有限的。配置项变更应该由后台人工触发(不管是直接在Diamond制台操作还是通过其他业务后台间接调用),通过一些外部触发条件触发配置变更的使用方式是很危险的,如果外部环境流量出现高峰,势必会对Diamond的变更频率变高。如果同一个配置项的变更频率达到了一分钟一次的量级,肯定是不正常的。
- 最终一致性:Diamond只保证最后一次推送的值一定会到达,但不保证中间的每一次变更都会送达订阅端。配置项的值保存到Diamond数据库中是覆盖式写的,假如一个配置项很快连续变更多次,等客户端陆续来Diamond读取值的时候,很可能只拿到最新值,而感知不到中间的快速变更。配置中心本身也会对时间间隔很短的推送任务做优化,只推最新的版本。Diamond的所有可靠性保障措施,也都是围绕最终一致来设计的。使用Diamond管理配置,不可以使用增量模式,必须确保每次推送的都是全量值。
- 幂等性:Diamond要求业务满足幂等性,最后一次集群推送的配置值,如果重复推送,不应该有业务影响。