TS MQTT封装导入相关包

npm i mqttnpm i lodash
  • guid 随机生成就行,具体可以参考百度或者随便生成一个随机数*

代码封装

import mqtt from 'mqtt'import type { MqttClient, OnMessageCallback, IClientOptions, IClientPublishOptions, IPublishPacket } from 'mqtt'import { getGuid } from '@/common/basic'import { without, uniq } from 'lodash'export type TPublishFormat = {  topic: string  payload: string | Buffer  opts?: IClientPublishOptions}export type TMessageCallback = (topic: string, payload: T) => voidexport interface IMqClientOptions extends IClientOptions {  connectCb?: () => void  errorCb?: (e: Event) => void  reconnectCb?: () => void}export default class MQTT {  private _type: string  private _url: string  private _opt: IMqClientOptions // mqtt配置  public client!: MqttClient  public topicArr: Array = []  constructor(url: string, opt?: IMqClientOptions, type: string = 'Web') {    this._type = type    this._url = url    this._opt = {      clean: true,      clientId: this._type + '_' + getGuid(), // 客户端分类唯一      connectTimeout: 3000, // 超时时间      reconnectPeriod: 1000, //重连超时      ...(opt && opt),    }    this._init()  }  private _init() {    this.destroy()    this.client = mqtt.connect(this._url, this._opt)    this.client.on('connect', () => {      this._opt.connectCb && this._opt.connectCb()      console.log(this._url + '连接成功...')    })    this.client.on('error', (error: any) => {      this._opt.errorCb && this._opt.errorCb(error)      console.log(this._url + '异常中断...')    })    this.client.on('reconnect', () => {      this._opt.reconnectCb && this._opt.reconnectCb()      console.log(this._url + '重新连接...')    })  }  /**   * 函数“unSubscribe”是一个 TypeScript 函数,用于取消订阅一个或多个主题,并返回一个 Promise,该 Promise 解析为一个布尔值,指示取消订阅是否成功。   * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表客户端想要取消订阅的主题。   * @returns 正在返回 Promise。   */  public unSubscribe(topic: string | string[]) {    return new Promise((resolve: (isOk: boolean) => void) => {      this.client &&        !this.client.disconnected &&        this.client          .unsubscribeAsync(topic)          .then((result) => {            if (typeof topic === 'string') {              topic = [topic]            }            //去重            this.topicArr = without(this.topicArr, ...topic)            console.log(topic, this.topicArr, '取消订阅成功...')            resolve(true)          })          .catch((err) => {            console.log(topic, '取消订阅失败...')            resolve(false)          })    })  }  /**   * 函数“onSubscribe”是一个 TypeScript 函数,它订阅一个或多个主题并返回一个 Promise,该 Promise 解析为一个布尔值,指示订阅是否成功。   * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表您要订阅的主题。   * @returns 一个 Promise,解析为布尔值,指示订阅是否成功。   */  public onSubscribe(topic: string | string[]) {    if (typeof topic === 'string') {      topic = [topic]    }    const topicOk: Array = without(topic, ...this.topicArr)    return new Promise((resolve: (isOk: boolean) => void) => {      this.client &&        !this.client.disconnected &&        topicOk.length > 0 &&        this.client          .subscribeAsync(topic)          .then((result) => {            this.topicArr.push(...topicOk)            this.topicArr = uniq(this.topicArr)            console.log(topicOk, this.topicArr, '订阅成功...')            resolve(true)          })          .catch((err) => {            console.log(topicOk, '订阅失败...')            resolve(false)          })    })  }  /**   * 函数“onPublish”使用客户端向主题发布消息,并返回一个解析为布尔值的 Promise,指示发布是否成功。   * @param {TPublishFormat} format - format参数的类型为TPublishFormat,它是一个包含两个属性的对象:topic和message。 topic   * 属性表示消息将发布到的主题,message 属性表示将发布的实际消息。   * @returns 正在返回 Promise。   */  public onPublish(format: TPublishFormat) {    return new Promise((resolve: (isOk: boolean) => void) => {      this.client &&        !this.client.disconnected &&        this.client          .publishAsync(format.topic, format.payload, format.opts)          .then((result) => {            console.log('发布消息成功...')            resolve(true)          })          .catch((err) => {            console.log('发布消息失败...')            resolve(false)          })    })  }  //收到的消息  public onMessage(callback: TMessageCallback) {    this.client &&      !this.client.disconnected &&      this.client.on('message', (topic: string, payload: Buffer) => {        try {          callback && callback(topic, JSON.parse(payload.toString()))        } catch (err) {          console.log('无法执行JSON.parse...')          callback && callback(topic, payload.toString() as T)        }      })  }  //销毁  public destroy() {    console.log('销毁...')    this.client && this.client.end()    this.topicArr = []  }}

使用

//通过开源公共服务器测试,切换成自家服务器就行了const mqtt = new MQTT('mqtt://broker.emqx.io:8083/mqtt', { username: 'emqx_test', password: 'emqx_test' })mqtt.onSubscribe('/test/ss')mqtt.onMessage((topic, message) => {  console.log(topic, message)})setTimeout(() => {  mqtt.onPublish({ topic: '/test/ss', payload: '测试1111' })}, 3000);