heyujie
2021-05-24 4885600ecc369aa2e30a65de8dd7a410f13c34df
node_modules/mqtt/test/abstract_client.js
@@ -12,6 +12,7 @@
var port = 9876
module.exports = function (server, config) {
  var version = config.protocolVersion || 4
  function connect (opts) {
    opts = xtend(config, opts)
    return mqtt.connect(opts)
@@ -302,11 +303,13 @@
    })
    it('should provide connack packet with connect event', function (done) {
      var connack = version === 5 ? {reasonCode: 0} : {returnCode: 0}
      server.once('client', function (serverClient) {
        serverClient.connack({returnCode: 0, sessionPresent: true})
        connack.sessionPresent = true
        serverClient.connack(connack)
        server.once('client', function (serverClient) {
          serverClient.connack({returnCode: 0, sessionPresent: false})
          connack.sessionPresent = false
          serverClient.connack(connack)
        })
      })
@@ -339,7 +342,8 @@
        done(new Error('Should not emit connect'))
      })
      client.once('error', function (error) {
        should(error.code).be.equal(2) // code for clientID identifer rejected
        var value = version === 5 ? 128 : 2
        should(error.code).be.equal(value) // code for clientID identifer rejected
        client.end()
        done()
      })
@@ -529,6 +533,64 @@
        setTimeout(function () {
          client.end(true, done)
        }, 10)
      })
    })
    it('should not interrupt messages', function (done) {
      var client = null
      var incomingStore = new mqtt.Store({ clean: false })
      var outgoingStore = new mqtt.Store({ clean: false })
      var publishCount = 0
      var server2 = new Server(function (c) {
        c.on('connect', function () {
          c.connack({returnCode: 0})
        })
        c.on('publish', function (packet) {
          if (packet.qos !== 0) {
            c.puback({messageId: packet.messageId})
          }
          switch (publishCount++) {
            case 0:
              packet.payload.toString().should.equal('payload1')
              break
            case 1:
              packet.payload.toString().should.equal('payload2')
              break
            case 2:
              packet.payload.toString().should.equal('payload3')
              break
            case 3:
              packet.payload.toString().should.equal('payload4')
              server2.close()
              done()
              break
          }
        })
      })
      server2.listen(port + 50, function () {
        client = mqtt.connect({
          port: port + 50,
          host: 'localhost',
          clean: false,
          clientId: 'cid1',
          reconnectPeriod: 0,
          incomingStore: incomingStore,
          outgoingStore: outgoingStore,
          queueQoSZero: true
        })
        client.on('packetreceive', function (packet) {
          if (packet.cmd === 'connack') {
            setImmediate(
              function () {
                client.publish('test', 'payload3', {qos: 1})
                client.publish('test', 'payload4', {qos: 0})
              }
            )
          }
        })
        client.publish('test', 'payload1', {qos: 2})
        client.publish('test', 'payload2', {qos: 2})
      })
    })
@@ -1004,7 +1066,7 @@
          return new AsyncStore()
        }
      }
      AsyncStore.prototype.put = function (packet, cb) {
      AsyncStore.prototype.del = function (packet, cb) {
        process.nextTick(function () {
          cb(new Error('Error'))
        })
@@ -1027,15 +1089,15 @@
    })
    it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
      var putComplete = false
      var delComplete = false
      function AsyncStore () {
        if (!(this instanceof AsyncStore)) {
          return new AsyncStore()
        }
      }
      AsyncStore.prototype.put = function (packet, cb) {
      AsyncStore.prototype.del = function (packet, cb) {
        process.nextTick(function () {
          putComplete = true
          delComplete = true
          cb(null)
        })
      }
@@ -1051,7 +1113,7 @@
        messageId: 1,
        qos: 2
      }, function () {
        putComplete.should.equal(true)
        delComplete.should.equal(true)
        done()
        client.end()
      })
@@ -1152,6 +1214,115 @@
          }
        })
      })
    })
    it('should keep message order', function (done) {
      var publishCount = 0
      var reconnect = false
      var client = {}
      var incomingStore = new mqtt.Store({ clean: false })
      var outgoingStore = new mqtt.Store({ clean: false })
      var server2 = new Server(function (c) {
        // errors are not interesting for this test
        // but they might happen on some platforms
        c.on('error', function () {})
        c.on('connect', function (packet) {
          c.connack({returnCode: 0})
        })
        c.on('publish', function (packet) {
          c.puback({messageId: packet.messageId})
          if (reconnect) {
            switch (publishCount++) {
              case 0:
                packet.payload.toString().should.equal('payload1')
                break
              case 1:
                packet.payload.toString().should.equal('payload2')
                break
              case 2:
                packet.payload.toString().should.equal('payload3')
                server2.close()
                done()
                break
            }
          }
        })
      })
      server2.listen(port + 50, function () {
        client = mqtt.connect({
          port: port + 50,
          host: 'localhost',
          clean: false,
          clientId: 'cid1',
          reconnectPeriod: 0,
          incomingStore: incomingStore,
          outgoingStore: outgoingStore
        })
        client.on('connect', function () {
          if (!reconnect) {
            client.publish('topic', 'payload1', {qos: 1})
            client.publish('topic', 'payload2', {qos: 1})
            client.end(true)
          } else {
            client.publish('topic', 'payload3', {qos: 1})
          }
        })
        client.on('close', function () {
          if (!reconnect) {
            client.reconnect({
              clean: false,
              incomingStore: incomingStore,
              outgoingStore: outgoingStore
            })
            reconnect = true
          }
        })
      })
    })
    function testCallbackStorePutByQoS (qos, clean, expected, done) {
      var client = connect({
        clean: clean,
        clientId: 'testId'
      })
      var callbacks = []
      function cbStorePut () {
        callbacks.push('storeput')
      }
      client.on('connect', function () {
        client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) {
          if (err) done(err)
          callbacks.push('publish')
          should.deepEqual(callbacks, expected)
          done()
        })
        client.end()
      })
    }
    it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) {
      testCallbackStorePutByQoS(0, true, ['publish'], done)
    })
    it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) {
      testCallbackStorePutByQoS(0, false, ['publish'], done)
    })
    it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) {
      testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done)
    })
    it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) {
      testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done)
    })
    it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) {
      testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done)
    })
    it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) {
      testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done)
    })
  })
