heyujie
2021-05-24 4885600ecc369aa2e30a65de8dd7a410f13c34df
node_modules/mqtt/lib/connect/wx.js
@@ -1,76 +1,35 @@
'use strict'
var Transform = require('readable-stream').Transform
var duplexify = require('duplexify')
/* global wx */
var socketOpen = false
var socketMsgQueue = []
var socketTask
var proxy
var stream
function sendSocketMessage (msg) {
  if (socketOpen) {
    wx.sendSocketMessage({
      data: msg.buffer || msg
function buildProxy () {
  var proxy = new Transform()
  proxy._write = function (chunk, encoding, next) {
    socketTask.send({
      data: chunk.buffer,
      success: function () {
        next()
      },
      fail: function (errMsg) {
        next(new Error(errMsg))
      }
    })
  } else {
    socketMsgQueue.push(msg)
  }
}
function WebSocket (url, protocols) {
  var ws = {
    OPEN: 1,
    CLOSING: 2,
    CLOSED: 3,
    readyState: socketOpen ? 1 : 0,
    send: sendSocketMessage,
    close: wx.closeSocket,
    onopen: null,
    onmessage: null,
    onclose: null,
    onerror: null
  proxy._flush = function socketEnd (done) {
    socketTask.close({
      success: function () {
        done()
      }
    })
  }
  wx.connectSocket({
    url: url,
    protocols: protocols
  })
  wx.onSocketOpen(function (res) {
    ws.readyState = ws.OPEN
    socketOpen = true
    for (var i = 0; i < socketMsgQueue.length; i++) {
      sendSocketMessage(socketMsgQueue[i])
    }
    socketMsgQueue = []
    ws.onopen && ws.onopen.apply(ws, arguments)
  })
  wx.onSocketMessage(function (res) {
    ws.onmessage && ws.onmessage.apply(ws, arguments)
  })
  wx.onSocketClose(function () {
    ws.onclose && ws.onclose.apply(ws, arguments)
    ws.readyState = ws.CLOSED
    socketOpen = false
  })
  wx.onSocketError(function () {
    ws.onerror && ws.onerror.apply(ws, arguments)
    ws.readyState = ws.CLOSED
    socketOpen = false
  })
  return ws
}
var websocket = require('websocket-stream')
function buildUrl (opts, client) {
  var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
  var url = protocol + '://' + opts.hostname + opts.path
  if (opts.port && opts.port !== 80 && opts.port !== 443) {
    url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  }
  if (typeof (opts.transformWsUrl) === 'function') {
    url = opts.transformWsUrl(url, opts, client)
  }
  return url
  return proxy
}
function setDefaultOpts (opts) {
@@ -86,25 +45,90 @@
  }
}
function createWebSocket (client, opts) {
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'
  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  return websocket(WebSocket(url, [websocketSubProtocol]))
function buildUrl (opts, client) {
  var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
  var url = protocol + '://' + opts.hostname + opts.path
  if (opts.port && opts.port !== 80 && opts.port !== 443) {
    url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  }
  if (typeof (opts.transformWsUrl) === 'function') {
    url = opts.transformWsUrl(url, opts, client)
  }
  return url
}
function buildBuilder (client, opts) {
function bindEventHandler () {
  socketTask.onOpen(function () {
    stream.setReadable(proxy)
    stream.setWritable(proxy)
    stream.emit('connect')
  })
  socketTask.onMessage(function (res) {
    var data = res.data
    if (data instanceof ArrayBuffer) data = Buffer.from(data)
    else data = Buffer.from(data, 'utf8')
    proxy.push(data)
  })
  socketTask.onClose(function () {
    stream.end()
    stream.destroy()
  })
  socketTask.onError(function (res) {
    stream.destroy(new Error(res.errMsg))
  })
}
function buildStream (client, opts) {
  opts.hostname = opts.hostname || opts.host
  if (!opts.hostname) {
    throw new Error('Could not determine host. Specify host manually.')
  }
  return createWebSocket(client, opts)
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'
  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  socketTask = wx.connectSocket({
    url: url,
    protocols: websocketSubProtocol
  })
  proxy = buildProxy()
  stream = duplexify.obj()
  stream._destroy = function (err, cb) {
    socketTask.close({
      success: function () {
        cb && cb(err)
      }
    })
  }
  var destroyRef = stream.destroy
  stream.destroy = function () {
    stream.destroy = destroyRef
    var self = this
    process.nextTick(function () {
      socketTask.close({
        fail: function () {
          self._destroy(new Error())
        }
      })
    })
  }.bind(stream)
  bindEventHandler()
  return stream
}
module.exports = buildBuilder
module.exports = buildStream