当前位置: 首页 > news >正文

9.3DDD之集成事件

9.3DDD之集成事件

和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ中间件来完成集成事件的处理。

  1. RabbitMQ的基本概念:
  • 信道(channel),信道是消息的生产者,消费者和服务器之间进行通信的虚拟连接。
  • 队列,队列是用来进行消息收发的地方,生产者将消息放到队列中,消费者从队列中获取消息。
  • 交换机,交换机用于把消息路由到队列中。
  1. RabbitMQ的routing模式:
  • 生产者把消息发布到交换机中,消息会携带routingKey属性,交互机根据routingKey的值吧消息发送到一个或者多个队列,然后消费者从队列中获取消息。这种模式的优点是交换机和队列都位于RabbitMQ服务器的内部,即使消费者不在线,相关消息也会保存在队列中,等消费者上线后就可以获取到消息了。

使用步骤

Nuget安装RabbitMQ.Client

  • 发送消息端
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;//兼容消费者异步使用
string exchangeName = "exchange1";//交换机的名字
string eventName = "myEvent";// routingKey的值
using var conn = factory.CreateConnection();//创建一个客户端到RabbitMQ的TCP连接,尽量重复使用
while (true)
{
    string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
    using (var channel = conn.CreateModel())//创建信道,信道可以关闭,关闭后消息才会发出
    {
        var properties = channel.CreateBasicProperties;//创建一个空的内容头
        properties.DeliveryMode = 2;//1为非持久2为持久
        //声明指定名字的交换机,如果已有同名的交换机则不再创建
        //type:direct表示交换机会根据消息routingKey的值进行相等性匹配,消息会发布到和它的routingKey绑定的队列
        channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
        byte[] body = Encoding.UTF8.GetBytes(msg);//RabbitMQ的消息只能按照byte[]类型传递
        channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
            mandatory: true, basicProperties: properties, body: body);//发布消息        
    }
    Console.WriteLine("发布了消息:" + msg);
    Thread.Sleep(1000);
}
  • 接收消息端:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";
string eventName = "myEvent";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
string queueName = "queue1";
//声明和发送端相同的交换机,如果发送端已经声明了相同的,则这天语句会被忽略
//但是仍然要写,因为不确定是哪端先启动
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明一个队列来接收交换机转发来的消息,如果已经指定了同名的队列,则自动忽略
//消息从队列中取走后则队列中就没有这个消息了
//如果A、B两程序都想取同一条消息,则需要声明两个不同名字的队列
channel.QueueDeclare(queue: queueName, durable: true,
        exclusive: false, autoDelete: false, arguments: null);
//将队列绑定到交换机上,并设定routingKey参数,这样当交换机收到routingKey的值和设定的值相同时
//会把消息转发到我们指定的队列,一个交换机可绑定多个队列,如果这些队列的routingKey的值相同
//那么交换机收到同一个routingKey的时候,会发送给多个队列
channel.QueueBind(queue: queueName,
    exchange: exchangeName, routingKey: eventName);
//AsyncEventingBasicConsumer用于从队列中接收消息,当一天消息被接收时,Received事件就会被触发
var consumer = new AsyncEventingBasicConsumer(channel);
//Received是阻塞执行的,也就是一条回调方法执行完成后才会触发下一条Received事件
consumer.Received += Consumer_Received;//增加处理事件

channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);//执行
Console.ReadLine();


async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    //RabbitMQ支持消息的失败重发
    try
    {
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now + "收到了消息" + msg);
        //如果消息处理成功,则调用BasicAck通知队列
        //如果消息没有处理成功,则抵用BasicReject通知队列
        channel.BasicAck(args.DeliveryTag, multiple: false);
        await Task.Delay(800);
    }
    catch (Exception ex)
    {
        channel.BasicReject(args.DeliveryTag, true);
        Console.WriteLine("处理收到的消息出错" + ex);
    }
}

简化框架

  1. Nuget安装Zack.EventBus
  2. 在配置系统下创建EventBus节点
"EventBus": {
    "HostName": "127.0.01",//RabbitMQ服务器地址
    "ExchangeName": "EventBusDemo1"//交换机的名字
  }
  1. 在program.cs中进行配置
  • 发送端
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
//第一个参数用来设定程序绑定的队列的名字,一般一个微服务使用一个名字
//但是同一个微服务项目的每个集群实例都要收到消息则不能使用一个名字
//第二个参数为含有监听继承事件的处理者代码的程序集
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
  • 接收端
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
  1. 在需要发布事件的类中注入IEventBus服务,调用Publish方法
[Route("api/[controller]")]
[ApiController]
public class DemoController : ControllerBase
{
    private IEventBus eventBus;

    public DemoController(IEventBus eventBus)
    {
        this.eventBus = eventBus;
    }

    [HttpPost]
    public string Publish()
    {
        eventBus.Publish("UserAdded", new { UserName = "yzk", Age = 18 });
        return "ok";
    }
}
  1. 编写事件处理者
  • 实现IIntegrationEventHandler接口
