es实战-rebalance功能及源码解析

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 探究es如何实现rebalance
  1. rebalance tasks在es集群里面的表现形式:

通过调用 GET _cat/tasks?v API
返回结果中 action 为 internal:index/shard/recovery/start_recovery(不仅仅是rebalance)

  1. 判断shards移动状况:

通过调用 GET _cat/recovery?v API
返回结果中 type 为 peer;source_node 和 target_node 可以看出分片移动的方向;stage可以看出移动进行到哪一步: INIT->......->DONE

  1. 查看分片状态

通过调用 GET _cat/shards?v API
返回结果中 可以看到移动的分片state为RELOCATING状态

  1. 查看每个节点分片数

使用kibana的monitor观测或者通过:GET _nodes/stats/indices?level=shards 统计每个node的shards数组长度(感觉_cat/nodes API有必要添加shards数的监控)

Rebalance相关配置参数有以下3+3个:

cluster.routing.rebalance.enable//谁可以进行rebalance
cluster.routing.allocation.allow_rebalance//什么时候可以rebalance
cluster.routing.allocation.cluster_concurrent_rebalance//rebalance的并行度(shards级别)

cluster.routing.allocation.balance.shard//allocate每个node上shard总数时计算的权重,提高这个值以后会使node上的shard总数基本趋于一致
cluster.routing.allocation.balance.index//allocate每个index在一个node上shard数时计算的权重,提高这个值会使单个index的shard在集群节点中均衡分布
cluster.routing.allocation.balance.threshold//阈值,提高这个值可以提高集群rebalance的惰性

具体分析见下文......

源码解析

抽象基类:AllocationDecider提供两个判断是否需要rebalane的方法

public abstract class AllocationDecider {
    //判断是否可以进行shard routing
    public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    //判断集群是否可以进行rebalance操作(主要研究)
    public Decision canRebalance(RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
}

AllocationDeciders类继承了基类,用于汇总一组决策者的决定来确定最终决定。

public Decision canRebalance(RoutingAllocation allocation) {
    Decision.Multi ret = new Decision.Multi();
    for (AllocationDecider allocationDecider : allocations) {
        Decision decision = allocationDecider.canRebalance(allocation);
        // short track if a NO is returned.
        if (decision == Decision.NO) {
            if (!allocation.debugDecision()) {
                return decision;
            } else {
                ret.add(decision);
            }
        } else {
            addDecision(ret, decision, allocation);
        }
    }
    return ret;
}

其中判断集群是否可以进行rebalance的决策者们如下:

  • EnableAllocationDecider

针对index.routing.rebalance.enable参数

  • ClusterRebalanceAllocationDecider

针对cluster.routing.allocation.allow_rebalance参数

  • ConcurrentRebalanceAllocationDecider

针对cluster.routing.allocation.cluster_concurrent_rebalance参数

具体的rebalance过程是由BalancedShardsAllocator类中allocate()方法中:调用Balancer的balanceByWeights()方法执行。
BalancedShardsAllocator初始化时会根据上文三个参数设置weightFunction(上文参数4,5)和Threshold(上文参数6)。

public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
    setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
    setThreshold(THRESHOLD_SETTING.get(settings));
    clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
    clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
    weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}

private void setThreshold(float threshold) {
    this.threshold = threshold;
}

WeightFunction权重函数用于均衡计算节点间shards数量平衡节点间每个索引shards数平衡,看注释:

private static class WeightFunction {

    private final float indexBalance;
    private final float shardBalance;
    private final float theta0;
    private final float theta1;
    //默认 0.45 和 0.55 相加等于一
    WeightFunction(float indexBalance, float shardBalance) {
        float sum = indexBalance + shardBalance;
        if (sum <= 0.0f) {
            throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
        }
        //相加等于一则权重保持参数配置
        theta0 = shardBalance / sum;
        theta1 = indexBalance / sum;
        this.indexBalance = indexBalance;
        this.shardBalance = shardBalance;
    }
    //获取权重计算结果,方式为通过Balancer策略和当前节点和当前索引计算
    float weight(Balancer balancer, ModelNode node, String index) {
        //当前节点的shards数减去平均的shards数
        final float weightShard = node.numShards() - balancer.avgShardsPerNode();
        //当前节点当前索引shards数减去平均的shards数
        final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
        //乘以系数得出结果
        return theta0 * weightShard + theta1 * weightIndex;
    }
}

