From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001
From: heyujie <516346543@qq.com>
Date: 星期一, 24 五月 2021 11:25:04 +0800
Subject: [PATCH] 解决app通信问题

---
 node_modules/mqtt/test/client.js |  603 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 594 insertions(+), 9 deletions(-)

diff --git a/node_modules/mqtt/test/client.js b/node_modules/mqtt/test/client.js
index ff4e8e6..008d037 100644
--- a/node_modules/mqtt/test/client.js
+++ b/node_modules/mqtt/test/client.js
@@ -12,6 +12,7 @@
 var Duplex = require('readable-stream').Duplex
 var Connection = require('mqtt-connection')
 var Server = require('./server')
+var FastServer = require('./server').FastMqttServer
 var port = 9876
 var server
 
@@ -26,13 +27,35 @@
 /**
  * Test server
  */
-function buildServer () {
-  return new Server(function (client) {
+function buildServer (fastFlag) {
+  var handler = function (client) {
+    client.on('auth', function (packet) {
+      var rc = 'reasonCode'
+      var connack = {}
+      connack[rc] = 0
+      client.connack(connack)
+    })
     client.on('connect', function (packet) {
-      if (packet.clientId === 'invalid') {
-        client.connack({returnCode: 2})
+      var rc = 'returnCode'
+      var connack = {}
+      if (client.options && client.options.protocolVersion === 5) {
+        rc = 'reasonCode'
+        if (packet.clientId === 'invalid') {
+          connack[rc] = 128
+        } else {
+          connack[rc] = 0
+        }
       } else {
-        client.connack({returnCode: 0})
+        if (packet.clientId === 'invalid') {
+          connack[rc] = 2
+        } else {
+          connack[rc] = 0
+        }
+      }
+      if (packet.properties && packet.properties.authenticationMethod) {
+        return false
+      } else {
+        client.connack(connack)
       }
     })
 
@@ -73,13 +96,19 @@
     })
 
     client.on('unsubscribe', function (packet) {
+      packet.granted = packet.unsubscriptions.map(function () { return 0 })
       client.unsuback(packet)
     })
 
     client.on('pingreq', function () {
       client.pingresp()
     })
-  })
+  }
+  if (fastFlag) {
+    return new FastServer(handler)
+  } else {
+    return new Server(handler)
+  }
 }
 
 server = buildServer().listen(port)
@@ -263,14 +292,15 @@
     it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) {
       this.timeout(15000)
 
-      var server2 = buildServer().listen(port + 42)
+      var server = buildServer(true).listen(port + 41)
+      var server2 = buildServer(true).listen(port + 42)
 
       server2.on('listening', function () {
         var client = mqtt.connect({
           protocol: 'wss',
           servers: [
             { port: port + 42, host: 'localhost', protocol: 'ws' },
-            { port: port, host: 'localhost' }
+            { port: port + 41, host: 'localhost' }
           ],
           keepalive: 50
         })
@@ -281,7 +311,7 @@
         })
 
         server.once('client', function () {
-          should.equal(client.stream.socket.url, 'wss://localhost:9876/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
+          should.equal(client.stream.socket.url, 'wss://localhost:9917/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
           client.end()
           done()
         })
@@ -541,4 +571,559 @@
       })
     })
   })
