From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001
From: heyujie <516346543@qq.com>
Date: 星期一, 24 五月 2021 11:25:04 +0800
Subject: [PATCH] 解决app通信问题

---
 node_modules/mqtt/lib/connect/wx.js |  178 +++++++++++++++++++++++++++++++++-------------------------
 1 files changed, 101 insertions(+), 77 deletions(-)

diff --git a/node_modules/mqtt/lib/connect/wx.js b/node_modules/mqtt/lib/connect/wx.js
index 5178117..c5048b5 100644
--- a/node_modules/mqtt/lib/connect/wx.js
+++ b/node_modules/mqtt/lib/connect/wx.js
@@ -1,76 +1,35 @@
 'use strict'
 
+var Transform = require('readable-stream').Transform
+var duplexify = require('duplexify')
+
 /* global wx */
-var socketOpen = false
-var socketMsgQueue = []
+var socketTask
+var proxy
+var stream
 
-function sendSocketMessage (msg) {
-  if (socketOpen) {
-    wx.sendSocketMessage({
-      data: msg.buffer || msg
+function buildProxy () {
+  var proxy = new Transform()
+  proxy._write = function (chunk, encoding, next) {
+    socketTask.send({
+      data: chunk.buffer,
+      success: function () {
+        next()
+      },
+      fail: function (errMsg) {
+        next(new Error(errMsg))
+      }
     })
-  } else {
-    socketMsgQueue.push(msg)
   }
-}
-
-function WebSocket (url, protocols) {
-  var ws = {
-    OPEN: 1,
-    CLOSING: 2,
-    CLOSED: 3,
-    readyState: socketOpen ? 1 : 0,
-    send: sendSocketMessage,
-    close: wx.closeSocket,
-    onopen: null,
-    onmessage: null,
-    onclose: null,
-    onerror: null
+  proxy._flush = function socketEnd (done) {
+    socketTask.close({
+      success: function () {
+        done()
+      }
+    })
   }
 
-  wx.connectSocket({
-    url: url,
-    protocols: protocols
-  })
-  wx.onSocketOpen(function (res) {
-    ws.readyState = ws.OPEN
-    socketOpen = true
-    for (var i = 0; i < socketMsgQueue.length; i++) {
-      sendSocketMessage(socketMsgQueue[i])
-    }
-    socketMsgQueue = []
-
-    ws.onopen && ws.onopen.apply(ws, arguments)
-  })
-  wx.onSocketMessage(function (res) {
-    ws.onmessage && ws.onmessage.apply(ws, arguments)
-  })
-  wx.onSocketClose(function () {
-    ws.onclose && ws.onclose.apply(ws, arguments)
-    ws.readyState = ws.CLOSED
-    socketOpen = false
-  })
-  wx.onSocketError(function () {
-    ws.onerror && ws.onerror.apply(ws, arguments)
-    ws.readyState = ws.CLOSED
-    socketOpen = false
-  })
-
-  return ws
-}
-
-var websocket = require('websocket-stream')
-
-function buildUrl (opts, client) {
-  var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
-  var url = protocol + '://' + opts.hostname + opts.path
-  if (opts.port && opts.port !== 80 && opts.port !== 443) {
-    url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
-  }
-  if (typeof (opts.transformWsUrl) === 'function') {
-    url = opts.transformWsUrl(url, opts, client)
-  }
-  return url
+  return proxy
 }
 
 function setDefaultOpts (opts) {
@@ -86,25 +45,90 @@
   }
 }
 
-function createWebSocket (client, opts) {
-  var websocketSubProtocol =
-    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
-      ? 'mqttv3.1'
-      : 'mqtt'
-
-  setDefaultOpts(opts)
-  var url = buildUrl(opts, client)
-  return websocket(WebSocket(url, [websocketSubProtocol]))
+function buildUrl (opts, client) {
+  var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
+  var url = protocol + '://' + opts.hostname + opts.path
+  if (opts.port && opts.port !== 80 && opts.port !== 443) {
+    url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
+  }
+  if (typeof (opts.transformWsUrl) === 'function') {
+    url = opts.transformWsUrl(url, opts, client)
+  }
+  return url
 }
 
-function buildBuilder (client, opts) {
+function bindEventHandler () {
+  socketTask.onOpen(function () {
+    stream.setReadable(proxy)
+    stream.setWritable(proxy)
+    stream.emit('connect')
+  })
+
+  socketTask.onMessage(function (res) {
+    var data = res.data
+
+    if (data instanceof ArrayBuffer) data = Buffer.from(data)
+    else data = Buffer.from(data, 'utf8')
+    proxy.push(data)
+  })
+
+  socketTask.onClose(function () {
+    stream.end()
+    stream.destroy()
+  })
+
+  socketTask.onError(function (res) {
+    stream.destroy(new Error(res.errMsg))
+  })
+}
+
+function buildStream (client, opts) {
   opts.hostname = opts.hostname || opts.host
 
   if (!opts.hostname) {
     throw new Error('Could not determine host. Specify host manually.')
   }
 
-  return createWebSocket(client, opts)
+  var websocketSubProtocol =
+    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
+      ? 'mqttv3.1'
+      : 'mqtt'
+
+  setDefaultOpts(opts)
+
+  var url = buildUrl(opts, client)
+  socketTask = wx.connectSocket({
+    url: url,
+    protocols: websocketSubProtocol
+  })
+
+  proxy = buildProxy()
+  stream = duplexify.obj()
+  stream._destroy = function (err, cb) {
+    socketTask.close({
+      success: function () {
+        cb && cb(err)
+      }
+    })
+  }
+
+  var destroyRef = stream.destroy
+  stream.destroy = function () {
+    stream.destroy = destroyRef
+
+    var self = this
+    process.nextTick(function () {
+      socketTask.close({
+        fail: function () {
+          self._destroy(new Error())
+        }
+      })
+    })
+  }.bind(stream)
+
+  bindEventHandler()
+
+  return stream
 }
 
-module.exports = buildBuilder
+module.exports = buildStream

--
Gitblit v1.8.0