.Net Core/.net 6/.Net 8 实现Mqtt客户端
- 客户端代码
- 调用
直接上代码
nuget引用
MQTTnet
客户端代码
using MQTTnet;using MQTTnet.Client;using MQTTnet.Packets;using System.Text;namespace Code.Mqtt{/// /// Mqtt客户端/// public class MqttClientBase{/// /// 客户端/// public IMqttClient client;/// /// 订阅主题列表/// public List<string> Topics=new List<string>();public MqttClientOptions options;public MqttClientBaseOptions _opt;/// /// 主动断开连接/// public bool off = false;public bool isconn = false;/// /// 创建mqtt客户端,并值接传入初始参数/// /// public MqttClientBase(MqttClientBaseOptions opt){this._opt = opt;//创建客户端client = new MqttFactory().CreateMqttClient();options =new MqttClientOptions() { ClientId=_opt.clientId,ChannelOptions=new MqttClientTcpOptions(){Server=_opt.server,Port=_opt.port,},Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)),//清理会话CleanSession=false,//设置心跳KeepAlivePeriod = TimeSpan.FromSeconds(30)};}/// /// 创建mqtt客户端,不传参数,/// 必须在调用 Connect之前调用过SetOption方法/// public MqttClientBase(){//创建客户端client = new MqttFactory().CreateMqttClient();}/// /// 设置参数/// /// public void SetOption(MqttClientBaseOptions opt){options = new MqttClientOptions(){ClientId = _opt.clientId,ChannelOptions = new MqttClientTcpOptions(){Server = _opt.server,Port = _opt.port,},Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)),//清理会话CleanSession = false,//设置心跳KeepAlivePeriod = TimeSpan.FromSeconds(30)};}/// /// 连接服务器/// /// 连接成功后执行/// 连接成功事件public void Connect(Action<MqttClientConnectedEventArgs> ConnectedAsync=null){client.ConnectAsync(options);if(ConnectedAsync != null){//连接成功事件client.ConnectedAsync += (args) =>{ConnectedAsync(args);return Task.CompletedTask;};}}/// /// 重连服务器/// 在连接断开事件中调用,即可实现无限轮询/// /// 是否重复尝试重连/// 尝试次数public void ReConnect(){try{client.ConnectAsync(options).Wait();}catch (Exception ex){Console.WriteLine(ex.Message);}}public async Task AddTopic(string topic){//更新订阅client.SubscribeAsync(new MqttClientSubscribeOptions(){TopicFilters = new List<MqttTopicFilter>() {new MqttTopicFilter { Topic = topic }}});//将主题名称加入列表Topics.Add(topic);}/// /// 取消订阅/// /// /// public async Task DeleteTopic(string topic){client.UnsubscribeAsync(new MqttClientUnsubscribeOptions(){TopicFilters = new List<string> { topic }});Topics.Remove(topic);}/// /// 发布消息/// /// 主题/// 内容/// public async Task Publish(string topic, string content){if(client.IsConnected){client.PublishAsync(new MqttApplicationMessage(){Topic = topic,Payload = Encoding.UTF8.GetBytes(content)});}}/// /// 主动断开连接/// public void Disconnect(){off = true;client.DisconnectAsync();}/// /// 断开连接事件/// /// /// public async Task DisconnectedAsync(Action<MqttClientDisconnectedEventArgs> action){client.DisconnectedAsync += (args) => {action(args);return Task.CompletedTask;};}/// /// 接收消息事件/// /// /// public async Task Message(Action<string,string> action) {client.ApplicationMessageReceivedAsync += (args) =>{var topic = args.ApplicationMessage.Topic;var msg = args.ApplicationMessage.Payload.BToString();action(topic, msg);return Task.CompletedTask;};}}}
调用
我这里是控制台项目
//初始化var mqtt = new MqttClientBase(new MqttClientBaseOptions() { clientId="client-1",username="username",password="password",server="127.0.0.1",port=10883});//断开连接事件mqtt.DisconnectedAsync((e) => {Console.WriteLine("连接断开");//重连服务器mqtt.ReConnect();});//连接服务器mqtt.Connect((args) => {/* 连接成功事件 */Console.WriteLine("连接成功");// 添加主题订阅,建议写到 连接成功事件 里面,这样重连后可以重新订阅主题mqtt.AddTopic("topic-1").Wait();mqtt.AddTopic("topic-2").Wait();mqtt.AddTopic("topic-3").Wait();// 取消主题订阅mqtt.DeleteTopic("topic-3").Wait();// 向指定主题推送消息mqtt.Publish("topic-1", "666666666").Wait();});// 收到来自服务器的消息 topic:主题msg:消息内容mqtt.Message((topic,msg) => { Console.WriteLine($"收到消息:{topic}:{msg}");});// 这里暂停三秒,看三秒后主动断开连接效果// Task.Delay(3000).Wait();// 主动断开连接//mqtt.Disconnect();while (true){// 向指定主题推送消息mqtt.Publish("topic-1", Console.ReadLine());}