@@ -1428,10 +1599,16 @@
      server.once('client', function (serverClient) {
        serverClient.once('subscribe', function (packet) {
          packet.subscriptions.should.containEql({
          var result = {
            topic: topic,
            qos: 0
          })
          }
          if (version === 5) {
            result.nl = false
            result.rap = false
            result.rh = 0
          }
          packet.subscriptions.should.containEql(result)
          done()
        })
      })
@@ -1479,7 +1656,13 @@
        serverClient.once('subscribe', function (packet) {
          // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
          var expected = subs.map(function (i) {
            return {topic: i, qos: 0}
            var result = {topic: i, qos: 0}
            if (version === 5) {
              result.nl = false
              result.rap = false
              result.rh = 0
            }
            return result
          })
          packet.subscriptions.should.eql(expected)
@@ -1491,8 +1674,8 @@
    it('should accept an hash of subscriptions', function (done) {
      var client = connect()
      var topics = {
        test1: 0,
        test2: 1
        test1: {qos: 0},
        test2: {qos: 1}
      }
      client.once('connect', function () {
@@ -1506,10 +1689,16 @@
          for (k in topics) {
            if (topics.hasOwnProperty(k)) {
              expected.push({
              var result = {
                topic: k,
                qos: topics[k]
              })
                qos: topics[k].qos
              }
              if (version === 5) {
                result.nl = false
                result.rap = false
                result.rh = 0
              }
              expected.push(result)
            }
          }
@@ -1535,6 +1724,12 @@
            qos: 1
          }]
          if (version === 5) {
            expected[0].nl = false
            expected[0].rap = false
            expected[0].rh = 0
          }
          packet.subscriptions.should.eql(expected)
          done()
        })
@@ -1552,10 +1747,16 @@
      server.once('client', function (serverClient) {
        serverClient.once('subscribe', function (packet) {
          packet.subscriptions.should.containEql({
          var result = {
            topic: topic,
            qos: defaultOpts.qos
          })
          }
          if (version === 5) {
            result.nl = false
            result.rap = false
            result.rh = 0
          }
          packet.subscriptions.should.containEql(result)
          done()
        })
      })
@@ -1571,7 +1772,14 @@
            done(err)
          } else {
            should.exist(granted, 'granted not given')
            granted.should.containEql({topic: 'test', qos: 2})
            var result = {topic: 'test', qos: 2}
            if (version === 5) {
              result.nl = false
              result.rap = false
              result.rh = 0
              result.properties = undefined
            }
            granted.should.containEql(result)
            done()
          }
        })
@@ -1617,10 +1825,16 @@
      server.once('client', function (serverClient) {
        serverClient.once('subscribe', function (packet) {
          packet.subscriptions.should.containEql({
          var result = {
            topic: topic,
            qos: 0
          })
          }
          if (version === 5) {
            result.nl = false
            result.rap = false
            result.rh = 0
          }
          packet.subscriptions.should.containEql(result)
          done()
        })
      })
