序言
在技术领域,不断学习和探索是保持竞争力的关键。最近,我重新开始了对RabbitMQ的研究,这个过程让我又收获了许多新的知识和技能,我觉得有必要记录下来,以便将来回顾和分享。
问题引出
RabbitMQ是一款开源的消息队列中间件,它提供了高效、可靠、灵活的通信机制,广泛应用于分布式系统中。然而,有时候在使用RabbitMQ时会遇到连接断开的问题,这可能会导致消息传递中断和应用程序的不可用性。
问题分析
当使用RabbitMQ时,可能会遇到以下几种情况导致连接断开的问题:
1.网络问题:网络中断、防火墙设置等问题可能导致RabbitMQ连接断开。
2.长时间空闲:如果连接在一段时间内没有进行任何通信,RabbitMQ可能会自动关闭连接。
3.RabbitMQ服务器问题:RabbitMQ服务器可能会因为负载过高或其他原因主动关闭连接。
解决方案
虽然RabbitMQ.Client库,有心跳机制,有断线重连机制,但是在网络断开的时候,并不能重连。
下面这段代码经过本人验证有效,可以解决上面的问题。
using RabbitMQ.Client.Events;using RabbitMQ.Client;using System.Text;using RabbitMQDemo.Shared;using System.Collections.Concurrent;using RabbitMQ.Client.Exceptions;namespace RabbitMQConsumerDemo{public class RabbitMQRpcClientHandler{/// /// 定义一个静态变量来保存类的实列/// private static RabbitMQRpcClientHandler? _uniqueInstance;/// /// 定义一个标识确保线程同步/// private static readonly object _locker = new();/// /// Main entry point to the RabbitMQ .NET AMQP client API. Constructs RabbitMQ.Client.IConnection instances./// private static IConnectionFactory? _factory;/// /// Main interface to an AMQP connection./// private IConnection? _connection;/// /// 发送通道(发送通道及接收通道分开,避免互相影响,导致整个服务不可用)/// private IModel? _sendChannel;/// /// 接收通道(发送通道及接收通道分开,避免互相影响,导致整个服务不可用)/// private IModel? _listenChannel;/// /// 监听消费者/// private EventingBasicConsumer? _listenConsumer;/// /// 响应消费者/// private EventingBasicConsumer? _replyConsumer;/// /// RabbitMQ 主机域名/// private readonly string _defaultRabbitMQHostName = "127.0.0.1";/// /// RabbitMQ 服务器端口, 默认 5672, 网页监控页面是 15672/// private readonly int _defaultRabbitMQPort = 5672;/// /// RabbitMQ 用户名, 默认 guest/// private readonly string _defaultRabbitMQUserName = "guest";/// /// RabbitMQ 密码, 默认 guest/// private readonly string _defaultRabbitMQPassword = "guest!";/// /// 虚拟主机/// private readonly string _defaultRabbitMQVirtualHost = "/";/// /// 交换机/// private readonly string _exchangeName = "";/// /// 数据监控队列/// private readonly string _listenQueueName = "queue.listen.test";/// /// 指令响应队列/// private string _replyQueueName = string.Empty;/// /// 注册-路由键/// private readonly string _routingKeyRegister = "queue.register";/// /// 心跳-路由键/// private readonly string _routingKeyHeart = "queue.heart";/// /// 取消信号/// private readonly CancellationTokenSource _cts = new();/// /// 回调函数映射器/// private readonly ConcurrentDictionary<string, TaskCompletionSource> _callbackMapper = new();private bool _connectionState;private bool _sendChannelState;private bool _listenChannelState;/// /// 连接状态/// public bool ConnectionState{get { return _connectionState; }set {if (_connectionState == value){return;}_connectionState = value;if (_connectionState){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接已打开");}else{Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接已关闭");}}}/// /// 发送通道状态/// public bool SendChannelState{get { return _sendChannelState; }set {if (_sendChannelState == value){return;}_sendChannelState = value;if (_sendChannelState){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道已打开");}else{Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道已关闭");}}}/// /// 接收通道状态/// public bool ListenChannelState{get { return _listenChannelState; }set {if (_listenChannelState == value){return;}_listenChannelState = value;if (_listenChannelState){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 接收通道已打开");}else{Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 接收通道已关闭");}}}/// /// 定义私有构造函数,使外界不能创建该类实例 /// private RabbitMQRpcClientHandler(){// 创建连接工厂_factory = new ConnectionFactory(){HostName = _defaultRabbitMQHostName,//MQ服务器地址Port = _defaultRabbitMQPort,//MQ服务端口号UserName = _defaultRabbitMQUserName,//账号Password = _defaultRabbitMQPassword,//密码VirtualHost = _defaultRabbitMQVirtualHost,RequestedHeartbeat = TimeSpan.FromSeconds(2),AutomaticRecoveryEnabled = true,//自动重连TopologyRecoveryEnabled = true,//拓扑重连NetworkRecoveryInterval = TimeSpan.FromSeconds(10)};}/// /// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点/// /// public static RabbitMQRpcClientHandler GetInstance(){/* ********************* * 当第一个线程运行到这里时,此时会对_locker对象“加锁” * 当第二个线程运行该方法时,首先检测到_locker对象为“加锁”状态,该线程就会挂起等待第一个线程解锁 * lock语句运行完之后(即线程运行完之后)会对该对象“解锁” * 双重锁定只需要一句判断就可以了 * *********************/if (_uniqueInstance == null){lock (_locker){// 如果类的实例不存在则创建,否则直接返回_uniqueInstance ??= new RabbitMQRpcClientHandler();}}return _uniqueInstance;}/// /// 异步调用/// /// /// /// /// public Task CallAsync(string message, EnumMsgType msgType = EnumMsgType.Register, CancellationToken cancellationToken = default){if (_connection?.IsOpen != true){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(生产者)");TaskCompletionSource taskCompletionSource = new();taskCompletionSource.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity(){Successed = false,Message = "连接为空或已经关闭"}, Newtonsoft.Json.Formatting.None));return taskCompletionSource.Task;}if (_sendChannel?.IsOpen != true){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道为空或已经关闭(生产者)");TaskCompletionSource taskCompletionSource = new();taskCompletionSource.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity(){Successed = false,Message = "发送通道为空或已经关闭"}, Newtonsoft.Json.Formatting.None));return taskCompletionSource.Task;}// 设置消息ID、类型、非持久性等IBasicProperties props = _sendChannel.CreateBasicProperties();var correlationId = Guid.NewGuid().ToString();props.CorrelationId = correlationId;props.ReplyTo = _replyQueueName;props.ContentType = "application/json";props.DeliveryMode = 1;//非持久性var messageBytes = Encoding.UTF8.GetBytes(message);var tcs = new TaskCompletionSource();_callbackMapper.TryAdd(correlationId, tcs);switch (msgType){case EnumMsgType.Register:{/* ********************* * 作用:向默认交换机的指定队列中发送注册消息 * 说明:生产者 * 参数: * 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"") * 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、basicProperties:消息的属性。 * 4、body:发送消息的内容 * *********************/_sendChannel.BasicPublish(exchange: string.Empty, routingKey: _routingKeyRegister, basicProperties: props, body: messageBytes);}break;default:{/* ********************* * 作用:向默认交换机的指定队列中发送心跳消息 * 说明:生产者 * 参数: * 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"") * 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、basicProperties:消息的属性。 * 4、body:发送消息的内容 * *********************/_sendChannel.BasicPublish(exchange: string.Empty, routingKey: _routingKeyHeart, basicProperties: props, body: messageBytes);}break;}// 通知任务已经取消,处理取消后的回调操作cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out _));// 请求超时检测if (tcs.Task.Wait(TimeSpan.FromSeconds(10)) == false){_callbackMapper.TryRemove(correlationId, out _);tcs.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity(){Successed = false,Message = $"{(msgType == EnumMsgType.Register ? "注册" : "心跳")}请求超时"}, Newtonsoft.Json.Formatting.None));}return tcs.Task;}/// /// 开始/// public void Start(){new Thread(new ThreadStart(Checking)){IsBackground = true}.Start();Reconnect();}/// /// 停止/// public void Stop(){Cleanup();}/// /// 取消状态监测/// public void CancelChecking(){_cts.Cancel();}/// /// 状态监测/// private void Checking(){while (_cts.IsCancellationRequested == false){Thread.Sleep(1000);ConnectionState = _connection?.IsOpen == true;if (_connection?.IsOpen != true){SendChannelState = false;ListenChannelState = false;continue;}SendChannelState = _sendChannel?.IsOpen == true;ListenChannelState = _listenChannel?.IsOpen == true;}}/// /// 连接/// private void Connect(){if (_factory == null){return;}// 创建连接_connection = _factory.CreateConnection();_connection.ConnectionShutdown += Connection_ConnectionShutdown;// 创建发送通道_sendChannel = _connection.CreateModel();// 创建接收通道_listenChannel = _connection.CreateModel();Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 尝试连接至RabbitMQ服务器[{_defaultRabbitMQHostName}]");// 监控消息ListenMessageConsume();// 响应消息ReplyMessageConsume();}/// /// 重连/// private void Reconnect(){Cleanup();// state is initially falsevar mres = new ManualResetEventSlim(false);// loop until state is true, checking every 3swhile (!mres.Wait(3 * 1000)){try{// 连接RabbitMQ服务器Connect();// state set to true - breaks out of loopmres.Set();}catch (Exception ex){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 尝试连接RabbitMQ服务器出现错误【{ex.Message}】");}}}/// /// 清理/// private void Cleanup(){if (_replyConsumer != null){if (_replyConsumer.IsRunning){try{_replyConsumer.OnCancel();}catch (Exception ex){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的响应消费者,不再执行任何操作,但遇到错误【{ex.Message}】");}}_replyConsumer.Received -= ReplyConsumer_Received;_replyConsumer.Registered -= ReplyConsumer_Registered;_replyConsumer.Shutdown -= ReplyConsumer_Shutdown;_replyConsumer.Unregistered -= ReplyConsumer_Unregistered;_replyConsumer = null;}if (_listenConsumer != null){if (_listenConsumer.IsRunning){try{_listenConsumer.OnCancel();}catch (Exception ex){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的监听消费者,不再执行任何操作,但遇到错误【{ex.Message}】");}}_listenConsumer.Received -= ListenConsumer_Received;_listenConsumer.Registered -= ListenConsumer_Registered;_listenConsumer.Shutdown -= ListenConsumer_Shutdown;_listenConsumer.Unregistered -= ListenConsumer_Unregistered;_listenConsumer = null;}if (_sendChannel != null){if (_sendChannel.IsOpen){try{_sendChannel.Close();}catch (Exception ex){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的发送通道,但遇到错误【{ex.Message}】");}}_sendChannel.Dispose();_sendChannel = null;}if (_listenChannel != null){if (_listenChannel.IsOpen){try{_listenChannel.Close();}catch (Exception ex){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的接收通道,但遇到错误【{ex.Message}】");}}_listenChannel.Dispose();_listenChannel = null;}if (_connection != null){_connection.ConnectionShutdown -= Connection_ConnectionShutdown;if (_connection.IsOpen){try{_connection.Close();}catch (Exception ex){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误【{ex.Message}】");}}_connection.Dispose();_connection = null;}}/// /// 响应消息消费/// private void ReplyMessageConsume(){try{if (_connection?.IsOpen != true){throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(响应消费者)");}if (_sendChannel?.IsOpen != true){throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 发送通道为空或已经关闭(响应消费者)");}// 声明一个服务器命名的队列_replyQueueName = _sendChannel.QueueDeclare().QueueName;/* ********************* * 作用:定义消息分发机制 * 说明:Qos可以设置消费者一次接收消息的最大条数,能够解决消息拥堵时造成的消费者内存爆满问题。 *Qos也比较适用于耗时任务队列,当任务队列中的任务很多时,使用Qos后我们可以随时添加新的消费者来提高任务的处理效率。 *prefetchCount=1,RabbitMQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的。 * 参数: * 1、prefetchSize:是可接收消息的大小。但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况 * 2、prefetchCount:处理消息最大的数量。即在下一次发送应答消息前,客户端可以收到的消息最大数量 * 3、global:则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的 * *********************/_sendChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);/* ********************* * 作用:创建基于该队列的消费者 * 优点: * 1、基于长连接 * 2、消费方式为发布订阅模式 * 3、节省资源且实时性好 * *********************/_replyConsumer = new EventingBasicConsumer(_sendChannel);_replyConsumer.Received += ReplyConsumer_Received;_replyConsumer.Registered += ReplyConsumer_Registered;_replyConsumer.Shutdown += ReplyConsumer_Shutdown;_replyConsumer.Unregistered += ReplyConsumer_Unregistered;Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 消费者准备完毕(响应消费者)");/* ********************* * 作用:监听队列(绑定消费者) * 说明:消费者 * 参数: * 1、queue:队列名称 * 2、autoAck:自动回复,当消费者接收到消息后要告诉RabbitMQ消息已接收,如果将此参数设置为true表示会自动回复RabbitMQ,如果设置为false要通过编程实现回复 * 3、consumer:消费方法,当消费者接收到消息要执行的方法 * *********************/_sendChannel.BasicConsume(queue: _replyQueueName, autoAck: false, consumer: _replyConsumer);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 开始监控RabbitMQ服务器,队列{_replyQueueName}(响应消费者)");}catch (AggregateException aex){// 错误信息去重var errorList = (from error in aex.InnerExceptions select error.Message).Distinct().ToList();// 打印所有错误信息foreach (var error in errorList){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 总异常之【{error}】(响应消费者)");}}catch (Exception ex){throw ex;}}/// /// 监听消息消费/// private void ListenMessageConsume(){try{if (_connection?.IsOpen != true){throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(监听消费者)");}if (_listenChannel?.IsOpen != true){throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 接收通道为空或已经关闭(监听消费者)");}/* ********************* * 作用:声明(创建)队列--RabbitMQ持久化机制(队列持久化) * 说明:生产者、消费者都有 * 参数: * 1、queue:队列名称。 * 2、durable:是否持久化。true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息 * 3、exclusive:是否独占队列。队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete:是否自动删除。队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments:其他参数。可以设置一个队列的扩展参数,比如:可设置存活时间 * *********************/_listenChannel.QueueDeclare(queue: _listenQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);/* ********************* * 作用:定义消息分发机制 * 说明:Qos可以设置消费者一次接收消息的最大条数,能够解决消息拥堵时造成的消费者内存爆满问题。 *Qos也比较适用于耗时任务队列,当任务队列中的任务很多时,使用Qos后我们可以随时添加新的消费者来提高任务的处理效率。 *prefetchCount=1,RabbitMQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的。 * 参数: * 1、prefetchSize:是可接收消息的大小。但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况 * 2、prefetchCount:处理消息最大的数量。即在下一次发送应答消息前,客户端可以收到的消息最大数量 * 3、global:则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的 * *********************/_listenChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);/* ********************* * 作用:创建基于该队列的消费者 * 优点: * 1、基于长连接 * 2、消费方式为发布订阅模式 * 3、节省资源且实时性好 * *********************/_listenConsumer = new EventingBasicConsumer(_listenChannel);_listenConsumer.Received += ListenConsumer_Received;_listenConsumer.Registered += ListenConsumer_Registered;_listenConsumer.Shutdown += ListenConsumer_Shutdown;_listenConsumer.Unregistered += ListenConsumer_Unregistered;Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 消费者准备完毕(监听消费者)");/* ********************* * 作用:监听队列(绑定消费者) * 说明:消费者 * 参数: * 1、queue:队列名称 * 2、autoAck:自动回复,当消费者接收到消息后要告诉RabbitMQ消息已接收,如果将此参数设置为true表示会自动回复RabbitMQ,如果设置为false要通过编程实现回复 * 3、consumer:消费方法,当消费者接收到消息要执行的方法 * *********************/_listenChannel.BasicConsume(queue: _listenQueueName, autoAck: false, consumer: _listenConsumer);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 开始监控RabbitMQ服务器,队列{_listenQueueName}(监听消费者)");}catch (AggregateException aex){// 错误信息去重var errorList = (from error in aex.InnerExceptions select error.Message).Distinct().ToList();// 打印所有错误信息foreach (var error in errorList){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 总异常之【{error}】(监听消费者)");}}catch (Exception ex){throw ex;}}/// /// 绑定事件:消息监控(响应消费者)/// /// /// private void ReplyConsumer_Received(object? sender, BasicDeliverEventArgs e){if (_sendChannel == null){return;}/* ********************* * 作用:手动签收消息 * 说明:消费者 * 参数: * 1、deliveryTag:消息投递标签 * 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息 * *********************/_sendChannel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);if (!_callbackMapper.TryRemove(e.BasicProperties.CorrelationId, out var tcs)){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 响应消费者收到新消息【{Encoding.UTF8.GetString(e.Body.ToArray())}】");return;}var body = e.Body.ToArray();var response = Encoding.UTF8.GetString(body);tcs.TrySetResult(response);}/// /// 绑定事件:订阅成功(响应消费者)/// /// /// private void ReplyConsumer_Registered(object? sender, ConsumerEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经注册上队列(响应消费者)");}/// /// 绑定事件:通道被关闭(响应消费者)/// /// /// private void ReplyConsumer_Shutdown(object? sender, ShutdownEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 队列已经关闭(响应消费者)");}/// /// 绑定事件:取消订阅(响应消费者)/// /// /// private void ReplyConsumer_Unregistered(object? sender, ConsumerEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经取消注册队列(响应消费者)");}/// /// 绑定事件:消息监控(监听消费者)/// /// /// private void ListenConsumer_Received(object? sender, BasicDeliverEventArgs e){if (_listenChannel == null){return;}try{var body = e.Body.ToArray();var message = Encoding.UTF8.GetString(body);var props = e.BasicProperties;if (props != null && string.IsNullOrWhiteSpace(props.CorrelationId) == false && string.IsNullOrWhiteSpace(props.ReplyTo) == false){var replyProps = _listenChannel.CreateBasicProperties();replyProps.CorrelationId = props.CorrelationId;replyProps.Persistent = true;var responseBytes = Encoding.UTF8.GetBytes(message);/* ********************* * 作用:向指定的队列中发送消息--RabbitMQ持久化机制(消息持久化) * 说明:生产者 * 参数: * 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"") * 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、basicProperties:消息的属性。 * 4、body:发送消息的内容 * *********************/_listenChannel.BasicPublish(exchange: _exchangeName, routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 签收响应【{message}】(监听消费者)");}// TODO: 正常处理回复BasicAck,未正常处理回复BasicReject/* ********************* * 作用:手动签收消息 * 说明:消费者 * 参数: * 1、deliveryTag:消息投递标签 * 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息 * *********************/_listenChannel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);// 未正常处理的消息,重新放回队列//_listenChannel.BasicReject(e.DeliveryTag, true);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动签收【{message}】(监听消费者)");}catch (OperationInterruptedException oiex){/* ********************* * 作用:手动拒绝签收,返回消息到Broke * 说明:消费者,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息 * 参数: * 1、deliveryTag:当前消息的投递标签 * 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息 * 3、requeue:是否重回队列。设置为true,重回队列,设置为false,不重回 * *********************/_listenChannel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动拒绝签收【{oiex.Message}】,返回消息到Broke(监听消费者)");}catch (Exception ex){/* ********************* * 作用:手动拒绝签收,返回消息到Broke * 说明:消费者,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息 * 参数: * 1、deliveryTag:当前消息的投递标签 * 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息 * 3、requeue:是否重回队列。设置为true,重回队列,设置为false,不重回 * *********************/_listenChannel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动拒绝签收【{ex.Message}】,返回消息到Broke(监听消费者)");}}/// /// 绑定事件:订阅成功(监听消费者)/// /// /// private void ListenConsumer_Registered(object? sender, ConsumerEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经注册上队列(监听消费者)");}/// /// 绑定事件:通道被关闭(监听消费者)/// /// /// private void ListenConsumer_Shutdown(object? sender, ShutdownEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 队列已经关闭(监听消费者)");}/// /// 绑定事件:取消订阅(监听消费者)/// /// /// private void ListenConsumer_Unregistered(object? sender, ConsumerEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经取消注册队列(监听消费者)");}/// /// 绑定事件:断开连接时,调用方法自动重连/// /// /// private void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e){Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经断开连接,正在尝试重新连接至RabbitMQ服务器");Reconnect();}}}
方法调用如下:
using RabbitMQConsumerDemo;RabbitMQRpcClientHandler.GetInstance().Start();while (true){var readLine = Console.ReadLine();if (string.IsNullOrWhiteSpace(readLine)){}else if (readLine.Equals("exit", StringComparison.OrdinalIgnoreCase)){break;}else if (readLine.StartsWith("register=", StringComparison.OrdinalIgnoreCase)){var response = await RabbitMQRpcClientHandler.GetInstance().CallAsync(readLine, RabbitMQDemo.Shared.EnumMsgType.Register);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 注册指令响应【{response}】");}else if (readLine.StartsWith("heart=", StringComparison.OrdinalIgnoreCase)){var response = await RabbitMQRpcClientHandler.GetInstance().CallAsync(readLine, RabbitMQDemo.Shared.EnumMsgType.Heart);Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 心跳指令响应【{response}】");}else{}}RabbitMQRpcClientHandler.GetInstance().Stop();Thread.Sleep(2 * 1000);// 退出程序int count = 8;while (true){if (count == 0){Console.WriteLine($"Exit in {count} seconds");Thread.Sleep(1000);break;}else if (count == 5){RabbitMQRpcClientHandler.GetInstance().CancelChecking();//RabbitMQClientHandler.GetInstance().CancelChecking();}Console.WriteLine($"Exit in {count} seconds");Thread.Sleep(1000);count--;}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END