elasticsearch indices.recovery 流程分析(索引的_open操作也会触发recovery)——主分片recovery主要是从translog里恢复之前未写完的index,副分片recovery主要是从主分片copy segment和translog来进行恢复

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介:

摘自:https://www.easyice.cn/archives/231

elasticsearch indices.recovery 流程分析与速度优化



基于版本:5.5.3

recovery 是 es 数据恢复,保持数据一致性的过程,触发条件包括:从快照备份恢复,节点加入和离开,索引的_open操作等.

recovery 由clusterChanged触发,进入到:

根据数据分片性质,分为主分片和副本分片恢复流程.
主分片从 translog 自我恢复,副本分片从主分片拉取数据进行恢复.

经历的阶段为:

init: Recovery has not started
index: Reading index meta-data and copying bytes from source to destination
start: Starting the engine; opening the index for use
translog: Replaying transaction log
finalize: Cleanup
done: Complete

触发,进入

每次处理一个 shard

主分片恢复流程


实现的主要思路是:系统的每次 flush 操作会清理相关 translog, 因此 translog 中存在的数据就是 lucene 索引中可能尚未刷入的数据,主分片的 recovery 就是把 translog 中的内容转移到 lucene.

具体做法是:把当前 translog 做快照,重放每条记录,调用标准的index 操作创建或更新 doc来恢复,然后再处理recovery期间新写入的数据.

路径:org/elasticsearch/index/shard/StoreRecoveryService.java
在新的线程池任务中执行:

然后会进入InternalEngine构造函数:

skipInitialTranslogRecovery一定为 false, 进入recoverFromTranslog,从 translog 做个快照,挨个恢复:

重放完毕后,如果重放写入的数据大于0,则 flush, 否则写一个 synced flush id:syncId

副本分片恢复流程


从主分片恢复到副本分片主要有两个阶段(在主分片节点执行):
phase1 对比分段信息,如果 syncid 相同且 doc 数量相同,则跳过,否则复制整个分段
phase2 将当前 translog 做快照,发送所有的 translog operation 到对端节点,不限速

恢复过程中的数据传输方向,主分片节点为 Source,副本分片节点为 Target
主要处理逻辑:副本分片节点为 RecoveryTarget类,主分片节点为 RecoverySource 类.
首先,副本分片的恢复也会启动一个新的线程池任务:

任务处理模块:indices/recovery/RecoveryTarget.java
在doRecovery函数中,将本次要恢复的 shard 相关信息,如 shardid,metadataSnapshot 重要的是metadataSnapshot中包含 syncid等,封装成 StartRecoveryRequest ,RPC 发送出去:

对端(主分片节点)处理模块:/indices/recovery/RecoverySource.java
入口:StartRecoveryTransportRequestHandler.messageReceived

主要处理逻辑:RecoverySourceHandler.recoverToTarget()

在第一阶段,值得注意的是关于 syncid 的处理,如果两个分片有一致的 syncid, 且 doc 数相同,则跳过第一阶段.

在第二阶段,从当前translogView进行快照后批量发送,
对端的处理模块:RecoveryTarget.TranslogOperationsRequestHandler
主要是调用 recoveryStatus.indexShard().performBatchRecovery重放 translog

recovery 慢的原因分析

最慢的过程在于副本分片恢复的第一阶段,各节点单独执行分段合并逻辑,合并后的分段基本不会相同,所以拷贝 lucene 分段是最耗时的,其中有一些相关的限速配置:

即使关闭限速,这个阶段仍然可能非常漫长,目前最好的方式就是先执行 synced flush, 但是 syncd flush 并且本身也可能比较慢,因为我们常常为了优化写入速度而加大 translog 刷盘周期,也会延长 translog 恢复阶段时间

在 es 6.0中再次优化这个问题,思路是给每次写入成功的操作都分配一个序号,通过对比序号就可以计算出差异范围,在实现方式上, 添加了global checkpoint 和 local checkpoint,checkpoint,主分片负责维护global checkpoint,代表所有分片都已写入到了这个序号的位置,local checkpoint代表当前分片已写入成功的最新位置,恢复时通过对比两个序列号,计算出缺失的数据范围,然后通过translog重放这部分数据,同时 translog 会为此保留更长的时间.

