From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001 From: heyujie <516346543@qq.com> Date: 星期一, 24 五月 2021 11:25:04 +0800 Subject: [PATCH] 解决app通信问题 --- node_modules/mqtt/lib/connect/wx.js | 178 +++++++++++++++++++++++++++++++++------------------------- 1 files changed, 101 insertions(+), 77 deletions(-) diff --git a/node_modules/mqtt/lib/connect/wx.js b/node_modules/mqtt/lib/connect/wx.js index 5178117..c5048b5 100644 --- a/node_modules/mqtt/lib/connect/wx.js +++ b/node_modules/mqtt/lib/connect/wx.js @@ -1,76 +1,35 @@ 'use strict' +var Transform = require('readable-stream').Transform +var duplexify = require('duplexify') + /* global wx */ -var socketOpen = false -var socketMsgQueue = [] +var socketTask +var proxy +var stream -function sendSocketMessage (msg) { - if (socketOpen) { - wx.sendSocketMessage({ - data: msg.buffer || msg +function buildProxy () { + var proxy = new Transform() + proxy._write = function (chunk, encoding, next) { + socketTask.send({ + data: chunk.buffer, + success: function () { + next() + }, + fail: function (errMsg) { + next(new Error(errMsg)) + } }) - } else { - socketMsgQueue.push(msg) } -} - -function WebSocket (url, protocols) { - var ws = { - OPEN: 1, - CLOSING: 2, - CLOSED: 3, - readyState: socketOpen ? 1 : 0, - send: sendSocketMessage, - close: wx.closeSocket, - onopen: null, - onmessage: null, - onclose: null, - onerror: null + proxy._flush = function socketEnd (done) { + socketTask.close({ + success: function () { + done() + } + }) } - wx.connectSocket({ - url: url, - protocols: protocols - }) - wx.onSocketOpen(function (res) { - ws.readyState = ws.OPEN - socketOpen = true - for (var i = 0; i < socketMsgQueue.length; i++) { - sendSocketMessage(socketMsgQueue[i]) - } - socketMsgQueue = [] - - ws.onopen && ws.onopen.apply(ws, arguments) - }) - wx.onSocketMessage(function (res) { - ws.onmessage && ws.onmessage.apply(ws, arguments) - }) - wx.onSocketClose(function () { - ws.onclose && ws.onclose.apply(ws, arguments) - ws.readyState = ws.CLOSED - socketOpen = false - }) - wx.onSocketError(function () { - ws.onerror && ws.onerror.apply(ws, arguments) - ws.readyState = ws.CLOSED - socketOpen = false - }) - - return ws -} - -var websocket = require('websocket-stream') - -function buildUrl (opts, client) { - var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' - var url = protocol + '://' + opts.hostname + opts.path - if (opts.port && opts.port !== 80 && opts.port !== 443) { - url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path - } - if (typeof (opts.transformWsUrl) === 'function') { - url = opts.transformWsUrl(url, opts, client) - } - return url + return proxy } function setDefaultOpts (opts) { @@ -86,25 +45,90 @@ } } -function createWebSocket (client, opts) { - var websocketSubProtocol = - (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) - ? 'mqttv3.1' - : 'mqtt' - - setDefaultOpts(opts) - var url = buildUrl(opts, client) - return websocket(WebSocket(url, [websocketSubProtocol])) +function buildUrl (opts, client) { + var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' + var url = protocol + '://' + opts.hostname + opts.path + if (opts.port && opts.port !== 80 && opts.port !== 443) { + url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path + } + if (typeof (opts.transformWsUrl) === 'function') { + url = opts.transformWsUrl(url, opts, client) + } + return url } -function buildBuilder (client, opts) { +function bindEventHandler () { + socketTask.onOpen(function () { + stream.setReadable(proxy) + stream.setWritable(proxy) + stream.emit('connect') + }) + + socketTask.onMessage(function (res) { + var data = res.data + + if (data instanceof ArrayBuffer) data = Buffer.from(data) + else data = Buffer.from(data, 'utf8') + proxy.push(data) + }) + + socketTask.onClose(function () { + stream.end() + stream.destroy() + }) + + socketTask.onError(function (res) { + stream.destroy(new Error(res.errMsg)) + }) +} + +function buildStream (client, opts) { opts.hostname = opts.hostname || opts.host if (!opts.hostname) { throw new Error('Could not determine host. Specify host manually.') } - return createWebSocket(client, opts) + var websocketSubProtocol = + (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) + ? 'mqttv3.1' + : 'mqtt' + + setDefaultOpts(opts) + + var url = buildUrl(opts, client) + socketTask = wx.connectSocket({ + url: url, + protocols: websocketSubProtocol + }) + + proxy = buildProxy() + stream = duplexify.obj() + stream._destroy = function (err, cb) { + socketTask.close({ + success: function () { + cb && cb(err) + } + }) + } + + var destroyRef = stream.destroy + stream.destroy = function () { + stream.destroy = destroyRef + + var self = this + process.nextTick(function () { + socketTask.close({ + fail: function () { + self._destroy(new Error()) + } + }) + }) + }.bind(stream) + + bindEventHandler() + + return stream } -module.exports = buildBuilder +module.exports = buildStream -- Gitblit v1.8.0