Files
APP/utils/mqtt.js
微微一笑 6257f9d84b 增加mqtt
2025-07-16 11:16:19 +08:00

249 lines
6.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 兼容性补丁:为 Paho MQTT 提供 uni-app 环境下的 WebSocket 实现
(function (root) {
// 如果未能找到全局对象,则无法应用补丁。
if (!root) {
console.error("MQTT Polyfill: 未能找到全局对象 (global/window/self 均未定义)。");
return;
}
// 如果 WebSocket 已存在,则不执行任何操作。
if (typeof root.WebSocket !== 'undefined') {
return;
}
console.log("MQTT Polyfill: 正在为 uni-app 应用 WebSocket 兼容性补丁。");
class WebSocket {
constructor(url, protocols) {
this.binaryType = 'arraybuffer'; // Paho MQTT 需要此属性
this.readyState = WebSocket.CONNECTING;
const socketTask = uni.connectSocket({
url: url,
protocols: protocols || [], // uni.connectSocket 要求 protocols 是一个数组
success: () => {},
fail: (err) => {
const errText = JSON.stringify(err) || "Empty error object";
console.error(`uni.connectSocket 失败: ${errText}`);
if (this.onerror) {
const errorMessage = (err && err.errMsg) ? err.errMsg : "uni.connectSocket call failed";
this.onerror({ message: errorMessage });
}
}
});
this._socketTask = socketTask;
socketTask.onOpen(() => {
this.readyState = WebSocket.OPEN;
if (this.onopen) {
this.onopen({}); // 传递一个虚拟的事件对象
}
});
socketTask.onClose((res) => {
this.readyState = WebSocket.CLOSED;
if (this.onclose) {
this.onclose(res);
}
});
socketTask.onError((err) => {
const errText = JSON.stringify(err) || "Empty error object";
console.error(`WebSocket polyfill 错误: ${errText}`);
if (this.onerror) {
// Paho expects an object that can be stringified, not a real Error object.
const errorMessage = (err && err.errMsg) ? err.errMsg : "WebSocket connection failed in uni-app";
this.onerror({ message: errorMessage });
}
});
socketTask.onMessage((res) => {
if (this.onmessage) {
// Paho 期望事件对象有一个 'data' 属性
this.onmessage({ data: res.data });
}
});
}
send(data) {
if (this.readyState === WebSocket.OPEN) {
this._socketTask.send({ data: data });
} else {
console.error('WebSocket polyfill: send() 在非 OPEN 状态下被调用。');
throw new Error('WebSocket is not open');
}
}
close() {
if (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CONNECTING) {
this._socketTask.close();
}
}
}
WebSocket.CONNECTING = 0;
WebSocket.OPEN = 1;
WebSocket.CLOSING = 2;
WebSocket.CLOSED = 3;
root.WebSocket = WebSocket;
})(
typeof self !== 'undefined' ? self :
typeof window !== 'undefined' ? window :
typeof global !== 'undefined' ? global :
this
);
// 兼容性补丁结束
/**
* uni-app MQTT 客户端工具封装
*
* 该工具基于 Eclipse Paho MQTT JavaScript 客户端库,
* 并进行了封装以适配 uni-app 原生环境。
*
* 它提供了连接、订阅、发布、断开等核心功能。
*/
import Paho from 'paho-mqtt';
import allConfigs from '../config/index.js';
// 根据环境选择正确的配置
const env = 'development';//production //开发of线上 改这里就行
const config = allConfigs[env];
class MqttClient {
constructor() {
this.client = null;
this.options = {
host: config.MQTT_HOST,
port: config.MQTT_PORT,
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),
username: config.MQTT_USERNAME,
password: config.MQTT_PASSWORD,
};
this.onConnectCallback = null;
this.messageCallbacks = new Map(); // 使用 Map 存储不同主题的回调
}
/**
* 连接到 MQTT Broker
* @param {Function} onConnectCallback - 连接成功后的回调函数
*/
connect(onConnectCallback) {
const brokerUrl = this.options.host;
const brokerPort = this.options.port;
console.log(`正在连接MQTT: ${brokerUrl}:${brokerPort}/mqtt`);
try {
// Paho-MQTT 的客户端需要 host, port, path, clientId
this.client = new Paho.Client(brokerUrl, Number(brokerPort), "/mqtt", this.options.clientId);
this.onConnectCallback = onConnectCallback;
// 设置回调
this.client.onConnectionLost = (responseObject) => {
if (responseObject.errorCode !== 0) {
console.log("MQTT连接丢失: " + responseObject.errorMessage);
// 可以在此添加重连逻辑
}
};
this.client.onMessageArrived = (message) => {
const topic = message.destinationName;
const payload = message.payloadString;
console.log(`收到消息, 主题: ${topic}, 内容: ${payload}`);
if (this.messageCallbacks.has(topic)) {
// 调用该主题对应的回调
this.messageCallbacks.get(topic)(payload);
}
};
const connectOptions = {
timeout: 4,
userName: this.options.username,
password: this.options.password,
useSSL: false, // 如果是 wss, 需要设置为 true
cleanSession: true,
onSuccess: () => {
console.log('MQTT连接成功');
if (this.onConnectCallback) {
this.onConnectCallback();
}
},
onFailure: (message) => {
console.error('MQTT连接失败:', message.errorMessage);
this.disconnect();
}
};
this.client.connect(connectOptions);
} catch (error) {
console.error('MQTT客户端初始化失败:', error);
}
}
/**
* 订阅主题
* @param {String} topic - 需要订阅的主题
* @param {Function} onMessageCallback - 收到该主题消息后的回调函数
*/
subscribe(topic, onMessageCallback) {
if (this.client && this.client.isConnected()) {
console.log(`尝试订阅主题: ${topic}`);
this.client.subscribe(topic, { qos: 1 });
// 存储该主题的回调函数
this.messageCallbacks.set(topic, onMessageCallback);
} else {
console.error('MQTT未连接无法订阅');
}
}
/**
* 发布消息
* @param {String} topic - 需要发布的主题
* @param {String} message - 需要发布的消息
*/
publish(topic, message) {
if (this.client && this.client.isConnected()) {
const mqttMessage = new Paho.Message(message);
mqttMessage.destinationName = topic;
mqttMessage.qos = 1;
this.client.send(mqttMessage);
console.log(`成功发布消息到主题 ${topic}: ${message}`);
} else {
console.error('MQTT未连接无法发布');
}
}
/**
* 取消订阅
* @param {String} topic - 需要取消订阅的主题
*/
unsubscribe(topic) {
if (this.client && this.client.isConnected()) {
this.client.unsubscribe(topic);
// 从回调Map中移除
if (this.messageCallbacks.has(topic)) {
this.messageCallbacks.delete(topic);
}
console.log(`成功取消订阅: ${topic}`);
} else {
console.error('MQTT未连接无法取消订阅');
}
}
/**
* 断开连接
*/
disconnect() {
if (this.client && this.client.isConnected()) {
this.client.disconnect();
}
this.client = null;
console.log('MQTT连接已断开');
}
}
export default MqttClient;