参考:
https://www.elastic.co/blog/elasticsearch-sequence-ids-6-0
https://github.com/elastic/elasticsearch/issues/10708

synced flush 机制

es 为了解决副本分片恢复过程第一阶段的漫长过程引入synced flush,默认情况下5分钟没有写入操作的索引被标记为inactive,执行 synced flush,生成一个唯一的 syncid,写入到所有 shard, 这个 syncid是shard 级,拥有相同syncid的 shard具有相同的 lucene 索引.

synced flush的实现思路是先执行普通的 flush 操作,各分片 flush 成功后,他们理应有相同的 lucene 索引内容,无论分段是否一致.于是给大家分配一个 id, 表示数据一致.但是显然 synced flush 期间不能有新写入的内容,对于这种情况, es 的处理是:让 synced flush 失败,让写操作成功.在没有执行 flush 的情况下已有 syncid 不会失效.当某个 shard 上执行了普通 flush 操作会删除已有 syncid,因此,synced flush操作是一个不可靠操作,只适用于冷索引.

主要实现:

indexShard.syncFlush只是写了一个 id 进去:
代码路径:

副分片如何做到和主分片一致的


index.recovery 的一个难题在于如何维护主副分片一致性。假设从副分片 recovery 之前到 recovery 完毕一致有写操作,他是如何实现一致的呢?

在2.0 版本之前,副本recovery 要经历三个阶段:

  • phase1:将主分片的 lucene做快照,发送到 target。期间不阻塞索引操作,新增数据写到主分片的 translog
  • phase2:将主分片 translog 做快照,发送到 target 重放,期间不阻塞索引操作。
  • phase3:为主分片加写锁,将剩余的translog 发送到 target。此时数据量很小,写入过程的阻塞很短。

从第一阶段开始,就要阻止 lucene 执行commit 操作,避免 translog 被刷盘后清除。

本质上来说,只要流程上允许将写操作阻塞一段时间,实现主副一致是比较容易的。但是后来(从2.0开始)官方觉得不太好:

为了安全地完成 recoveries / relocations,我们必须在 recovery 开始后保持所有的operation全部 done,以便重放。目前我们实现这点是通过防止engine flush,从而确保操作operations都在 translog 中。这不是一个问题,因为我们确实需要这些operations。但是如果另一个 recovery 并发启动,可能会有不必要的长时间重试。另外如果我们在这个时候因为某种原因关闭了engine(比如一个节点重新启动),当我们回来的时候,我们需要恢复一个很大的 translog。

为了解决这个问题,translog被改为基于多个文件而不是一个文件。 这允许recovery保留所需的文件,同时允许engine执行flush,以及执行lucene的commit(这将创建一个新的translog文件)。

重构了 translog 文件管理模块,允许多个文件。

translog 维护一个引用文件的列表。包括未完成的recovery 以及那些包含尚未提交到 lucene 的operations的文件

引入了新的 translog.view概念,允许 recovery 获取一个引用,包括所有当前未提交的 translog 文件,以及所有未来新创建的 translog 文件,直到 view 关闭。他们可以使用这个 view 做operations的遍历操作

phase3被删除,这个阶段是重放operations,同时防止新的写入到engine。这是不必要的,因为自 recovery 开始,标准的 index 操作会发送所有的operations到正在recovery中的 shard。重放recovery 开始时获取的 view 中的所有operations足够保证不丢失任何operations。

从2.0开始,phase3被删除。对于如何做到主副一致的,描述的很模糊。分析完相关代码后,整理流程如下:

先创建一个 Translog.view,然后

  • phase1:将主分片的 lucene 做快照,发送到 target。期间允许索引操作和 flush 操作。发送完毕后,告知 target 启动 engine,phase2开始之前,新的索引操作都会转发副分片正常执行。
  • phase2:将主分片的 translog 做快照,发送到 target 去重放。