@@ -1847,9 +2061,76 @@
      var testTopic = 'test'
      var testMessage = 'message'
      var mid = 253
      var publishReceived = false
      var pubrecReceived = false
      var pubrelReceived = false
      client.once('connect', function () {
        client.subscribe(testTopic, {qos: 2})
      })
      client.on('packetreceive', (packet) => {
        switch (packet.cmd) {
          case 'connack':
          case 'suback':
            // expected, but not specifically part of QOS 2 semantics
            break
          case 'publish':
            pubrecReceived.should.be.false()
            pubrelReceived.should.be.false()
            publishReceived = true
            break
          case 'pubrel':
            publishReceived.should.be.true()
            pubrecReceived.should.be.true()
            pubrelReceived = true
            break
          default:
            should.fail()
        }
      })
      server.once('client', function (serverClient) {
        serverClient.once('subscribe', function () {
          serverClient.publish({
            topic: testTopic,
            payload: testMessage,
            qos: 2,
            messageId: mid
          })
        })
        serverClient.on('pubrec', function () {
          publishReceived.should.be.true()
          pubrelReceived.should.be.false()
          pubrecReceived = true
        })
        serverClient.once('pubcomp', function () {
          client.removeAllListeners()
          serverClient.removeAllListeners()
          publishReceived.should.be.true()
          pubrecReceived.should.be.true()
          pubrelReceived.should.be.true()
          done()
        })
      })
    })
    it('should should empty the incoming store after a qos 2 handshake is completed', function (done) {
      var client = connect()
      var testTopic = 'test'
      var testMessage = 'message'
      var mid = 253
      client.once('connect', function () {
        client.subscribe(testTopic, {qos: 2})
      })
      client.on('packetreceive', (packet) => {
        if (packet.cmd === 'pubrel') {
          should(client.incomingStore._inflights.size).be.equal(1)
        }
      })
      server.once('client', function (serverClient) {
@@ -1863,9 +2144,93 @@
        })
        serverClient.once('pubcomp', function () {
          should(client.incomingStore._inflights.size).be.equal(0)
          client.removeAllListeners()
          done()
        })
      })
    })
    function testMultiplePubrel (shouldSendPubcompFail, done) {
      var client = connect()
      var testTopic = 'test'
      var testMessage = 'message'
      var mid = 253
      var pubcompCount = 0
      var pubrelCount = 0
      var handleMessageCount = 0
      var emitMessageCount = 0
      var origSendPacket = client._sendPacket
      var shouldSendFail
      client.handleMessage = function (packet, callback) {
        handleMessageCount++
        callback()
      }
      client.on('message', function () {
        emitMessageCount++
      })
      client._sendPacket = function (packet, sendDone) {
        shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail
        if (sendDone) {
          sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined)
        }
        // send the mocked response
        switch (packet.cmd) {
          case 'subscribe':
            const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]}
            client._handlePacket(suback, function (err) {
              should(err).not.be.ok()
            })
            break
          case 'pubrec':
          case 'pubcomp':
            // for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp
            if (packet.cmd === 'pubcomp') {
              pubcompCount++
              if (pubcompCount === 2) {
                // end the test once the client has gone through two rounds of replying to pubrel messages
                pubrelCount.should.be.exactly(2)
                handleMessageCount.should.be.exactly(1)
                emitMessageCount.should.be.exactly(1)
                client._sendPacket = origSendPacket
                done()
                break
              }
            }
            // simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received
            const pubrel = {cmd: 'pubrel', messageId: mid}
            pubrelCount++
            client._handlePacket(pubrel, function (err) {
              if (shouldSendFail) {
                should(err).be.ok()
              } else {
                should(err).not.be.ok()
              }
            })
            break
        }
      }
      client.once('connect', function () {
        client.subscribe(testTopic, {qos: 2})
        const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid}
        client._handlePacket(publish, function (err) {
          should(err).not.be.ok()
        })
      })
    }
    it('handle qos 2 messages exactly once when multiple pubrel received', function (done) {
      testMultiplePubrel(false, done)
    })
    it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) {
      testMultiplePubrel(true, done)
    })
  })
@@ -2317,6 +2682,57 @@
      })
    })
    it('should clear outgoing if close from server', function (done) {
      var reconnect = false
      var client = {}
      var server2 = new Server(function (c) {
        c.on('connect', function (packet) {
          c.connack({returnCode: 0})
        })
        c.on('subscribe', function (packet) {
          if (reconnect) {
            c.suback({
              messageId: packet.messageId,
              granted: packet.subscriptions.map(function (e) {
                return e.qos
              })
            })
          } else {
            c.destroy()
          }
        })
      })
      server2.listen(port + 50, function () {
        client = mqtt.connect({
          port: port + 50,
          host: 'localhost',
          clean: true,
          clientId: 'cid1',
          reconnectPeriod: 0
        })
        client.on('connect', function () {
          client.subscribe('test', {qos: 2}, function (e) {
            if (!e) {
              client.end()
            }
          })
        })
        client.on('close', function () {
          if (reconnect) {
            server2.close()
            done()
          } else {
            Object.keys(client.outgoing).length.should.equal(0)
            reconnect = true
            client.reconnect()
          }
        })
      })
    })
    it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) {
      var reconnect = false
      var client = {}
@@ -2591,6 +3007,7 @@
    context('with alternate server client', function () {
      var cachedClientListeners
      var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
      beforeEach(function () {
        cachedClientListeners = server.listeners('client')
@@ -2612,7 +3029,7 @@
        server.on('client', function (serverClient) {
          serverClient.on('connect', function () {
            connectCount++
            serverClient.connack({returnCode: 0})
            serverClient.connack(connack)
          })
          serverClient.on('subscribe', function () {
@@ -2641,7 +3058,7 @@
        server.on('client', function (serverClient) {
          serverClient.on('connect', function () {
            serverClient.connack({returnCode: 0})
            serverClient.connack(connack)
          })
          serverClient.on('subscribe', function () {