Files
APP/utils/mqtt.js

249 lines
6.8 KiB
JavaScript
Raw Permalink Normal View History

2025-07-16 11:16:19 +08:00
// 兼容性补丁:为 Paho MQTT 提供 uni-app 环境下的 WebSocket 实现
(function (root) {
// 如果未能找到全局对象,则无法应用补丁。
if (!root) {
console.error("MQTT Polyfill: 未能找到全局对象 (global/window/self 均未定义)。");
return;
}
// 如果 WebSocket 已存在,则不执行任何操作。
if (typeof root.WebSocket !== 'undefined') {
return;
}
console.log("MQTT Polyfill: 正在为 uni-app 应用 WebSocket 兼容性补丁。");
class WebSocket {
constructor(url, protocols) {
this.binaryType = 'arraybuffer'; // Paho MQTT 需要此属性
this.readyState = WebSocket.CONNECTING;
const socketTask = uni.connectSocket({
url: url,
protocols: protocols || [], // uni.connectSocket 要求 protocols 是一个数组
success: () => {},
fail: (err) => {
const errText = JSON.stringify(err) || "Empty error object";
console.error(`uni.connectSocket 失败: ${errText}`);
if (this.onerror) {
const errorMessage = (err && err.errMsg) ? err.errMsg : "uni.connectSocket call failed";
this.onerror({ message: errorMessage });
}
}
});
this._socketTask = socketTask;
socketTask.onOpen(() => {
this.readyState = WebSocket.OPEN;
if (this.onopen) {
this.onopen({}); // 传递一个虚拟的事件对象
}
});
socketTask.onClose((res) => {
this.readyState = WebSocket.CLOSED;
if (this.onclose) {
this.onclose(res);
}
});
socketTask.onError((err) => {
const errText = JSON.stringify(err) || "Empty error object";
console.error(`WebSocket polyfill 错误: ${errText}`);
if (this.onerror) {
// Paho expects an object that can be stringified, not a real Error object.
const errorMessage = (err && err.errMsg) ? err.errMsg : "WebSocket connection failed in uni-app";
this.onerror({ message: errorMessage });
}
});
socketTask.onMessage((res) => {
if (this.onmessage) {
// Paho 期望事件对象有一个 'data' 属性
this.onmessage({ data: res.data });
}
});
}
send(data) {
if (this.readyState === WebSocket.OPEN) {
this._socketTask.send({ data: data });
} else {
console.error('WebSocket polyfill: send() 在非 OPEN 状态下被调用。');
throw new Error('WebSocket is not open');
}
}
close() {
if (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CONNECTING) {
this._socketTask.close();
}
}
}
WebSocket.CONNECTING = 0;
WebSocket.OPEN = 1;
WebSocket.CLOSING = 2;
WebSocket.CLOSED = 3;
root.WebSocket = WebSocket;
})(
typeof self !== 'undefined' ? self :
typeof window !== 'undefined' ? window :
typeof global !== 'undefined' ? global :
this
);
// 兼容性补丁结束
/**
* uni-app MQTT 客户端工具封装
*
* 该工具基于 Eclipse Paho MQTT JavaScript 客户端库
* 并进行了封装以适配 uni-app 原生环境
*
* 它提供了连接订阅发布断开等核心功能
*/
import Paho from 'paho-mqtt';
import allConfigs from '../config/index.js';
// 根据环境选择正确的配置
const env = 'development';//production //开发of线上 改这里就行
const config = allConfigs[env];
class MqttClient {
constructor() {
this.client = null;
this.options = {
host: config.MQTT_HOST,
port: config.MQTT_PORT,
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),
username: config.MQTT_USERNAME,
password: config.MQTT_PASSWORD,
};
this.onConnectCallback = null;
this.messageCallbacks = new Map(); // 使用 Map 存储不同主题的回调
}
/**
* 连接到 MQTT Broker
* @param {Function} onConnectCallback - 连接成功后的回调函数
*/
connect(onConnectCallback) {
const brokerUrl = this.options.host;
const brokerPort = this.options.port;
console.log(`正在连接MQTT: ${brokerUrl}:${brokerPort}/mqtt`);
try {
// Paho-MQTT 的客户端需要 host, port, path, clientId
this.client = new Paho.Client(brokerUrl, Number(brokerPort), "/mqtt", this.options.clientId);
this.onConnectCallback = onConnectCallback;
// 设置回调
this.client.onConnectionLost = (responseObject) => {
if (responseObject.errorCode !== 0) {
console.log("MQTT连接丢失: " + responseObject.errorMessage);
// 可以在此添加重连逻辑
}
};
this.client.onMessageArrived = (message) => {
const topic = message.destinationName;
const payload = message.payloadString;
console.log(`收到消息, 主题: ${topic}, 内容: ${payload}`);
if (this.messageCallbacks.has(topic)) {
// 调用该主题对应的回调
this.messageCallbacks.get(topic)(payload);
}
};
const connectOptions = {
timeout: 4,
userName: this.options.username,
password: this.options.password,
useSSL: false, // 如果是 wss, 需要设置为 true
cleanSession: true,
onSuccess: () => {
console.log('MQTT连接成功');
if (this.onConnectCallback) {
this.onConnectCallback();
}
},
onFailure: (message) => {
console.error('MQTT连接失败:', message.errorMessage);
this.disconnect();
}
};
this.client.connect(connectOptions);
} catch (error) {
console.error('MQTT客户端初始化失败:', error);
}
}
/**
* 订阅主题
* @param {String} topic - 需要订阅的主题
* @param {Function} onMessageCallback - 收到该主题消息后的回调函数
*/
subscribe(topic, onMessageCallback) {
if (this.client && this.client.isConnected()) {
console.log(`尝试订阅主题: ${topic}`);
this.client.subscribe(topic, { qos: 1 });
// 存储该主题的回调函数
this.messageCallbacks.set(topic, onMessageCallback);
} else {
console.error('MQTT未连接无法订阅');
}
}
/**
* 发布消息
* @param {String} topic - 需要发布的主题
* @param {String} message - 需要发布的消息
*/
publish(topic, message) {
if (this.client && this.client.isConnected()) {
const mqttMessage = new Paho.Message(message);
mqttMessage.destinationName = topic;
mqttMessage.qos = 1;
this.client.send(mqttMessage);
console.log(`成功发布消息到主题 ${topic}: ${message}`);
} else {
console.error('MQTT未连接无法发布');
}
}
/**
* 取消订阅
* @param {String} topic - 需要取消订阅的主题
*/
unsubscribe(topic) {
if (this.client && this.client.isConnected()) {
this.client.unsubscribe(topic);
// 从回调Map中移除
if (this.messageCallbacks.has(topic)) {
this.messageCallbacks.delete(topic);
}
console.log(`成功取消订阅: ${topic}`);
} else {
console.error('MQTT未连接无法取消订阅');
}
}
/**
* 断开连接
*/
disconnect() {
if (this.client && this.client.isConnected()) {
this.client.disconnect();
}
this.client = null;
console.log('MQTT连接已断开');
}
2025-07-05 14:49:26 +08:00
}
2025-07-16 11:16:19 +08:00
export default MqttClient;