完整性:
phase2 对translog 的快照包含了从 phase1开始的新增操作,而 phase2开始之前,副分片已经可以正常处理写操作,只要把 phase2的 translog 重放,就可以保证副分片不丢数据

一致性:
由于没有了阻塞写操作的第三阶段,接下来的问题就是解决 phase1和 phase2之间的写操作,与 phase2重放操作之间的时序和冲突问题。在 phase1执行完毕后,副分片已经可以正常处理写请求,副分片的新增写操作和 translog 重放的写操作是并行执行的。如果 translog 重放慢,又把他写会老数据怎么办?

es 现在的机制是在写操作中做异常处理。
写操作有三种类型:新增、更新、删除,分别看一下处理机制:

新增:不存在冲突问题,不需要处理。

更新:判断本次操作的版本号是否小于 lucene 中 doc 的版本号,如果小于,则放弃本次操作。

Index,Delete,都继承自Operation,每个Operation都有一个版本号,这个版本号就是 doc 版本号。对于副分片的写流程来说,正常情况下是主分片写成功后,相应 doc 写入的版本号被放到转发写副分片的请求中。对于更新来说,就是主分片将原 doc 版本号+1后转发的副分片来的。在对比版本号的时候:

副分片在InternalEngine#index函数中通过plan判断是否写到 lucene:

在planIndexingAsNonPrimary函数中,通过

判断当前操作的版本号是否低于 lucene 中的版本号。

对比部分:

如果 translog 重放的操作在写一条老数据,compareOpToLuceneDocBasedOnVersions 会返回:OpVsLuceneDocStatus.OP_STALE_OR_EQUAL,

plan 的最终结果就是:plan = IndexingStrategy.skipAsStale,后面就会跳过写 lucene 和 translog 的逻辑。

删除:判断本次操作中的版本号是否小于 lucene 中 doc 的版本号,如果小于,放弃本次操作。
同样,在InternalEngine#delete函数中,

判断是否要从 lucene 删除:

通过compareOpToLuceneDocBasedOnVersions判断本次操作是否小于 lucenne 中 doc 的版本号,与 Index 操作时使用相同的函数。

如果 translog 重放的是一个 的删除操作,compareOpToLuceneDocBasedOnVersions 会返回:OpVsLuceneDocStatus.OP_STALE_OR_EQUAL,

plan 的最终结果就是:plan = DeletionStrategy.processButSkipLucene,后面就会跳过从 lucene 删除的逻辑。

提升 recovery 速度的建议


使用 _forcemerge

由于 synced flush 不是可靠操作,以下操作都会将其打断:
1. 因写入过程被打断
2. 因普通 flush 被删除 syncdid
3. 因系统自行merge后 flush 删除syncdid

对于冷索引,可以考虑将 segment 强制合并为一个分段,这样各分片 segment 一致,可以跳过副本恢复的第一阶段.
执行:

集群 FullRestart 的建议操作过程

  1. 停止写入
  2. 禁用 shard allocation
  1. 执行 synced flush
  1. 重启集群
  2. 等待到 yellow 状态后,启用 allocation
  1. 等待 recovery 完毕
  2. 开启写入程序

一些用于查看 recovery 状态的命令


问题

副分片的恢复如何做到和主分片完全一致的?假设从副分片开始恢复之前,一致有数据持续写入,删除等操作,phase1阶段结束前 source 端 prepareTargetForTranslog 函数会向 target 端中发送启动 engine 的指令,这个指令执行完之后,target 端已经可以接受写入请求,那么从 phase2阶段的快照直接发给 target 去重放,会存在执行顺序的问题,2.x 之前的版本会有 phase3阶段,将主分片上的写操作阻塞一段时间,但5.x版本中没看到写阻塞过程,那么主副如何做到一致的?

参考:


https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-optimize.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/indices-recovery.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/restart-upgrade.html
http://www.jianshu.com/p/0d0f3d2b9ecd
https://elasticsearch.cn/article/38
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/shards-allocation.html
https://github.com/elastic/elasticsearch/pull/10624

 

 

