记一次引入Elasticsearch的系统架构实战(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 记一次引入Elasticsearch的系统架构实战(三)

异步写入


对于异步写入有两个细节点:

  

1.该数据从RabbtiMQ订阅消费写入到Elasticsearch,从下面代码可以看出,我刻意以月的维度建立Index,格式为 userviewrecord-2021-12,这么做的目的是为了方便管理Index和资源利用,有需要的情况下会删除旧的Index。

  

2.消息订阅与WebAPI暂时集成到同一个进程,这样做主要是开发、部署都方便,如果后续订阅多了,在把消息订阅相关的业务抽离到独立的进程。


按需演变,避免过度设计


订阅消费逻辑


public class UserViewDurationConsumer : BaseConsumer<UserViewDurationMessage>
    {
        private readonly ElasticClient _elasticClient;
        public UserViewDurationConsumer(ElasticClient elasticClient)
        {
            _elasticClient = elasticClient;
        }
        public override void Excute(UserViewDurationMessage msg)
        {
            var document = msg.MapTo<Entity.UserViewDuration>();
            var result = _elasticClient.Create(document, a => a.Index(typeof(Entity.UserViewDuration).GetRelationName() + "-" + msg.CreateDateTime.ToString("yyyy-MM"))).GetApiResult();
            if (result.Failed)
                LoggerHelper.WriteToFile(result.Message);
        }
    }
/// <summary>
    /// 订阅消费
    /// </summary>
    public static class ConsumerExtension
    {
        public static IApplicationBuilder UseSubscribe<T, TConsumer>(this IApplicationBuilder appBuilder, IHostApplicationLifetime lifetime) where T : EasyNetQEntity, new() where TConsumer : BaseConsumer<T>
        {
            var bus = appBuilder.ApplicationServices.GetRequiredService<IBus>();
            var consumer = appBuilder.ApplicationServices.GetRequiredService<TConsumer>();
            lifetime.ApplicationStarted.Register(() =>
            {
                bus.Subscribe<T>(msg => consumer.Excute(msg));
            });
            lifetime.ApplicationStopped.Register(() => bus?.Dispose());
            return appBuilder;
        }
    }
订阅与注入
public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }
        public IConfiguration Configuration { get; }
        public void ConfigureServices(IServiceCollection services)
        {
            ......
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime)
        {
            app.UseAllElasticApm(Configuration);
            app.UseHealthChecks("/health");
            app.UseDeveloperExceptionPage();
            app.UseSwagger();
            app.UseSwaggerUI(c =>
            {
                c.SwaggerEndpoint("/swagger/v1/swagger.json", "SF.ES.Api v1");
                c.RoutePrefix = "";
            });
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
            app.UseSubscribe<UserViewDurationMessage, UserViewDurationConsumer>(lifetime);
        }
    }


查询接口


查询接口此处有两个细节点:

  

1.如果不确定月份,则使用通配符查询userviewrecord-*,当然有需要的也可以使用别名处理。

  

2.因为Elasticsearch是记录UTC时间,因此时间查询得指定TimeZone。