[EventName("UserAdded")] //设定监听的事件名称,和publish中的名称一致,可以增加多个[EventName]来监听多个事件
public class UserAddesEventHandler : IIntegrationEventHandler
{
	private readonly ILogger<UserAddesEventHandler> logger;
	public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger)
	{
		this.logger = logger;
	}
    //当收到一个事件后,Handle方法就会被调用,第一个参数为事件的名字,第二个是publish设置的数据,事件数据是以JSON格式传入
	public Task Handle(string eventName, string eventData)
	{
		logger.LogInformation("新建了用户:" + eventData);
		return Task.CompletedTask;
	}
}
  • 事件数据是以JSON格式传入,可以使用JsonIntegrationEventHandler<T>接口来解析成.net对象
public record UserData(string UserName, int Age);

[EventName("UserAdded")]
public class UserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>
{
	private readonly ILogger<UserAddesEventHandler3> logger;
	public UserAddesEventHandler3(ILogger<UserAddesEventHandler3> logger)
	{
		this.logger = logger;
	}
	public override Task HandleJson(string eventName, UserData eventData)
	{
		logger.LogInformation($"Json:{eventData.UserName}");
		return Task.CompletedTask;
	}
}
  • 进行微服务开发时,为了降低耦合,一般不会新建一个UserData类供多个微服务使用。则可以使用DynamicIntegrationEventHandler接口来将JSON解析为dynamic类型
[EventName("UserAdded")]
public class UserAddesEventHandler2 : DynamicIntegrationEventHandler
{
    private readonly ILogger<UserAddesEventHandler2> logger;
    public UserAddesEventHandler2(ILogger<UserAddesEventHandler2> logger)
    {
        this.logger = logger;
    }
    public override Task HandleDynamic(string eventName, dynamic eventData)
    {
        logger.LogInformation($"Dynamic:{eventData.UserName}");
        return Task.CompletedTask;
    }
}

相关文章:

  • 抖音获取商品原数据API接口展示
  • C++ 删除链表的倒数第N个结点
  • Pinia的使用
  • java — 认识String类的常用方法(上)
  • 高速专线不打烊 DPDK Hotplug助你实现设备动态管理
  • 【数据结构 二叉树 递归与非递归遍历】
  • 微信公众号消息推送教程
  • MiddleWare ❀ Zookeeper基础概述
  • 多任务学习算法在推荐系统中的应用
  • webpack5入门教程
  • Python解决多个服务线程/进程重复运行定时任务的问题
  • webpack学习笔记
  • 01人机交互/打开CMD/常见CMD命令/CMD打开QQ并设置环境变量
  • QT汽车客运公司售票系统(改良版)
  • 初始Cpp之 六、内存分配
  • 【翻译】babel对TC39装饰器草案的实现
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • DOM的那些事
  • IDEA常用插件整理
  • javascript数组去重/查找/插入/删除
  • JavaScript学习总结——原型
  • Redis中的lru算法实现
  • vue:响应原理
  • 推荐一个React的管理后台框架
  • 学习Vue.js的五个小例子
  • 小白应该如何快速入门阿里云服务器,新手使用ECS的方法 ...
  • ​configparser --- 配置文件解析器​
  • (AngularJS)Angular 控制器之间通信初探
  • (PWM呼吸灯)合泰开发板HT66F2390-----点灯大师
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (多级缓存)多级缓存
  • (附源码)php新闻发布平台 毕业设计 141646
  • (学习日记)2024.04.04:UCOSIII第三十二节:计数信号量实验
  • (转)真正的中国天气api接口xml,json(求加精) ...
  • ******IT公司面试题汇总+优秀技术博客汇总
  • .bat批处理(十):从路径字符串中截取盘符、文件名、后缀名等信息
  • .naturalWidth 和naturalHeight属性,
  • .net php 通信,flash与asp/php/asp.net通信的方法
  • .NET Project Open Day(2011.11.13)
  • /dev下添加设备节点的方法步骤(通过device_create)
  • @GlobalLock注解作用与原理解析
  • @serverendpoint注解_SpringBoot 使用WebSocket打造在线聊天室(基于注解)
  • [ Linux ] Linux信号概述 信号的产生
  • [2013][note]通过石墨烯调谐用于开关、传感的动态可重构Fano超——
  • [383] 赎金信 js
  • [ASP.NET 控件实作 Day7] 设定工具箱的控件图标
  • [autojs]逍遥模拟器和vscode对接
  • [BT]BUUCTF刷题第9天(3.27)
  • [BZOJ1053][HAOI2007]反素数ant
  • [C#基础知识]专题十三:全面解析对象集合初始化器、匿名类型和隐式类型
  • [C++打怪升级]--学习总目录
  • [CC-FNCS]Chef and Churu
  • [DM复习]关联规则挖掘(下)
  • [ExtJS5学习笔记]第三十节 sencha extjs 5表格gridpanel分组汇总
  • [HTML API]HTMLCollection