ElasticSearch Recovery 分析

2016.07.01 18:00* 字数 1869 阅读 2524评论 9

上周出现了一次故障,recovery的过程比较慢,然后发现Shard 在做恢复的过程一般都是卡在TRANSLOG阶段,所以好奇这块是怎么完成的,于是有了这篇文章

这是一篇源码分析类的文章,大家需要先建立一个整体的概念,建议参看这篇文章

另外你可能还需要了解下 Recovery 阶段迁移过程:

INIT -> INDEX -> VERIFY_INDEX -> TRANSLOG -> FINALIZE -> DONE

概览

Recovery 其实有两种:

  1. Primary的迁移/Replication的生成和迁移
  2. Primary的恢复

org.elasticsearch.indices.cluster.IndicesClusterStateService.clusterChanged 被触发后,会触发applyNewOrUpdatedShards 函数的调用,这里是我们整个分析的起点。大家可以跑进去看看,然后跟着文章打开对应的源码浏览。

阅读完这篇文章,我们能够得到:

  1. 熟悉整个recovery 流程
  2. 了解translog机制
  3. 掌握对应的代码体系结构

Primary的恢复

这个是一般出现故障集群重启的时候可能遇到的。首先需要从Store里进行恢复。

if (isPeerRecovery(shardRouting)) {
   ......
}
else {
  //走的这个分支
  indexService.shard(shardId).recoverFromStore(shardRouting, 
  new StoreRecoveryService.RecoveryListener() {
}

Primary 进行自我恢复,所以并不需要其他节点的支持。所以判定的函数叫做isPeerRecovery 其实还是挺合适的。

indexService.shard(shardId).recoverFromStore 调用的是 org.elasticsearch.index.shard.IndexShard的方法。

  public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) {
    ......
    final boolean shouldExist = shard.allocatedPostIndexCreate();
    storeRecoveryService.recover(this, shouldExist, recoveryListener);
    }

逻辑还是很清晰的,判断分片必须存在,接着将任务委托给 org.elasticsearch.index.shard.StoreRecoveryService.recover 方法,该方法有个细节需要了解下:

 if (indexShard.routingEntry().restoreSource() != null) {
       indexShard.recovering("from snapshot", 
           RecoveryState.Type.SNAPSHOT, 
           indexShard.routingEntry().restoreSource());
            } else {
       indexShard.recovering("from store", 
           RecoveryState.Type.STORE, 
           clusterService.localNode());
            }

ES会根据restoreSource 决定是从SNAPSHOT或者从Store里进行恢复。这里的indexShard.recovering并没有执行真正的recovering 操作,而是返回了一个recover的信息对象,里面包含了譬如节点之类的信息。

之后就将其作为一个任务提交出去了:

threadPool.generic().execute(new Runnable() {
            @Override
  public void run() {
  try {
  final RecoveryState recoveryState = indexShard.recoveryState();
  if (indexShard.routingEntry().restoreSource() != null) {
     restore(indexShard, recoveryState);
     } else {              
     recoverFromStore(indexShard, indexShouldExists, recoveryState);
    }

这里我们只走一条线,也就是进入 recoverFromStore 方法,该方法会执行索引文件的恢复动作,本质上是进入了INDEX Stage.

接着进行TranslogRecovery了

typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();

继续进入 indexShard.performTranslogRecovery 方法:

  public Map<String, Mapping> performTranslogRecovery(boolean indexExists) {
        if (indexExists == false) {            
            final RecoveryState.Translog translogStats = recoveryState().getTranslog();
            translogStats.totalOperations(0);
            translogStats.totalOperationsOnStart(0);
        }
        final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false, indexExists);     
        return recoveredTypes;
    }

这个方法里面,最核心的是 internalPerformTranslogRecovery方法,进入该方法后先进入 VERIFY_INDEX Stage,进行索引的校验,校验如果没有问题,就会进入我们期待的TRANSLOG 状态了。

进入TRANSLOG 后,先进行一些设置:

engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);

这里的GC 指的是tranlog日志的删除问题,也就是不允许删除translog,接着会创建一个新的InternalEngine了,然后返回调用org.elasticsearch.index.shard.TranslogRecoveryPerformer.getRecoveredTypes

不过你看这个代码会比较疑惑,其实我一开始看也觉得纳闷:

  if (skipTranslogRecovery == false) {            
       markLastWrite();
        }
 createNewEngine(skipTranslogRecovery, engineConfig);
 return  engineConfig.getTranslogRecoveryPerformer().
                                  getRecoveredTypes();

我们并没有看到做translog replay的地方,而从上层的调用方来看:

typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();

performTranslogRecovery 返回后,就立马进入扫尾(finalizeRecovery)阶段。 里面唯一的动作是createNewEngine,并且传递了skipTranslogRecovery 参数。 也就说,真正的translog replay动作是在createNewEngine里完成,我们经过探索,发现是在InternalEngine 的初始化过程完成的,具体代码如下:

try {
      if (skipInitialTranslogRecovery) {
          commitIndexWriter(writer,
                                       translog, 
                                       lastCommittedSegmentInfos.
                                       getUserData().
                                       get(SYNC_COMMIT_ID));
                } else {
                    recoverFromTranslog(engineConfig, translogGeneration);
                }
            } catch (IOException | EngineException ex) {
              .......
            }

里面有个recoverFromTranslog,我们进去瞅瞅:

   final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer();
        try (Translog.Snapshot snapshot = translog.newSnapshot()) {
            opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
        } catch (Throwable e) {
            throw new EngineException(shardId, "failed to recover from translog", e);
        }

目前来看,所有的Translog recovery 动作其实都是由 TranslogRecoveryPerformer 来完成的。当然这个名字也比较好,翻译过来就是 TranslogRecovery 执行者。先对translog 做一个snapshot,然后根据这个snapshot开始进行恢复,进入 recoveryFromSnapshot 方法我们查看细节,然后会引导你进入
下面的方法:

 public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
        try {
            switch (operation.opType()) {
                case CREATE:
                    Translog.Create create = (Translog.Create) operation;
                    Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()),
                            source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id())
                                    .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
                            create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
                    maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
                    if (logger.isTraceEnabled()) {
                        logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
                    }
                    engine.create(engineCreate);
                    break;

终于看到了实际的translog replay 逻辑了。这里调用了标准的InternalEngine.create 等方法进行日志的恢复。其实比较有意思的是,我们在日志回放的过程中,依然会继续写translog。这里就会导致一个问题,如果我在做日志回放的过程中,服务器由当掉了(或者ES instance 重启了),那么就会导致translog 变多了。这个地方是否可以再优化下?

假设我们完成了Translog 回放后,如果确实有重放,那么就行flush动作,删除translog,否则就commit Index。具体逻辑由如下的代码来完成:

if (opsRecovered > 0) {
     opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog
                            .currentFileGeneration());
            flush(true, true);
        } else if (translog.isCurrent(translogGeneration) == false) {
            commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
        }

接着就进入了finalizeRecovery,然后,就没然后了。

 indexShard.finalizeRecovery();
            String indexName = indexShard.shardId().index().name();
            for (Map.Entry<String, Mapping> entry : typesToUpdate.entrySet()) {
                validateMappingUpdate(indexName, entry.getKey(), entry.getValue());
            }
            indexShard.postRecovery("post recovery from shard_store");

Primary的迁移/Replication的生成和迁移

一般这种recovery其实就是发生relocation或者调整副本的时候发生的。所以集群是在正常状态,一定有健康的primary shard存在,所以我们也把这种recovery叫做Peer Recovery。 入口和前面的Primary恢复是一样的,代码如下:

if (isPeerRecovery(shardRouting)) {
 //走的这个分支
.....
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
                recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
......           
}
else {
 ......
}

核心代码自然是 recoveryTarget.startRecovery。这里的recoveryTarget的类型是: org.elasticsearch.indices.recovery.RecoveryTarget

startRecovery方法的核心代码是:

threadPool.generic().execute(new RecoveryRunner(recoveryId));

也是启动一个县城异步执行的。RecoveryRunner调用的是RecoveryTarget的 doRecovery方法,在该方法里,会发出一个RPC请求:

final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),        false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());

