记一次引入Elasticsearch的系统架构实战(四)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 记一次引入Elasticsearch的系统架构实战(四)

作品搜索实现细节


实体定义


SearchKey是原有SQL Server的数据,现需要同步到Elasticsearch,仍是继承抽象类ElasticsearchEntity实体定义,同时这里有三个细节点:

 

1. public string KeyName,我定义的是Text类型,在Elasticsearch使用Text类型才会分词。

 

2.在实体定义我没有给KeyName指定分词器,因为我会使用两个分词器:拼音和默认分词,而我会在批量写入数据创建Mapping时定义。

  

3.实体里的 public List<int> SysTagId 与SearchKey在SQL Server是两张不同的物理表是一对多的关系,在代码表示如下,但是在关系型数据库是无法与之对应和体现的,这就是咱们所说的“阻抗失配”,但是能在以文档型存储系统(MongoDB、Elasticsearch)里很好的解决这个问题,可以以一个聚合的方式写入,避免多次查询关联。


[ElasticsearchType(RelationName = "search_key")]
    public class SearchKey : ElasticsearchEntity
    {
        [Number(NumberType.Integer, Name = "key_id")]
        public int KeyId { get; set; }
        [Number(NumberType.Integer, Name = "entity_id")]
        public int EntityId { get; set; }
        [Number(NumberType.Integer, Name = "entity_type")]
        public int EntityType { get; set; }
        [Text(Name = "key_name")]
        public string KeyName { get; set; }
        [Number(NumberType.Integer, Name = "weight")]
        public int Weight { get; set; }
        [Boolean(Name = "is_subsidiary")]
        public bool IsSubsidiary { get; set; }
        [Date(Name = "active_date")]
        public DateTimeOffset? ActiveDate { get; set; }
        [Number(NumberType.Integer, Name = "sys_tag_id")]
        public List<int> SysTagId { get; set; }
    }

 

数据同步

  

数据同步我采用了Quartz.Net定时调度任务框架,因此时效不高,所以每4小时同步一次即可,有42W多的数据,分批进行同步,每次查询1000条数据同时进行一次批量写入。全量同步一次的时间大概2分钟。因此使用RPC调用[ES业务API服务]。

  

因为具体业务逻辑已经封装在[ES业务API服务],因此同步逻辑也相对简单,查询出SQL Server数据源、聚合整理、调用[ES业务API服务]的批量写入接口、重新绑定别名到新的Index。


[DisallowConcurrentExecution]
    public class SearchKeySynchronousJob : BaseJob
    {
        public override void Execute()
        {
            var rm = SFNovelReadManager.Instance();
            var maxId = 0;
            var size = 1000;
            string indexName = "";
            while (true)
            {
                //避免一次性全部查询出来,每1000条一次写入。
                var searchKey = sm.searchKey.GetList(size, maxId);
                if (!searchKey.Any())
                    break;
                var entityIds = searchKey.Select(a => a.EntityID).Distinct().ToList();
                var sysTagRecord = rm.Novel.GetSysTagRecord(entityIds);
                var items = searchKey.Select(a => new SearchKeyPostItem
                {
                    Weight = a.Weight,
                    EntityType = a.EntityType,
                    EntityId = a.EntityID,
                    IsSubsidiary = a.IsSubsidiary ?? false,
                    KeyName = a.KeyName,
                    ActiveDate = a.ActiveDate,
                    SysTagId = sysTagRecord.Where(c => c.EntityID == a.EntityID).Select(c => c.SysTagID).ToList(),
                    KeyID = a.KeyID
                }).ToList();
                //以一个聚合写入到ES
                var postResult = new SearchKeyPostRequest
                {
                    IndexName = indexName,
                    Items = items
                }.Excute();
                if (postResult.Success)
                {
                    indexName = (string)postResult.Data;
                    maxId = searchKey.Max(a => a.KeyID);
                }
            }
            //别名从旧Index指向新的Index,最后删除旧Index
            var renameResult = new SearchKeyRenameRequest
            {
                IndexName = indexName
            }.Excute();
        }
    }
}


业务API接口


批量新增接口这里有2个细节点:

 

1.在第一次有数据进来的时候需要创建Mapping,因为得对KeyName字段定义分词器,其余字段都可以使用AutoMap即可。

  

2.新创建的Index名称是精确到秒的 SearchKey-202112261121


