Files
APP/utils/mqtt.js
2025-07-07 11:27:24 +08:00

179 lines
5.4 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import mqtt from 'mqtt/dist/mqtt.js';
const MQTT_BROKER_URL = 'ws://your-mqtt-broker-address:your-port/mqtt'; // 使用 ws:// 或 wss://
const MQTT_OPTIONS = {
connectTimeout: 4000,
clientId: `mqtt_client_${Math.random().toString(16).substr(2, 8)}`,
username: 'your-username',
password: 'your-password',
clean: true,
};
let client = null;
let connectPromise = null;
// 连接到 MQTT 代理
export function connect() {
if (client && client.connected) {
return Promise.resolve(client);
}
if (connectPromise) {
return connectPromise;
}
let url = MQTT_BROKER_URL;
// #ifdef MP-WEIXIN || APP-PLUS
// #endif
console.log(`正在连接到 MQTT 代理: ${url}`);
client = mqtt.connect(url, MQTT_OPTIONS);
connectPromise = new Promise((resolve, reject) => {
client.on('connect', () => {
console.log('MQTT 已连接');
connectPromise = null;
resolve(client);
});
client.on('reconnect', () => {
console.log('MQTT 正在重新连接...');
});
client.on('error', (err) => {
console.error('MQTT 连接错误:', err);
if (connectPromise) {
reject(err);
}
connectPromise = null;
});
client.on('close', () => {
console.log('MQTT 连接已关闭');
connectPromise = null;
});
client.on('offline', () => {
console.log('MQTT 客户端离线');
connectPromise = null;
});
});
return connectPromise;
}
// 断开 MQTT 连接
export function disconnect() {
return new Promise((resolve) => {
if (client) {
console.log('正在断开 MQTT 客户端...');
client.end(true, (err) => { // true 表示强制关闭,丢弃未发送的消息
if (err) {
console.error('MQTT 断开连接时出错:', err);
} else {
console.log('MQTT 已成功断开连接');
}
client = null;
connectPromise = null;
resolve();
});
} else {
resolve();
}
});
}
// 发布消息
export async function publish(topic, message, options = { qos: 0, retain: false }) {
if (!client || !client.connected) {
console.warn('MQTT 客户端未连接。正在尝试连接...');
try {
await connect(); // 发布前确保已连接
} catch (error) {
console.error('发布前连接失败:', error);
uni.showToast({ title: 'MQTT 连接失败', icon: 'none' });
return Promise.reject('MQTT 客户端未连接');
}
}
return new Promise((resolve, reject) => {
client.publish(topic, message, options, (err) => {
if (err) {
console.error('MQTT 发布错误:', err);
reject(err);
} else {
console.log(`MQTT 消息已发布到主题 ${topic}:`, message);
resolve();
}
});
});
}
// 订阅主题
export async function subscribe(topic, options = { qos: 0 }) {
if (!client || !client.connected) {
console.warn('MQTT 客户端未连接。正在尝试连接...');
try {
await connect(); // 订阅前确保已连接
} catch (error) {
console.error('订阅前连接失败:', error);
uni.showToast({ title: 'MQTT 连接失败', icon: 'none' });
return Promise.reject('MQTT 客户端未连接');
}
}
return new Promise((resolve, reject) => {
client.subscribe(topic, options, (err, granted) => {
if (err) {
console.error('MQTT 订阅错误:', err);
reject(err);
} else {
console.log(`MQTT 已订阅主题: ${granted[0].topic} 服务质量 (QoS): ${granted[0].qos}`);
resolve(granted);
}
});
});
}
// 取消订阅主题
export function unsubscribe(topic) {
return new Promise((resolve, reject) => {
if (client && client.connected) {
client.unsubscribe(topic, (err) => {
if (err) {
console.error('MQTT 取消订阅错误:', err);
reject(err);
} else {
console.log(`MQTT 已取消订阅主题: ${topic}`);
resolve();
}
});
} else {
console.warn('MQTT 客户端未连接,无法取消订阅。');
resolve();
}
});
}
// 处理传入的消息
export function onMessage(callback) {
if (client) {
client.on('message', callback); // 回调格式: (topic, message, packet) => { ... }
} else {
const checkClientInterval = setInterval(() => {
if (client) {
client.on('message', callback);
clearInterval(checkClientInterval);
} else if (connectPromise === null && client === null) {
console.warn('MQTT 客户端未初始化以处理 onMessage且没有连接尝试正在进行中。');
clearInterval(checkClientInterval);
}
}, 100);
}
}
export function getClient() {
return client;
}
export function isConnected() {
return client && client.connected;
}