import MqttClient from '@/utils/mqtt.js'; class MqHelper { constructor() { this.instance = null; this.subTopics=[]; } timeout(ms) { if (!ms) { ms = 200; } return new Promise((succ, err) => { setTimeout(() => { err({ code: -1, errMsg: '超时了' }) }, ms); }); } //打开mq连接,topics:要订阅的主题列表格式{topic:'主题',callback:fn}或此格式的数组 init(topics) { let connect = () => { return new Promise((resolve, reject) => { this.instance = new MqttClient(); this.instance.connect((res) => { if(topics){ console.error("连接成功开始订阅消息",topics); setTimeout(this.subscribes, 50, topics) } resolve(); }); }); } let p1 = connect(); let p2 = this.timeout(500); return new Promise((resolve, reject) => { Promise.race([p1, p2]).then(()=>{ resolve() }).catch(ex=>{ this.instance=null; reject(ex); }); }); } //订阅消息{topic:'主题名称',callback:fn(payload,receive)} subscribes(topics) { return new Promise((resolve,reject)=>{ if(!this.instance || !topics){ reject(); return; } if (!Array.isArray(topics)) { topics = [topics]; } for (let i = 0; i < topics.length; i++) { let item = topics[i]; let f=this.subTopics.find(v=>{ return v.topic===item.topic; }); if(!f){ this.subTopics.push(item); } this.instance.subscribe(item.topic, (payload, receive) => { try { console.log("开始处理回调") item.callback({payload:payload,receive:receive}); console.log("处理回调成功") } catch (err) { console.error("订阅消息回调出现异常,",err); } }); console.error("订阅消息成功"); } resolve(); }); } //取消订阅消息 单个或者数组 unSubscribes(topics){ if(!this.instance || !topics){ return; } if (!Array.isArray(topics)) { topics = [topics]; } for (let i = 0; i < topics.length; i++) { let item = topics[i]; this.instance.unsubscribe(item.topic); this.subTopics.find((v,index)=>{ if(v.topic===item.topic){ this.subTopics.splice(index,1); return true; } return false; }); } } //发送消息,主题 消息内容 是否保留消息 sendData(topic,msg,retained){ return new Promise((resolve,reject)=>{ if(!this.instance || !topic){ reject("MQTT未连接或无主题无法发布消息") return } if(msg===null || msg===undefined){ reject("msg无内容") return; } if(!retained){ retained=false; }else{ retained=true; } try { let flag = this.instance.publish(topic, msg, retained); if (flag) { resolve({ code: 200, msg: 'MQTT直连发送成功' }); return; } reject("MQTT未连接,无法发布消息"); } catch (error) { reject(error); } }); } //断开连接 disconnect(){ if(!this.instance){ return } this.instance.disconnect() } } export default { getMqTool: function(found, receive) { let instance = new MqHelper(); return instance; } }