From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001 From: heyujie <516346543@qq.com> Date: 星期一, 24 五月 2021 11:25:04 +0800 Subject: [PATCH] 解决app通信问题 --- node_modules/mqtt/lib/client.js | 691 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 511 insertions(+), 180 deletions(-) diff --git a/node_modules/mqtt/lib/client.js b/node_modules/mqtt/lib/client.js index 81ab1ae..ca003bb 100644 --- a/node_modules/mqtt/lib/client.js +++ b/node_modules/mqtt/lib/client.js @@ -5,7 +5,6 @@ */ var events = require('events') var Store = require('./store') -var eos = require('end-of-stream') var mqttPacket = require('mqtt-packet') var Writable = require('readable-stream').Writable var inherits = require('inherits') @@ -26,6 +25,51 @@ clean: true, resubscribe: true } +var errors = { + 0: '', + 1: 'Unacceptable protocol version', + 2: 'Identifier rejected', + 3: 'Server unavailable', + 4: 'Bad username or password', + 5: 'Not authorized', + 16: 'No matching subscribers', + 17: 'No subscription existed', + 128: 'Unspecified error', + 129: 'Malformed Packet', + 130: 'Protocol Error', + 131: 'Implementation specific error', + 132: 'Unsupported Protocol Version', + 133: 'Client Identifier not valid', + 134: 'Bad User Name or Password', + 135: 'Not authorized', + 136: 'Server unavailable', + 137: 'Server busy', + 138: 'Banned', + 139: 'Server shutting down', + 140: 'Bad authentication method', + 141: 'Keep Alive timeout', + 142: 'Session taken over', + 143: 'Topic Filter invalid', + 144: 'Topic Name invalid', + 145: 'Packet identifier in use', + 146: 'Packet Identifier not found', + 147: 'Receive Maximum exceeded', + 148: 'Topic Alias invalid', + 149: 'Packet too large', + 150: 'Message rate too high', + 151: 'Quota exceeded', + 152: 'Administrative action', + 153: 'Payload format invalid', + 154: 'Retain not supported', + 155: 'QoS not supported', + 156: 'Use another server', + 157: 'Server moved', + 158: 'Shared Subscriptions not supported', + 159: 'Connection rate exceeded', + 160: 'Maximum connect time', + 161: 'Subscription Identifiers not supported', + 162: 'Wildcard Subscriptions not supported' +} function defaultId () { return 'mqttjs_' + Math.random().toString(16).substr(2, 8) @@ -34,7 +78,7 @@ function sendPacket (client, packet, cb) { client.emit('packetsend', packet) - var result = mqttPacket.writeToStream(packet, client.stream) + var result = mqttPacket.writeToStream(packet, client.stream, client.options) if (!result && cb) { client.stream.once('drain', cb) @@ -46,19 +90,31 @@ function flush (queue) { if (queue) { Object.keys(queue).forEach(function (messageId) { - if (typeof queue[messageId] === 'function') { - queue[messageId](new Error('Connection closed')) + if (typeof queue[messageId].cb === 'function') { + queue[messageId].cb(new Error('Connection closed')) delete queue[messageId] } }) } } -function storeAndSend (client, packet, cb) { +function flushVolatile (queue) { + if (queue) { + Object.keys(queue).forEach(function (messageId) { + if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { + queue[messageId].cb(new Error('Connection closed')) + delete queue[messageId] + } + }) + } +} + +function storeAndSend (client, packet, cb, cbStorePut) { client.outgoingStore.put(packet, function storedPacket (err) { if (err) { return cb && cb(err) } + cbStorePut() sendPacket(client, packet, cb) }) } @@ -91,16 +147,18 @@ } } - this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId() + this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() + + this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } this.streamBuilder = streamBuilder // Inflight message storages - this.outgoingStore = this.options.outgoingStore || new Store() - this.incomingStore = this.options.incomingStore || new Store() + this.outgoingStore = options.outgoingStore || new Store() + this.incomingStore = options.incomingStore || new Store() // Should QoS zero messages be queued when the connection is broken? - this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero + this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero // map of subscribed topics to support reconnection this._resubscribeTopics = {} @@ -120,6 +178,10 @@ this.connackTimer = null // Reconnect timer this.reconnectTimer = null + // Is processing store? + this._storeProcessing = false + // Packet Ids are put into the store during store processing + this._packetIdsDuringStoreProcessing = {} /** * MessageIDs starting with 1 * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 @@ -129,73 +191,14 @@ // Inflight callbacks this.outgoing = {} - // Mark connected on connect - this.on('connect', function () { - if (this.disconnected) { - return - } - - this.connected = true - var outStore = this.outgoingStore.createStream() - - this.once('close', remove) - outStore.on('end', function () { - that.removeListener('close', remove) - }) - outStore.on('error', function (err) { - that.removeListener('close', remove) - that.emit('error', err) - }) - - function remove () { - outStore.destroy() - outStore = null - } - - function storeDeliver () { - // edge case, we wrapped this twice - if (!outStore) { - return - } - - var packet = outStore.read(1) - var cb - - if (!packet) { - // read when data is available in the future - outStore.once('readable', storeDeliver) - return - } - - // Avoid unnecessary stream read operations when disconnected - if (!that.disconnecting && !that.reconnectTimer) { - cb = that.outgoing[packet.messageId] - that.outgoing[packet.messageId] = function (err, status) { - // Ensure that the original callback passed in to publish gets invoked - if (cb) { - cb(err, status) - } - - storeDeliver() - } - that._sendPacket(packet) - } else if (outStore.destroy) { - outStore.destroy() - } - } - - // start flowing - storeDeliver() - }) + // True if connection is first time. + this._firstConnection = true // Mark disconnected on stream close this.on('close', function () { this.connected = false clearTimeout(this.connackTimer) }) - - // Setup ping timer - this.on('connect', this._setupPingTimer) // Send queued packets this.on('connect', function () { @@ -223,23 +226,6 @@ } deliver() - }) - - var firstConnection = true - // resubscribe - this.on('connect', function () { - if (!firstConnection && - this.options.clean && - Object.keys(this._resubscribeTopics).length > 0) { - if (this.options.resubscribe) { - this._resubscribeTopics.resubscribe = true - this.subscribe(this._resubscribeTopics) - } else { - this._resubscribeTopics = {} - } - } - - firstConnection = false }) // Clear ping timer @@ -281,18 +267,24 @@ }) function nextTickWork () { - process.nextTick(work) + if (packets.length) { + process.nextTick(work) + } else { + var done = completeParse + completeParse = null + done() + } } function work () { var packet = packets.shift() - var done = completeParse if (packet) { that._handlePacket(packet, nextTickWork) } else { + var done = completeParse completeParse = null - done() + if (done) done() } } @@ -308,7 +300,10 @@ this.stream.on('error', nop) // Echo stream close - eos(this.stream, this.emit.bind(this, 'close')) + this.stream.on('close', function () { + flushVolatile(that.outgoing) + that.emit('close') + }) // Send a connect packet connectPacket = Object.create(this.options) @@ -318,6 +313,18 @@ // Echo connection errors parser.on('error', this.emit.bind(this, 'error')) + + // auth + if (this.options.properties) { + if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { + this.emit('error', new Error('Packet has no Authentication Method')) + return this + } + if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { + var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket) + sendPacket(this, authPacket) + } + } // many drain listeners are needed for qos 1 callbacks if the connection is intermittent this.stream.setMaxListeners(1000) @@ -329,6 +336,14 @@ } MqttClient.prototype._handlePacket = function (packet, done) { + var options = this.options + + if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { + this.emit('error', new Error('exceeding packets size ' + packet.cmd)) + this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }}) + return this + } + this.emit('packetreceive', packet) switch (packet.cmd) { @@ -352,6 +367,10 @@ break case 'pingresp': this._handlePingresp(packet) + done() + break + case 'disconnect': + this._handleDisconnect(packet) done() break default: @@ -382,6 +401,7 @@ * {Number} qos - qos level to publish on * {Boolean} retain - whether or not to retain the message * {Boolean} dup - whether or not mark a message as duplicate + * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` * @param {Function} [callback] - function(err){} * called when publish succeeds or fails * @returns {MqttClient} this - for chaining @@ -394,6 +414,7 @@ */ MqttClient.prototype.publish = function (topic, message, opts, callback) { var packet + var options = this.options // .publish(topic, payload, cb); if (typeof opts === 'function') { @@ -419,16 +440,42 @@ dup: opts.dup } + if (options.protocolVersion === 5) { + packet.properties = opts.properties + if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && + ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || + (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { + /* + if we are don`t setup topic alias or + topic alias maximum less than topic alias or + server don`t give topic alias maximum, + we are removing topic alias from packet + */ + delete packet.properties.topicAlias + } + } + switch (opts.qos) { case 1: case 2: - // Add to callbacks - this.outgoing[packet.messageId] = callback || nop - this._sendPacket(packet) + this.outgoing[packet.messageId] = { + volatile: false, + cb: callback || nop + } + if (this._storeProcessing) { + this._packetIdsDuringStoreProcessing[packet.messageId] = false + this._storePacket(packet, undefined, opts.cbStorePut) + } else { + this._sendPacket(packet, undefined, opts.cbStorePut) + } break default: - this._sendPacket(packet, callback) + if (this._storeProcessing) { + this._storePacket(packet, callback, opts.cbStorePut) + } else { + this._sendPacket(packet, callback, opts.cbStorePut) + } break } @@ -448,12 +495,15 @@ * @api public * @example client.subscribe('topic'); * @example client.subscribe('topic', {qos: 1}); - * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log); + * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); * @example client.subscribe('topic', console.log); */ MqttClient.prototype.subscribe = function () { var packet - var args = Array.prototype.slice.call(arguments) + var args = new Array(arguments.length) + for (var i = 0; i < arguments.length; i++) { + args[i] = arguments[i] + } var subs = [] var obj = args.shift() var resubscribe = obj.resubscribe @@ -461,6 +511,7 @@ var opts = args.pop() var invalidTopic var that = this + var version = this.options.protocolVersion delete obj.resubscribe @@ -483,31 +534,52 @@ return this } - var defaultOpts = { qos: 0 } + var defaultOpts = { + qos: 0 + } + if (version === 5) { + defaultOpts.nl = false + defaultOpts.rap = false + defaultOpts.rh = 0 + } opts = xtend(defaultOpts, opts) if (Array.isArray(obj)) { obj.forEach(function (topic) { - if (that._resubscribeTopics[topic] < opts.qos || - !that._resubscribeTopics.hasOwnProperty(topic) || + if (!that._resubscribeTopics.hasOwnProperty(topic) || + that._resubscribeTopics[topic].qos < opts.qos || resubscribe) { - subs.push({ + var currentOpts = { topic: topic, qos: opts.qos - }) + } + if (version === 5) { + currentOpts.nl = opts.nl + currentOpts.rap = opts.rap + currentOpts.rh = opts.rh + currentOpts.properties = opts.properties + } + subs.push(currentOpts) } }) } else { Object .keys(obj) .forEach(function (k) { - if (that._resubscribeTopics[k] < obj[k] || - !that._resubscribeTopics.hasOwnProperty(k) || + if (!that._resubscribeTopics.hasOwnProperty(k) || + that._resubscribeTopics[k].qos < obj[k].qos || resubscribe) { - subs.push({ + var currentOpts = { topic: k, - qos: obj[k] - }) + qos: obj[k].qos + } + if (version === 5) { + currentOpts.nl = obj[k].nl + currentOpts.rap = obj[k].rap + currentOpts.rh = obj[k].rh + currentOpts.properties = opts.properties + } + subs.push(currentOpts) } }) } @@ -521,6 +593,10 @@ messageId: this._nextId() } + if (opts.properties) { + packet.properties = opts.properties + } + if (!subs.length) { callback(null, []) return @@ -531,22 +607,32 @@ var topics = [] subs.forEach(function (sub) { if (that.options.reconnectPeriod > 0) { - that._resubscribeTopics[sub.topic] = sub.qos + var topic = { qos: sub.qos } + if (version === 5) { + topic.nl = sub.nl || false + topic.rap = sub.rap || false + topic.rh = sub.rh || 0 + topic.properties = sub.properties + } + that._resubscribeTopics[sub.topic] = topic topics.push(sub.topic) } }) that.messageIdToTopic[packet.messageId] = topics } - this.outgoing[packet.messageId] = function (err, packet) { - if (!err) { - var granted = packet.granted - for (var i = 0; i < granted.length; i += 1) { - subs[i].qos = granted[i] + this.outgoing[packet.messageId] = { + volatile: true, + cb: function (err, packet) { + if (!err) { + var granted = packet.granted + for (var i = 0; i < granted.length; i += 1) { + subs[i].qos = granted[i] + } } - } - callback(err, subs) + callback(err, subs) + } } this._sendPacket(packet) @@ -558,21 +644,37 @@ * unsubscribe - unsubscribe from topic(s) * * @param {String, Array} topic - topics to unsubscribe from + * @param {Object} [opts] - optional subscription options, includes: + * {Object} properties - properties of unsubscribe packet * @param {Function} [callback] - callback fired on unsuback * @returns {MqttClient} this - for chaining * @api public * @example client.unsubscribe('topic'); * @example client.unsubscribe('topic', console.log); */ -MqttClient.prototype.unsubscribe = function (topic, callback) { +MqttClient.prototype.unsubscribe = function () { var packet = { cmd: 'unsubscribe', qos: 1, messageId: this._nextId() } var that = this + var args = new Array(arguments.length) + for (var i = 0; i < arguments.length; i++) { + args[i] = arguments[i] + } + var topic = args.shift() + var callback = args.pop() || nop + var opts = args.pop() - callback = callback || nop + if (typeof topic === 'string') { + topic = [topic] + } + + if (typeof callback !== 'function') { + opts = callback + callback = nop + } if (this._checkDisconnecting(callback)) { return this @@ -590,7 +692,14 @@ }) } - this.outgoing[packet.messageId] = callback + if (typeof opts === 'object' && opts.properties) { + packet.properties = opts.properties + } + + this.outgoing[packet.messageId] = { + volatile: true, + cb: callback + } this._sendPacket(packet) @@ -606,13 +715,32 @@ * * @api public */ -MqttClient.prototype.end = function (force, cb) { +MqttClient.prototype.end = function () { var that = this - if (typeof force === 'function') { - cb = force + var force = arguments[0] + var opts = arguments[1] + var cb = arguments[2] + + if (force == null || typeof force !== 'boolean') { + cb = opts || nop + opts = force force = false + if (typeof opts !== 'object') { + cb = opts + opts = null + if (typeof cb !== 'function') { + cb = nop + } + } } + + if (typeof opts !== 'object') { + cb = opts + opts = null + } + + cb = cb || nop function closeStores () { that.disconnected = true @@ -633,7 +761,7 @@ // defer closesStores of an I/O cycle, // just to make sure things are // ok for websockets - that._cleanUp(force, setImmediate.bind(null, closeStores)) + that._cleanUp(force, setImmediate.bind(null, closeStores), opts) } if (this.disconnecting) { @@ -665,7 +793,7 @@ * @example client.removeOutgoingMessage(client.getLastMessageId()); */ MqttClient.prototype.removeOutgoingMessage = function (mid) { - var cb = this.outgoing[mid] + var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null delete this.outgoing[mid] this.outgoingStore.del({messageId: mid}, function () { cb(new Error('Message removed')) @@ -751,6 +879,7 @@ * @api private */ MqttClient.prototype._cleanUp = function (forced, done) { + var opts = arguments[2] if (done) { this.stream.on('close', done) } @@ -761,8 +890,9 @@ } this.stream.destroy() } else { + var packet = xtend({ cmd: 'disconnect' }, opts) this._sendPacket( - { cmd: 'disconnect' }, + packet, setImmediate.bind( null, this.stream.end.bind(this.stream) @@ -791,23 +921,14 @@ * @param {String} type - packet type (see `protocol`) * @param {Object} packet - packet options * @param {Function} cb - callback when the packet is sent + * @param {Function} cbStorePut - called when message is put into outgoingStore * @api private */ -MqttClient.prototype._sendPacket = function (packet, cb) { - if (!this.connected) { - if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { - this.queue.push({ packet: packet, cb: cb }) - } else if (packet.qos > 0) { - cb = this.outgoing[packet.messageId] - this.outgoingStore.put(packet, function (err) { - if (err) { - return cb && cb(err) - } - }) - } else if (cb) { - cb(new Error('No connection to broker')) - } +MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) { + cbStorePut = cbStorePut || nop + if (!this.connected) { + this._storePacket(packet, cb, cbStorePut) return } @@ -818,7 +939,7 @@ case 'publish': break case 'pubrel': - storeAndSend(this, packet, cb) + storeAndSend(this, packet, cb, cbStorePut) return default: sendPacket(this, packet, cb) @@ -828,7 +949,7 @@ switch (packet.qos) { case 2: case 1: - storeAndSend(this, packet, cb) + storeAndSend(this, packet, cb, cbStorePut) break /** * no need of case here since it will be caught by default @@ -840,6 +961,32 @@ default: sendPacket(this, packet, cb) break + } +} + +/** + * _storePacket - queue a packet + * @param {String} type - packet type (see `protocol`) + * @param {Object} packet - packet options + * @param {Function} cb - callback when the packet is sent + * @param {Function} cbStorePut - called when message is put into outgoingStore + * @api private + */ +MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { + cbStorePut = cbStorePut || nop + + if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { + this.queue.push({ packet: packet, cb: cb }) + } else if (packet.qos > 0) { + cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null + this.outgoingStore.put(packet, function (err) { + if (err) { + return cb && cb(err) + } + cbStorePut() + }) + } else if (cb) { + cb(new Error('No connection to broker')) } } @@ -901,21 +1048,30 @@ */ MqttClient.prototype._handleConnack = function (packet) { - var rc = packet.returnCode - var errors = [ - '', - 'Unacceptable protocol version', - 'Identifier rejected', - 'Server unavailable', - 'Bad username or password', - 'Not authorized' - ] + var options = this.options + var version = options.protocolVersion + var rc = version === 5 ? packet.reasonCode : packet.returnCode clearTimeout(this.connackTimer) + if (packet.properties) { + if (packet.properties.topicAliasMaximum) { + if (!options.properties) { options.properties = {} } + options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum + } + if (packet.properties.serverKeepAlive && options.keepalive) { + options.keepalive = packet.properties.serverKeepAlive + this._shiftPingInterval() + } + if (packet.properties.maximumPacketSize) { + if (!options.properties) { options.properties = {} } + options.properties.maximumPacketSize = packet.properties.maximumPacketSize + } + } + if (rc === 0) { this.reconnecting = false - this.emit('connect', packet) + this._onConnect(packet) } else if (rc > 0) { var err = new Error('Connection refused: ' + errors[rc]) err.code = rc @@ -960,28 +1116,47 @@ var qos = packet.qos var mid = packet.messageId var that = this + var options = this.options + var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] switch (qos) { - case 2: - this.incomingStore.put(packet, function (err) { - if (err) { - return done(err) + case 2: { + options.customHandleAcks(topic, message, packet, function (error, code) { + if (!(error instanceof Error)) { + code = error + error = null } - that._sendPacket({cmd: 'pubrec', messageId: mid}, done) + if (error) { return that.emit('error', error) } + if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } + if (code) { + that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done) + } else { + that.incomingStore.put(packet, function () { + that._sendPacket({cmd: 'pubrec', messageId: mid}, done) + }) + } }) break - case 1: + } + case 1: { // emit the message event - this.emit('message', topic, message, packet) - this.handleMessage(packet, function (err) { - if (err) { - return done(err) + options.customHandleAcks(topic, message, packet, function (error, code) { + if (!(error instanceof Error)) { + code = error + error = null } - // send 'puback' if the above 'handleMessage' method executed - // successfully. - that._sendPacket({cmd: 'puback', messageId: mid}, done) + if (error) { return that.emit('error', error) } + if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } + if (!code) { that.emit('message', topic, message, packet) } + that.handleMessage(packet, function (err) { + if (err) { + return done && done(err) + } + that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done) + }) }) break + } case 0: // emit the message event this.emit('message', topic, message, packet) @@ -1018,8 +1193,9 @@ var mid = packet.messageId var type = packet.cmd var response = null - var cb = this.outgoing[mid] + var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null var that = this + var err if (!cb) { // Server sent an ack in error, ignore it. @@ -1031,7 +1207,13 @@ case 'pubcomp': // same thing as puback for QoS 2 case 'puback': + var pubackRC = packet.reasonCode // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } delete this.outgoing[mid] this.outgoingStore.del(packet, cb) break @@ -1041,18 +1223,27 @@ qos: 2, messageId: mid } + var pubrecRC = packet.reasonCode - this._sendPacket(response) + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } break case 'suback': delete this.outgoing[mid] - if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) { - // suback with Failure status - var topics = this.messageIdToTopic[mid] - if (topics) { - topics.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) + for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[mid] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } } } cb(null, packet) @@ -1085,23 +1276,29 @@ var comp = {cmd: 'pubcomp', messageId: mid} that.incomingStore.get(packet, function (err, pub) { - if (!err && pub.cmd !== 'pubrel') { + if (!err) { that.emit('message', pub.topic, pub.payload, pub) - that.incomingStore.put(packet, function (err) { + that.handleMessage(pub, function (err) { if (err) { return callback(err) } - that.handleMessage(pub, function (err) { - if (err) { - return callback(err) - } - that._sendPacket(comp, callback) - }) + that.incomingStore.del(pub, nop) + that._sendPacket(comp, callback) }) } else { that._sendPacket(comp, callback) } }) +} + +/** + * _handleDisconnect + * + * @param {Object} packet + * @api private + */ +MqttClient.prototype._handleDisconnect = function (packet) { + this.emit('disconnect', packet) } /** @@ -1126,4 +1323,138 @@ return (this.nextId === 1) ? 65535 : (this.nextId - 1) } +/** + * _resubscribe + * @api private + */ +MqttClient.prototype._resubscribe = function (connack) { + var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) + if (!this._firstConnection && + (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) && + _resubscribeTopicsKeys.length > 0) { + if (this.options.resubscribe) { + if (this.options.protocolVersion === 5) { + for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { + var resubscribeTopic = {} + resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] + resubscribeTopic.resubscribe = true + this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties}) + } + } else { + this._resubscribeTopics.resubscribe = true + this.subscribe(this._resubscribeTopics) + } + } else { + this._resubscribeTopics = {} + } + } + + this._firstConnection = false +} + +/** + * _onConnect + * + * @api private + */ +MqttClient.prototype._onConnect = function (packet) { + if (this.disconnected) { + this.emit('connect', packet) + return + } + + var that = this + + this._setupPingTimer() + this._resubscribe(packet) + + this.connected = true + + function startStreamProcess () { + var outStore = that.outgoingStore.createStream() + + function clearStoreProcessing () { + that._storeProcessing = false + that._packetIdsDuringStoreProcessing = {} + } + + that.once('close', remove) + outStore.on('error', function (err) { + clearStoreProcessing() + that.removeListener('close', remove) + that.emit('error', err) + }) + + function remove () { + outStore.destroy() + outStore = null + clearStoreProcessing() + } + + function storeDeliver () { + // edge case, we wrapped this twice + if (!outStore) { + return + } + that._storeProcessing = true + + var packet = outStore.read(1) + + var cb + + if (!packet) { + // read when data is available in the future + outStore.once('readable', storeDeliver) + return + } + + // Skip already processed store packets + if (that._packetIdsDuringStoreProcessing[packet.messageId]) { + storeDeliver() + return + } + + // Avoid unnecessary stream read operations when disconnected + if (!that.disconnecting && !that.reconnectTimer) { + cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null + that.outgoing[packet.messageId] = { + volatile: false, + cb: function (err, status) { + // Ensure that the original callback passed in to publish gets invoked + if (cb) { + cb(err, status) + } + + storeDeliver() + } + } + that._packetIdsDuringStoreProcessing[packet.messageId] = true + that._sendPacket(packet) + } else if (outStore.destroy) { + outStore.destroy() + } + } + + outStore.on('end', function () { + var allProcessed = true + for (var id in that._packetIdsDuringStoreProcessing) { + if (!that._packetIdsDuringStoreProcessing[id]) { + allProcessed = false + break + } + } + if (allProcessed) { + clearStoreProcessing() + that.removeListener('close', remove) + that.emit('connect', packet) + } else { + startStreamProcess() + } + }) + storeDeliver() + } + // start flowing + startStreamProcess() +} + module.exports = MqttClient -- Gitblit v1.8.0