EventBus In eShop -- 解析微软微服务架构Demo(四)

简介: 引言 大家好像对分析源码厌倦了,说实在我也会厌倦,不过不看是无法分析其后面的东西,从易到难是一个必要的过程。 今天说下EventBus,前几天园里的大神已经把其解刨,我今天就借着大神的肩膀,分析下在eShop项目中EventBus的实现。

引言

大家好像对分析源码厌倦了,说实在我也会厌倦,不过不看是无法分析其后面的东西,从易到难是一个必要的过程。

今天说下EventBus,前几天园里的大神已经把其解刨,我今天就借着大神的肩膀,分析下在eShop项目中EventBus的实现。

最近发觉转发文章不写出处的,特此加上链接:http://inday.cnblogs.com

解析源码

我们知道使用EventBus是为了解除Publisher和Subscriber之间的依赖性,这样我们的Publisher就不需要知道有多少Subscribers,只需要通过EventBus进行注册管理就好了,在eShop项目中,有一个这样的接口IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)

public interface IEventBus
{
    void Subscribe<T, TH>(Func<TH> handler)
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;
    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;

    void Publish(IntegrationEvent @event);
}

我们可以看到这个接口定义了EventBus所需的一些操作, 对比大神的EventBus,相关功能都是一致的,我们看下它的实现类:EventBusRabbitMQ,从名字上可以看出,这是一个通过RabbitMQ来进行管理的EventBus,我们可以看到它使用了IEventBusSubscriptionsManager进行订阅存储,也就是大神文中的:

private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;

微软在Demo中把其提取出了接口,把一些常用方法给提炼了出来,但是核心还是Dictionary<string, List<Delegate>>, 使用Dictionary进行Map映射。通过Subscribe和UnSubscribe进行订阅和取消,使用Publish方法进行发布操作。

public void Subscribe<T, TH>(Func<TH> handler)
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = typeof(T).Name;
    var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
    if (!containsKey)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueBind(queue: _queueName,
                                exchange: BROKER_NAME,
                                routingKey: eventName);
        }
    }

    _subsManager.AddSubscription<T, TH>(handler);

}

我们看到在订阅的时候,EventBus会检查下在Map中是否有相应的注册,如果没有的话首先回去RabbitMQ中创建一个新的channel进行绑定,随后在Map中进行注册映射。

UnSubscribe则直接从Map中取消映射,通过OnEventRemoved事件判断Map下此映射的subscriber是否为空,为空则从RabbitMQ中关闭channel。

在RabbitMQ的构造方法中,我们看到这样一个创建:CreateConsumerChannel(),这里创建了一个EventingBasicConsumer,当Queue中有新的消息时会通过ProcessEvent执行Map中注册的handler(subscribers),看图可能更清晰些:

 

Event In eShop

在ProcessEvent方法中,回去Map中找寻subscribers,然后通过动态反射进行执行:

private async Task ProcessEvent(string eventName, string message)
{

    if (_subsManager.HasSubscriptionsForEvent(eventName))
    { 
        var eventType = _subsManager.GetEventTypeByName(eventName);
        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
        var handlers = _subsManager.GetHandlersForEvent(eventName);

        foreach (var handlerfactory in handlers)
        {
            var handler = handlerfactory.DynamicInvoke();
            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
        }
    }
}

微软通过简单的代码解耦了Publisher和Subscribers之间的依赖关系,我们引用大神的总结:

image

应用

在catalog.api中,微软出现了EventBus,我在上一篇中也提到了,这是我的一个疑惑,因为在catalog中并没有订阅操作,直接执行了Publish操作,原先以为是一个空操作,后来看了Basket.Api我才知道为何微软要用RabbitMQ。

使用RabbitMQ,我们不仅是从类之间的解耦,更可以跨项目,跨语言,跨平台的解耦,publisher仅仅需要把消息体(IntegrationEvent)传送到RabbitMQ,Consumer从Queue中获取消息体,然后推送到Subscribers执行相应的操作。我们看下Basket.Api.Startup.cs:

protected virtual void ConfigureEventBus(IApplicationBuilder app)
{
    var catalogPriceHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>();

    var orderStartedHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();

    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

    eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());

    eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
}

在这个方法里,我们看到了Subscribe操作,想想之前的提问有点搞笑,不过研究明白了也不错,对吧!

总结

今天我们看了EventBus在Demo中的应用,总结一下。

1、EventBus可以很好的解耦订阅者和发布者之间的依赖

2、使用RabbitMQ能够跨项目、跨平台、跨语言的解耦订阅者和发布者

虽然在Demo中我们看到对订阅者的管理是通过Dictionary内存的方式,所以我们的Subscribe仅仅只在Basket.Api中看到,但微软是通过IEventBusSubscriptionsManager接口定义的,我们可以通过自己的需求来进行定制,可以做成分布式的,比如使用memcached。

写在最后

每个月到下旬就会比较忙,所以文章发布会比较慢,但我也会坚持学习完eShop的,为了学习,我建了个群,大家可以进来一起学习,有什么建议和问题都可以进来哦。

eShop虽好,但不建议大家放到生产环境,毕竟是一个Demo,而且目前还是ALPHA版本,用来学习是一个很好的教材,这就是一个大杂烩,学习中你会学到很多新的东西,大家如果看好core的发展,可以一起研究下。

QQ群:376248054

相关文章
|
3月前
|
数据采集 机器学习/深度学习 人工智能
YOLOv11浅浅解析:架构创新
YOLOv11是YOLO系列最新升级版,通过C3k2模块、SPPF优化和解耦检测头等创新,显著提升检测精度与速度,mAP提高2-5%,推理更快,支持多平台部署,适用于工业、安防、自动驾驶等场景。
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
34_GPT系列:从1到5的架构升级_深度解析
大型语言模型(LLM)的发展历程中,OpenAI的GPT系列无疑扮演着至关重要的角色。自2018年GPT-1问世以来,每一代GPT模型都在架构设计、预训练策略和性能表现上实现了质的飞跃。本专题将深入剖析GPT系列从1.17亿参数到能够处理百万级token上下文的技术演进,特别关注2025年8月8日发布的GPT-5如何引领大模型技术迈向通用人工智能(AGI)的重要一步。
|
3月前
|
机器学习/深度学习 人工智能 搜索推荐
从零构建短视频推荐系统:双塔算法架构解析与代码实现
短视频推荐看似“读心”,实则依赖双塔推荐系统:用户塔与物品塔分别将行为与内容编码为向量,通过相似度匹配实现精准推送。本文解析其架构原理、技术实现与工程挑战,揭秘抖音等平台如何用AI抓住你的注意力。
801 7
从零构建短视频推荐系统:双塔算法架构解析与代码实现
|
2月前
|
存储 监控 安全
132_API部署:FastAPI与现代安全架构深度解析与LLM服务化最佳实践
在大语言模型(LLM)部署的最后一公里,API接口的设计与安全性直接决定了模型服务的可用性、稳定性与用户信任度。随着2025年LLM应用的爆炸式增长,如何构建高性能、高安全性的REST API成为开发者面临的核心挑战。FastAPI作为Python生态中最受青睐的Web框架之一,凭借其卓越的性能、强大的类型安全支持和完善的文档生成能力,已成为LLM服务化部署的首选方案。
|
3月前
|
存储 监控 NoSQL
Redis高可用架构全解析:从主从复制到集群方案
Redis高可用确保服务持续稳定,避免单点故障导致数据丢失或业务中断。通过主从复制实现数据冗余,哨兵模式支持自动故障转移,Cluster集群则提供分布式数据分片与水平扩展,三者层层递进,保障读写分离、容灾切换与大规模数据存储,构建高性能、高可靠的Redis架构体系。
|
2月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。

热门文章

最新文章

推荐镜像

更多
  • DNS