liudong
2023-05-29 340f156319b863525e50e900c58e59b86ecb3d5e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
'use strict'
 
const { Buffer } = require('buffer')
const WS = require('ws')
const debug = require('debug')('mqttjs:ws')
const duplexify = require('duplexify')
const Transform = require('readable-stream').Transform
 
const WSS_OPTIONS = [
  'rejectUnauthorized',
  'ca',
  'cert',
  'key',
  'pfx',
  'passphrase'
]
// eslint-disable-next-line camelcase
const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
function buildUrl (opts, client) {
  let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  if (typeof (opts.transformWsUrl) === 'function') {
    url = opts.transformWsUrl(url, opts, client)
  }
  return url
}
 
function setDefaultOpts (opts) {
  const options = opts
  if (!opts.hostname) {
    options.hostname = 'localhost'
  }
  if (!opts.port) {
    if (opts.protocol === 'wss') {
      options.port = 443
    } else {
      options.port = 80
    }
  }
  if (!opts.path) {
    options.path = '/'
  }
 
  if (!opts.wsOptions) {
    options.wsOptions = {}
  }
  if (!IS_BROWSER && opts.protocol === 'wss') {
    // Add cert/key/ca etc options
    WSS_OPTIONS.forEach(function (prop) {
      if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {
        options.wsOptions[prop] = opts[prop]
      }
    })
  }
 
  return options
}
 
function setDefaultBrowserOpts (opts) {
  const options = setDefaultOpts(opts)
 
  if (!options.hostname) {
    options.hostname = options.host
  }
 
  if (!options.hostname) {
    // Throwing an error in a Web Worker if no `hostname` is given, because we
    // can not determine the `hostname` automatically.  If connecting to
    // localhost, please supply the `hostname` as an argument.
    if (typeof (document) === 'undefined') {
      throw new Error('Could not determine host. Specify host manually.')
    }
    const parsed = new URL(document.URL)
    options.hostname = parsed.hostname
 
    if (!options.port) {
      options.port = parsed.port
    }
  }
 
  // objectMode should be defined for logic
  if (options.objectMode === undefined) {
    options.objectMode = !(options.binary === true || options.binary === undefined)
  }
 
  return options
}
 
function createWebSocket (client, url, opts) {
  debug('createWebSocket')
  debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
  const websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'
 
  debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
  const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
  return socket
}
 
function createBrowserWebSocket (client, opts) {
  const websocketSubProtocol =
  (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
    ? 'mqttv3.1'
    : 'mqtt'
 
  const url = buildUrl(opts, client)
  /* global WebSocket */
  const socket = new WebSocket(url, [websocketSubProtocol])
  socket.binaryType = 'arraybuffer'
  return socket
}
 
function streamBuilder (client, opts) {
  debug('streamBuilder')
  const options = setDefaultOpts(opts)
  const url = buildUrl(options, client)
  const socket = createWebSocket(client, url, options)
  const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
  webSocketStream.url = url
  socket.on('close', () => { webSocketStream.destroy() })
  return webSocketStream
}
 
function browserStreamBuilder (client, opts) {
  debug('browserStreamBuilder')
  let stream
  const options = setDefaultBrowserOpts(opts)
  // sets the maximum socket buffer size before throttling
  const bufferSize = options.browserBufferSize || 1024 * 512
 
  const bufferTimeout = opts.browserBufferTimeout || 1000
 
  const coerceToBuffer = !opts.objectMode
 
  const socket = createBrowserWebSocket(client, opts)
 
  const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)
 
  if (!opts.objectMode) {
    proxy._writev = writev
  }
  proxy.on('close', () => { socket.close() })
 
  const eventListenerSupport = (typeof socket.addEventListener !== 'undefined')
 
  // was already open when passed in
  if (socket.readyState === socket.OPEN) {
    stream = proxy
  } else {
    stream = stream = duplexify(undefined, undefined, opts)
    if (!opts.objectMode) {
      stream._writev = writev
    }
 
    if (eventListenerSupport) {
      socket.addEventListener('open', onopen)
    } else {
      socket.onopen = onopen
    }
  }
 
  stream.socket = socket
 
  if (eventListenerSupport) {
    socket.addEventListener('close', onclose)
    socket.addEventListener('error', onerror)
    socket.addEventListener('message', onmessage)
  } else {
    socket.onclose = onclose
    socket.onerror = onerror
    socket.onmessage = onmessage
  }
 
  // methods for browserStreamBuilder
 
  function buildProxy (options, socketWrite, socketEnd) {
    const proxy = new Transform({
      objectModeMode: options.objectMode
    })
 
    proxy._write = socketWrite
    proxy._flush = socketEnd
 
    return proxy
  }
 
  function onopen () {
    stream.setReadable(proxy)
    stream.setWritable(proxy)
    stream.emit('connect')
  }
 
  function onclose () {
    stream.end()
    stream.destroy()
  }
 
  function onerror (err) {
    stream.destroy(err)
  }
 
  function onmessage (event) {
    let data = event.data
    if (data instanceof ArrayBuffer) data = Buffer.from(data)
    else data = Buffer.from(data, 'utf8')
    proxy.push(data)
  }
 
  // this is to be enabled only if objectMode is false
  function writev (chunks, cb) {
    const buffers = new Array(chunks.length)
    for (let i = 0; i < chunks.length; i++) {
      if (typeof chunks[i].chunk === 'string') {
        buffers[i] = Buffer.from(chunks[i], 'utf8')
      } else {
        buffers[i] = chunks[i].chunk
      }
    }
 
    this._write(Buffer.concat(buffers), 'binary', cb)
  }
 
  function socketWriteBrowser (chunk, enc, next) {
    if (socket.bufferedAmount > bufferSize) {
      // throttle data until buffered amount is reduced.
      setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
    }
 
    if (coerceToBuffer && typeof chunk === 'string') {
      chunk = Buffer.from(chunk, 'utf8')
    }
 
    try {
      socket.send(chunk)
    } catch (err) {
      return next(err)
    }
 
    next()
  }
 
  function socketEndBrowser (done) {
    socket.close()
    done()
  }
 
  // end methods for browserStreamBuilder
 
  return stream
}
 
if (IS_BROWSER) {
  module.exports = browserStreamBuilder
} else {
  module.exports = streamBuilder
}