1
0
forked from dyf/dyf-vue-ui
Files
dyf-vue-ui/src/utils/mqtt.ts

275 lines
7.5 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import * as Paho from 'paho-mqtt';
// MQTT消息类型定义
export interface MqttMessage {
topic: string;
payload: string | ArrayBuffer;
qos: number;
retained: boolean;
time: Date;
}
// 订阅选项
export interface SubscribeOptions {
qos: 0 | 1 | 2;
}
// 根据当前页面协议自动选择MQTT配置
const getMqttConfig = () => {
// 检测当前页面协议http: 或 https:
const isHttps = window.location.protocol === 'https:';
console.log(isHttps,'检测环境');
return {
// 自动切换协议https页面用wsshttp页面用ws
protocol: isHttps ? 'wss' : 'ws',
host: 'www.cnxhyc.com',
// 自动切换端口https对应9084http对应9083
port: isHttps ? 9084 : 9083,
// 认证信息
username: 'admin',
password: '#YtvpSfCNG',
clientId: `vue3-mqtt-client-${Date.now()}-${Math.random().toString(36).substring(2, 10)}`,
cleanSession: true,
keepAliveInterval: 60,
reconnect: true,
};
};
const MQTT_CONFIG = getMqttConfig();
// MQTT客户端组合式API
export function useMqtt() {
// 客户端实例
let client: Paho.Client | null = null;
// 状态管理修复已导入ref
const connected = ref(false);
const connecting = ref(false);
const error = ref<Error | null>(null);
const messages = ref<MqttMessage[]>([]);
const subscribedTopics = ref<string[]>([]);
// 事件回调
const connectCallbacks: (() => void)[] = [];
const messageCallbacks: ((message: MqttMessage) => void)[] = [];
const errorCallbacks: ((err: Error) => void)[] = [];
const disconnectCallbacks: (() => void)[] = [];
// 修复移除无用的p0参数connect方法不接受回调通过onConnect注册
const connect = () => {
if (connected.value || connecting.value) return;
connecting.value = true;
error.value = null;
try {
// 打印当前使用的配置(方便调试)
console.log('当前MQTT连接配置:', {
protocol: MQTT_CONFIG.protocol,
host: MQTT_CONFIG.host,
port: MQTT_CONFIG.port,
clientId: MQTT_CONFIG.clientId
});
client = new Paho.Client(
MQTT_CONFIG.host,
MQTT_CONFIG.port,
MQTT_CONFIG.clientId
);
// 设置连接选项
const connectOptions: Paho.ConnectOptions = {
userName: MQTT_CONFIG.username,
password: MQTT_CONFIG.password,
cleanSession: MQTT_CONFIG.cleanSession,
keepAliveInterval: MQTT_CONFIG.keepAliveInterval,
reconnect: MQTT_CONFIG.reconnect,
useSSL: MQTT_CONFIG.protocol === 'wss', // 关键:根据协议自动启用 SSL
// 连接成功回调
onSuccess: () => {
console.log('MQTT连接成功');
connected.value = true;
connecting.value = false;
connectCallbacks.forEach(cb => cb()); // 触发所有连接成功回调
},
// 连接失败回调
onFailure: (err) => {
console.error('MQTT连接失败:', err);
error.value = new Error(err.errorMessage || '连接失败');
connected.value = false;
connecting.value = false;
errorCallbacks.forEach(cb => cb(error.value!));
}
};
// 设置客户端回调
client.onConnectionLost = (responseObject) => {
if (responseObject.errorCode !== 0) {
console.error('连接丢失:', responseObject.errorMessage);
error.value = new Error(responseObject.errorMessage || '连接丢失');
errorCallbacks.forEach(cb => cb(error.value!));
}
connected.value = false;
connecting.value = false;
disconnectCallbacks.forEach(cb => cb());
};
// 消息接收回调
client.onMessageArrived = (message) => {
const newMessage: MqttMessage = {
topic: message.destinationName,
payload: message.payloadString || message.payloadBytes,
qos: message.qos,
retained: message.retained,
time: new Date()
};
messages.value.push(newMessage);
messageCallbacks.forEach(cb => cb(newMessage));
};
// 连接服务器
client.connect(connectOptions);
} catch (err) {
console.error('MQTT连接异常:', err);
error.value = err as Error;
connected.value = false;
connecting.value = false;
errorCallbacks.forEach(cb => cb(error.value!));
}
};
// 断开连接
const disconnect = () => {
if (!client || !connected.value) return;
client.disconnect();
client = null;
connected.value = false;
subscribedTopics.value = [];
disconnectCallbacks.forEach(cb => cb());
};
// 订阅主题
const subscribe = (topic: string, options: SubscribeOptions): Promise<void> => {
return new Promise((resolve, reject) => {
if (!client || !connected.value) {
reject(new Error('未连接到MQTT服务器'));
return;
}
if (subscribedTopics.value.includes(topic)) {
resolve();
return;
}
client.subscribe(topic, {
qos: options.qos,
onSuccess: () => {
console.log(`订阅主题成功: ${topic}`);
subscribedTopics.value.push(topic);
resolve();
},
onFailure: (err) => {
console.error(`订阅主题失败: ${topic}`, err);
reject(new Error(err.errorMessage || `订阅主题 ${topic} 失败`));
}
});
});
};
// 取消订阅
const unsubscribe = (topic: string): Promise<void> => {
return new Promise((resolve, reject) => {
if (!client || !connected.value) {
reject(new Error('未连接到MQTT服务器'));
return;
}
if (!subscribedTopics.value.includes(topic)) {
resolve();
return;
}
client.unsubscribe(topic, {
onSuccess: () => {
console.log(`取消订阅主题成功: ${topic}`);
subscribedTopics.value = subscribedTopics.value.filter(t => t !== topic);
resolve();
},
onFailure: (err) => {
console.error(`取消订阅主题失败: ${topic}`, err);
reject(new Error(err.errorMessage || `取消订阅主题 ${topic} 失败`));
}
});
});
};
// 发布消息
const publish = (
topic: string,
payload: string | ArrayBuffer,
options: { qos: 0 | 1 | 2; retained?: boolean }
): Promise<void> => {
return new Promise((resolve, reject) => {
if (!client || !connected.value) {
reject(new Error('未连接到MQTT服务器'));
return;
}
if (!topic) {
reject(new Error('主题不能为空'));
return;
}
const message = new Paho.Message(
typeof payload === 'string' ? payload : payload.toString()
);
message.destinationName = topic;
message.qos = options.qos;
message.retained = options.retained ?? false;
client.send(message);
resolve();
});
};
// 事件注册
const onConnect = (callback: () => void) => {
connectCallbacks.push(callback);
};
const onMessage = (callback: (message: MqttMessage) => void) => {
messageCallbacks.push(callback);
};
const onError = (callback: (err: Error) => void) => {
errorCallbacks.push(callback);
};
const onDisconnect = (callback: () => void) => {
disconnectCallbacks.push(callback);
};
// 组件卸载时断开连接
onUnmounted(() => {
disconnect();
});
return {
connected,
connecting,
error,
messages,
subscribedTopics,
connect,
disconnect,
subscribe,
unsubscribe,
publish,
onConnect,
onMessage,
onError,
onDisconnect
};
}