/// <summary>
        /// 批量新增作品搜索列表(返回创建的indexName)
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public ApiResult Post(SearchKeyPostRequest request)
        {
            if (!request.Items.Any())
                return ApiResult.IsFailed("无传入数据");
            var date = DateTime.Now;
            var relationName = typeof(SearchKey).GetRelationName();
            var indexName = request.IndexName.IsNullOrWhiteSpace() ? (relationName + "-" + date.ToString("yyyyMMddHHmmss")) : request.IndexName;
            if (request.IndexName.IsNullOrWhiteSpace())
            {
                var createResult = _elasticClient.Indices.Create(indexName,
                    a =>
                        a.Map<SearchKey>(m => m.AutoMap().Properties(p =>
                            p.Custom(new TextProperty
                            {
                                Name = "key_name",
                                Analyzer = "standard",
                                Fields = new Properties(new Dictionary<PropertyName, IProperty>
                                {
                                    { new PropertyName("pinyin"),new TextProperty{ Analyzer = "pinyin"} },
                                    { new PropertyName("standard"),new TextProperty{ Analyzer = "standard"} }
                                })
                            }))));
                if (!createResult.IsValid && request.IndexName.IsNullOrWhiteSpace())
                    return ApiResult.IsFailed("创建索引失败");
            }
            var document = request.Items.MapTo<List<SearchKey>>();
            var result = _elasticClient.BulkAll(indexName, document);
            return result ? ApiResult.IsSuccess(data: indexName) : ApiResult.IsFailed();
        }

 

重新绑定别名接口这里有4个细节点:


1.别名使用searchkey,只会有一个Index[searchkey-yyyyMMddHHmmss]会跟searchkey绑定.

2.优先把已绑定的Index查询出来,方便解绑与删除。


3.别名绑定在Elasticsearch虽然是原子性的,但是不是数据一致性的,因此得先Add后Remove。


4.删除旧得Index免得占用过多资源。


/// <summary>
        /// 重新绑定别名
        /// </summary>
        /// <returns></returns>
        [HttpPut]
        public ApiResult Rename(SearchKeyRanameRequest request)
        {
            var aliasName = typeof(SearchKey).GetRelationName();
            var getAliasResult = _elasticClient.Indices.GetAlias(aliasName);
            //给新index指定别名
            var bulkAliasRequest = new BulkAliasRequest
            {
                Actions = new List<IAliasAction>
                {
                    new AliasAddDescriptor().Index(request.IndexName).Alias(aliasName)
                }
            };
            //移除别名里旧的索引
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    bulkAliasRequest.Actions.Add(new AliasRemoveDescriptor().Index(indexName.Name).Alias(aliasName));
                }
            }
            var result = _elasticClient.Indices.BulkAlias(bulkAliasRequest);
            //删除旧的index
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    _elasticClient.Indices.Delete(indexName);
                }
            }
            return result != null && result.ApiCall.Success ? ApiResult.IsSuccess() : ApiResult.IsFailed();
        }

   

查询接口这里跟前面细节得差不多:

  

但是这里有一个得特别注意的点,可以看到这个查询接口同时使用了should和must,这里得设置minimumShouldMatch才能正常像SQL过滤。

  

should可以理解成SQL的Or,Must可以理解成SQL的And。

  

默认情况下minimumShouldMatch是等于0的,等于0的意思是,should不命中任何的数据仍然会返回must命中的数据,也就是你们可能想搜索(keyname.pinyin=’chengong‘ or keyname.standard=’chengong‘) and id > 0,但是es里没有存keyname='chengong'的数据,会把id> 0 而且 keyname != 'chengong' 数据给查询出来。

  

因此我们得对minimumShouldMatch=1,就是should条件必须得任意命中一个才能返回结果。

  

在should和must混用的情况下必须得注意minimumShouldMatch的设置!!!


/// <summary>
        /// 作品搜索列表
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        [Route("search")]
        public ApiResult<List<SearchKeyGetResponse>> Get(SearchKeyGetRequest request)
        {
            var shouldQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>();
            int minimumShouldMatch = 0;
            if (!request.KeyName.IsNullOrWhiteSpace())
            {
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.pinyin").Query(request.KeyName)));
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.standard").Query(request.KeyName)));
                minimumShouldMatch = 1;
            }
            var mustQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>
            {
                a => a.Range(t => t.Field(f => f.Weight).GreaterThanOrEquals(0))
            };
            if (request.IsSubsidiary.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.IsSubsidiary).Value(request.IsSubsidiary.Value)));
            if (request.SysTagIds != null && request.SysTagIds.Any())
                mustQuerys.Add(a => a.Terms(t => t.Field(f => f.SysTagId).Terms(request.SysTagIds)));
            if (request.EntityType.HasValue)
            {
                if (request.EntityType.Value == ESearchKey.EntityType.AllNovel)
                {
                    mustQuerys.Add(a => a.Terms(t => t.Field(f => f.EntityType).Terms(ESearchKey.EntityType.Novel, ESearchKey.EntityType.ChatNovel, ESearchKey.EntityType.FanNovel)));
                }
                else
                    mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value((int)request.EntityType.Value)));
            }
            var sortDescriptor = new SortDescriptor<SearchKey>();
            sortDescriptor = request.Sort == ESearchKey.Sort.Weight
                ? sortDescriptor.Field(f => f.Weight, SortOrder.Descending)
                : sortDescriptor.Field(f => f.ActiveDate, SortOrder.Descending);
            var searchResult = _elasticClient.Search<SearchKey>(a =>
                a.Index(typeof(SearchKey).GetRelationName())
                    .From(request.Size * request.Page)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Should(shouldQuerys).Must(mustQuerys).MinimumShouldMatch(minimumShouldMatch)))
                    .Sort(s => sortDescriptor));
            var apiResult = searchResult.GetApiResult<SearchKey, List<SearchKeyGetResponse>>();
            if (apiResult.Success)
                return apiResult;
            return ApiResult<List<SearchKeyGetResponse>>.IsSuccess("空集合数据");
        }


