前言
这里以开源组件DotnetCore.CAP为例,简单聊一下不同服务模块之间的消息通信,本篇为入门级别,欢迎大家指点或者飘过~
一点概念
事件总线(EventBus),通常作为多个模块间的通信机制,相当于一个事件管理中心,一个模块发送消息,其它模块接受消息,就达到了通信的作用。
本质应该还是消息队列里发布订阅的机制,当然,事件总线中最重要的消息传递,还是要依赖消息队列组件,比如 rabbitmq,kafka,甚至服务器内存等
为什么需要它
至于为什么需要事件总线这个话题,我个人觉得可以从两个角度来聊
第一个角度是,你所做的项目,本身是依赖了一些框架的能力,而框架提供的能力里,事件总线是作为基础设施来服务的,所以,你将不得不需要事件总线,来完成一些业务。比如 ABP,Masa Framework 等,这个就是硬性的,方便更好的展现框架能力,简化后续多个模块间通信的复杂程度。
第二个角度是,我们所做的项目,本身没有那么的前瞻,就是基于一些传统的项目,一点点累计堆到了现在,而当我们仍然要往上堆代码的时候,发现如果要新增或者修改某些功能的时候,变得非常困难,甚至还不如另起炉灶单独为某个功能新写一段程序,从而摆脱老项目的限制。这时,你应该会需要事件总线。这也是我今天想主要聊的一个角度。
接触并实际转入微服务架构
当不了解微服务的时候,我们面对这眼前复杂的项目发愁。这么一大摊子东西,怎么能说推到重来就推倒重来?
实际上,如果我们的心思稍微细腻一点,动手的欲望稍微强一点,就会发现,除非是极端情况,在大多数场景中,我们是不需要重写整个项目的。
微服务架构,非常核心的一个点就是多个服务模块各自提供能力,却又互补干涉内政,同时又彼此消息互通,配合默契。
那单就这一点,我们就可以对老项目进行拆解了。拆解后依赖事件总线的手段,就可以完成化整为零的效果了。调整完成后,一个项目,就被拆成了多个项目,起码结构上,我们已经开始转向微服务了,而至于其余的方面,在慢慢调整就好了。
一个小小的例子
我这里有一个实际的场景,整体的流程是,我们有一个教育类的视频网站,站点上由于很多名师的视频讲座,对应的,有一个管理系统来管理这些视频数据,当有新的课程需要发布的时候,需要到管理人员登录到后台发布信息,上传视频。
这是常规的流程。
有时候,我们会发布一系列课程,这个系列下又数十个甚至上百个小的视频文件需要上传,那这个时候如果还是在后台人工操作的话,工作量不可想象。
我的做法是,单独提供了一个上传的客户端,把需要上传的视频和数据资料按一定格式在本地整理好,然后执行上传程序,就可以了(放个效果图撑下场面,因为这篇口嗨的地方很多,图比较少)
那这个客户端主要实现的功能其实就是上传文件,但实际情况又是,上传完成后还需要完成数据的一些管理操作,比如更新课程信息等。
那如果我还想再客户端完成这些操作,就要不得不引入数据管理的一些依赖。这就会造成本来是专门用来提供上传服务的程序又集成了很多非必要的东西。
当然我也可以在服务端提供一个数据接口,等客户端上传的动作完成后,发送 http 请求,完成数据更新。但当我们的多个服务都是在服务器集群内部,发送网络请求并不划算,而且编写接口也需要在原有的项目中新增一些独立的东西,并不是最佳的解决方案。
这时候,就可以利用消息总线的方式,在服务端订阅客户端的行为,当发现上传事件完成后,立刻执行数据更新动作,高效,而且简单。
这里我就不把整个流程都贴出来了,只把这个消息传递的例子简单贴一下。
小例子的代码
引入中间件,这里提供事件总线能力的组件很多,我这里用的是 DotnetCore.CAP,还有一个MassTransit也非常不错,推荐一下
# 安装组件,在程序包管理控制台里,直接安装以下2个包,或者在Nuget里搜索安装,或者cli直接安装# 总之装上就得了,形式不重要PM> Install-Package DotNetCore.CAP.SqlServerPM> Install-Package DotNetCore.CAP.RabbitMQ
这两个包一个是存储用的,一个是传递用的,具体的功能就不多说了,大家可以取 DotnetCore.CAP 的官网去了解一下。
对了,多说一句,因为我这里是两个不同的服务间通信,所以不能用内存的形式来传递和存储消息事件,所以用的 RabbitMQ 和 Sqlserver。
再多说一句,RabbitMQ 虽然问世时间有点长,在消息队列行列,性能和社区活跃度不是最突出的,但是也都不差,而且,就稳定性来讲,什么 RocketMQ,Kafka,都是弟弟~~,所以中小规模的业务系统,可以无脑选 RabbitMQ(当然这也是个人意见啦~,对了,推两个裸机安装教程,1 是官网的,2 是国内有人写的,实际也是把官网的安装流程简化梳理了一下,如果是用开发环境,直接 docker 就好了)。
配置服务(生产者和消费者都要配置)
//我这里是新建了个文件单独配置CAP服务public static void ConfigureCap(this IServiceCollection services, IConfiguration configuration){ services.AddCap(x => { x.UseSqlServer(x => x.ConnectionString = configuration.GetConnectionString("MatchAIConnString")); x.UseRabbitMQ(opt => { opt.HostName = configuration.GetSection("RabbitMQ")["HostName"]; opt.UserName = configuration.GetSection("RabbitMQ")["UserName"]; opt.Password = configuration.GetSection("RabbitMQ")["Password"]; opt.VirtualHost = configuration.GetSection("RabbitMQ")["VirtualHost"]; opt.Port = Convert.ToInt32(configuration.GetSection("RabbitMQ")["Port"]); //指定Topic exchange名称,不指定的话会用默认的 opt.ExchangeName = configuration.GetSection("RabbitMQ")["ExchangeName"]; }); //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。 x.SucceedMessageExpiredAfter = 24 * 3600; //设置失败重试次数 x.FailedRetryCount = 5; });}//回到program或者startup里注入配置,我这利用的是.net 6的miniapi,所以直接就在program里配置了builder.Services.ConfigureCap(builder.Configuration);
生产者代码
public class AreaController: Controller{private readonly IResponseHelper _resp;private readonly ICapPublisher _capBus; ///
/// 构造函数 /// /// /// public AreaController(IResponseHelper resp, ICapPublisher capBus) { _resp = resp; _capBus = capBus; } public void PublishMsg(DateTime time) { _capBus.Publish("space.service.test", DateTime.Now); }}
解释一下,这段代码就是注入了 capbus 服务,然后再控制器里,直接 publish 消息即可。
消费者代码
app.MapGet("/cappublish", ([FromServices] ICapPublisher _capBus, IResponseHelper resp) =>{ _capBus.Publish("space.service.test", DateTime.Now); return resp.ok(DateTime.Now);});
到此,不同项目间消息的传递就完成了~
好了,就这样了,CAP 的官方文档里还有更多案例,大家可以去看一下。
文章同步发表于Info:构建微服务的基建——事件总线_RabbitMQ_为自己带盐_InfoQ写作社区