From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001
From: heyujie <516346543@qq.com>
Date: 星期一, 24 五月 2021 11:25:04 +0800
Subject: [PATCH] 解决app通信问题

---
 node_modules/mqtt/lib/client.js |  691 ++++++++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 511 insertions(+), 180 deletions(-)

diff --git a/node_modules/mqtt/lib/client.js b/node_modules/mqtt/lib/client.js
index 81ab1ae..ca003bb 100644
--- a/node_modules/mqtt/lib/client.js
+++ b/node_modules/mqtt/lib/client.js
@@ -5,7 +5,6 @@
  */
 var events = require('events')
 var Store = require('./store')
-var eos = require('end-of-stream')
 var mqttPacket = require('mqtt-packet')
 var Writable = require('readable-stream').Writable
 var inherits = require('inherits')
@@ -26,6 +25,51 @@
   clean: true,
   resubscribe: true
 }
+var errors = {
+  0: '',
+  1: 'Unacceptable protocol version',
+  2: 'Identifier rejected',
+  3: 'Server unavailable',
+  4: 'Bad username or password',
+  5: 'Not authorized',
+  16: 'No matching subscribers',
+  17: 'No subscription existed',
+  128: 'Unspecified error',
+  129: 'Malformed Packet',
+  130: 'Protocol Error',
+  131: 'Implementation specific error',
+  132: 'Unsupported Protocol Version',
+  133: 'Client Identifier not valid',
+  134: 'Bad User Name or Password',
+  135: 'Not authorized',
+  136: 'Server unavailable',
+  137: 'Server busy',
+  138: 'Banned',
+  139: 'Server shutting down',
+  140: 'Bad authentication method',
+  141: 'Keep Alive timeout',
+  142: 'Session taken over',
+  143: 'Topic Filter invalid',
+  144: 'Topic Name invalid',
+  145: 'Packet identifier in use',
+  146: 'Packet Identifier not found',
+  147: 'Receive Maximum exceeded',
+  148: 'Topic Alias invalid',
+  149: 'Packet too large',
+  150: 'Message rate too high',
+  151: 'Quota exceeded',
+  152: 'Administrative action',
+  153: 'Payload format invalid',
+  154: 'Retain not supported',
+  155: 'QoS not supported',
+  156: 'Use another server',
+  157: 'Server moved',
+  158: 'Shared Subscriptions not supported',
+  159: 'Connection rate exceeded',
+  160: 'Maximum connect time',
+  161: 'Subscription Identifiers not supported',
+  162: 'Wildcard Subscriptions not supported'
+}
 
 function defaultId () {
   return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
@@ -34,7 +78,7 @@
 function sendPacket (client, packet, cb) {
   client.emit('packetsend', packet)
 
-  var result = mqttPacket.writeToStream(packet, client.stream)
+  var result = mqttPacket.writeToStream(packet, client.stream, client.options)
 
   if (!result && cb) {
     client.stream.once('drain', cb)
@@ -46,19 +90,31 @@
 function flush (queue) {
   if (queue) {
     Object.keys(queue).forEach(function (messageId) {
-      if (typeof queue[messageId] === 'function') {
-        queue[messageId](new Error('Connection closed'))
+      if (typeof queue[messageId].cb === 'function') {
+        queue[messageId].cb(new Error('Connection closed'))
         delete queue[messageId]
       }
     })
   }
 }
 
-function storeAndSend (client, packet, cb) {
+function flushVolatile (queue) {
+  if (queue) {
+    Object.keys(queue).forEach(function (messageId) {
+      if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
+        queue[messageId].cb(new Error('Connection closed'))
+        delete queue[messageId]
+      }
+    })
+  }
+}
+
+function storeAndSend (client, packet, cb, cbStorePut) {
   client.outgoingStore.put(packet, function storedPacket (err) {
     if (err) {
       return cb && cb(err)
     }
+    cbStorePut()
     sendPacket(client, packet, cb)
   })
 }
@@ -91,16 +147,18 @@
     }
   }
 
-  this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId()
+  this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
+
+  this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
 
   this.streamBuilder = streamBuilder
 
   // Inflight message storages
-  this.outgoingStore = this.options.outgoingStore || new Store()
-  this.incomingStore = this.options.incomingStore || new Store()
+  this.outgoingStore = options.outgoingStore || new Store()
+  this.incomingStore = options.incomingStore || new Store()
 
   // Should QoS zero messages be queued when the connection is broken?
-  this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero
+  this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
 
   // map of subscribed topics to support reconnection
   this._resubscribeTopics = {}
@@ -120,6 +178,10 @@
   this.connackTimer = null
   // Reconnect timer
   this.reconnectTimer = null
+  // Is processing store?
+  this._storeProcessing = false
+  // Packet Ids are put into the store during store processing
+  this._packetIdsDuringStoreProcessing = {}
   /**
    * MessageIDs starting with 1
    * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
@@ -129,73 +191,14 @@
   // Inflight callbacks
   this.outgoing = {}
 
-  // Mark connected on connect
-  this.on('connect', function () {
-    if (this.disconnected) {
-      return
-    }
-
-    this.connected = true
-    var outStore = this.outgoingStore.createStream()
-
-    this.once('close', remove)
-    outStore.on('end', function () {
-      that.removeListener('close', remove)
-    })
-    outStore.on('error', function (err) {
-      that.removeListener('close', remove)
-      that.emit('error', err)
-    })
-
-    function remove () {
-      outStore.destroy()
-      outStore = null
-    }
-
-    function storeDeliver () {
-      // edge case, we wrapped this twice
-      if (!outStore) {
-        return
-      }
-
-      var packet = outStore.read(1)
-      var cb
-
-      if (!packet) {
-        // read when data is available in the future
-        outStore.once('readable', storeDeliver)
-        return
-      }
-
-      // Avoid unnecessary stream read operations when disconnected
-      if (!that.disconnecting && !that.reconnectTimer) {
-        cb = that.outgoing[packet.messageId]
-        that.outgoing[packet.messageId] = function (err, status) {
-          // Ensure that the original callback passed in to publish gets invoked
-          if (cb) {
-            cb(err, status)
-          }
-
-          storeDeliver()
-        }
-        that._sendPacket(packet)
-      } else if (outStore.destroy) {
-        outStore.destroy()
-      }
-    }
-
-    // start flowing
-    storeDeliver()
-  })
+  // True if connection is first time.
+  this._firstConnection = true
 
   // Mark disconnected on stream close
   this.on('close', function () {
     this.connected = false
     clearTimeout(this.connackTimer)
   })
-
-  // Setup ping timer
-  this.on('connect', this._setupPingTimer)
 
   // Send queued packets
   this.on('connect', function () {
@@ -223,23 +226,6 @@
     }
 
     deliver()
-  })
-
-  var firstConnection = true
-  // resubscribe
-  this.on('connect', function () {
-    if (!firstConnection &&
-        this.options.clean &&
-        Object.keys(this._resubscribeTopics).length > 0) {
-      if (this.options.resubscribe) {
-        this._resubscribeTopics.resubscribe = true
-        this.subscribe(this._resubscribeTopics)
-      } else {
-        this._resubscribeTopics = {}
-      }
-    }
-
-    firstConnection = false
   })
 
   // Clear ping timer
@@ -281,18 +267,24 @@
   })
 
   function nextTickWork () {
-    process.nextTick(work)
+    if (packets.length) {
+      process.nextTick(work)
+    } else {
+      var done = completeParse
+      completeParse = null
+      done()
+    }
   }
 
   function work () {
     var packet = packets.shift()
-    var done = completeParse
 
     if (packet) {
       that._handlePacket(packet, nextTickWork)
     } else {
+      var done = completeParse
       completeParse = null
-      done()
+      if (done) done()
     }
   }
 
@@ -308,7 +300,10 @@
   this.stream.on('error', nop)
 
   // Echo stream close
-  eos(this.stream, this.emit.bind(this, 'close'))
+  this.stream.on('close', function () {
+    flushVolatile(that.outgoing)
+    that.emit('close')
+  })
 
   // Send a connect packet
   connectPacket = Object.create(this.options)
@@ -318,6 +313,18 @@
 
   // Echo connection errors
   parser.on('error', this.emit.bind(this, 'error'))
+
+  // auth
+  if (this.options.properties) {
+    if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
+      this.emit('error', new Error('Packet has no Authentication Method'))
+      return this
+    }
+    if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
+      var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket)
+      sendPacket(this, authPacket)
+    }
+  }
 
   // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
   this.stream.setMaxListeners(1000)
@@ -329,6 +336,14 @@
 }
 
 MqttClient.prototype._handlePacket = function (packet, done) {
+  var options = this.options
+
+  if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
+    this.emit('error', new Error('exceeding packets size ' + packet.cmd))
+    this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }})
+    return this
+  }
+
   this.emit('packetreceive', packet)
 
   switch (packet.cmd) {
@@ -352,6 +367,10 @@
       break
     case 'pingresp':
       this._handlePingresp(packet)
+      done()
+      break
+    case 'disconnect':
+      this._handleDisconnect(packet)
       done()
       break
     default:
@@ -382,6 +401,7 @@
  *    {Number} qos - qos level to publish on
  *    {Boolean} retain - whether or not to retain the message
  *    {Boolean} dup - whether or not mark a message as duplicate
+ *    {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
  * @param {Function} [callback] - function(err){}
  *    called when publish succeeds or fails
  * @returns {MqttClient} this - for chaining
@@ -394,6 +414,7 @@
  */
 MqttClient.prototype.publish = function (topic, message, opts, callback) {
   var packet
+  var options = this.options
 
   // .publish(topic, payload, cb);
   if (typeof opts === 'function') {
@@ -419,16 +440,42 @@
     dup: opts.dup
   }
 
+  if (options.protocolVersion === 5) {
+    packet.properties = opts.properties
+    if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
+      ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
+        (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
+      /*
+      if we are don`t setup topic alias or
+      topic alias maximum less than topic alias or
+      server don`t give topic alias maximum,
+      we are removing topic alias from packet
+      */
+      delete packet.properties.topicAlias
+    }
+  }
+
   switch (opts.qos) {
     case 1:
     case 2:
-
       // Add to callbacks
-      this.outgoing[packet.messageId] = callback || nop
-      this._sendPacket(packet)
+      this.outgoing[packet.messageId] = {
+        volatile: false,
+        cb: callback || nop
+      }
+      if (this._storeProcessing) {
+        this._packetIdsDuringStoreProcessing[packet.messageId] = false
+        this._storePacket(packet, undefined, opts.cbStorePut)
+      } else {
+        this._sendPacket(packet, undefined, opts.cbStorePut)
+      }
       break
     default:
-      this._sendPacket(packet, callback)
+      if (this._storeProcessing) {
+        this._storePacket(packet, callback, opts.cbStorePut)
+      } else {
+        this._sendPacket(packet, callback, opts.cbStorePut)
+      }
       break
   }
 
@@ -448,12 +495,15 @@
  * @api public
  * @example client.subscribe('topic');
  * @example client.subscribe('topic', {qos: 1});
- * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
+ * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
  * @example client.subscribe('topic', console.log);
  */
 MqttClient.prototype.subscribe = function () {
   var packet
-  var args = Array.prototype.slice.call(arguments)
+  var args = new Array(arguments.length)
+  for (var i = 0; i < arguments.length; i++) {
+    args[i] = arguments[i]
+  }
   var subs = []
   var obj = args.shift()
   var resubscribe = obj.resubscribe
@@ -461,6 +511,7 @@
   var opts = args.pop()
   var invalidTopic
   var that = this
+  var version = this.options.protocolVersion
 
   delete obj.resubscribe
 
@@ -483,31 +534,52 @@
     return this
   }
 
-  var defaultOpts = { qos: 0 }
+  var defaultOpts = {
+    qos: 0
+  }
+  if (version === 5) {
+    defaultOpts.nl = false
+    defaultOpts.rap = false
+    defaultOpts.rh = 0
+  }
   opts = xtend(defaultOpts, opts)
 
   if (Array.isArray(obj)) {
     obj.forEach(function (topic) {
-      if (that._resubscribeTopics[topic] < opts.qos ||
-          !that._resubscribeTopics.hasOwnProperty(topic) ||
+      if (!that._resubscribeTopics.hasOwnProperty(topic) ||
+        that._resubscribeTopics[topic].qos < opts.qos ||
           resubscribe) {
-        subs.push({
+        var currentOpts = {
           topic: topic,
           qos: opts.qos
-        })
+        }
+        if (version === 5) {
+          currentOpts.nl = opts.nl
+          currentOpts.rap = opts.rap
+          currentOpts.rh = opts.rh
+          currentOpts.properties = opts.properties
+        }
+        subs.push(currentOpts)
       }
     })
   } else {
     Object
       .keys(obj)
       .forEach(function (k) {
-        if (that._resubscribeTopics[k] < obj[k] ||
-            !that._resubscribeTopics.hasOwnProperty(k) ||
+        if (!that._resubscribeTopics.hasOwnProperty(k) ||
+          that._resubscribeTopics[k].qos < obj[k].qos ||
             resubscribe) {
-          subs.push({
+          var currentOpts = {
             topic: k,
-            qos: obj[k]
-          })
+            qos: obj[k].qos
+          }
+          if (version === 5) {
+            currentOpts.nl = obj[k].nl
+            currentOpts.rap = obj[k].rap
+            currentOpts.rh = obj[k].rh
+            currentOpts.properties = opts.properties
+          }
+          subs.push(currentOpts)
         }
       })
   }
@@ -521,6 +593,10 @@
     messageId: this._nextId()
   }
 
+  if (opts.properties) {
+    packet.properties = opts.properties
+  }
+
   if (!subs.length) {
     callback(null, [])
     return
@@ -531,22 +607,32 @@
     var topics = []
     subs.forEach(function (sub) {
       if (that.options.reconnectPeriod > 0) {
-        that._resubscribeTopics[sub.topic] = sub.qos
+        var topic = { qos: sub.qos }
+        if (version === 5) {
+          topic.nl = sub.nl || false
+          topic.rap = sub.rap || false
+          topic.rh = sub.rh || 0
+          topic.properties = sub.properties
+        }
+        that._resubscribeTopics[sub.topic] = topic
         topics.push(sub.topic)
       }
     })
     that.messageIdToTopic[packet.messageId] = topics
   }
 
-  this.outgoing[packet.messageId] = function (err, packet) {
-    if (!err) {
-      var granted = packet.granted
-      for (var i = 0; i < granted.length; i += 1) {
-        subs[i].qos = granted[i]
+  this.outgoing[packet.messageId] = {
+    volatile: true,
+    cb: function (err, packet) {
+      if (!err) {
+        var granted = packet.granted
+        for (var i = 0; i < granted.length; i += 1) {
+          subs[i].qos = granted[i]
+        }
       }
-    }
 
-    callback(err, subs)
+      callback(err, subs)
+    }
   }
 
   this._sendPacket(packet)
@@ -558,21 +644,37 @@
  * unsubscribe - unsubscribe from topic(s)
  *
  * @param {String, Array} topic - topics to unsubscribe from
+ * @param {Object} [opts] - optional subscription options, includes:
+ *    {Object} properties - properties of unsubscribe packet
  * @param {Function} [callback] - callback fired on unsuback
  * @returns {MqttClient} this - for chaining
  * @api public
  * @example client.unsubscribe('topic');
  * @example client.unsubscribe('topic', console.log);
  */
-MqttClient.prototype.unsubscribe = function (topic, callback) {
+MqttClient.prototype.unsubscribe = function () {
   var packet = {
     cmd: 'unsubscribe',
     qos: 1,
     messageId: this._nextId()
   }
   var that = this
+  var args = new Array(arguments.length)
+  for (var i = 0; i < arguments.length; i++) {
+    args[i] = arguments[i]
+  }
+  var topic = args.shift()
+  var callback = args.pop() || nop
+  var opts = args.pop()
 
-  callback = callback || nop
+  if (typeof topic === 'string') {
+    topic = [topic]
+  }
+
+  if (typeof callback !== 'function') {
+    opts = callback
+    callback = nop
+  }
 
   if (this._checkDisconnecting(callback)) {
     return this
@@ -590,7 +692,14 @@
     })
   }
 
-  this.outgoing[packet.messageId] = callback
+  if (typeof opts === 'object' && opts.properties) {
+    packet.properties = opts.properties
+  }
+
+  this.outgoing[packet.messageId] = {
+    volatile: true,
+    cb: callback
+  }
 
   this._sendPacket(packet)
 
@@ -606,13 +715,32 @@
  *
  * @api public
  */
-MqttClient.prototype.end = function (force, cb) {
+MqttClient.prototype.end = function () {
   var that = this
 
-  if (typeof force === 'function') {
-    cb = force
+  var force = arguments[0]
+  var opts = arguments[1]
+  var cb = arguments[2]
+
+  if (force == null || typeof force !== 'boolean') {
+    cb = opts || nop
+    opts = force
     force = false
+    if (typeof opts !== 'object') {
+      cb = opts
+      opts = null
+      if (typeof cb !== 'function') {
+        cb = nop
+      }
+    }
   }
+
+  if (typeof opts !== 'object') {
+    cb = opts
+    opts = null
+  }
+
+  cb = cb || nop
 
   function closeStores () {
     that.disconnected = true
@@ -633,7 +761,7 @@
     // defer closesStores of an I/O cycle,
     // just to make sure things are
     // ok for websockets
-    that._cleanUp(force, setImmediate.bind(null, closeStores))
+    that._cleanUp(force, setImmediate.bind(null, closeStores), opts)
   }
 
   if (this.disconnecting) {
@@ -665,7 +793,7 @@
  * @example client.removeOutgoingMessage(client.getLastMessageId());
  */
 MqttClient.prototype.removeOutgoingMessage = function (mid) {
-  var cb = this.outgoing[mid]
+  var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
   delete this.outgoing[mid]
   this.outgoingStore.del({messageId: mid}, function () {
     cb(new Error('Message removed'))
@@ -751,6 +879,7 @@
  * @api private
  */
 MqttClient.prototype._cleanUp = function (forced, done) {
+  var opts = arguments[2]
   if (done) {
     this.stream.on('close', done)
   }
@@ -761,8 +890,9 @@
     }
     this.stream.destroy()
   } else {
+    var packet = xtend({ cmd: 'disconnect' }, opts)
     this._sendPacket(
-      { cmd: 'disconnect' },
+      packet,
       setImmediate.bind(
         null,
         this.stream.end.bind(this.stream)
@@ -791,23 +921,14 @@
  * @param {String} type - packet type (see `protocol`)
  * @param {Object} packet - packet options
  * @param {Function} cb - callback when the packet is sent
+ * @param {Function} cbStorePut - called when message is put into outgoingStore
  * @api private
  */
-MqttClient.prototype._sendPacket = function (packet, cb) {
-  if (!this.connected) {
-    if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
-      this.queue.push({ packet: packet, cb: cb })
-    } else if (packet.qos > 0) {
-      cb = this.outgoing[packet.messageId]
-      this.outgoingStore.put(packet, function (err) {
-        if (err) {
-          return cb && cb(err)
-        }
-      })
-    } else if (cb) {
-      cb(new Error('No connection to broker'))
-    }
+MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
+  cbStorePut = cbStorePut || nop
 
+  if (!this.connected) {
+    this._storePacket(packet, cb, cbStorePut)
     return
   }
 
@@ -818,7 +939,7 @@
     case 'publish':
       break
     case 'pubrel':
-      storeAndSend(this, packet, cb)
+      storeAndSend(this, packet, cb, cbStorePut)
       return
     default:
       sendPacket(this, packet, cb)
@@ -828,7 +949,7 @@
   switch (packet.qos) {
     case 2:
     case 1:
-      storeAndSend(this, packet, cb)
+      storeAndSend(this, packet, cb, cbStorePut)
       break
     /**
      * no need of case here since it will be caught by default
@@ -840,6 +961,32 @@
     default:
       sendPacket(this, packet, cb)
       break
+  }
+}
+
+/**
+ * _storePacket - queue a packet
+ * @param {String} type - packet type (see `protocol`)
+ * @param {Object} packet - packet options
+ * @param {Function} cb - callback when the packet is sent
+ * @param {Function} cbStorePut - called when message is put into outgoingStore
+ * @api private
+ */
+MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
+  cbStorePut = cbStorePut || nop
+
+  if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
+    this.queue.push({ packet: packet, cb: cb })
+  } else if (packet.qos > 0) {
+    cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
+    this.outgoingStore.put(packet, function (err) {
+      if (err) {
+        return cb && cb(err)
+      }
+      cbStorePut()
+    })
+  } else if (cb) {
+    cb(new Error('No connection to broker'))
   }
 }
 
@@ -901,21 +1048,30 @@
  */
 
 MqttClient.prototype._handleConnack = function (packet) {
-  var rc = packet.returnCode
-  var errors = [
-    '',
-    'Unacceptable protocol version',
-    'Identifier rejected',
-    'Server unavailable',
-    'Bad username or password',
-    'Not authorized'
-  ]
+  var options = this.options
+  var version = options.protocolVersion
+  var rc = version === 5 ? packet.reasonCode : packet.returnCode
 
   clearTimeout(this.connackTimer)
 
+  if (packet.properties) {
+    if (packet.properties.topicAliasMaximum) {
+      if (!options.properties) { options.properties = {} }
+      options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
+    }
+    if (packet.properties.serverKeepAlive && options.keepalive) {
+      options.keepalive = packet.properties.serverKeepAlive
+      this._shiftPingInterval()
+    }
+    if (packet.properties.maximumPacketSize) {
+      if (!options.properties) { options.properties = {} }
+      options.properties.maximumPacketSize = packet.properties.maximumPacketSize
+    }
+  }
+
   if (rc === 0) {
     this.reconnecting = false
-    this.emit('connect', packet)
+    this._onConnect(packet)
   } else if (rc > 0) {
     var err = new Error('Connection refused: ' + errors[rc])
     err.code = rc
@@ -960,28 +1116,47 @@
   var qos = packet.qos
   var mid = packet.messageId
   var that = this
+  var options = this.options
+  var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
 
   switch (qos) {
-    case 2:
-      this.incomingStore.put(packet, function (err) {
-        if (err) {
-          return done(err)
+    case 2: {
+      options.customHandleAcks(topic, message, packet, function (error, code) {
+        if (!(error instanceof Error)) {
+          code = error
+          error = null
         }
-        that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
+        if (error) { return that.emit('error', error) }
+        if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
+        if (code) {
+          that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done)
+        } else {
+          that.incomingStore.put(packet, function () {
+            that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
+          })
+        }
       })
       break
-    case 1:
+    }
+    case 1: {
       // emit the message event
-      this.emit('message', topic, message, packet)
-      this.handleMessage(packet, function (err) {
-        if (err) {
-          return done(err)
+      options.customHandleAcks(topic, message, packet, function (error, code) {
+        if (!(error instanceof Error)) {
+          code = error
+          error = null
         }
-        // send 'puback' if the above 'handleMessage' method executed
-        // successfully.
-        that._sendPacket({cmd: 'puback', messageId: mid}, done)
+        if (error) { return that.emit('error', error) }
+        if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
+        if (!code) { that.emit('message', topic, message, packet) }
+        that.handleMessage(packet, function (err) {
+          if (err) {
+            return done && done(err)
+          }
+          that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done)
+        })
       })
       break
+    }
     case 0:
       // emit the message event
       this.emit('message', topic, message, packet)
@@ -1018,8 +1193,9 @@
   var mid = packet.messageId
   var type = packet.cmd
   var response = null
-  var cb = this.outgoing[mid]
+  var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
   var that = this
+  var err
 
   if (!cb) {
     // Server sent an ack in error, ignore it.
@@ -1031,7 +1207,13 @@
     case 'pubcomp':
       // same thing as puback for QoS 2
     case 'puback':
+      var pubackRC = packet.reasonCode
       // Callback - we're done
+      if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
+        err = new Error('Publish error: ' + errors[pubackRC])
+        err.code = pubackRC
+        cb(err, packet)
+      }
       delete this.outgoing[mid]
       this.outgoingStore.del(packet, cb)
       break
@@ -1041,18 +1223,27 @@
         qos: 2,
         messageId: mid
       }
+      var pubrecRC = packet.reasonCode
 
-      this._sendPacket(response)
+      if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
+        err = new Error('Publish error: ' + errors[pubrecRC])
+        err.code = pubrecRC
+        cb(err, packet)
+      } else {
+        this._sendPacket(response)
+      }
       break
     case 'suback':
       delete this.outgoing[mid]
-      if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) {
-        // suback with Failure status
-        var topics = this.messageIdToTopic[mid]
-        if (topics) {
-          topics.forEach(function (topic) {
-            delete that._resubscribeTopics[topic]
-          })
+      for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) {
+        if ((packet.granted[grantedI] & 0x80) !== 0) {
+          // suback with Failure status
+          var topics = this.messageIdToTopic[mid]
+          if (topics) {
+            topics.forEach(function (topic) {
+              delete that._resubscribeTopics[topic]
+            })
+          }
         }
       }
       cb(null, packet)
@@ -1085,23 +1276,29 @@
   var comp = {cmd: 'pubcomp', messageId: mid}
 
   that.incomingStore.get(packet, function (err, pub) {
-    if (!err && pub.cmd !== 'pubrel') {
+    if (!err) {
       that.emit('message', pub.topic, pub.payload, pub)
-      that.incomingStore.put(packet, function (err) {
+      that.handleMessage(pub, function (err) {
         if (err) {
           return callback(err)
         }
-        that.handleMessage(pub, function (err) {
-          if (err) {
-            return callback(err)
-          }
-          that._sendPacket(comp, callback)
-        })
+        that.incomingStore.del(pub, nop)
+        that._sendPacket(comp, callback)
       })
     } else {
       that._sendPacket(comp, callback)
     }
   })
+}
+
+/**
+ * _handleDisconnect
+ *
+ * @param {Object} packet
+ * @api private
+ */
+MqttClient.prototype._handleDisconnect = function (packet) {
+  this.emit('disconnect', packet)
 }
 
 /**
@@ -1126,4 +1323,138 @@
   return (this.nextId === 1) ? 65535 : (this.nextId - 1)
 }
 
+/**
+ * _resubscribe
+ * @api private
+ */
+MqttClient.prototype._resubscribe = function (connack) {
+  var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
+  if (!this._firstConnection &&
+      (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
+      _resubscribeTopicsKeys.length > 0) {
+    if (this.options.resubscribe) {
+      if (this.options.protocolVersion === 5) {
+        for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
+          var resubscribeTopic = {}
+          resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
+          resubscribeTopic.resubscribe = true
+          this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties})
+        }
+      } else {
+        this._resubscribeTopics.resubscribe = true
+        this.subscribe(this._resubscribeTopics)
+      }
+    } else {
+      this._resubscribeTopics = {}
+    }
+  }
+
+  this._firstConnection = false
+}
+
+/**
+ * _onConnect
+ *
+ * @api private
+ */
+MqttClient.prototype._onConnect = function (packet) {
+  if (this.disconnected) {
+    this.emit('connect', packet)
+    return
+  }
+
+  var that = this
+
+  this._setupPingTimer()
+  this._resubscribe(packet)
+
+  this.connected = true
+
+  function startStreamProcess () {
+    var outStore = that.outgoingStore.createStream()
+
+    function clearStoreProcessing () {
+      that._storeProcessing = false
+      that._packetIdsDuringStoreProcessing = {}
+    }
+
+    that.once('close', remove)
+    outStore.on('error', function (err) {
+      clearStoreProcessing()
+      that.removeListener('close', remove)
+      that.emit('error', err)
+    })
+
+    function remove () {
+      outStore.destroy()
+      outStore = null
+      clearStoreProcessing()
+    }
+
+    function storeDeliver () {
+      // edge case, we wrapped this twice
+      if (!outStore) {
+        return
+      }
+      that._storeProcessing = true
+
+      var packet = outStore.read(1)
+
+      var cb
+
+      if (!packet) {
+        // read when data is available in the future
+        outStore.once('readable', storeDeliver)
+        return
+      }
+
+      // Skip already processed store packets
+      if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
+        storeDeliver()
+        return
+      }
+
+      // Avoid unnecessary stream read operations when disconnected
+      if (!that.disconnecting && !that.reconnectTimer) {
+        cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
+        that.outgoing[packet.messageId] = {
+          volatile: false,
+          cb: function (err, status) {
+            // Ensure that the original callback passed in to publish gets invoked
+            if (cb) {
+              cb(err, status)
+            }
+
+            storeDeliver()
+          }
+        }
+        that._packetIdsDuringStoreProcessing[packet.messageId] = true
+        that._sendPacket(packet)
+      } else if (outStore.destroy) {
+        outStore.destroy()
+      }
+    }
+
+    outStore.on('end', function () {
+      var allProcessed = true
+      for (var id in that._packetIdsDuringStoreProcessing) {
+        if (!that._packetIdsDuringStoreProcessing[id]) {
+          allProcessed = false
+          break
+        }
+      }
+      if (allProcessed) {
+        clearStoreProcessing()
+        that.removeListener('close', remove)
+        that.emit('connect', packet)
+      } else {
+        startStreamProcess()
+      }
+    })
+    storeDeliver()
+  }
+  // start flowing
+  startStreamProcess()
+}
+
 module.exports = MqttClient

--
Gitblit v1.8.0