recoveryStatus.indexShard().prepareForIndexRecovery();
            recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() {
                @Override
                public void run() throws InterruptedException {
                    responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
                        @Override
                        public RecoveryResponse newInstance() {
                            return new RecoveryResponse();
                        }
                    }).txGet());
                }
            });

这个时候进入 INDEX Stage。 那谁接受处理的呢? 我们先看看现在的类名叫啥? RecoveryTarget。 我们想当然的想,是不是有RecoverySource呢? 发现确实有,而且该类确实也有一个处理类:

 class StartRecoveryTransportRequestHandler extends TransportRequestHandler<StartRecoveryRequest> {
        @Override
        public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
            RecoveryResponse response = recover(request);
            channel.sendResponse(response);
        }
    }

ES里这种通过Netty进行交互的方式,大家可以看看我之前写文章ElasticSearch Rest/RPC 接口解析

这里我们进入RecoverSource对象的recover方法:

 private RecoveryResponse recover(final StartRecoveryRequest request) {
      .....
      if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) {
            handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
        } else {
            handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
        }
        ongoingRecoveries.add(shard, handler);
        try {
            return handler.recoverToTarget();
        } finally {
            ongoingRecoveries.remove(shard, handler);
        }
 }

我们看到具体负责处理的类是RecoverySourceHandler,之后调用该类的recoverToTarget方法。我对下面的代码做了精简,方便大家看清楚。