再说Balancer:它的具体三个工作如下所示(本文主要想研究balance):

public void allocate(RoutingAllocation allocation) {
    if (allocation.routingNodes().size() == 0) {
        failAllocationOfNewPrimaries(allocation);
        return;
    }
    final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
    //分配未分配的shards
    balancer.allocateUnassigned();
    //重分配需要迁移的shards(一些分配规则的限制)
    balancer.moveShards();
    //尽量平衡分片在节点的数量
    balancer.balance();//最终调用balanceByWeights()
}

接下来看balance():

  • 首先你想看balance过程得开启日log的trace
  • issue 14387,集群OK且shards OK才rebalance,否则可能做无用功
  • 调用上文提到的canRebalance()判断是否可以进行
  • 节点只有一个没必要进行
  • 开始进行rebalance
private void balance() {
    if (logger.isTraceEnabled()) {
        logger.trace("Start balancing cluster");
    }
    if (allocation.hasPendingAsyncFetch()) {
        /*
         * see https://github.com/elastic/elasticsearch/issues/14387
         * if we allow rebalance operations while we are still fetching shard store data
         * we might end up with unnecessary rebalance operations which can be super confusion/frustrating
         * since once the fetches come back we might just move all the shards back again.
         * Therefore we only do a rebalance if we have fetched all information.
         */
        logger.debug("skipping rebalance due to in-flight shard/store fetches");
        return;
    }
    if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
        logger.trace("skipping rebalance as it is disabled");
        return;
    }
    if (nodes.size() < 2) { /* skip if we only have one node */
        logger.trace("skipping rebalance as single node only");
        return;
    }
    balanceByWeights();//核心方法
}

接下来看balanceByWeights():核心代码在此 内容比较多,英文注释已去除,添加了详细的中文注释,一定要捋一遍......

private void balanceByWeights() {
    //判断是否要rebanlance的决策者
    final AllocationDeciders deciders = allocation.deciders();
    //节点信息:包括节点shards数和节点内每个index的shards数
    final ModelNode[] modelNodes = sorter.modelNodes;
    //节点内每个索引的权重信息
    final float[] weights = sorter.weights;
    //处理每个索引
    for (String index : buildWeightOrderedIndices()) {
        IndexMetadata indexMetadata = metadata.index(index);
        //找到含有索引shards或者索引shards可以移动过去的节点,并将其移动到ModelNode数组靠前的位置
        int relevantNodes = 0;
        for (int i = 0; i < modelNodes.length; i++) {
            ModelNode modelNode = modelNodes[i];
            if (modelNode.getIndex(index) != null
                || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
                // swap nodes at position i and relevantNodes
                modelNodes[i] = modelNodes[relevantNodes];
                modelNodes[relevantNodes] = modelNode;
                relevantNodes++;
            }
        }
        //没有或者只有一个相关节点则跳过
        if (relevantNodes < 2) {
            continue;
        }
        //对相关节点重新计算权重并排序
        sorter.reset(index, 0, relevantNodes);
        //准备对相关节点即前relevantNodes个节点下手
        int lowIdx = 0;
        int highIdx = relevantNodes - 1;
        while (true) {
            final ModelNode minNode = modelNodes[lowIdx];
            final ModelNode maxNode = modelNodes[highIdx];
            advance_range:
            if (maxNode.numShards(index) > 0) {
                //计算相关节点的最大权重差值,如果低于参数3配置的值则跳过
                final float delta = absDelta(weights[lowIdx], weights[highIdx]);
                if (lessThan(delta, threshold)) {
                    if (lowIdx > 0 && highIdx-1 > 0 && (absDelta(weights[0], weights[highIdx-1]) > threshold) ) {
                        break advance_range;
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Stop balancing index [{}]  min_node [{}] weight: [{}]" +
                                "  max_node [{}] weight: [{}]  delta: [{}]",
                                index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
                    }
                    break;
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}]  delta: [{}]",
                            maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
                }
                //权重差值小于默认值1则跳过?应该写配置参数而不是写死1吧?
                if (delta <= 1.0f) {
                    logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]",
                        maxNode.getNodeId(), minNode.getNodeId());
                    //进行分片们移动,在两个节点间进行全部可能的ShardRouting。
                } else if (tryRelocateShard(minNode, maxNode, index)) {
                    //移动完成后由于节点shards数发生编发,会重新计算他们的权重并重新排序,开启下一轮计算
                    weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
                    weights[highIdx] = sorter.weight(modelNodes[highIdx]);
                    sorter.sort(0, relevantNodes);
                    lowIdx = 0;
                    highIdx = relevantNodes - 1;
                    continue;
                }
            }
            //如果本轮没有移动情况,节点权重没有发生改变,则继续处理其他的相关节点
            if (lowIdx < highIdx - 1) {
                lowIdx++;
            } else if (lowIdx > 0) {
                lowIdx = 0;
                highIdx--;
            } else {
                //当前索引已经平衡
                break;
            }
        }
    }
}

