heyujie
2021-05-24 4885600ecc369aa2e30a65de8dd7a410f13c34df
node_modules/mqtt/test/client.js
@@ -12,6 +12,7 @@
var Duplex = require('readable-stream').Duplex
var Connection = require('mqtt-connection')
var Server = require('./server')
var FastServer = require('./server').FastMqttServer
var port = 9876
var server
@@ -26,13 +27,35 @@
/**
 * Test server
 */
function buildServer () {
  return new Server(function (client) {
function buildServer (fastFlag) {
  var handler = function (client) {
    client.on('auth', function (packet) {
      var rc = 'reasonCode'
      var connack = {}
      connack[rc] = 0
      client.connack(connack)
    })
    client.on('connect', function (packet) {
      if (packet.clientId === 'invalid') {
        client.connack({returnCode: 2})
      var rc = 'returnCode'
      var connack = {}
      if (client.options && client.options.protocolVersion === 5) {
        rc = 'reasonCode'
        if (packet.clientId === 'invalid') {
          connack[rc] = 128
        } else {
          connack[rc] = 0
        }
      } else {
        client.connack({returnCode: 0})
        if (packet.clientId === 'invalid') {
          connack[rc] = 2
        } else {
          connack[rc] = 0
        }
      }
      if (packet.properties && packet.properties.authenticationMethod) {
        return false
      } else {
        client.connack(connack)
      }
    })
@@ -73,13 +96,19 @@
    })
    client.on('unsubscribe', function (packet) {
      packet.granted = packet.unsubscriptions.map(function () { return 0 })
      client.unsuback(packet)
    })
    client.on('pingreq', function () {
      client.pingresp()
    })
  })
  }
  if (fastFlag) {
    return new FastServer(handler)
  } else {
    return new Server(handler)
  }
}
server = buildServer().listen(port)
@@ -263,14 +292,15 @@
    it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) {
      this.timeout(15000)
      var server2 = buildServer().listen(port + 42)
      var server = buildServer(true).listen(port + 41)
      var server2 = buildServer(true).listen(port + 42)
      server2.on('listening', function () {
        var client = mqtt.connect({
          protocol: 'wss',
          servers: [
            { port: port + 42, host: 'localhost', protocol: 'ws' },
            { port: port, host: 'localhost' }
            { port: port + 41, host: 'localhost' }
          ],
          keepalive: 50
        })
@@ -281,7 +311,7 @@
        })
        server.once('client', function () {
          should.equal(client.stream.socket.url, 'wss://localhost:9876/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
          should.equal(client.stream.socket.url, 'wss://localhost:9917/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
          client.end()
          done()
        })
@@ -541,4 +571,559 @@
      })
    })
  })
  it('check emit error on checkDisconnection w/o callback', function (done) {
    this.timeout(15000)
    var server118 = new Server(function (client) {
      client.on('connect', function (packet) {
        client.connack({
          reasonCode: 0
        })
      })
      client.on('publish', function (packet) {
        setImmediate(function () {
          packet.reasonCode = 0
          client.puback(packet)
        })
      })
    }).listen(port + 118)
    var opts = {
      host: 'localhost',
      port: port + 118,
      protocolVersion: 5
    }
    var client = mqtt.connect(opts)
    client.on('error', function (error) {
      should(error.message).be.equal('client disconnecting')
      server118.close()
      done()
    })
    client.on('connect', function () {
      client.end(function () {
        client._checkDisconnecting()
      })
      server118.close()
    })
  })
  describe('MQTT 5.0', function () {
    var server = buildServer().listen(port + 115)
    var config = { protocol: 'mqtt', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 200 } }
    abstractClientTests(server, config)
    it('should has Auth method with Auth data', function (done) {
      this.timeout(5000)
      var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationData: Buffer.from([1, 2, 3, 4]) }}
      try {
        mqtt.connect(opts)
      } catch (error) {
        should(error.message).be.equal('Packet has no Authentication Method')
      }
      done()
    })
    it('auth packet', function (done) {
      this.timeout(15000)
      server.once('client', function (client) {
        client.on('auth', function (packet) {
          done()
        })
      })
      var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationMethod: 'json' }, authPacket: {}}
      mqtt.connect(opts)
    })
    it('Maximum Packet Size', function (done) {
      this.timeout(15000)
      var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 1 }}
      var client = mqtt.connect(opts)
      client.on('error', function (error) {
        should(error.message).be.equal('exceeding packets size connack')
        done()
      })
    })
    describe('Topic Alias', function () {
      it('topicAlias > topicAliasMaximum', function (done) {
        this.timeout(15000)
        var maximum = 15
        var current = 22
        server.once('client', function (client) {
          client.on('publish', function (packet) {
            if (packet.properties && packet.properties.topicAlias) {
              done(new Error('Packet should not have topicAlias'))
              return false
            }
            done()
          })
        })
        var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { topicAliasMaximum: maximum }}
        var client = mqtt.connect(opts)
        client.publish('t/h', 'Message', { properties: { topicAlias: current } })
      })
      it('topicAlias w/o topicAliasMaximum in settings', function (done) {
        this.timeout(15000)
        server.once('client', function (client) {
          client.on('publish', function (packet) {
            if (packet.properties && packet.properties.topicAlias) {
              done(new Error('Packet should not have topicAlias'))
              return false
            }
            done()
          })
        })
        var opts = {host: 'localhost', port: port + 115, protocolVersion: 5}
        var client = mqtt.connect(opts)
        client.publish('t/h', 'Message', { properties: { topicAlias: 22 } })
      })
    })
    it('Change values of some properties by server response', function (done) {
      this.timeout(15000)
      var server116 = new Server(function (client) {
        client.on('connect', function (packet) {
          client.connack({
            reasonCode: 0,
            properties: {
              topicAliasMaximum: 15,
              serverKeepAlive: 16,
              maximumPacketSize: 95
            }
          })
        })
      }).listen(port + 116)
      var opts = {
        host: 'localhost',
        port: port + 116,
        protocolVersion: 5,
        properties: {
          topicAliasMaximum: 10,
          serverKeepAlive: 11,
          maximumPacketSize: 100
        }
      }
      var client = mqtt.connect(opts)
      client.on('connect', function () {
        should(client.options.keepalive).be.equal(16)
        should(client.options.properties.topicAliasMaximum).be.equal(15)
        should(client.options.properties.maximumPacketSize).be.equal(95)
        server116.close()
        done()
      })
    })
    it('should resubscribe when reconnecting with protocolVersion 5 and Session Present flag is false', function (done) {
      this.timeout(15000)
      var tryReconnect = true
      var reconnectEvent = false
      var server316 = new Server(function (client) {
        client.on('connect', function (packet) {
          client.connack({
            reasonCode: 0,
            sessionPresent: false
          })
          client.on('subscribe', function () {
            if (!tryReconnect) {
              client.end()
              server316.close()
              done()
            }
          })
        })
      }).listen(port + 316)
      var opts = {
        host: 'localhost',
        port: port + 316,
        protocolVersion: 5
      }
      var client = mqtt.connect(opts)
      client.on('reconnect', function () {
        reconnectEvent = true
      })
      client.on('connect', function (connack) {
        should(connack.sessionPresent).be.equal(false)
        if (tryReconnect) {
          client.subscribe('hello', function () {
            client.stream.end()
          })
          tryReconnect = false
        } else {
          reconnectEvent.should.equal(true)
        }
      })
    })
    it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) {
      this.timeout(15000)
      var tryReconnect = true
      var reconnectEvent = false
      var server326 = new Server(function (client) {
        client.on('connect', function (packet) {
          client.on('subscribe', function (packet) {
            if (!reconnectEvent) {
              client.suback({
                messageId: packet.messageId,
                granted: packet.subscriptions.map(function (e) {
                  return e.qos
                })
              })
            } else {
              if (!tryReconnect) {
                should(packet.properties.userProperties.test).be.equal('test')
                client.end()
                server326.close()
                done()
              }
            }
          })
          client.connack({
            reasonCode: 0,
            sessionPresent: false
          })
        })
      }).listen(port + 326)
      var opts = {
        host: 'localhost',
        port: port + 326,
        protocolVersion: 5
      }
      var client = mqtt.connect(opts)
      client.on('reconnect', function () {
        reconnectEvent = true
      })
      client.on('connect', function (connack) {
        should(connack.sessionPresent).be.equal(false)
        if (tryReconnect) {
          client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () {
            client.stream.end()
          })
          tryReconnect = false
        } else {
          reconnectEvent.should.equal(true)
        }
      })
    })
    var serverErr = new Server(function (client) {
      client.on('connect', function (packet) {
        client.connack({
          reasonCode: 0
        })
      })
      client.on('publish', function (packet) {
        setImmediate(function () {
          switch (packet.qos) {
            case 0:
              break
            case 1:
              packet.reasonCode = 142
              delete packet.cmd
              client.puback(packet)
              break
            case 2:
              packet.reasonCode = 142
              delete packet.cmd
              client.pubrec(packet)
              break
          }
        })
      })
      client.on('pubrel', function (packet) {
        packet.reasonCode = 142
        delete packet.cmd
        client.pubcomp(packet)
      })
    })
    it('Subscribe properties', function (done) {
      this.timeout(15000)
      var opts = {
        host: 'localhost',
        port: port + 119,
        protocolVersion: 5
      }
      var subOptions = { properties: { subscriptionIdentifier: 1234 } }
      var server119 = new Server(function (client) {
        client.on('connect', function (packet) {
          client.connack({
            reasonCode: 0
          })
        })
        client.on('subscribe', function (packet) {
          should(packet.properties.subscriptionIdentifier).be.equal(subOptions.properties.subscriptionIdentifier)
          server119.close()
          done()
        })
      }).listen(port + 119)
      var client = mqtt.connect(opts)
      client.on('connect', function () {
        client.subscribe('a/b', subOptions)
      })
    })
    it('puback handling errors check', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5
      }
      var client = mqtt.connect(opts)
      client.once('connect', () => {
        client.publish('a/b', 'message', {qos: 1}, function (err, packet) {
          should(err.message).be.equal('Publish error: Session taken over')
          should(err.code).be.equal(142)
        })
        serverErr.close()
        done()
      })
    })
    it('pubrec handling errors check', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 118)
      var opts = {
        host: 'localhost',
        port: port + 118,
        protocolVersion: 5
      }
      var client = mqtt.connect(opts)
      client.once('connect', () => {
        client.publish('a/b', 'message', {qos: 2}, function (err, packet) {
          should(err.message).be.equal('Publish error: Session taken over')
          should(err.code).be.equal(142)
        })
        serverErr.close()
        done()
      })
    })
    it('puback handling custom reason code', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5,
        customHandleAcks: function (topic, message, packet, cb) {
          var code = 0
          if (topic === 'a/b') {
            code = 128
          }
          cb(code)
        }
      }
      serverErr.once('client', function (c) {
        c.once('subscribe', function () {
          c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
        })
        c.on('puback', function (packet) {
          should(packet.reasonCode).be.equal(128)
          client.end()
          c.destroy()
          serverErr.close()
          done()
        })
      })
      var client = mqtt.connect(opts)
      client.once('connect', function () {
        client.subscribe('a/b', {qos: 1})
      })
    })
    it('server side disconnect', function (done) {
      this.timeout(15000)
      var server327 = new Server(function (client) {
        client.on('connect', function (packet) {
          client.connack({
            reasonCode: 0
          })
          client.disconnect({reasonCode: 128})
          server327.close()
        })
      })
      server327.listen(port + 327)
      var opts = {
        host: 'localhost',
        port: port + 327,
        protocolVersion: 5
      }
      var client = mqtt.connect(opts)
      client.once('disconnect', function (disconnectPacket) {
        should(disconnectPacket.reasonCode).be.equal(128)
        done()
      })
    })
    it('pubrec handling custom reason code', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5,
        customHandleAcks: function (topic, message, packet, cb) {
          var code = 0
          if (topic === 'a/b') {
            code = 128
          }
          cb(code)
        }
      }
      serverErr.once('client', function (c) {
        c.once('subscribe', function () {
          c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
        })
        c.on('pubrec', function (packet) {
          should(packet.reasonCode).be.equal(128)
          client.end()
          c.destroy()
          serverErr.close()
          done()
        })
      })
      var client = mqtt.connect(opts)
      client.once('connect', function () {
        client.subscribe('a/b', {qos: 1})
      })
    })
    it('puback handling custom reason code with error', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5,
        customHandleAcks: function (topic, message, packet, cb) {
          var code = 0
          if (topic === 'a/b') {
            cb(new Error('a/b is not valid'))
          }
          cb(code)
        }
      }
      serverErr.once('client', function (c) {
        c.once('subscribe', function () {
          c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
        })
      })
      var client = mqtt.connect(opts)
      client.on('error', function (error) {
        should(error.message).be.equal('a/b is not valid')
        client.end()
        serverErr.close()
        done()
      })
      client.once('connect', function () {
        client.subscribe('a/b', {qos: 1})
      })
    })
    it('pubrec handling custom reason code with error', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5,
        customHandleAcks: function (topic, message, packet, cb) {
          var code = 0
          if (topic === 'a/b') {
            cb(new Error('a/b is not valid'))
          }
          cb(code)
        }
      }
      serverErr.once('client', function (c) {
        c.once('subscribe', function () {
          c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
        })
      })
      var client = mqtt.connect(opts)
      client.on('error', function (error) {
        should(error.message).be.equal('a/b is not valid')
        client.end()
        serverErr.close()
        done()
      })
      client.once('connect', function () {
        client.subscribe('a/b', {qos: 1})
      })
    })
    it('puback handling custom invalid reason code', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5,
        customHandleAcks: function (topic, message, packet, cb) {
          var code = 0
          if (topic === 'a/b') {
            code = 124124
          }
          cb(code)
        }
      }
      serverErr.once('client', function (c) {
        c.once('subscribe', function () {
          c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
        })
      })
      var client = mqtt.connect(opts)
      client.on('error', function (error) {
        should(error.message).be.equal('Wrong reason code for puback')
        client.end()
        serverErr.close()
        done()
      })
      client.once('connect', function () {
        client.subscribe('a/b', {qos: 1})
      })
    })
    it('pubrec handling custom invalid reason code', function (done) {
      this.timeout(15000)
      serverErr.listen(port + 117)
      var opts = {
        host: 'localhost',
        port: port + 117,
        protocolVersion: 5,
        customHandleAcks: function (topic, message, packet, cb) {
          var code = 0
          if (topic === 'a/b') {
            code = 34535
          }
          cb(code)
        }
      }
      serverErr.once('client', function (c) {
        c.once('subscribe', function () {
          c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
        })
      })
      var client = mqtt.connect(opts)
      client.on('error', function (error) {
        should(error.message).be.equal('Wrong reason code for pubrec')
        client.end()
        serverErr.close()
        done()
      })
      client.once('connect', function () {
        client.subscribe('a/b', {qos: 1})
      })
    })
  })
})