[HttpGet]
        [Route("record")]
        public ApiResult<List<UserMarkRecordGetRecordResponse>> GetRecord([FromQuery] UserViewDurationRecordGetRequest request)
        {
            var dataList = new List<UserMarkRecordGetRecordResponse>();
            string dateTime;
            if (request.BeginDateTime.HasValue && request.EndDateTime.HasValue)
            {
                var month = request.EndDateTime.Value.DifferMonth(request.BeginDateTime.Value);
                if(month <= 0 )
                    dateTime = request.BeginDateTime.Value.ToString("yyyy-MM");
                else
                    dateTime = "*";
            }
            else
                dateTime = "*";
            var mustQuerys = new List<Func<QueryContainerDescriptor<UserViewDuration>, QueryContainer>>();
            if (request.UserId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.UserId).Value(request.UserId.Value)));
            if (request.EntityType.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value(request.EntityType)));
            if (request.EntityId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityId).Value(request.EntityId.Value)));
            if (request.CharpterId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.CharpterId).Value(request.CharpterId.Value)));
            if (request.BeginDateTime.HasValue)
                mustQuerys.Add(a => a.DateRange(dr =>
                    dr.Field(f => f.CreateDateTime).GreaterThanOrEquals(request.BeginDateTime.Value).TimeZone(EsConst.TimeZone)));
            if (request.EndDateTime.HasValue)
                mustQuerys.Add(a =>
                    a.DateRange(dr => dr.Field(f => f.CreateDateTime).LessThanOrEquals(request.EndDateTime.Value).TimeZone(EsConst.TimeZone)));
            var searchResult = _elasticClient.Search<UserViewDuration>(a =>
                a.Index(typeof(UserViewDuration).GetRelationName() + "-" + dateTime)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Must(mustQuerys)))
                    .SearchAfterTimestamp(request.Timestamp)
                    .Sort(s => s.Field(f => f.Timestamp, SortOrder.Descending)));
            var apiResult = searchResult.GetApiResult<UserViewDuration, List<UserMarkRecordGetRecordResponse>>();
            if (apiResult.Success)
                dataList.AddRange(apiResult.Data);
            return ApiResult<List<UserMarkRecordGetReco

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
2月前
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
63 3
|
3月前
|
存储 JSON 数据库
Elasticsearch 分布式架构解析
【9月更文第2天】Elasticsearch 是一个分布式的搜索和分析引擎,以其高可扩展性和实时性著称。它基于 Lucene 开发,但提供了更高级别的抽象,使得开发者能够轻松地构建复杂的搜索应用。本文将深入探讨 Elasticsearch 的分布式存储和检索机制,解释其背后的原理及其优势。
220 5
|
18天前
|
消息中间件 Java Kafka
实时数仓Kappa架构:从入门到实战
【11月更文挑战第24天】随着大数据技术的不断发展,企业对实时数据处理和分析的需求日益增长。实时数仓(Real-Time Data Warehouse, RTDW)应运而生,其中Kappa架构作为一种简化的数据处理架构,通过统一的流处理框架,解决了传统Lambda架构中批处理和实时处理的复杂性。本文将深入探讨Kappa架构的历史背景、业务场景、功能点、优缺点、解决的问题以及底层原理,并详细介绍如何使用Java语言快速搭建一套实时数仓。
92 4
|
29天前
|
存储 索引
Elasticsearch分布式架构
【11月更文挑战第2天】
26 1
|
1月前
|
运维 NoSQL Java
后端架构演进:微服务架构的优缺点与实战案例分析
【10月更文挑战第28天】本文探讨了微服务架构与单体架构的优缺点,并通过实战案例分析了微服务架构在实际应用中的表现。微服务架构具有高内聚、低耦合、独立部署等优势,但也面临分布式系统的复杂性和较高的运维成本。通过某电商平台的实际案例,展示了微服务架构在提升系统性能和团队协作效率方面的显著效果,同时也指出了其带来的挑战。
71 4
|
2月前
|
存储 监控 分布式数据库
百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现
本文介绍了百亿级数据存储架构的设计与实现,重点探讨了ElasticSearch和HBase的结合使用。通过ElasticSearch实现快速检索,HBase实现海量数据存储,解决了大规模数据的高效存储与查询问题。文章详细讲解了数据统一接入、元数据管理、数据一致性及平台监控等关键模块的设计思路和技术细节,帮助读者理解和掌握构建高性能数据存储系统的方法。
百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现
|
2月前
|
存储 前端开发 API
DDD领域驱动设计实战-分层架构
DDD分层架构通过明确各层职责及交互规则,有效降低了层间依赖。其基本原则是每层仅与下方层耦合,分为严格和松散两种形式。架构演进包括传统四层架构与改良版四层架构,后者采用依赖反转设计原则优化基础设施层位置。各层职责分明:用户接口层处理显示与请求;应用层负责服务编排与组合;领域层实现业务逻辑;基础层提供技术基础服务。通过合理设计聚合与依赖关系,DDD支持微服务架构灵活演进,提升系统适应性和可维护性。
|
3月前
|
运维 持续交付 API
深入理解并实践微服务架构:从理论到实战
深入理解并实践微服务架构:从理论到实战
143 3
|
3月前
|
存储 缓存 负载均衡
亿级流量架构理论+秒杀实战系列(二)
亿级流量架构理论+秒杀实战系列(二)
|
3月前
|
运维 监控 持续交付
深入浅出:微服务架构的设计与实战
微服务,一个在软件开发领域如雷贯耳的名词,它代表着一种现代软件架构的风格。本文将通过浅显易懂的语言,带领读者从零开始了解微服务的概念、设计原则及其在实际项目中的运用。我们将一起探讨如何将一个庞大的单体应用拆分为灵活、独立、可扩展的微服务,并分享一些实践中的经验和技巧。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供新的视角和深入的理解。
82 3