接下来看tryRelocateShard()方法,在两个节点进行分片们的平衡:
//TODO

目录
相关文章
|
7天前
|
搜索推荐 UED Python
实现一个带有昼夜背景切换的动态时钟:从代码到功能解析
本文介绍了一个使用Python和Tkinter库实现的动态时钟程序,具有昼夜背景切换、指针颜色随机变化及整点和半点报时功能。通过设置不同的背景颜色和随机变换指针颜色,增强视觉吸引力;利用多线程技术确保音频播放不影响主程序运行。该程序结合了Tkinter、Pygame、Pytz等库,提供了一个美观且实用的时间显示工具。欢迎点赞、关注、转发、收藏!
123 94
|
9天前
|
供应链 搜索推荐 API
深度解析1688 API对电商的影响与实战应用
在全球电子商务迅猛发展的背景下,1688作为知名的B2B电商平台,为中小企业提供商品批发、分销、供应链管理等一站式服务,并通过开放的API接口,为开发者和电商企业提供数据资源和功能支持。本文将深入解析1688 API的功能(如商品搜索、详情、订单管理等)、应用场景(如商品展示、搜索优化、交易管理和用户行为分析)、收益分析(如流量增长、销售提升、库存优化和成本降低)及实际案例,帮助电商从业者提升运营效率和商业收益。
82 17
|
7天前
|
人工智能 自然语言处理 搜索推荐
销售易、悟空、神州云动CRM:全方位功能解析与优势特色盘点
销售易CRM、悟空CRM和神州云动CRM各自具备独特的产品功能与优势,适用于不同类型的企业。销售易CRM提供移动化、社交化和AI驱动的全流程管理,适合大型企业及跨国公司;悟空CRM以智能数据分析和移动办公支持见长,适合中大型企业;神州云动CRM则凭借灵活定制和多行业适配能力,特别适合大中型企业。企业在选择时应根据自身需求和发展战略,挑选最适合的CRM系统,以实现客户关系管理的最大化效益。
|
27天前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
27天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
14天前
|
数据采集 XML API
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
|
20天前
|
存储 数据库 对象存储
新版本发布:查询更快,兼容更强,TDengine 3.3.4.3 功能解析
经过 TDengine 研发团队的精心打磨,TDengine 3.3.4.3 版本正式发布。作为时序数据库领域的领先产品,TDengine 一直致力于为用户提供高效、稳定、易用的解决方案。本次版本更新延续了一贯的高标准,为用户带来了多项实用的新特性,并对系统性能进行了深度优化。
32 3
|
20天前
|
供应链 数据可视化 数据挖掘
企业服务品牌深度解析:销售易、用友、白码功能与特色对比
在企业服务领域,销售易、用友、白码等品牌凭借独特的产品和解决方案占据重要地位。销售易专注于CRM,提供客户管理、销售自动化、市场营销等功能,提升销售效率与客户满意度。用友作为领先的企业服务提供商,涵盖ERP、财务管理、人力资源管理等,助力企业资源优化配置。白码则以低代码开发平台为核心,支持快速构建业务应用,具备高度可定制化和易于维护的特点。三者各具特色,共同推动企业数字化转型。
|
25天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。

热门文章

最新文章

推荐镜像

更多