Files
APP/utils/mqtt.js
2025-07-05 14:49:26 +08:00

204 lines
7.3 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import mqtt from 'mqtt/dist/mqtt.js'; // 根据典型的 uniapp 结构调整路径
// 请替换为您的 MQTT代理broker详细信息
const MQTT_BROKER_URL = 'ws://your-mqtt-broker-address:your-port/mqtt'; // 使用 ws:// 或 wss://
const MQTT_OPTIONS = {
connectTimeout: 4000, // 连接超时时间 (毫秒)
clientId: `mqtt_client_${Math.random().toString(16).substr(2, 8)}`, // 唯一的客户端ID
username: 'your-username', // 用户名 (可选)
password: 'your-password', // 密码 (可选)
clean: true, // 对于大多数移动应用场景,建议为 true
// 更多选项请参考 MQTT.js 文档
// Keepalive (心跳包间隔) 通常为 30-60 秒。
// reconnectPeriod: 1000, // 每次重新连接尝试之间的间隔时间 (毫秒)
};
let client = null; // MQTT 客户端实例
let connectPromise = null; // 用于跟踪连接过程的 Promise
// 连接到 MQTT 代理
export function connect() {
// 如果客户端已存在并且已连接,则直接返回
if (client && client.connected) {
return Promise.resolve(client);
}
// 如果正在连接中,则返回当前的连接 Promise
if (connectPromise) {
return connectPromise;
}
let url = MQTT_BROKER_URL;
// #ifdef MP-WEIXIN || APP-PLUS
// 对于微信小程序和 App某些代理可能需要 'wx://' 或 'wxs://'
// if (MQTT_BROKER_URL.startsWith('ws://')) {
// url = MQTT_BROKER_URL.replace('ws://', 'wx://');
// } else if (MQTT_BROKER_URL.startsWith('wss://')) {
// url = MQTT_BROKER_URL.replace('wss://', 'wxs://');
// }
// #endif
console.log(`正在连接到 MQTT 代理: ${url}`);
client = mqtt.connect(url, MQTT_OPTIONS);
connectPromise = new Promise((resolve, reject) => {
client.on('connect', () => {
console.log('MQTT 已连接');
connectPromise = null; // 连接成功后清除 Promise
resolve(client);
});
client.on('reconnect', () => {
console.log('MQTT 正在重新连接...');
});
client.on('error', (err) => {
console.error('MQTT 连接错误:', err);
if (connectPromise) { // 仅当是初始连接尝试时才 reject
reject(err);
}
connectPromise = null; // 出错时清除 Promise
// 如果合适,可以考虑在此处尝试断开连接或清理客户端
// client.end(true, () => console.log('客户端因错误被强制结束.'));
});
client.on('close', () => {
console.log('MQTT 连接已关闭');
// 如果连接意外关闭,您可能需要将客户端置为 null
// 或设置一个标志,以便 `connect()` 可以尝试建立新连接。
// client = null; // 或者管理重连逻辑
connectPromise = null;
});
client.on('offline', () => {
console.log('MQTT 客户端离线');
connectPromise = null;
});
});
return connectPromise;
}
// 断开 MQTT 连接
export function disconnect() {
return new Promise((resolve) => {
if (client) {
console.log('正在断开 MQTT 客户端...');
client.end(true, (err) => { // true 表示强制关闭,丢弃未发送的消息
if (err) {
console.error('MQTT 断开连接时出错:', err);
} else {
console.log('MQTT 已成功断开连接');
}
client = null;
connectPromise = null;
resolve();
});
} else {
resolve();
}
});
}
// 发布消息
export async function publish(topic, message, options = { qos: 0, retain: false }) {
if (!client || !client.connected) {
console.warn('MQTT 客户端未连接。正在尝试连接...');
try {
await connect(); // 发布前确保已连接
} catch (error) {
console.error('发布前连接失败:', error);
uni.showToast({ title: 'MQTT 连接失败', icon: 'none' });
return Promise.reject('MQTT 客户端未连接');
}
}
return new Promise((resolve, reject) => {
client.publish(topic, message, options, (err) => {
if (err) {
console.error('MQTT 发布错误:', err);
reject(err);
} else {
console.log(`MQTT 消息已发布到主题 ${topic}:`, message);
resolve();
}
});
});
}
// 订阅主题
export async function subscribe(topic, options = { qos: 0 }) {
if (!client || !client.connected) {
console.warn('MQTT 客户端未连接。正在尝试连接...');
try {
await connect(); // 订阅前确保已连接
} catch (error) {
console.error('订阅前连接失败:', error);
uni.showToast({ title: 'MQTT 连接失败', icon: 'none' });
return Promise.reject('MQTT 客户端未连接');
}
}
return new Promise((resolve, reject) => {
client.subscribe(topic, options, (err, granted) => {
if (err) {
console.error('MQTT 订阅错误:', err);
reject(err);
} else {
console.log(`MQTT 已订阅主题: ${granted[0].topic} 服务质量 (QoS): ${granted[0].qos}`);
resolve(granted);
}
});
});
}
// 取消订阅主题
export function unsubscribe(topic) {
return new Promise((resolve, reject) => {
if (client && client.connected) {
client.unsubscribe(topic, (err) => {
if (err) {
console.error('MQTT 取消订阅错误:', err);
reject(err);
} else {
console.log(`MQTT 已取消订阅主题: ${topic}`);
resolve();
}
});
} else {
console.warn('MQTT 客户端未连接,无法取消订阅。');
resolve(); // 或 reject取决于期望的行为
}
});
}
// 处理传入的消息
// 连接后设置一次此回调至关重要。
export function onMessage(callback) {
if (client) {
client.on('message', callback); // 回调格式: (topic, message, packet) => { ... }
} else {
// 可能是 connect() 初始化 'client' 之前调用了此函数
// 延迟附加监听器,直到客户端可用。
const checkClientInterval = setInterval(() => {
if (client) {
client.on('message', callback);
clearInterval(checkClientInterval);
} else if (connectPromise === null && client === null) {
// 如果 connectPromise 为 null且 client 为 null表示 connect() 未被调用或已永久失败。
console.warn('MQTT 客户端未初始化以处理 onMessage且没有连接尝试正在进行中。');
clearInterval(checkClientInterval);
}
}, 100); // 每 100 毫秒检查一次
}
}
// 您可能希望直接公开客户端以用于高级用例或特定的事件处理
export function getClient() {
return client;
}
// 可选:添加一个函数来检查连接状态
export function isConnected() {
return client && client.connected;
}