public RecoveryResponse recoverToTarget() {
        final Engine engine = shard.engine();
        assert engine.getTranslog() != null : "translog must not be null";
        try (Translog.View translogView = engine.getTranslog().newView()) {
     
            final SnapshotIndexCommit phase1Snapshot;
            phase1Snapshot = shard.snapshotIndex(false);
            phase1(phase1Snapshot, translogView);
                      
            try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
                phase2(phase2Snapshot);
            } catch (Throwable e) {
                throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
            }

            finalizeRecovery();
        }
        return response;
    }

首先创建一个Translog的视图(创建视图的细节我现在也还没研究),接着的话对当前的索引进行snapshot。 然后进入phase1阶段,该阶段是把索引文件和请求的进行对比,然后得出有差异的部分,主动将数据推送给请求方。之后进入文件清理阶段,然后就进入translog 阶段:

protected void prepareTargetForTranslog(final Translog.View translogView) {

接着进入第二阶段:

try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
                phase2(phase2Snapshot);           
            }

对当前的translogView 进行一次snapshot,然后进行translog发送:

int totalOperations = sendSnapshot(snapshot);

具体的发送逻辑如下:

 cancellableThreads.execute(new Interruptable() {
                    @Override
                    public void run() throws InterruptedException {
                        final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
                                request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
                        transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
                                recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
                    }
                });

