TS MQTT封装导入相关包
npm i mqttnpm i lodash
- guid 随机生成就行,具体可以参考百度或者随便生成一个随机数*
代码封装
import mqtt from 'mqtt'import type { MqttClient, OnMessageCallback, IClientOptions, IClientPublishOptions, IPublishPacket } from 'mqtt'import { getGuid } from '@/common/basic'import { without, uniq } from 'lodash'export type TPublishFormat = { topic: string payload: string | Buffer opts?: IClientPublishOptions}export type TMessageCallback = (topic: string, payload: T) => voidexport interface IMqClientOptions extends IClientOptions { connectCb?: () => void errorCb?: (e: Event) => void reconnectCb?: () => void}export default class MQTT { private _type: string private _url: string private _opt: IMqClientOptions // mqtt配置 public client!: MqttClient public topicArr: Array = [] constructor(url: string, opt?: IMqClientOptions, type: string = 'Web') { this._type = type this._url = url this._opt = { clean: true, clientId: this._type + '_' + getGuid(), // 客户端分类唯一 connectTimeout: 3000, // 超时时间 reconnectPeriod: 1000, //重连超时 ...(opt && opt), } this._init() } private _init() { this.destroy() this.client = mqtt.connect(this._url, this._opt) this.client.on('connect', () => { this._opt.connectCb && this._opt.connectCb() console.log(this._url + '连接成功...') }) this.client.on('error', (error: any) => { this._opt.errorCb && this._opt.errorCb(error) console.log(this._url + '异常中断...') }) this.client.on('reconnect', () => { this._opt.reconnectCb && this._opt.reconnectCb() console.log(this._url + '重新连接...') }) } /** * 函数“unSubscribe”是一个 TypeScript 函数,用于取消订阅一个或多个主题,并返回一个 Promise,该 Promise 解析为一个布尔值,指示取消订阅是否成功。 * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表客户端想要取消订阅的主题。 * @returns 正在返回 Promise。 */ public unSubscribe(topic: string | string[]) { return new Promise((resolve: (isOk: boolean) => void) => { this.client && !this.client.disconnected && this.client .unsubscribeAsync(topic) .then((result) => { if (typeof topic === 'string') { topic = [topic] } //去重 this.topicArr = without(this.topicArr, ...topic) console.log(topic, this.topicArr, '取消订阅成功...') resolve(true) }) .catch((err) => { console.log(topic, '取消订阅失败...') resolve(false) }) }) } /** * 函数“onSubscribe”是一个 TypeScript 函数,它订阅一个或多个主题并返回一个 Promise,该 Promise 解析为一个布尔值,指示订阅是否成功。 * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表您要订阅的主题。 * @returns 一个 Promise,解析为布尔值,指示订阅是否成功。 */ public onSubscribe(topic: string | string[]) { if (typeof topic === 'string') { topic = [topic] } const topicOk: Array = without(topic, ...this.topicArr) return new Promise((resolve: (isOk: boolean) => void) => { this.client && !this.client.disconnected && topicOk.length > 0 && this.client .subscribeAsync(topic) .then((result) => { this.topicArr.push(...topicOk) this.topicArr = uniq(this.topicArr) console.log(topicOk, this.topicArr, '订阅成功...') resolve(true) }) .catch((err) => { console.log(topicOk, '订阅失败...') resolve(false) }) }) } /** * 函数“onPublish”使用客户端向主题发布消息,并返回一个解析为布尔值的 Promise,指示发布是否成功。 * @param {TPublishFormat} format - format参数的类型为TPublishFormat,它是一个包含两个属性的对象:topic和message。 topic * 属性表示消息将发布到的主题,message 属性表示将发布的实际消息。 * @returns 正在返回 Promise。 */ public onPublish(format: TPublishFormat) { return new Promise((resolve: (isOk: boolean) => void) => { this.client && !this.client.disconnected && this.client .publishAsync(format.topic, format.payload, format.opts) .then((result) => { console.log('发布消息成功...') resolve(true) }) .catch((err) => { console.log('发布消息失败...') resolve(false) }) }) } //收到的消息 public onMessage(callback: TMessageCallback) { this.client && !this.client.disconnected && this.client.on('message', (topic: string, payload: Buffer) => { try { callback && callback(topic, JSON.parse(payload.toString())) } catch (err) { console.log('无法执行JSON.parse...') callback && callback(topic, payload.toString() as T) } }) } //销毁 public destroy() { console.log('销毁...') this.client && this.client.end() this.topicArr = [] }}
使用
//通过开源公共服务器测试,切换成自家服务器就行了const mqtt = new MQTT('mqtt://broker.emqx.io:8083/mqtt', { username: 'emqx_test', password: 'emqx_test' })mqtt.onSubscribe('/test/ss')mqtt.onMessage((topic, message) => { console.log(topic, message)})setTimeout(() => { mqtt.onPublish({ topic: '/test/ss', payload: '测试1111' })}, 3000);