1
0
forked from dyf/APP
Files
APP/utils/mqtt.js
2025-08-27 09:08:59 +08:00

316 lines
8.3 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 兼容性补丁:为 Paho MQTT 提供 uni-app 环境下的 WebSocket 实现
(function(root) {
// 如果未能找到全局对象,则无法应用补丁。
console.log("root=",root);
if (!root) {
console.error("MQTT Polyfill: 未能找到全局对象 (global/window/self 均未定义)。");
return;
}
// 如果 WebSocket 已存在,则不执行任何操作。
if (typeof root.WebSocket !== 'undefined') {
return;
}
console.log("MQTT Polyfill: 正在为 uni-app 应用 WebSocket 兼容性补丁。");
class WebSocket {
constructor(url, protocols) {
this.binaryType = 'arraybuffer'; // Paho MQTT 需要此属性
this.readyState = WebSocket.CONNECTING;
const socketTask = uni.connectSocket({
url: url,
protocols: protocols || [], // uni.connectSocket 要求 protocols 是一个数组
success: () => {},
fail: (err) => {
const errText = JSON.stringify(err) || "Empty error object";
console.error(`uni.connectSocket 失败: ${errText}`);
if (this.onerror) {
const errorMessage = (err && err.errMsg) ? err.errMsg :
"uni.connectSocket call failed";
this.onerror({
message: errorMessage
});
}
}
});
this._socketTask = socketTask;
socketTask.onOpen(() => {
this.readyState = WebSocket.OPEN;
if (this.onopen) {
this.onopen({}); // 传递一个虚拟的事件对象
}
});
socketTask.onClose((res) => {
this.readyState = WebSocket.CLOSED;
if (this.onclose) {
this.onclose(res);
}
});
socketTask.onError((err) => {
const errText = JSON.stringify(err) || "Empty error object";
console.error(`WebSocket polyfill 错误: ${errText}`);
if (this.onerror) {
// Paho expects an object that can be stringified, not a real Error object.
const errorMessage = (err && err.errMsg) ? err.errMsg :
"WebSocket connection failed in uni-app";
this.onerror({
message: errorMessage
});
}
});
socketTask.onMessage((res) => {
if (this.onmessage) {
// Paho 期望事件对象有一个 'data' 属性
this.onmessage({
data: res.data
});
}
});
}
send(data) {
if (this.readyState === WebSocket.OPEN) {
this._socketTask.send({
data: data
});
} else {
console.error('WebSocket polyfill: send() 在非 OPEN 状态下被调用。');
throw new Error('WebSocket is not open');
}
}
close() {
if (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CONNECTING) {
this._socketTask.close();
}
}
}
WebSocket.CONNECTING = 0;
WebSocket.OPEN = 1;
WebSocket.CLOSING = 2;
WebSocket.CLOSED = 3;
root.WebSocket = WebSocket;
})(
typeof self !== 'undefined' ? self :
typeof window !== 'undefined' ? window :
typeof global !== 'undefined' ? global :
this
);
// 兼容性补丁结束
/**
* uni-app MQTT 客户端工具封装
*
* 该工具基于 Eclipse Paho MQTT JavaScript 客户端库,
* 并进行了封装以适配 uni-app 原生环境。
*
* 它提供了连接、订阅、发布、断开等核心功能。
*/
import Paho from 'paho-mqtt';
import allConfigs from '../config/index.js';
// 根据环境选择正确的配置
const env = 'production'; //production //开发of线上 改这里就行
const config = allConfigs[env];
class MqttClient {
constructor() {
this.client = null;
this.options = {
host: config.MQTT_HOST,
port: config.MQTT_PORT,
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),
username: config.MQTT_USERNAME,
password: config.MQTT_PASSWORD,
};
this.onConnectCallback = null;
this.messageCallbacks = new Map();
this.isReconnecting = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.reconnectTimeoutId = null;
this.manualDisconnect = false;
this.client = new Paho.Client(
this.options.host,
Number(this.options.port),
"/mqtt",
this.options.clientId
);
this.setCallbacks();
}
setCallbacks() {
this.client.onConnectionLost = (responseObject) => {
if (responseObject.errorCode !== 0) {
console.log("MQTT连接丢失: " + responseObject.errorMessage);
uni.$emit('mqttConnectionLost', {
error: responseObject.errorMessage,
timestamp: Date.now()
});
if (!this.manualDisconnect) {
this.startReconnecting();
}
}
};
this.client.onMessageArrived = (message) => {
const topic = message.destinationName;
const payload = message.payloadString;
// console.log(`收到消息, 主题: ${topic}, 内容: ${payload}`);
const potentialJsons = payload.replace(/}\s*{/g, '}|{').split('|');
potentialJsons.forEach(jsonString => {
if (jsonString.trim() === '') return;
if (this.messageCallbacks.has(topic)) {
this.messageCallbacks.get(topic)(jsonString,message);
}
});
};
}
connect(onConnectCallback) {
if (this.client && this.client.isConnected()) {
console.log('MQTT客户端已连接。');
if (onConnectCallback) onConnectCallback();
return;
}
this.manualDisconnect = false;
this.onConnectCallback = onConnectCallback;
console.log(`正在连接MQTT: ${this.options.host}:${this.options.port}/mqtt`);
try {
const connectOptions = {
timeout: 10, // 增加连接超时时间,应对网络波动
keepAliveInterval: 30, // 明确设置心跳间隔为30秒
userName: this.options.username,
password: this.options.password,
useSSL: false,
cleanSession: true,
onSuccess: () => {
console.log('MQTT连接成功');
this.reconnectAttempts = 0;
this.isReconnecting = false;
clearTimeout(this.reconnectTimeoutId);
this.resubscribe();
if (this.onConnectCallback) {
this.onConnectCallback();
}
},
onFailure: (message) => {
console.error('MQTT连接失败:', message.errorMessage);
if (!this.manualDisconnect) {
this.startReconnecting();
}
}
};
this.client.connect(connectOptions);
} catch (error) {
console.error('MQTT客户端连接时发生异常:', error);
if (!this.manualDisconnect) {
this.startReconnecting();
}
}
}
startReconnecting() {
if (this.isReconnecting || this.manualDisconnect) {
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('MQTT: 达到最大重连次数,已放弃。');
this.isReconnecting = false;
this.reconnectAttempts = 0;
return;
}
this.isReconnecting = true;
this.reconnectAttempts++;
const delay = 1000 * Math.pow(2, this.reconnectAttempts - 1);
console.log(`MQTT: 第 ${this.reconnectAttempts} 次重连将在 ${delay / 1000}s 后开始...`);
this.reconnectTimeoutId = setTimeout(() => {
if (!this.manualDisconnect) {
this.connect(this.onConnectCallback);
}
}, delay);
}
resubscribe() {
if (!this.client || !this.client.isConnected()) {
return;
}
console.log('MQTT: 重新订阅所有主题...');
for (const [topic] of this.messageCallbacks) {
console.log(`重新订阅: ${topic}`);
this.client.subscribe(topic, {
qos: 1
});
}
}
subscribe(topic, onMessageCallback) {
this.messageCallbacks.set(topic, onMessageCallback);
if (this.client && this.client.isConnected()) {
console.log(`尝试订阅主题: ${topic}`);
this.client.subscribe(topic, {
qos: 1
});
} else {
console.error('MQTT未连接订阅请求已缓存连接后将自动订阅。');
}
}
publish(topic, message) {
if (this.client && this.client.isConnected()) {
const mqttMessage = new Paho.Message(message);
mqttMessage.destinationName = topic;
mqttMessage.qos = 1;
this.client.send(mqttMessage);
console.log(`成功发布消息到主题 ${topic}: ${message}`);
return true;
} else {
console.error('MQTT未连接无法发布');
return false;
}
}
unsubscribe(topic) {
if (this.messageCallbacks.has(topic)) {
this.messageCallbacks.delete(topic);
}
if (this.client && this.client.isConnected()) {
this.client.unsubscribe(topic);
console.log(`成功取消订阅: ${topic}`);
} else {
console.error('MQTT未连接或已断开无法取消订阅');
}
}
disconnect() {
this.manualDisconnect = true;
clearTimeout(this.reconnectTimeoutId);
if (this.client && this.client.isConnected()) {
this.client.disconnect();
}
this.isReconnecting = false;
this.reconnectAttempts = 0;
console.log('MQTT连接已断开');
}
}
export default MqttClient;