+
+  it('check emit error on checkDisconnection w/o callback', function (done) {
+    this.timeout(15000)
+    var server118 = new Server(function (client) {
+      client.on('connect', function (packet) {
+        client.connack({
+          reasonCode: 0
+        })
+      })
+      client.on('publish', function (packet) {
+        setImmediate(function () {
+          packet.reasonCode = 0
+          client.puback(packet)
+        })
+      })
+    }).listen(port + 118)
+    var opts = {
+      host: 'localhost',
+      port: port + 118,
+      protocolVersion: 5
+    }
+    var client = mqtt.connect(opts)
+    client.on('error', function (error) {
+      should(error.message).be.equal('client disconnecting')
+      server118.close()
+      done()
+    })
+    client.on('connect', function () {
+      client.end(function () {
+        client._checkDisconnecting()
+      })
+      server118.close()
+    })
+  })
+
+  describe('MQTT 5.0', function () {
+    var server = buildServer().listen(port + 115)
+    var config = { protocol: 'mqtt', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 200 } }
+    abstractClientTests(server, config)
+    it('should has Auth method with Auth data', function (done) {
+      this.timeout(5000)
+      var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationData: Buffer.from([1, 2, 3, 4]) }}
+      try {
+        mqtt.connect(opts)
+      } catch (error) {
+        should(error.message).be.equal('Packet has no Authentication Method')
+      }
+      done()
+    })
+    it('auth packet', function (done) {
+      this.timeout(15000)
+      server.once('client', function (client) {
+        client.on('auth', function (packet) {
+          done()
+        })
+      })
+      var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationMethod: 'json' }, authPacket: {}}
+      mqtt.connect(opts)
+    })
+    it('Maximum Packet Size', function (done) {
+      this.timeout(15000)
+      var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 1 }}
+      var client = mqtt.connect(opts)
+      client.on('error', function (error) {
+        should(error.message).be.equal('exceeding packets size connack')
+        done()
+      })
+    })
+    describe('Topic Alias', function () {
+      it('topicAlias > topicAliasMaximum', function (done) {
+        this.timeout(15000)
+        var maximum = 15
+        var current = 22
+        server.once('client', function (client) {
+          client.on('publish', function (packet) {
+            if (packet.properties && packet.properties.topicAlias) {
+              done(new Error('Packet should not have topicAlias'))
+              return false
+            }
+            done()
+          })
+        })
+        var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { topicAliasMaximum: maximum }}
+        var client = mqtt.connect(opts)
+        client.publish('t/h', 'Message', { properties: { topicAlias: current } })
+      })
+      it('topicAlias w/o topicAliasMaximum in settings', function (done) {
+        this.timeout(15000)
+        server.once('client', function (client) {
+          client.on('publish', function (packet) {
+            if (packet.properties && packet.properties.topicAlias) {
+              done(new Error('Packet should not have topicAlias'))
+              return false
+            }
+            done()
+          })
+        })
+        var opts = {host: 'localhost', port: port + 115, protocolVersion: 5}
+        var client = mqtt.connect(opts)
+        client.publish('t/h', 'Message', { properties: { topicAlias: 22 } })
+      })
+    })
+    it('Change values of some properties by server response', function (done) {
+      this.timeout(15000)
+      var server116 = new Server(function (client) {
+        client.on('connect', function (packet) {
+          client.connack({
+            reasonCode: 0,
+            properties: {
+              topicAliasMaximum: 15,
+              serverKeepAlive: 16,
+              maximumPacketSize: 95
+            }
+          })
+        })
+      }).listen(port + 116)
+      var opts = {
+        host: 'localhost',
+        port: port + 116,
+        protocolVersion: 5,
+        properties: {
+          topicAliasMaximum: 10,
+          serverKeepAlive: 11,
+          maximumPacketSize: 100
+        }
+      }
+      var client = mqtt.connect(opts)
+      client.on('connect', function () {
+        should(client.options.keepalive).be.equal(16)
+        should(client.options.properties.topicAliasMaximum).be.equal(15)
+        should(client.options.properties.maximumPacketSize).be.equal(95)
+        server116.close()
+        done()
+      })
+    })
+
+    it('should resubscribe when reconnecting with protocolVersion 5 and Session Present flag is false', function (done) {
+      this.timeout(15000)
+      var tryReconnect = true
+      var reconnectEvent = false
+      var server316 = new Server(function (client) {
+        client.on('connect', function (packet) {
+          client.connack({
+            reasonCode: 0,
+            sessionPresent: false
+          })
+          client.on('subscribe', function () {
+            if (!tryReconnect) {
+              client.end()
+              server316.close()
+              done()
+            }
+          })
+        })
+      }).listen(port + 316)
+      var opts = {
+        host: 'localhost',
+        port: port + 316,
+        protocolVersion: 5
+      }
+      var client = mqtt.connect(opts)
+
+      client.on('reconnect', function () {
+        reconnectEvent = true
+      })
+
+      client.on('connect', function (connack) {
+        should(connack.sessionPresent).be.equal(false)
+        if (tryReconnect) {
+          client.subscribe('hello', function () {
+            client.stream.end()
+          })
+
+          tryReconnect = false
+        } else {
+          reconnectEvent.should.equal(true)
+        }
+      })
+    })
+
+    it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) {
+      this.timeout(15000)
+      var tryReconnect = true
+      var reconnectEvent = false
+      var server326 = new Server(function (client) {
+        client.on('connect', function (packet) {
+          client.on('subscribe', function (packet) {
+            if (!reconnectEvent) {
+              client.suback({
+                messageId: packet.messageId,
+                granted: packet.subscriptions.map(function (e) {
+                  return e.qos
+                })
+              })
+            } else {
+              if (!tryReconnect) {
+                should(packet.properties.userProperties.test).be.equal('test')
+                client.end()
+                server326.close()
+                done()
+              }
+            }
+          })
+          client.connack({
+            reasonCode: 0,
+            sessionPresent: false
+          })
+        })
+      }).listen(port + 326)
+      var opts = {
+        host: 'localhost',
+        port: port + 326,
+        protocolVersion: 5
+      }
+      var client = mqtt.connect(opts)
+
+      client.on('reconnect', function () {
+        reconnectEvent = true
+      })
+
+      client.on('connect', function (connack) {
+        should(connack.sessionPresent).be.equal(false)
+        if (tryReconnect) {
+          client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () {
+            client.stream.end()
+          })
+
+          tryReconnect = false
+        } else {
+          reconnectEvent.should.equal(true)
+        }
+      })
+    })
+
+    var serverErr = new Server(function (client) {
+      client.on('connect', function (packet) {
+        client.connack({
+          reasonCode: 0
+        })
+      })
+      client.on('publish', function (packet) {
+        setImmediate(function () {
+          switch (packet.qos) {
+            case 0:
+              break
+            case 1:
+              packet.reasonCode = 142
+              delete packet.cmd
+              client.puback(packet)
+              break
+            case 2:
+              packet.reasonCode = 142
+              delete packet.cmd
+              client.pubrec(packet)
+              break
+          }
+        })
+      })
+
+      client.on('pubrel', function (packet) {
+        packet.reasonCode = 142
+        delete packet.cmd
+        client.pubcomp(packet)
+      })
+    })
+    it('Subscribe properties', function (done) {
+      this.timeout(15000)
+      var opts = {
+        host: 'localhost',
+        port: port + 119,
+        protocolVersion: 5
+      }
+      var subOptions = { properties: { subscriptionIdentifier: 1234 } }
+      var server119 = new Server(function (client) {
+        client.on('connect', function (packet) {
+          client.connack({
+            reasonCode: 0
+          })
+        })
+        client.on('subscribe', function (packet) {
+          should(packet.properties.subscriptionIdentifier).be.equal(subOptions.properties.subscriptionIdentifier)
+          server119.close()
+          done()
+        })
+      }).listen(port + 119)
+
+      var client = mqtt.connect(opts)
+      client.on('connect', function () {
+        client.subscribe('a/b', subOptions)
+      })
+    })
+
+    it('puback handling errors check', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5
+      }
+      var client = mqtt.connect(opts)
+      client.once('connect', () => {
+        client.publish('a/b', 'message', {qos: 1}, function (err, packet) {
+          should(err.message).be.equal('Publish error: Session taken over')
+          should(err.code).be.equal(142)
+        })
+        serverErr.close()
+        done()
+      })
+    })
+    it('pubrec handling errors check', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 118)
+      var opts = {
+        host: 'localhost',
+        port: port + 118,
+        protocolVersion: 5
+      }
+      var client = mqtt.connect(opts)
+      client.once('connect', () => {
+        client.publish('a/b', 'message', {qos: 2}, function (err, packet) {
+          should(err.message).be.equal('Publish error: Session taken over')
+          should(err.code).be.equal(142)
+        })
+        serverErr.close()
+        done()
+      })
+    })
+    it('puback handling custom reason code', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5,
+        customHandleAcks: function (topic, message, packet, cb) {
+          var code = 0
+          if (topic === 'a/b') {
+            code = 128
+          }
+          cb(code)
+        }
+      }
+
+      serverErr.once('client', function (c) {
+        c.once('subscribe', function () {
+          c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
+        })
+
+        c.on('puback', function (packet) {
+          should(packet.reasonCode).be.equal(128)
+          client.end()
+          c.destroy()
+          serverErr.close()
+          done()
+        })
+      })
+
+      var client = mqtt.connect(opts)
+      client.once('connect', function () {
+        client.subscribe('a/b', {qos: 1})
+      })
+    })
+    it('server side disconnect', function (done) {
+      this.timeout(15000)
+      var server327 = new Server(function (client) {
+        client.on('connect', function (packet) {
+          client.connack({
+            reasonCode: 0
+          })
+          client.disconnect({reasonCode: 128})
+          server327.close()
+        })
+      })
+      server327.listen(port + 327)
+      var opts = {
+        host: 'localhost',
+        port: port + 327,
+        protocolVersion: 5
+      }
+
+      var client = mqtt.connect(opts)
+      client.once('disconnect', function (disconnectPacket) {
+        should(disconnectPacket.reasonCode).be.equal(128)
+        done()
+      })
+    })
+    it('pubrec handling custom reason code', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5,
+        customHandleAcks: function (topic, message, packet, cb) {
+          var code = 0
+          if (topic === 'a/b') {
+            code = 128
+          }
+          cb(code)
+        }
+      }
+
+      serverErr.once('client', function (c) {
+        c.once('subscribe', function () {
+          c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
+        })
+
+        c.on('pubrec', function (packet) {
+          should(packet.reasonCode).be.equal(128)
+          client.end()
+          c.destroy()
+          serverErr.close()
+          done()
+        })
+      })
+
+      var client = mqtt.connect(opts)
+      client.once('connect', function () {
+        client.subscribe('a/b', {qos: 1})
+      })
+    })
+    it('puback handling custom reason code with error', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5,
+        customHandleAcks: function (topic, message, packet, cb) {
+          var code = 0
+          if (topic === 'a/b') {
+            cb(new Error('a/b is not valid'))
+          }
+          cb(code)
+        }
+      }
+
+      serverErr.once('client', function (c) {
+        c.once('subscribe', function () {
+          c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
+        })
+      })
+
+      var client = mqtt.connect(opts)
+      client.on('error', function (error) {
+        should(error.message).be.equal('a/b is not valid')
+        client.end()
+        serverErr.close()
+        done()
+      })
+      client.once('connect', function () {
+        client.subscribe('a/b', {qos: 1})
+      })
+    })
+    it('pubrec handling custom reason code with error', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5,
+        customHandleAcks: function (topic, message, packet, cb) {
+          var code = 0
+          if (topic === 'a/b') {
+            cb(new Error('a/b is not valid'))
+          }
+          cb(code)
+        }
+      }
+
+      serverErr.once('client', function (c) {
+        c.once('subscribe', function () {
+          c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
+        })
+      })
+
+      var client = mqtt.connect(opts)
+      client.on('error', function (error) {
+        should(error.message).be.equal('a/b is not valid')
+        client.end()
+        serverErr.close()
+        done()
+      })
+      client.once('connect', function () {
+        client.subscribe('a/b', {qos: 1})
+      })
+    })
+    it('puback handling custom invalid reason code', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5,
+        customHandleAcks: function (topic, message, packet, cb) {
+          var code = 0
+          if (topic === 'a/b') {
+            code = 124124
+          }
+          cb(code)
+        }
+      }
+
+      serverErr.once('client', function (c) {
+        c.once('subscribe', function () {
+          c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
+        })
+      })
+
+      var client = mqtt.connect(opts)
+      client.on('error', function (error) {
+        should(error.message).be.equal('Wrong reason code for puback')
+        client.end()
+        serverErr.close()
+        done()
+      })
+      client.once('connect', function () {
+        client.subscribe('a/b', {qos: 1})
+      })
+    })
+    it('pubrec handling custom invalid reason code', function (done) {
+      this.timeout(15000)
+      serverErr.listen(port + 117)
+      var opts = {
+        host: 'localhost',
+        port: port + 117,
+        protocolVersion: 5,
+        customHandleAcks: function (topic, message, packet, cb) {
+          var code = 0
+          if (topic === 'a/b') {
+            code = 34535
+          }
+          cb(code)
+        }
+      }
+
+      serverErr.once('client', function (c) {
+        c.once('subscribe', function () {
+          c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
+        })
+      })
+
+      var client = mqtt.connect(opts)
+      client.on('error', function (error) {
+        should(error.message).be.equal('Wrong reason code for pubrec')
+        client.end()
+        serverErr.close()
+        done()
+      })
+      client.once('connect', function () {
+        client.subscribe('a/b', {qos: 1})
+      })
+    })
+  })
 })

--
Gitblit v1.8.0