heyujie
2021-05-24 4885600ecc369aa2e30a65de8dd7a410f13c34df
node_modules/mqtt/lib/client.js
@@ -5,7 +5,6 @@
 */
var events = require('events')
var Store = require('./store')
var eos = require('end-of-stream')
var mqttPacket = require('mqtt-packet')
var Writable = require('readable-stream').Writable
var inherits = require('inherits')
@@ -26,6 +25,51 @@
  clean: true,
  resubscribe: true
}
var errors = {
  0: '',
  1: 'Unacceptable protocol version',
  2: 'Identifier rejected',
  3: 'Server unavailable',
  4: 'Bad username or password',
  5: 'Not authorized',
  16: 'No matching subscribers',
  17: 'No subscription existed',
  128: 'Unspecified error',
  129: 'Malformed Packet',
  130: 'Protocol Error',
  131: 'Implementation specific error',
  132: 'Unsupported Protocol Version',
  133: 'Client Identifier not valid',
  134: 'Bad User Name or Password',
  135: 'Not authorized',
  136: 'Server unavailable',
  137: 'Server busy',
  138: 'Banned',
  139: 'Server shutting down',
  140: 'Bad authentication method',
  141: 'Keep Alive timeout',
  142: 'Session taken over',
  143: 'Topic Filter invalid',
  144: 'Topic Name invalid',
  145: 'Packet identifier in use',
  146: 'Packet Identifier not found',
  147: 'Receive Maximum exceeded',
  148: 'Topic Alias invalid',
  149: 'Packet too large',
  150: 'Message rate too high',
  151: 'Quota exceeded',
  152: 'Administrative action',
  153: 'Payload format invalid',
  154: 'Retain not supported',
  155: 'QoS not supported',
  156: 'Use another server',
  157: 'Server moved',
  158: 'Shared Subscriptions not supported',
  159: 'Connection rate exceeded',
  160: 'Maximum connect time',
  161: 'Subscription Identifiers not supported',
  162: 'Wildcard Subscriptions not supported'
}
function defaultId () {
  return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
@@ -34,7 +78,7 @@
function sendPacket (client, packet, cb) {
  client.emit('packetsend', packet)
  var result = mqttPacket.writeToStream(packet, client.stream)
  var result = mqttPacket.writeToStream(packet, client.stream, client.options)
  if (!result && cb) {
    client.stream.once('drain', cb)
@@ -46,19 +90,31 @@
function flush (queue) {
  if (queue) {
    Object.keys(queue).forEach(function (messageId) {
      if (typeof queue[messageId] === 'function') {
        queue[messageId](new Error('Connection closed'))
      if (typeof queue[messageId].cb === 'function') {
        queue[messageId].cb(new Error('Connection closed'))
        delete queue[messageId]
      }
    })
  }
}
function storeAndSend (client, packet, cb) {
function flushVolatile (queue) {
  if (queue) {
    Object.keys(queue).forEach(function (messageId) {
      if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
        queue[messageId].cb(new Error('Connection closed'))
        delete queue[messageId]
      }
    })
  }
}
function storeAndSend (client, packet, cb, cbStorePut) {
  client.outgoingStore.put(packet, function storedPacket (err) {
    if (err) {
      return cb && cb(err)
    }
    cbStorePut()
    sendPacket(client, packet, cb)
  })
}
@@ -91,16 +147,18 @@
    }
  }
  this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId()
  this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
  this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
  this.streamBuilder = streamBuilder
  // Inflight message storages
  this.outgoingStore = this.options.outgoingStore || new Store()
  this.incomingStore = this.options.incomingStore || new Store()
  this.outgoingStore = options.outgoingStore || new Store()
  this.incomingStore = options.incomingStore || new Store()
  // Should QoS zero messages be queued when the connection is broken?
  this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero
  this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
  // map of subscribed topics to support reconnection
  this._resubscribeTopics = {}
@@ -120,6 +178,10 @@
  this.connackTimer = null
  // Reconnect timer
  this.reconnectTimer = null
  // Is processing store?
  this._storeProcessing = false
  // Packet Ids are put into the store during store processing
  this._packetIdsDuringStoreProcessing = {}
  /**
   * MessageIDs starting with 1
   * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
@@ -129,73 +191,14 @@
  // Inflight callbacks
  this.outgoing = {}
  // Mark connected on connect
  this.on('connect', function () {
    if (this.disconnected) {
      return
    }
    this.connected = true
    var outStore = this.outgoingStore.createStream()
    this.once('close', remove)
    outStore.on('end', function () {
      that.removeListener('close', remove)
    })
    outStore.on('error', function (err) {
      that.removeListener('close', remove)
      that.emit('error', err)
    })
    function remove () {
      outStore.destroy()
      outStore = null
    }
    function storeDeliver () {
      // edge case, we wrapped this twice
      if (!outStore) {
        return
      }
      var packet = outStore.read(1)
      var cb
      if (!packet) {
        // read when data is available in the future
        outStore.once('readable', storeDeliver)
        return
      }
      // Avoid unnecessary stream read operations when disconnected
      if (!that.disconnecting && !that.reconnectTimer) {
        cb = that.outgoing[packet.messageId]
        that.outgoing[packet.messageId] = function (err, status) {
          // Ensure that the original callback passed in to publish gets invoked
          if (cb) {
            cb(err, status)
          }
          storeDeliver()
        }
        that._sendPacket(packet)
      } else if (outStore.destroy) {
        outStore.destroy()
      }
    }
    // start flowing
    storeDeliver()
  })
  // True if connection is first time.
  this._firstConnection = true
  // Mark disconnected on stream close
  this.on('close', function () {
    this.connected = false
    clearTimeout(this.connackTimer)
  })
  // Setup ping timer
  this.on('connect', this._setupPingTimer)
  // Send queued packets
  this.on('connect', function () {
@@ -223,23 +226,6 @@
    }
    deliver()
  })
  var firstConnection = true
  // resubscribe
  this.on('connect', function () {
    if (!firstConnection &&
        this.options.clean &&
        Object.keys(this._resubscribeTopics).length > 0) {
      if (this.options.resubscribe) {
        this._resubscribeTopics.resubscribe = true
        this.subscribe(this._resubscribeTopics)
      } else {
        this._resubscribeTopics = {}
      }
    }
    firstConnection = false
  })
  // Clear ping timer
@@ -281,18 +267,24 @@
  })
  function nextTickWork () {
    process.nextTick(work)
    if (packets.length) {
      process.nextTick(work)
    } else {
      var done = completeParse
      completeParse = null
      done()
    }
  }
  function work () {
    var packet = packets.shift()
    var done = completeParse
    if (packet) {
      that._handlePacket(packet, nextTickWork)
    } else {
      var done = completeParse
      completeParse = null
      done()
      if (done) done()
    }
  }
@@ -308,7 +300,10 @@
  this.stream.on('error', nop)
  // Echo stream close
  eos(this.stream, this.emit.bind(this, 'close'))
  this.stream.on('close', function () {
    flushVolatile(that.outgoing)
    that.emit('close')
  })
  // Send a connect packet
  connectPacket = Object.create(this.options)
@@ -318,6 +313,18 @@
  // Echo connection errors
  parser.on('error', this.emit.bind(this, 'error'))
  // auth
  if (this.options.properties) {
    if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
      this.emit('error', new Error('Packet has no Authentication Method'))
      return this
    }
    if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
      var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket)
      sendPacket(this, authPacket)
    }
  }
  // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
  this.stream.setMaxListeners(1000)
@@ -329,6 +336,14 @@
}
MqttClient.prototype._handlePacket = function (packet, done) {
  var options = this.options
  if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
    this.emit('error', new Error('exceeding packets size ' + packet.cmd))
    this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }})
    return this
  }
  this.emit('packetreceive', packet)
  switch (packet.cmd) {
@@ -352,6 +367,10 @@
      break
    case 'pingresp':
      this._handlePingresp(packet)
      done()
      break
    case 'disconnect':
      this._handleDisconnect(packet)
      done()
      break
    default:
@@ -382,6 +401,7 @@
 *    {Number} qos - qos level to publish on
 *    {Boolean} retain - whether or not to retain the message
 *    {Boolean} dup - whether or not mark a message as duplicate
 *    {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
 * @param {Function} [callback] - function(err){}
 *    called when publish succeeds or fails
 * @returns {MqttClient} this - for chaining
@@ -394,6 +414,7 @@
 */
MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet
  var options = this.options
  // .publish(topic, payload, cb);
  if (typeof opts === 'function') {
@@ -419,16 +440,42 @@
    dup: opts.dup
  }
  if (options.protocolVersion === 5) {
    packet.properties = opts.properties
    if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
      ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
        (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
      /*
      if we are don`t setup topic alias or
      topic alias maximum less than topic alias or
      server don`t give topic alias maximum,
      we are removing topic alias from packet
      */
      delete packet.properties.topicAlias
    }
  }
  switch (opts.qos) {
    case 1:
    case 2:
      // Add to callbacks
      this.outgoing[packet.messageId] = callback || nop
      this._sendPacket(packet)
      this.outgoing[packet.messageId] = {
        volatile: false,
        cb: callback || nop
      }
      if (this._storeProcessing) {
        this._packetIdsDuringStoreProcessing[packet.messageId] = false
        this._storePacket(packet, undefined, opts.cbStorePut)
      } else {
        this._sendPacket(packet, undefined, opts.cbStorePut)
      }
      break
    default:
      this._sendPacket(packet, callback)
      if (this._storeProcessing) {
        this._storePacket(packet, callback, opts.cbStorePut)
      } else {
        this._sendPacket(packet, callback, opts.cbStorePut)
      }
      break
  }
@@ -448,12 +495,15 @@
 * @api public
 * @example client.subscribe('topic');
 * @example client.subscribe('topic', {qos: 1});
 * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
 * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
 * @example client.subscribe('topic', console.log);
 */
MqttClient.prototype.subscribe = function () {
  var packet
  var args = Array.prototype.slice.call(arguments)
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var subs = []
  var obj = args.shift()
  var resubscribe = obj.resubscribe
@@ -461,6 +511,7 @@
  var opts = args.pop()
  var invalidTopic
  var that = this
  var version = this.options.protocolVersion
  delete obj.resubscribe
@@ -483,31 +534,52 @@
    return this
  }
  var defaultOpts = { qos: 0 }
  var defaultOpts = {
    qos: 0
  }
  if (version === 5) {
    defaultOpts.nl = false
    defaultOpts.rap = false
    defaultOpts.rh = 0
  }
  opts = xtend(defaultOpts, opts)
  if (Array.isArray(obj)) {
    obj.forEach(function (topic) {
      if (that._resubscribeTopics[topic] < opts.qos ||
          !that._resubscribeTopics.hasOwnProperty(topic) ||
      if (!that._resubscribeTopics.hasOwnProperty(topic) ||
        that._resubscribeTopics[topic].qos < opts.qos ||
          resubscribe) {
        subs.push({
        var currentOpts = {
          topic: topic,
          qos: opts.qos
        })
        }
        if (version === 5) {
          currentOpts.nl = opts.nl
          currentOpts.rap = opts.rap
          currentOpts.rh = opts.rh
          currentOpts.properties = opts.properties
        }
        subs.push(currentOpts)
      }
    })
  } else {
    Object
      .keys(obj)
      .forEach(function (k) {
        if (that._resubscribeTopics[k] < obj[k] ||
            !that._resubscribeTopics.hasOwnProperty(k) ||
        if (!that._resubscribeTopics.hasOwnProperty(k) ||
          that._resubscribeTopics[k].qos < obj[k].qos ||
            resubscribe) {
          subs.push({
          var currentOpts = {
            topic: k,
            qos: obj[k]
          })
            qos: obj[k].qos
          }
          if (version === 5) {
            currentOpts.nl = obj[k].nl
            currentOpts.rap = obj[k].rap
            currentOpts.rh = obj[k].rh
            currentOpts.properties = opts.properties
          }
          subs.push(currentOpts)
        }
      })
  }
@@ -521,6 +593,10 @@
    messageId: this._nextId()
  }
  if (opts.properties) {
    packet.properties = opts.properties
  }
  if (!subs.length) {
    callback(null, [])
    return
@@ -531,22 +607,32 @@
    var topics = []
    subs.forEach(function (sub) {
      if (that.options.reconnectPeriod > 0) {
        that._resubscribeTopics[sub.topic] = sub.qos
        var topic = { qos: sub.qos }
        if (version === 5) {
          topic.nl = sub.nl || false
          topic.rap = sub.rap || false
          topic.rh = sub.rh || 0
          topic.properties = sub.properties
        }
        that._resubscribeTopics[sub.topic] = topic
        topics.push(sub.topic)
      }
    })
    that.messageIdToTopic[packet.messageId] = topics
  }
  this.outgoing[packet.messageId] = function (err, packet) {
    if (!err) {
      var granted = packet.granted
      for (var i = 0; i < granted.length; i += 1) {
        subs[i].qos = granted[i]
  this.outgoing[packet.messageId] = {
    volatile: true,
    cb: function (err, packet) {
      if (!err) {
        var granted = packet.granted
        for (var i = 0; i < granted.length; i += 1) {
          subs[i].qos = granted[i]
        }
      }
    }
    callback(err, subs)
      callback(err, subs)
    }
  }
  this._sendPacket(packet)
@@ -558,21 +644,37 @@
 * unsubscribe - unsubscribe from topic(s)
 *
 * @param {String, Array} topic - topics to unsubscribe from
 * @param {Object} [opts] - optional subscription options, includes:
 *    {Object} properties - properties of unsubscribe packet
 * @param {Function} [callback] - callback fired on unsuback
 * @returns {MqttClient} this - for chaining
 * @api public
 * @example client.unsubscribe('topic');
 * @example client.unsubscribe('topic', console.log);
 */
MqttClient.prototype.unsubscribe = function (topic, callback) {
MqttClient.prototype.unsubscribe = function () {
  var packet = {
    cmd: 'unsubscribe',
    qos: 1,
    messageId: this._nextId()
  }
  var that = this
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var topic = args.shift()
  var callback = args.pop() || nop
  var opts = args.pop()
  callback = callback || nop
  if (typeof topic === 'string') {
    topic = [topic]
  }
  if (typeof callback !== 'function') {
    opts = callback
    callback = nop
  }
  if (this._checkDisconnecting(callback)) {
    return this
@@ -590,7 +692,14 @@
    })
  }
  this.outgoing[packet.messageId] = callback
  if (typeof opts === 'object' && opts.properties) {
    packet.properties = opts.properties
  }
  this.outgoing[packet.messageId] = {
    volatile: true,
    cb: callback
  }
  this._sendPacket(packet)
@@ -606,13 +715,32 @@
 *
 * @api public
 */
MqttClient.prototype.end = function (force, cb) {
MqttClient.prototype.end = function () {
  var that = this
  if (typeof force === 'function') {
    cb = force
  var force = arguments[0]
  var opts = arguments[1]
  var cb = arguments[2]
  if (force == null || typeof force !== 'boolean') {
    cb = opts || nop
    opts = force
    force = false
    if (typeof opts !== 'object') {
      cb = opts
      opts = null
      if (typeof cb !== 'function') {
        cb = nop
      }
    }
  }
  if (typeof opts !== 'object') {
    cb = opts
    opts = null
  }
  cb = cb || nop
  function closeStores () {
    that.disconnected = true
@@ -633,7 +761,7 @@
    // defer closesStores of an I/O cycle,
    // just to make sure things are
    // ok for websockets
    that._cleanUp(force, setImmediate.bind(null, closeStores))
    that._cleanUp(force, setImmediate.bind(null, closeStores), opts)
  }
  if (this.disconnecting) {
@@ -665,7 +793,7 @@
 * @example client.removeOutgoingMessage(client.getLastMessageId());
 */
MqttClient.prototype.removeOutgoingMessage = function (mid) {
  var cb = this.outgoing[mid]
  var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
  delete this.outgoing[mid]
  this.outgoingStore.del({messageId: mid}, function () {
    cb(new Error('Message removed'))
@@ -751,6 +879,7 @@
 * @api private
 */
MqttClient.prototype._cleanUp = function (forced, done) {
  var opts = arguments[2]
  if (done) {
    this.stream.on('close', done)
  }
@@ -761,8 +890,9 @@
    }
    this.stream.destroy()
  } else {
    var packet = xtend({ cmd: 'disconnect' }, opts)
    this._sendPacket(
      { cmd: 'disconnect' },
      packet,
      setImmediate.bind(
        null,
        this.stream.end.bind(this.stream)
@@ -791,23 +921,14 @@
 * @param {String} type - packet type (see `protocol`)
 * @param {Object} packet - packet options
 * @param {Function} cb - callback when the packet is sent
 * @param {Function} cbStorePut - called when message is put into outgoingStore
 * @api private
 */
MqttClient.prototype._sendPacket = function (packet, cb) {
  if (!this.connected) {
    if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
      this.queue.push({ packet: packet, cb: cb })
    } else if (packet.qos > 0) {
      cb = this.outgoing[packet.messageId]
      this.outgoingStore.put(packet, function (err) {
        if (err) {
          return cb && cb(err)
        }
      })
    } else if (cb) {
      cb(new Error('No connection to broker'))
    }
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
  cbStorePut = cbStorePut || nop
  if (!this.connected) {
    this._storePacket(packet, cb, cbStorePut)
    return
  }
@@ -818,7 +939,7 @@
    case 'publish':
      break
    case 'pubrel':
      storeAndSend(this, packet, cb)
      storeAndSend(this, packet, cb, cbStorePut)
      return
    default:
      sendPacket(this, packet, cb)
@@ -828,7 +949,7 @@
  switch (packet.qos) {
    case 2:
    case 1:
      storeAndSend(this, packet, cb)
      storeAndSend(this, packet, cb, cbStorePut)
      break
    /**
     * no need of case here since it will be caught by default
@@ -840,6 +961,32 @@
    default:
      sendPacket(this, packet, cb)
      break
  }
}
/**
 * _storePacket - queue a packet
 * @param {String} type - packet type (see `protocol`)
 * @param {Object} packet - packet options
 * @param {Function} cb - callback when the packet is sent
 * @param {Function} cbStorePut - called when message is put into outgoingStore
 * @api private
 */
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
  cbStorePut = cbStorePut || nop
  if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
    this.queue.push({ packet: packet, cb: cb })
  } else if (packet.qos > 0) {
    cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
    this.outgoingStore.put(packet, function (err) {
      if (err) {
        return cb && cb(err)
      }
      cbStorePut()
    })
  } else if (cb) {
    cb(new Error('No connection to broker'))
  }
}
@@ -901,21 +1048,30 @@
 */
MqttClient.prototype._handleConnack = function (packet) {
  var rc = packet.returnCode
  var errors = [
    '',
    'Unacceptable protocol version',
    'Identifier rejected',
    'Server unavailable',
    'Bad username or password',
    'Not authorized'
  ]
  var options = this.options
  var version = options.protocolVersion
  var rc = version === 5 ? packet.reasonCode : packet.returnCode
  clearTimeout(this.connackTimer)
  if (packet.properties) {
    if (packet.properties.topicAliasMaximum) {
      if (!options.properties) { options.properties = {} }
      options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
    }
    if (packet.properties.serverKeepAlive && options.keepalive) {
      options.keepalive = packet.properties.serverKeepAlive
      this._shiftPingInterval()
    }
    if (packet.properties.maximumPacketSize) {
      if (!options.properties) { options.properties = {} }
      options.properties.maximumPacketSize = packet.properties.maximumPacketSize
    }
  }
  if (rc === 0) {
    this.reconnecting = false
    this.emit('connect', packet)
    this._onConnect(packet)
  } else if (rc > 0) {
    var err = new Error('Connection refused: ' + errors[rc])
    err.code = rc
@@ -960,28 +1116,47 @@
  var qos = packet.qos
  var mid = packet.messageId
  var that = this
  var options = this.options
  var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
  switch (qos) {
    case 2:
      this.incomingStore.put(packet, function (err) {
        if (err) {
          return done(err)
    case 2: {
      options.customHandleAcks(topic, message, packet, function (error, code) {
        if (!(error instanceof Error)) {
          code = error
          error = null
        }
        that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
        if (error) { return that.emit('error', error) }
        if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
        if (code) {
          that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done)
        } else {
          that.incomingStore.put(packet, function () {
            that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
          })
        }
      })
      break
    case 1:
    }
    case 1: {
      // emit the message event
      this.emit('message', topic, message, packet)
      this.handleMessage(packet, function (err) {
        if (err) {
          return done(err)
      options.customHandleAcks(topic, message, packet, function (error, code) {
        if (!(error instanceof Error)) {
          code = error
          error = null
        }
        // send 'puback' if the above 'handleMessage' method executed
        // successfully.
        that._sendPacket({cmd: 'puback', messageId: mid}, done)
        if (error) { return that.emit('error', error) }
        if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
        if (!code) { that.emit('message', topic, message, packet) }
        that.handleMessage(packet, function (err) {
          if (err) {
            return done && done(err)
          }
          that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done)
        })
      })
      break
    }
    case 0:
      // emit the message event
      this.emit('message', topic, message, packet)
@@ -1018,8 +1193,9 @@
  var mid = packet.messageId
  var type = packet.cmd
  var response = null
  var cb = this.outgoing[mid]
  var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
  var that = this
  var err
  if (!cb) {
    // Server sent an ack in error, ignore it.
@@ -1031,7 +1207,13 @@
    case 'pubcomp':
      // same thing as puback for QoS 2
    case 'puback':
      var pubackRC = packet.reasonCode
      // Callback - we're done
      if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
        err = new Error('Publish error: ' + errors[pubackRC])
        err.code = pubackRC
        cb(err, packet)
      }
      delete this.outgoing[mid]
      this.outgoingStore.del(packet, cb)
      break
@@ -1041,18 +1223,27 @@
        qos: 2,
        messageId: mid
      }
      var pubrecRC = packet.reasonCode
      this._sendPacket(response)
      if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
        err = new Error('Publish error: ' + errors[pubrecRC])
        err.code = pubrecRC
        cb(err, packet)
      } else {
        this._sendPacket(response)
      }
      break
    case 'suback':
      delete this.outgoing[mid]
      if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) {
        // suback with Failure status
        var topics = this.messageIdToTopic[mid]
        if (topics) {
          topics.forEach(function (topic) {
            delete that._resubscribeTopics[topic]
          })
      for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) {
        if ((packet.granted[grantedI] & 0x80) !== 0) {
          // suback with Failure status
          var topics = this.messageIdToTopic[mid]
          if (topics) {
            topics.forEach(function (topic) {
              delete that._resubscribeTopics[topic]
            })
          }
        }
      }
      cb(null, packet)
@@ -1085,23 +1276,29 @@
  var comp = {cmd: 'pubcomp', messageId: mid}
  that.incomingStore.get(packet, function (err, pub) {
    if (!err && pub.cmd !== 'pubrel') {
    if (!err) {
      that.emit('message', pub.topic, pub.payload, pub)
      that.incomingStore.put(packet, function (err) {
      that.handleMessage(pub, function (err) {
        if (err) {
          return callback(err)
        }
        that.handleMessage(pub, function (err) {
          if (err) {
            return callback(err)
          }
          that._sendPacket(comp, callback)
        })
        that.incomingStore.del(pub, nop)
        that._sendPacket(comp, callback)
      })
    } else {
      that._sendPacket(comp, callback)
    }
  })
}
/**
 * _handleDisconnect
 *
 * @param {Object} packet
 * @api private
 */
MqttClient.prototype._handleDisconnect = function (packet) {
  this.emit('disconnect', packet)
}
/**
@@ -1126,4 +1323,138 @@
  return (this.nextId === 1) ? 65535 : (this.nextId - 1)
}
/**
 * _resubscribe
 * @api private
 */
MqttClient.prototype._resubscribe = function (connack) {
  var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
  if (!this._firstConnection &&
      (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
      _resubscribeTopicsKeys.length > 0) {
    if (this.options.resubscribe) {
      if (this.options.protocolVersion === 5) {
        for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
          var resubscribeTopic = {}
          resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
          resubscribeTopic.resubscribe = true
          this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties})
        }
      } else {
        this._resubscribeTopics.resubscribe = true
        this.subscribe(this._resubscribeTopics)
      }
    } else {
      this._resubscribeTopics = {}
    }
  }
  this._firstConnection = false
}
/**
 * _onConnect
 *
 * @api private
 */
MqttClient.prototype._onConnect = function (packet) {
  if (this.disconnected) {
    this.emit('connect', packet)
    return
  }
  var that = this
  this._setupPingTimer()
  this._resubscribe(packet)
  this.connected = true
  function startStreamProcess () {
    var outStore = that.outgoingStore.createStream()
    function clearStoreProcessing () {
      that._storeProcessing = false
      that._packetIdsDuringStoreProcessing = {}
    }
    that.once('close', remove)
    outStore.on('error', function (err) {
      clearStoreProcessing()
      that.removeListener('close', remove)
      that.emit('error', err)
    })
    function remove () {
      outStore.destroy()
      outStore = null
      clearStoreProcessing()
    }
    function storeDeliver () {
      // edge case, we wrapped this twice
      if (!outStore) {
        return
      }
      that._storeProcessing = true
      var packet = outStore.read(1)
      var cb
      if (!packet) {
        // read when data is available in the future
        outStore.once('readable', storeDeliver)
        return
      }
      // Skip already processed store packets
      if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
        storeDeliver()
        return
      }
      // Avoid unnecessary stream read operations when disconnected
      if (!that.disconnecting && !that.reconnectTimer) {
        cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
        that.outgoing[packet.messageId] = {
          volatile: false,
          cb: function (err, status) {
            // Ensure that the original callback passed in to publish gets invoked
            if (cb) {
              cb(err, status)
            }
            storeDeliver()
          }
        }
        that._packetIdsDuringStoreProcessing[packet.messageId] = true
        that._sendPacket(packet)
      } else if (outStore.destroy) {
        outStore.destroy()
      }
    }
    outStore.on('end', function () {
      var allProcessed = true
      for (var id in that._packetIdsDuringStoreProcessing) {
        if (!that._packetIdsDuringStoreProcessing[id]) {
          allProcessed = false
          break
        }
      }
      if (allProcessed) {
        clearStoreProcessing()
        that.removeListener('close', remove)
        that.emit('connect', packet)
      } else {
        startStreamProcess()
      }
    })
    storeDeliver()
  }
  // start flowing
  startStreamProcess()
}
module.exports = MqttClient