| | |
| | | 'use strict' |
| | | |
| | | var Transform = require('readable-stream').Transform |
| | | var duplexify = require('duplexify') |
| | | |
| | | /* global wx */ |
| | | var socketOpen = false |
| | | var socketMsgQueue = [] |
| | | var socketTask |
| | | var proxy |
| | | var stream |
| | | |
| | | function sendSocketMessage (msg) { |
| | | if (socketOpen) { |
| | | wx.sendSocketMessage({ |
| | | data: msg.buffer || msg |
| | | function buildProxy () { |
| | | var proxy = new Transform() |
| | | proxy._write = function (chunk, encoding, next) { |
| | | socketTask.send({ |
| | | data: chunk.buffer, |
| | | success: function () { |
| | | next() |
| | | }, |
| | | fail: function (errMsg) { |
| | | next(new Error(errMsg)) |
| | | } |
| | | }) |
| | | } else { |
| | | socketMsgQueue.push(msg) |
| | | } |
| | | } |
| | | |
| | | function WebSocket (url, protocols) { |
| | | var ws = { |
| | | OPEN: 1, |
| | | CLOSING: 2, |
| | | CLOSED: 3, |
| | | readyState: socketOpen ? 1 : 0, |
| | | send: sendSocketMessage, |
| | | close: wx.closeSocket, |
| | | onopen: null, |
| | | onmessage: null, |
| | | onclose: null, |
| | | onerror: null |
| | | proxy._flush = function socketEnd (done) { |
| | | socketTask.close({ |
| | | success: function () { |
| | | done() |
| | | } |
| | | }) |
| | | } |
| | | |
| | | wx.connectSocket({ |
| | | url: url, |
| | | protocols: protocols |
| | | }) |
| | | wx.onSocketOpen(function (res) { |
| | | ws.readyState = ws.OPEN |
| | | socketOpen = true |
| | | for (var i = 0; i < socketMsgQueue.length; i++) { |
| | | sendSocketMessage(socketMsgQueue[i]) |
| | | } |
| | | socketMsgQueue = [] |
| | | |
| | | ws.onopen && ws.onopen.apply(ws, arguments) |
| | | }) |
| | | wx.onSocketMessage(function (res) { |
| | | ws.onmessage && ws.onmessage.apply(ws, arguments) |
| | | }) |
| | | wx.onSocketClose(function () { |
| | | ws.onclose && ws.onclose.apply(ws, arguments) |
| | | ws.readyState = ws.CLOSED |
| | | socketOpen = false |
| | | }) |
| | | wx.onSocketError(function () { |
| | | ws.onerror && ws.onerror.apply(ws, arguments) |
| | | ws.readyState = ws.CLOSED |
| | | socketOpen = false |
| | | }) |
| | | |
| | | return ws |
| | | } |
| | | |
| | | var websocket = require('websocket-stream') |
| | | |
| | | function buildUrl (opts, client) { |
| | | var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' |
| | | var url = protocol + '://' + opts.hostname + opts.path |
| | | if (opts.port && opts.port !== 80 && opts.port !== 443) { |
| | | url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path |
| | | } |
| | | if (typeof (opts.transformWsUrl) === 'function') { |
| | | url = opts.transformWsUrl(url, opts, client) |
| | | } |
| | | return url |
| | | return proxy |
| | | } |
| | | |
| | | function setDefaultOpts (opts) { |
| | |
| | | } |
| | | } |
| | | |
| | | function createWebSocket (client, opts) { |
| | | var websocketSubProtocol = |
| | | (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) |
| | | ? 'mqttv3.1' |
| | | : 'mqtt' |
| | | |
| | | setDefaultOpts(opts) |
| | | var url = buildUrl(opts, client) |
| | | return websocket(WebSocket(url, [websocketSubProtocol])) |
| | | function buildUrl (opts, client) { |
| | | var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' |
| | | var url = protocol + '://' + opts.hostname + opts.path |
| | | if (opts.port && opts.port !== 80 && opts.port !== 443) { |
| | | url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path |
| | | } |
| | | if (typeof (opts.transformWsUrl) === 'function') { |
| | | url = opts.transformWsUrl(url, opts, client) |
| | | } |
| | | return url |
| | | } |
| | | |
| | | function buildBuilder (client, opts) { |
| | | function bindEventHandler () { |
| | | socketTask.onOpen(function () { |
| | | stream.setReadable(proxy) |
| | | stream.setWritable(proxy) |
| | | stream.emit('connect') |
| | | }) |
| | | |
| | | socketTask.onMessage(function (res) { |
| | | var data = res.data |
| | | |
| | | if (data instanceof ArrayBuffer) data = Buffer.from(data) |
| | | else data = Buffer.from(data, 'utf8') |
| | | proxy.push(data) |
| | | }) |
| | | |
| | | socketTask.onClose(function () { |
| | | stream.end() |
| | | stream.destroy() |
| | | }) |
| | | |
| | | socketTask.onError(function (res) { |
| | | stream.destroy(new Error(res.errMsg)) |
| | | }) |
| | | } |
| | | |
| | | function buildStream (client, opts) { |
| | | opts.hostname = opts.hostname || opts.host |
| | | |
| | | if (!opts.hostname) { |
| | | throw new Error('Could not determine host. Specify host manually.') |
| | | } |
| | | |
| | | return createWebSocket(client, opts) |
| | | var websocketSubProtocol = |
| | | (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) |
| | | ? 'mqttv3.1' |
| | | : 'mqtt' |
| | | |
| | | setDefaultOpts(opts) |
| | | |
| | | var url = buildUrl(opts, client) |
| | | socketTask = wx.connectSocket({ |
| | | url: url, |
| | | protocols: websocketSubProtocol |
| | | }) |
| | | |
| | | proxy = buildProxy() |
| | | stream = duplexify.obj() |
| | | stream._destroy = function (err, cb) { |
| | | socketTask.close({ |
| | | success: function () { |
| | | cb && cb(err) |
| | | } |
| | | }) |
| | | } |
| | | |
| | | var destroyRef = stream.destroy |
| | | stream.destroy = function () { |
| | | stream.destroy = destroyRef |
| | | |
| | | var self = this |
| | | process.nextTick(function () { |
| | | socketTask.close({ |
| | | fail: function () { |
| | | self._destroy(new Error()) |
| | | } |
| | | }) |
| | | }) |
| | | }.bind(stream) |
| | | |
| | | bindEventHandler() |
| | | |
| | | return stream |
| | | } |
| | | |
| | | module.exports = buildBuilder |
| | | module.exports = buildStream |