这里发的请求,都是被 RecoveryTarget的TranslogOperationsRequestHandler 处理器来完成的,具体代码是:

 @Override
        public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception {
            try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
                final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
                final RecoveryStatus recoveryStatus = statusRef.status();
                final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
                translog.totalOperations(request.totalTranslogOps());
                assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
                try {
                    recoveryStatus.indexShard().performBatchRecovery(request.operations());

这里调用IndexShard.performBatchRecovery进行translog 的回放。

最后发送一个finalizeRecovery给target 节点,完成recovering操作。

关于Recovery translog 配置相关

在如下的类里有:

//org.elasticsearch.index.translog.TranslogService
INDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval";
INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period";
INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";

当服务器恢复时发现有存在的translog日志,就会进入TRANSLOG 阶段进行replay。translog 的recovery 是走的标准的InternalEngine.create/update等方法,并且还会再写translog,同时还有一个影响性能的地方是很多数据可能已经存在,会走update操作,所以性能还是非常差的。这个目前能够想到的解决办法是调整flush日志的频率,保证存在的translog 尽量的少。 上面的话可以看出有三个控制选项:

//每隔interval的时间,就去检查下面三个条件决定是不是要进行flush,
//默认5s。时间过长,会超出下面阈值比较大。
index.translog.interval 

//超过多少条日志后需要flush,默认Int的最大值
index.translog.flush_threshold_ops 

//定时flush,默认30m 可动态设置
index.translog.flush_threshold_period

//translog 大小超过多少后flush,默认512m  
index.translog.flush_threshold_size

本质上translog的恢复速度和条数的影响关系更大些,所以建议大家设置下 index.translog.flush_threshold_ops,比如多少条就一定要flush,否则积累的太多,
出现故障,恢复就慢了。这些参数都可以动态设置,但建议放到配置文件。

 

 

Elasticsearch Recovery详解

原创 2016年10月21日 10:55:37
 

Elasticsearch Recovery详解

基础知识点


在Eleasticsearch中recovery指的就是一个索引的分片分配到另外一个节点的过程;一般在快照恢复、索引副本数变更、节点故障、节点重启时发生。由于master保存整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点,例如:
  1. 如果某个shard主分片在,副分片所在结点挂了,那么选择另外一个可用结点,将副分片分配(allocate)上去,然后进行主从分片的复制。
  2. 如果某个shard的主分片所在结点挂了,副分片还在,那么将副分片升级为主分片,然后做主从分片复制。
  3. 如果某个shard的主副分片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从该结点上恢复主分片,再选择另外的结点复制副分片。
 
正常情况下,我们可以通过ES的health的API接口,查看整个集群的健康状态和整个集群数据的完整性:
EsHealth.png

状态及含义如下:
  • green: 所有的shard主副分片都是正常的;
  • yellow: 所有shard的主分片都完好,部分副分片没有或者不完整,数据完整性依然完好;
  • red: 某些shard的主副分片都没有了,对应的索引数据不完整。
 
recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务性能下降,或者一部分功能暂时不可用。了解一些recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
 

减少集群Full Restart造成的数据来回拷贝


ES集群可能会有整体重启的情况,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群, 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡,显然是不均衡的,master会触发rebalance过程,将数据在节点之间挪动。整个过程无谓消耗了大量的网络流量;合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个节点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
 
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
 
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
 

减少主副本之间的数据复制


如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
EsNone.png

然后在节点重启完成加入集群后,再重新打开:
EsAll.png

这样在节点重启完成后,尽量多的从本地直接恢复数据。

但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同节点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
 
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启节点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
 
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启节点
  5. 重新开启集群shard allocation 
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序
 

特大热索引为何恢复慢


对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨节点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
 
从主片恢复数据到副片需要经历3个阶段:
  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
 
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
 
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考:  CAT Recovery
 

其他Recovery相关的专家级设置


还有其他一些专家级的设置(参见:  recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
 
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 ( issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。

















本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/8135878.html ,如需转载请自行联系原作者

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
68 5
|
2月前
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
63 3
|
21天前
|
存储 缓存 监控
优化Elasticsearch 索引设计
优化Elasticsearch 索引设计
19 5
|
24天前
|
存储 索引
Elasticsearch分片和副本
【11月更文挑战第4天】
40 7
|
1月前
|
存储 SQL 监控
|
28天前
|
存储 JSON 关系型数据库
Elasticsearch 索引
【11月更文挑战第3天】
35 4
|
1月前
|
运维 监控 安全
|
1月前
|
测试技术 API 开发工具
ElasticSearch7.6.x 模板及滚动索引创建及注意事项
ElasticSearch7.6.x 模板及滚动索引创建及注意事项
44 8
|
2月前
|
存储 JSON 监控
大数据-167 ELK Elasticsearch 详细介绍 特点 分片 查询
大数据-167 ELK Elasticsearch 详细介绍 特点 分片 查询
54 4
|
2月前
|
SQL 分布式计算 大数据
大数据-168 Elasticsearch 单机云服务器部署运行 详细流程
大数据-168 Elasticsearch 单机云服务器部署运行 详细流程
60 2