APM监控

  

虽然在上面我做了足够的实现准备,但是对于上生产后的实际使用效果我还是希望有一个直观的体现。我之前写了一篇文章《.Net微服务实战之可观测性》很好叙述了该种情况,有兴趣的可以移步去看看。

  

在之前公司做微服务的时候的APM选型我们使用了Skywalking,但是现在这家公司的运维没有接触过,但是对于Elastic Stack他相对比较熟悉,如同上文所说架构设计的输入核心为两点满足需求组织架构,秉着我的技术选型原则是基于团队架构,我们采用了Elastic APM + Kibana(7.4版本),如下图所示:


image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
2月前
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
63 3
|
3月前
|
存储 JSON 数据库
Elasticsearch 分布式架构解析
【9月更文第2天】Elasticsearch 是一个分布式的搜索和分析引擎,以其高可扩展性和实时性著称。它基于 Lucene 开发,但提供了更高级别的抽象,使得开发者能够轻松地构建复杂的搜索应用。本文将深入探讨 Elasticsearch 的分布式存储和检索机制,解释其背后的原理及其优势。
220 5
|
29天前
|
存储 索引
Elasticsearch分布式架构
【11月更文挑战第2天】
26 1
|
1月前
|
运维 NoSQL Java
后端架构演进:微服务架构的优缺点与实战案例分析
【10月更文挑战第28天】本文探讨了微服务架构与单体架构的优缺点,并通过实战案例分析了微服务架构在实际应用中的表现。微服务架构具有高内聚、低耦合、独立部署等优势,但也面临分布式系统的复杂性和较高的运维成本。通过某电商平台的实际案例,展示了微服务架构在提升系统性能和团队协作效率方面的显著效果,同时也指出了其带来的挑战。
71 4
|
2月前
|
存储 监控 分布式数据库
百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现
本文介绍了百亿级数据存储架构的设计与实现,重点探讨了ElasticSearch和HBase的结合使用。通过ElasticSearch实现快速检索,HBase实现海量数据存储,解决了大规模数据的高效存储与查询问题。文章详细讲解了数据统一接入、元数据管理、数据一致性及平台监控等关键模块的设计思路和技术细节,帮助读者理解和掌握构建高性能数据存储系统的方法。
百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现
|
2月前
|
存储 前端开发 API
DDD领域驱动设计实战-分层架构
DDD分层架构通过明确各层职责及交互规则,有效降低了层间依赖。其基本原则是每层仅与下方层耦合,分为严格和松散两种形式。架构演进包括传统四层架构与改良版四层架构,后者采用依赖反转设计原则优化基础设施层位置。各层职责分明:用户接口层处理显示与请求;应用层负责服务编排与组合;领域层实现业务逻辑;基础层提供技术基础服务。通过合理设计聚合与依赖关系,DDD支持微服务架构灵活演进,提升系统适应性和可维护性。
|
3月前
|
运维 持续交付 API
深入理解并实践微服务架构:从理论到实战
深入理解并实践微服务架构:从理论到实战
143 3
|
3月前
|
存储 缓存 负载均衡
亿级流量架构理论+秒杀实战系列(二)
亿级流量架构理论+秒杀实战系列(二)
|
3月前
|
运维 监控 持续交付
深入浅出:微服务架构的设计与实战
微服务,一个在软件开发领域如雷贯耳的名词,它代表着一种现代软件架构的风格。本文将通过浅显易懂的语言,带领读者从零开始了解微服务的概念、设计原则及其在实际项目中的运用。我们将一起探讨如何将一个庞大的单体应用拆分为灵活、独立、可扩展的微服务,并分享一些实践中的经验和技巧。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供新的视角和深入的理解。
82 3
|
3月前
|
SQL 缓存 运维
亿级流量架构理论+秒杀实战系列(一)
亿级流量架构理论+秒杀实战系列(一)