增加mqtt
This commit is contained in:
401
utils/mqtt.js
401
utils/mqtt.js
@ -1,178 +1,249 @@
|
||||
import mqtt from 'mqtt/dist/mqtt.js';
|
||||
// 兼容性补丁:为 Paho MQTT 提供 uni-app 环境下的 WebSocket 实现
|
||||
(function (root) {
|
||||
// 如果未能找到全局对象,则无法应用补丁。
|
||||
if (!root) {
|
||||
console.error("MQTT Polyfill: 未能找到全局对象 (global/window/self 均未定义)。");
|
||||
return;
|
||||
}
|
||||
|
||||
const MQTT_BROKER_URL = 'ws://your-mqtt-broker-address:your-port/mqtt'; // 使用 ws:// 或 wss://
|
||||
const MQTT_OPTIONS = {
|
||||
connectTimeout: 4000,
|
||||
clientId: `mqtt_client_${Math.random().toString(16).substr(2, 8)}`,
|
||||
username: 'your-username',
|
||||
password: 'your-password',
|
||||
clean: true,
|
||||
};
|
||||
// 如果 WebSocket 已存在,则不执行任何操作。
|
||||
if (typeof root.WebSocket !== 'undefined') {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log("MQTT Polyfill: 正在为 uni-app 应用 WebSocket 兼容性补丁。");
|
||||
|
||||
let client = null;
|
||||
let connectPromise = null;
|
||||
class WebSocket {
|
||||
constructor(url, protocols) {
|
||||
this.binaryType = 'arraybuffer'; // Paho MQTT 需要此属性
|
||||
this.readyState = WebSocket.CONNECTING;
|
||||
|
||||
// 连接到 MQTT 代理
|
||||
export function connect() {
|
||||
if (client && client.connected) {
|
||||
return Promise.resolve(client);
|
||||
}
|
||||
if (connectPromise) {
|
||||
return connectPromise;
|
||||
}
|
||||
const socketTask = uni.connectSocket({
|
||||
url: url,
|
||||
protocols: protocols || [], // uni.connectSocket 要求 protocols 是一个数组
|
||||
success: () => {},
|
||||
fail: (err) => {
|
||||
const errText = JSON.stringify(err) || "Empty error object";
|
||||
console.error(`uni.connectSocket 失败: ${errText}`);
|
||||
if (this.onerror) {
|
||||
const errorMessage = (err && err.errMsg) ? err.errMsg : "uni.connectSocket call failed";
|
||||
this.onerror({ message: errorMessage });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let url = MQTT_BROKER_URL;
|
||||
// #ifdef MP-WEIXIN || APP-PLUS
|
||||
// #endif
|
||||
this._socketTask = socketTask;
|
||||
|
||||
console.log(`正在连接到 MQTT 代理: ${url}`);
|
||||
client = mqtt.connect(url, MQTT_OPTIONS);
|
||||
socketTask.onOpen(() => {
|
||||
this.readyState = WebSocket.OPEN;
|
||||
if (this.onopen) {
|
||||
this.onopen({}); // 传递一个虚拟的事件对象
|
||||
}
|
||||
});
|
||||
|
||||
connectPromise = new Promise((resolve, reject) => {
|
||||
client.on('connect', () => {
|
||||
console.log('MQTT 已连接');
|
||||
connectPromise = null;
|
||||
resolve(client);
|
||||
});
|
||||
socketTask.onClose((res) => {
|
||||
this.readyState = WebSocket.CLOSED;
|
||||
if (this.onclose) {
|
||||
this.onclose(res);
|
||||
}
|
||||
});
|
||||
|
||||
client.on('reconnect', () => {
|
||||
console.log('MQTT 正在重新连接...');
|
||||
});
|
||||
socketTask.onError((err) => {
|
||||
const errText = JSON.stringify(err) || "Empty error object";
|
||||
console.error(`WebSocket polyfill 错误: ${errText}`);
|
||||
if (this.onerror) {
|
||||
// Paho expects an object that can be stringified, not a real Error object.
|
||||
const errorMessage = (err && err.errMsg) ? err.errMsg : "WebSocket connection failed in uni-app";
|
||||
this.onerror({ message: errorMessage });
|
||||
}
|
||||
});
|
||||
|
||||
client.on('error', (err) => {
|
||||
console.error('MQTT 连接错误:', err);
|
||||
if (connectPromise) {
|
||||
reject(err);
|
||||
}
|
||||
connectPromise = null;
|
||||
});
|
||||
socketTask.onMessage((res) => {
|
||||
if (this.onmessage) {
|
||||
// Paho 期望事件对象有一个 'data' 属性
|
||||
this.onmessage({ data: res.data });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
client.on('close', () => {
|
||||
console.log('MQTT 连接已关闭');
|
||||
connectPromise = null;
|
||||
});
|
||||
send(data) {
|
||||
if (this.readyState === WebSocket.OPEN) {
|
||||
this._socketTask.send({ data: data });
|
||||
} else {
|
||||
console.error('WebSocket polyfill: send() 在非 OPEN 状态下被调用。');
|
||||
throw new Error('WebSocket is not open');
|
||||
}
|
||||
}
|
||||
|
||||
client.on('offline', () => {
|
||||
console.log('MQTT 客户端离线');
|
||||
connectPromise = null;
|
||||
});
|
||||
});
|
||||
close() {
|
||||
if (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CONNECTING) {
|
||||
this._socketTask.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return connectPromise;
|
||||
WebSocket.CONNECTING = 0;
|
||||
WebSocket.OPEN = 1;
|
||||
WebSocket.CLOSING = 2;
|
||||
WebSocket.CLOSED = 3;
|
||||
|
||||
root.WebSocket = WebSocket;
|
||||
|
||||
})(
|
||||
typeof self !== 'undefined' ? self :
|
||||
typeof window !== 'undefined' ? window :
|
||||
typeof global !== 'undefined' ? global :
|
||||
this
|
||||
);
|
||||
// 兼容性补丁结束
|
||||
|
||||
/**
|
||||
* uni-app MQTT 客户端工具封装
|
||||
*
|
||||
* 该工具基于 Eclipse Paho MQTT JavaScript 客户端库,
|
||||
* 并进行了封装以适配 uni-app 原生环境。
|
||||
*
|
||||
* 它提供了连接、订阅、发布、断开等核心功能。
|
||||
*/
|
||||
import Paho from 'paho-mqtt';
|
||||
import allConfigs from '../config/index.js';
|
||||
|
||||
// 根据环境选择正确的配置
|
||||
const env = 'development';//production //开发of线上 改这里就行
|
||||
const config = allConfigs[env];
|
||||
|
||||
class MqttClient {
|
||||
constructor() {
|
||||
this.client = null;
|
||||
this.options = {
|
||||
host: config.MQTT_HOST,
|
||||
port: config.MQTT_PORT,
|
||||
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),
|
||||
username: config.MQTT_USERNAME,
|
||||
password: config.MQTT_PASSWORD,
|
||||
};
|
||||
this.onConnectCallback = null;
|
||||
this.messageCallbacks = new Map(); // 使用 Map 存储不同主题的回调
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接到 MQTT Broker
|
||||
* @param {Function} onConnectCallback - 连接成功后的回调函数
|
||||
*/
|
||||
connect(onConnectCallback) {
|
||||
const brokerUrl = this.options.host;
|
||||
const brokerPort = this.options.port;
|
||||
console.log(`正在连接MQTT: ${brokerUrl}:${brokerPort}/mqtt`);
|
||||
|
||||
try {
|
||||
// Paho-MQTT 的客户端需要 host, port, path, clientId
|
||||
this.client = new Paho.Client(brokerUrl, Number(brokerPort), "/mqtt", this.options.clientId);
|
||||
|
||||
this.onConnectCallback = onConnectCallback;
|
||||
|
||||
// 设置回调
|
||||
this.client.onConnectionLost = (responseObject) => {
|
||||
if (responseObject.errorCode !== 0) {
|
||||
console.log("MQTT连接丢失: " + responseObject.errorMessage);
|
||||
// 可以在此添加重连逻辑
|
||||
}
|
||||
};
|
||||
|
||||
this.client.onMessageArrived = (message) => {
|
||||
const topic = message.destinationName;
|
||||
const payload = message.payloadString;
|
||||
console.log(`收到消息, 主题: ${topic}, 内容: ${payload}`);
|
||||
if (this.messageCallbacks.has(topic)) {
|
||||
// 调用该主题对应的回调
|
||||
this.messageCallbacks.get(topic)(payload);
|
||||
}
|
||||
};
|
||||
|
||||
const connectOptions = {
|
||||
timeout: 4,
|
||||
userName: this.options.username,
|
||||
password: this.options.password,
|
||||
useSSL: false, // 如果是 wss, 需要设置为 true
|
||||
cleanSession: true,
|
||||
onSuccess: () => {
|
||||
console.log('MQTT连接成功');
|
||||
if (this.onConnectCallback) {
|
||||
this.onConnectCallback();
|
||||
}
|
||||
},
|
||||
onFailure: (message) => {
|
||||
console.error('MQTT连接失败:', message.errorMessage);
|
||||
this.disconnect();
|
||||
}
|
||||
};
|
||||
|
||||
this.client.connect(connectOptions);
|
||||
|
||||
} catch (error) {
|
||||
console.error('MQTT客户端初始化失败:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅主题
|
||||
* @param {String} topic - 需要订阅的主题
|
||||
* @param {Function} onMessageCallback - 收到该主题消息后的回调函数
|
||||
*/
|
||||
subscribe(topic, onMessageCallback) {
|
||||
if (this.client && this.client.isConnected()) {
|
||||
console.log(`尝试订阅主题: ${topic}`);
|
||||
this.client.subscribe(topic, { qos: 1 });
|
||||
// 存储该主题的回调函数
|
||||
this.messageCallbacks.set(topic, onMessageCallback);
|
||||
} else {
|
||||
console.error('MQTT未连接,无法订阅');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布消息
|
||||
* @param {String} topic - 需要发布的主题
|
||||
* @param {String} message - 需要发布的消息
|
||||
*/
|
||||
publish(topic, message) {
|
||||
if (this.client && this.client.isConnected()) {
|
||||
const mqttMessage = new Paho.Message(message);
|
||||
mqttMessage.destinationName = topic;
|
||||
mqttMessage.qos = 1;
|
||||
this.client.send(mqttMessage);
|
||||
console.log(`成功发布消息到主题 ${topic}: ${message}`);
|
||||
} else {
|
||||
console.error('MQTT未连接,无法发布');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消订阅
|
||||
* @param {String} topic - 需要取消订阅的主题
|
||||
*/
|
||||
unsubscribe(topic) {
|
||||
if (this.client && this.client.isConnected()) {
|
||||
this.client.unsubscribe(topic);
|
||||
// 从回调Map中移除
|
||||
if (this.messageCallbacks.has(topic)) {
|
||||
this.messageCallbacks.delete(topic);
|
||||
}
|
||||
console.log(`成功取消订阅: ${topic}`);
|
||||
} else {
|
||||
console.error('MQTT未连接,无法取消订阅');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 断开连接
|
||||
*/
|
||||
disconnect() {
|
||||
if (this.client && this.client.isConnected()) {
|
||||
this.client.disconnect();
|
||||
}
|
||||
this.client = null;
|
||||
console.log('MQTT连接已断开');
|
||||
}
|
||||
}
|
||||
|
||||
// 断开 MQTT 连接
|
||||
export function disconnect() {
|
||||
return new Promise((resolve) => {
|
||||
if (client) {
|
||||
console.log('正在断开 MQTT 客户端...');
|
||||
client.end(true, (err) => { // true 表示强制关闭,丢弃未发送的消息
|
||||
if (err) {
|
||||
console.error('MQTT 断开连接时出错:', err);
|
||||
} else {
|
||||
console.log('MQTT 已成功断开连接');
|
||||
}
|
||||
client = null;
|
||||
connectPromise = null;
|
||||
resolve();
|
||||
});
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 发布消息
|
||||
export async function publish(topic, message, options = { qos: 0, retain: false }) {
|
||||
if (!client || !client.connected) {
|
||||
console.warn('MQTT 客户端未连接。正在尝试连接...');
|
||||
try {
|
||||
await connect(); // 发布前确保已连接
|
||||
} catch (error) {
|
||||
console.error('发布前连接失败:', error);
|
||||
uni.showToast({ title: 'MQTT 连接失败', icon: 'none' });
|
||||
return Promise.reject('MQTT 客户端未连接');
|
||||
}
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
client.publish(topic, message, options, (err) => {
|
||||
if (err) {
|
||||
console.error('MQTT 发布错误:', err);
|
||||
reject(err);
|
||||
} else {
|
||||
console.log(`MQTT 消息已发布到主题 ${topic}:`, message);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// 订阅主题
|
||||
export async function subscribe(topic, options = { qos: 0 }) {
|
||||
if (!client || !client.connected) {
|
||||
console.warn('MQTT 客户端未连接。正在尝试连接...');
|
||||
try {
|
||||
await connect(); // 订阅前确保已连接
|
||||
} catch (error) {
|
||||
console.error('订阅前连接失败:', error);
|
||||
uni.showToast({ title: 'MQTT 连接失败', icon: 'none' });
|
||||
return Promise.reject('MQTT 客户端未连接');
|
||||
}
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
client.subscribe(topic, options, (err, granted) => {
|
||||
if (err) {
|
||||
console.error('MQTT 订阅错误:', err);
|
||||
reject(err);
|
||||
} else {
|
||||
console.log(`MQTT 已订阅主题: ${granted[0].topic} 服务质量 (QoS): ${granted[0].qos}`);
|
||||
resolve(granted);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// 取消订阅主题
|
||||
export function unsubscribe(topic) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (client && client.connected) {
|
||||
client.unsubscribe(topic, (err) => {
|
||||
if (err) {
|
||||
console.error('MQTT 取消订阅错误:', err);
|
||||
reject(err);
|
||||
} else {
|
||||
console.log(`MQTT 已取消订阅主题: ${topic}`);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
console.warn('MQTT 客户端未连接,无法取消订阅。');
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 处理传入的消息
|
||||
export function onMessage(callback) {
|
||||
if (client) {
|
||||
client.on('message', callback); // 回调格式: (topic, message, packet) => { ... }
|
||||
} else {
|
||||
const checkClientInterval = setInterval(() => {
|
||||
if (client) {
|
||||
client.on('message', callback);
|
||||
clearInterval(checkClientInterval);
|
||||
} else if (connectPromise === null && client === null) {
|
||||
console.warn('MQTT 客户端未初始化以处理 onMessage,且没有连接尝试正在进行中。');
|
||||
clearInterval(checkClientInterval);
|
||||
}
|
||||
}, 100);
|
||||
}
|
||||
}
|
||||
export function getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
export function isConnected() {
|
||||
return client && client.connected;
|
||||
}
|
||||
export default MqttClient;
|
Reference in New Issue
Block a user