presence-manager.js 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /**
  2. * 用户在线状态管理器
  3. * 基于WebSocket心跳检测实现
  4. */
  5. class PresenceManager {
  6. constructor() {
  7. this.ws = null;
  8. this.heartbeatTimer = null;
  9. this.reconnectTimer = null;
  10. this.heartbeatInterval = 30000; // 30秒心跳
  11. this.reconnectInterval = 5000; // 5秒重连
  12. this.isConnected = false;
  13. this.userId = null;
  14. this.statusCallbacks = new Map(); // 存储状态变化回调
  15. this.onlineStatusCache = new Map(); // 缓存在线状态
  16. this.messageCallbacks = []; // 存储消息回调(用于已读回执等)
  17. // WebSocket服务器地址(使用实际的聊天WebSocket服务)
  18. this.wsUrl = 'ws://localhost:8083/ws/chat';
  19. this.httpUrl = 'http://localhost:8083/api/online'; // HTTP API地址
  20. }
  21. /**
  22. * 连接WebSocket
  23. * @param {String} userId 当前用户ID
  24. */
  25. connect(userId) {
  26. if (this.isConnected || !userId) {
  27. return;
  28. }
  29. this.userId = userId;
  30. try {
  31. // uni-app的WebSocket API
  32. this.ws = uni.connectSocket({
  33. url: `${this.wsUrl}?userId=${userId}`,
  34. success: () => {
  35. console.log('🔌 WebSocket连接请求已发送');
  36. },
  37. fail: (err) => {
  38. console.error('❌ WebSocket连接失败:', err);
  39. this.scheduleReconnect();
  40. }
  41. });
  42. // 监听连接打开
  43. uni.onSocketOpen(() => {
  44. console.log('✅ WebSocket已连接');
  45. this.isConnected = true;
  46. this.startHeartbeat();
  47. });
  48. // 监听消息
  49. uni.onSocketMessage((res) => {
  50. this.handleMessage(res.data);
  51. });
  52. // 监听错误
  53. uni.onSocketError((err) => {
  54. console.error('❌ WebSocket错误:', err);
  55. this.isConnected = false;
  56. this.scheduleReconnect();
  57. });
  58. // 监听关闭
  59. uni.onSocketClose(() => {
  60. console.log('🔌 WebSocket已关闭');
  61. this.isConnected = false;
  62. this.stopHeartbeat();
  63. this.scheduleReconnect();
  64. });
  65. } catch (error) {
  66. console.error('❌ WebSocket连接异常:', error);
  67. this.scheduleReconnect();
  68. }
  69. }
  70. /**
  71. * 处理接收到的消息
  72. */
  73. handleMessage(data) {
  74. try {
  75. const message = typeof data === 'string' ? JSON.parse(data) : data;
  76. switch (message.type) {
  77. case 'PONG':
  78. // 心跳响应
  79. console.log('💓 收到心跳响应');
  80. break;
  81. case 'ONLINE':
  82. // 用户上线通知
  83. if (message.fromUserId) {
  84. this.onlineStatusCache.set(String(message.fromUserId), true);
  85. this.notifyStatusChange(String(message.fromUserId), 'online');
  86. }
  87. break;
  88. case 'OFFLINE':
  89. // 用户离线通知
  90. if (message.fromUserId) {
  91. this.onlineStatusCache.set(String(message.fromUserId), false);
  92. this.notifyStatusChange(String(message.fromUserId), 'offline');
  93. }
  94. break;
  95. case 'STATUS_UPDATE':
  96. // 用户状态更新
  97. if (message.userId) {
  98. const status = message.online ? 'online' : 'offline';
  99. this.onlineStatusCache.set(String(message.userId), message.online);
  100. this.notifyStatusChange(String(message.userId), status);
  101. }
  102. break;
  103. default:
  104. // 其他消息类型,调用注册的回调
  105. if (this.messageCallbacks && this.messageCallbacks.length > 0) {
  106. this.messageCallbacks.forEach(callback => {
  107. try {
  108. callback(message);
  109. } catch (err) {
  110. console.error('❌ 消息回调执行失败:', err);
  111. }
  112. });
  113. }
  114. break;
  115. }
  116. } catch (error) {
  117. console.error('❌ 消息解析失败:', error);
  118. }
  119. }
  120. /**
  121. * 通过HTTP API查询用户在线状态
  122. * @param {String} userId 目标用户ID
  123. * @return {Promise<Boolean>} 是否在线
  124. */
  125. async queryOnlineStatus(userId) {
  126. try {
  127. const [err, res] = await uni.request({
  128. url: `${this.httpUrl}/checkStatus`,
  129. method: 'GET',
  130. data: {
  131. userId: userId
  132. }
  133. });
  134. if (err) {
  135. console.error('❌ 查询在线状态失败:', err);
  136. return false;
  137. }
  138. if (res.data && res.data.code === 200) {
  139. const isOnline = res.data.data.online || false;
  140. this.onlineStatusCache.set(String(userId), isOnline);
  141. return isOnline;
  142. }
  143. return false;
  144. } catch (error) {
  145. console.error('❌ 查询在线状态异常:', error);
  146. return false;
  147. }
  148. }
  149. /**
  150. * 获取缓存的在线状态
  151. * @param {String} userId 目标用户ID
  152. * @return {Boolean|null} 在线状态,null表示未知
  153. */
  154. getCachedStatus(userId) {
  155. return this.onlineStatusCache.get(String(userId)) || null;
  156. }
  157. /**
  158. * 启动心跳
  159. */
  160. startHeartbeat() {
  161. this.stopHeartbeat();
  162. this.heartbeatTimer = setInterval(() => {
  163. if (this.isConnected) {
  164. this.sendMessage({
  165. type: 'PING',
  166. fromUserId: this.userId,
  167. timestamp: Date.now()
  168. });
  169. }
  170. }, this.heartbeatInterval);
  171. }
  172. /**
  173. * 停止心跳
  174. */
  175. stopHeartbeat() {
  176. if (this.heartbeatTimer) {
  177. clearInterval(this.heartbeatTimer);
  178. this.heartbeatTimer = null;
  179. }
  180. }
  181. /**
  182. * 发送消息
  183. */
  184. sendMessage(data) {
  185. if (!this.isConnected) {
  186. console.warn('⚠️ WebSocket未连接,无法发送消息');
  187. return;
  188. }
  189. try {
  190. uni.sendSocketMessage({
  191. data: JSON.stringify(data),
  192. fail: (err) => {
  193. console.error('❌ 发送消息失败:', err);
  194. }
  195. });
  196. } catch (error) {
  197. console.error('❌ 发送消息异常:', error);
  198. }
  199. }
  200. /**
  201. * 订阅用户状态
  202. * @param {String} targetUserId 目标用户ID
  203. */
  204. subscribeUser(targetUserId) {
  205. if (!this.isConnected || !targetUserId) {
  206. return;
  207. }
  208. this.sendMessage({
  209. type: 'SUBSCRIBE',
  210. fromUserId: this.userId,
  211. toUserId: targetUserId
  212. });
  213. }
  214. /**
  215. * 取消订阅用户状态
  216. * @param {String} targetUserId 目标用户ID
  217. */
  218. unsubscribeUser(targetUserId) {
  219. if (!this.isConnected || !targetUserId) {
  220. return;
  221. }
  222. this.sendMessage({
  223. type: 'UNSUBSCRIBE',
  224. fromUserId: this.userId,
  225. toUserId: targetUserId
  226. });
  227. }
  228. /**
  229. * 监听用户状态变化
  230. * @param {String} targetUserId 目标用户ID
  231. * @param {Function} callback 回调函数 (status) => {}
  232. */
  233. onStatusChange(targetUserId, callback) {
  234. if (!this.statusCallbacks.has(targetUserId)) {
  235. this.statusCallbacks.set(targetUserId, []);
  236. }
  237. this.statusCallbacks.get(targetUserId).push(callback);
  238. // 订阅该用户
  239. this.subscribeUser(targetUserId);
  240. }
  241. /**
  242. * 移除状态监听
  243. * @param {String} targetUserId 目标用户ID
  244. * @param {Function} callback 回调函数
  245. */
  246. offStatusChange(targetUserId, callback) {
  247. if (!this.statusCallbacks.has(targetUserId)) {
  248. return;
  249. }
  250. const callbacks = this.statusCallbacks.get(targetUserId);
  251. const index = callbacks.indexOf(callback);
  252. if (index > -1) {
  253. callbacks.splice(index, 1);
  254. }
  255. // 如果没有回调了,取消订阅
  256. if (callbacks.length === 0) {
  257. this.statusCallbacks.delete(targetUserId);
  258. this.unsubscribeUser(targetUserId);
  259. }
  260. }
  261. /**
  262. * 通知状态变化
  263. */
  264. notifyStatusChange(userId, status) {
  265. console.log(`👤 用户 ${userId} 状态变更: ${status}`);
  266. const callbacks = this.statusCallbacks.get(userId);
  267. if (callbacks && callbacks.length > 0) {
  268. callbacks.forEach(callback => {
  269. try {
  270. callback(status);
  271. } catch (error) {
  272. console.error('❌ 状态回调执行失败:', error);
  273. }
  274. });
  275. }
  276. }
  277. /**
  278. * 计划重连
  279. */
  280. scheduleReconnect() {
  281. if (this.reconnectTimer) {
  282. return;
  283. }
  284. console.log('🔄 将在5秒后重连...');
  285. this.reconnectTimer = setTimeout(() => {
  286. this.reconnectTimer = null;
  287. if (!this.isConnected && this.userId) {
  288. console.log('🔄 尝试重连...');
  289. this.connect(this.userId);
  290. }
  291. }, this.reconnectInterval);
  292. }
  293. /**
  294. * 断开连接
  295. */
  296. disconnect() {
  297. this.stopHeartbeat();
  298. if (this.reconnectTimer) {
  299. clearTimeout(this.reconnectTimer);
  300. this.reconnectTimer = null;
  301. }
  302. if (this.isConnected) {
  303. uni.closeSocket();
  304. this.isConnected = false;
  305. }
  306. this.statusCallbacks.clear();
  307. this.messageCallbacks = []; // 清空消息回调
  308. this.userId = null;
  309. }
  310. /**
  311. * 获取连接状态
  312. */
  313. getConnectionStatus() {
  314. return this.isConnected;
  315. }
  316. }
  317. // 导出单例
  318. export default new PresenceManager();