From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001
From: heyujie <516346543@qq.com>
Date: 星期一, 24 五月 2021 11:25:04 +0800
Subject: [PATCH] 解决app通信问题

---
 node_modules/mqtt/test/abstract_client.js |  465 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 441 insertions(+), 24 deletions(-)

diff --git a/node_modules/mqtt/test/abstract_client.js b/node_modules/mqtt/test/abstract_client.js
index 87c0b98..017a2e3 100644
--- a/node_modules/mqtt/test/abstract_client.js
+++ b/node_modules/mqtt/test/abstract_client.js
@@ -12,6 +12,7 @@
 var port = 9876
 
 module.exports = function (server, config) {
+  var version = config.protocolVersion || 4
   function connect (opts) {
     opts = xtend(config, opts)
     return mqtt.connect(opts)
@@ -302,11 +303,13 @@
     })
 
     it('should provide connack packet with connect event', function (done) {
+      var connack = version === 5 ? {reasonCode: 0} : {returnCode: 0}
       server.once('client', function (serverClient) {
-        serverClient.connack({returnCode: 0, sessionPresent: true})
-
+        connack.sessionPresent = true
+        serverClient.connack(connack)
         server.once('client', function (serverClient) {
-          serverClient.connack({returnCode: 0, sessionPresent: false})
+          connack.sessionPresent = false
+          serverClient.connack(connack)
         })
       })
 
@@ -339,7 +342,8 @@
         done(new Error('Should not emit connect'))
       })
       client.once('error', function (error) {
-        should(error.code).be.equal(2) // code for clientID identifer rejected
+        var value = version === 5 ? 128 : 2
+        should(error.code).be.equal(value) // code for clientID identifer rejected
         client.end()
         done()
       })
@@ -529,6 +533,64 @@
         setTimeout(function () {
           client.end(true, done)
         }, 10)
+      })
+    })
+
+    it('should not interrupt messages', function (done) {
+      var client = null
+      var incomingStore = new mqtt.Store({ clean: false })
+      var outgoingStore = new mqtt.Store({ clean: false })
+      var publishCount = 0
+      var server2 = new Server(function (c) {
+        c.on('connect', function () {
+          c.connack({returnCode: 0})
+        })
+        c.on('publish', function (packet) {
+          if (packet.qos !== 0) {
+            c.puback({messageId: packet.messageId})
+          }
+          switch (publishCount++) {
+            case 0:
+              packet.payload.toString().should.equal('payload1')
+              break
+            case 1:
+              packet.payload.toString().should.equal('payload2')
+              break
+            case 2:
+              packet.payload.toString().should.equal('payload3')
+              break
+            case 3:
+              packet.payload.toString().should.equal('payload4')
+              server2.close()
+              done()
+              break
+          }
+        })
+      })
+
+      server2.listen(port + 50, function () {
+        client = mqtt.connect({
+          port: port + 50,
+          host: 'localhost',
+          clean: false,
+          clientId: 'cid1',
+          reconnectPeriod: 0,
+          incomingStore: incomingStore,
+          outgoingStore: outgoingStore,
+          queueQoSZero: true
+        })
+        client.on('packetreceive', function (packet) {
+          if (packet.cmd === 'connack') {
+            setImmediate(
+              function () {
+                client.publish('test', 'payload3', {qos: 1})
+                client.publish('test', 'payload4', {qos: 0})
+              }
+            )
+          }
+        })
+        client.publish('test', 'payload1', {qos: 2})
+        client.publish('test', 'payload2', {qos: 2})
       })
     })
 
@@ -1004,7 +1066,7 @@
           return new AsyncStore()
         }
       }
-      AsyncStore.prototype.put = function (packet, cb) {
+      AsyncStore.prototype.del = function (packet, cb) {
         process.nextTick(function () {
           cb(new Error('Error'))
         })
@@ -1027,15 +1089,15 @@
     })
 
     it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
-      var putComplete = false
+      var delComplete = false
       function AsyncStore () {
         if (!(this instanceof AsyncStore)) {
           return new AsyncStore()
         }
       }
-      AsyncStore.prototype.put = function (packet, cb) {
+      AsyncStore.prototype.del = function (packet, cb) {
         process.nextTick(function () {
-          putComplete = true
+          delComplete = true
           cb(null)
         })
       }
@@ -1051,7 +1113,7 @@
         messageId: 1,
         qos: 2
       }, function () {
-        putComplete.should.equal(true)
+        delComplete.should.equal(true)
         done()
         client.end()
       })
@@ -1152,6 +1214,115 @@
           }
         })
       })
+    })
+
+    it('should keep message order', function (done) {
+      var publishCount = 0
+      var reconnect = false
+      var client = {}
+      var incomingStore = new mqtt.Store({ clean: false })
+      var outgoingStore = new mqtt.Store({ clean: false })
+      var server2 = new Server(function (c) {
+        // errors are not interesting for this test
+        // but they might happen on some platforms
+        c.on('error', function () {})
+
+        c.on('connect', function (packet) {
+          c.connack({returnCode: 0})
+        })
+        c.on('publish', function (packet) {
+          c.puback({messageId: packet.messageId})
+          if (reconnect) {
+            switch (publishCount++) {
+              case 0:
+                packet.payload.toString().should.equal('payload1')
+                break
+              case 1:
+                packet.payload.toString().should.equal('payload2')
+                break
+              case 2:
+                packet.payload.toString().should.equal('payload3')
+                server2.close()
+                done()
+                break
+            }
+          }
+        })
+      })
+
+      server2.listen(port + 50, function () {
+        client = mqtt.connect({
+          port: port + 50,
+          host: 'localhost',
+          clean: false,
+          clientId: 'cid1',
+          reconnectPeriod: 0,
+          incomingStore: incomingStore,
+          outgoingStore: outgoingStore
+        })
+
+        client.on('connect', function () {
+          if (!reconnect) {
+            client.publish('topic', 'payload1', {qos: 1})
+            client.publish('topic', 'payload2', {qos: 1})
+            client.end(true)
+          } else {
+            client.publish('topic', 'payload3', {qos: 1})
+          }
+        })
+        client.on('close', function () {
+          if (!reconnect) {
+            client.reconnect({
+              clean: false,
+              incomingStore: incomingStore,
+              outgoingStore: outgoingStore
+            })
+            reconnect = true
+          }
+        })
+      })
+    })
+
+    function testCallbackStorePutByQoS (qos, clean, expected, done) {
+      var client = connect({
+        clean: clean,
+        clientId: 'testId'
+      })
+
+      var callbacks = []
+
+      function cbStorePut () {
+        callbacks.push('storeput')
+      }
+
+      client.on('connect', function () {
+        client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) {
+          if (err) done(err)
+          callbacks.push('publish')
+          should.deepEqual(callbacks, expected)
+          done()
+        })
+        client.end()
+      })
+    }
+
+    it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) {
+      testCallbackStorePutByQoS(0, true, ['publish'], done)
+    })
+    it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) {
+      testCallbackStorePutByQoS(0, false, ['publish'], done)
+    })
+    it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) {
+      testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done)
+    })
+    it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) {
+      testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done)
+    })
+    it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) {
+      testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done)
+    })
+    it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) {
+      testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done)
     })
   })
 
@@ -1428,10 +1599,16 @@
 
       server.once('client', function (serverClient) {
         serverClient.once('subscribe', function (packet) {
-          packet.subscriptions.should.containEql({
+          var result = {
             topic: topic,
             qos: 0
-          })
+          }
+          if (version === 5) {
+            result.nl = false
+            result.rap = false
+            result.rh = 0
+          }
+          packet.subscriptions.should.containEql(result)
           done()
         })
       })
@@ -1479,7 +1656,13 @@
         serverClient.once('subscribe', function (packet) {
           // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
           var expected = subs.map(function (i) {
-            return {topic: i, qos: 0}
+            var result = {topic: i, qos: 0}
+            if (version === 5) {
+              result.nl = false
+              result.rap = false
+              result.rh = 0
+            }
+            return result
           })
 
           packet.subscriptions.should.eql(expected)
@@ -1491,8 +1674,8 @@
     it('should accept an hash of subscriptions', function (done) {
       var client = connect()
       var topics = {
-        test1: 0,
-        test2: 1
+        test1: {qos: 0},
+        test2: {qos: 1}
       }
 
       client.once('connect', function () {
@@ -1506,10 +1689,16 @@
 
           for (k in topics) {
             if (topics.hasOwnProperty(k)) {
-              expected.push({
+              var result = {
                 topic: k,
-                qos: topics[k]
-              })
+                qos: topics[k].qos
+              }
+              if (version === 5) {
+                result.nl = false
+                result.rap = false
+                result.rh = 0
+              }
+              expected.push(result)
             }
           }
 
@@ -1535,6 +1724,12 @@
             qos: 1
           }]
 
+          if (version === 5) {
+            expected[0].nl = false
+            expected[0].rap = false
+            expected[0].rh = 0
+          }
+
           packet.subscriptions.should.eql(expected)
           done()
         })
@@ -1552,10 +1747,16 @@
 
       server.once('client', function (serverClient) {
         serverClient.once('subscribe', function (packet) {
-          packet.subscriptions.should.containEql({
+          var result = {
             topic: topic,
             qos: defaultOpts.qos
-          })
+          }
+          if (version === 5) {
+            result.nl = false
+            result.rap = false
+            result.rh = 0
+          }
+          packet.subscriptions.should.containEql(result)
           done()
         })
       })
@@ -1571,7 +1772,14 @@
             done(err)
           } else {
             should.exist(granted, 'granted not given')
-            granted.should.containEql({topic: 'test', qos: 2})
+            var result = {topic: 'test', qos: 2}
+            if (version === 5) {
+              result.nl = false
+              result.rap = false
+              result.rh = 0
+              result.properties = undefined
+            }
+            granted.should.containEql(result)
             done()
           }
         })
@@ -1617,10 +1825,16 @@
 
       server.once('client', function (serverClient) {
         serverClient.once('subscribe', function (packet) {
-          packet.subscriptions.should.containEql({
+          var result = {
             topic: topic,
             qos: 0
-          })
+          }
+          if (version === 5) {
+            result.nl = false
+            result.rap = false
+            result.rh = 0
+          }
+          packet.subscriptions.should.containEql(result)
           done()
         })
       })
@@ -1847,9 +2061,76 @@
       var testTopic = 'test'
       var testMessage = 'message'
       var mid = 253
+      var publishReceived = false
+      var pubrecReceived = false
+      var pubrelReceived = false
 
       client.once('connect', function () {
         client.subscribe(testTopic, {qos: 2})
+      })
+
+      client.on('packetreceive', (packet) => {
+        switch (packet.cmd) {
+          case 'connack':
+          case 'suback':
+            // expected, but not specifically part of QOS 2 semantics
+            break
+          case 'publish':
+            pubrecReceived.should.be.false()
+            pubrelReceived.should.be.false()
+            publishReceived = true
+            break
+          case 'pubrel':
+            publishReceived.should.be.true()
+            pubrecReceived.should.be.true()
+            pubrelReceived = true
+            break
+          default:
+            should.fail()
+        }
+      })
+
+      server.once('client', function (serverClient) {
+        serverClient.once('subscribe', function () {
+          serverClient.publish({
+            topic: testTopic,
+            payload: testMessage,
+            qos: 2,
+            messageId: mid
+          })
+        })
+
+        serverClient.on('pubrec', function () {
+          publishReceived.should.be.true()
+          pubrelReceived.should.be.false()
+          pubrecReceived = true
+        })
+
+        serverClient.once('pubcomp', function () {
+          client.removeAllListeners()
+          serverClient.removeAllListeners()
+          publishReceived.should.be.true()
+          pubrecReceived.should.be.true()
+          pubrelReceived.should.be.true()
+          done()
+        })
+      })
+    })
+
+    it('should should empty the incoming store after a qos 2 handshake is completed', function (done) {
+      var client = connect()
+      var testTopic = 'test'
+      var testMessage = 'message'
+      var mid = 253
+
+      client.once('connect', function () {
+        client.subscribe(testTopic, {qos: 2})
+      })
+
+      client.on('packetreceive', (packet) => {
+        if (packet.cmd === 'pubrel') {
+          should(client.incomingStore._inflights.size).be.equal(1)
+        }
       })
 
       server.once('client', function (serverClient) {
@@ -1863,9 +2144,93 @@
         })
 
         serverClient.once('pubcomp', function () {
+          should(client.incomingStore._inflights.size).be.equal(0)
+          client.removeAllListeners()
           done()
         })
       })
+    })
+
+    function testMultiplePubrel (shouldSendPubcompFail, done) {
+      var client = connect()
+      var testTopic = 'test'
+      var testMessage = 'message'
+      var mid = 253
+      var pubcompCount = 0
+      var pubrelCount = 0
+      var handleMessageCount = 0
+      var emitMessageCount = 0
+      var origSendPacket = client._sendPacket
+      var shouldSendFail
+
+      client.handleMessage = function (packet, callback) {
+        handleMessageCount++
+        callback()
+      }
+
+      client.on('message', function () {
+        emitMessageCount++
+      })
+
+      client._sendPacket = function (packet, sendDone) {
+        shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail
+        if (sendDone) {
+          sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined)
+        }
+
+        // send the mocked response
+        switch (packet.cmd) {
+          case 'subscribe':
+            const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]}
+            client._handlePacket(suback, function (err) {
+              should(err).not.be.ok()
+            })
+            break
+          case 'pubrec':
+          case 'pubcomp':
+            // for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp
+            if (packet.cmd === 'pubcomp') {
+              pubcompCount++
+              if (pubcompCount === 2) {
+                // end the test once the client has gone through two rounds of replying to pubrel messages
+                pubrelCount.should.be.exactly(2)
+                handleMessageCount.should.be.exactly(1)
+                emitMessageCount.should.be.exactly(1)
+                client._sendPacket = origSendPacket
+                done()
+                break
+              }
+            }
+
+            // simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received
+            const pubrel = {cmd: 'pubrel', messageId: mid}
+            pubrelCount++
+            client._handlePacket(pubrel, function (err) {
+              if (shouldSendFail) {
+                should(err).be.ok()
+              } else {
+                should(err).not.be.ok()
+              }
+            })
+            break
+        }
+      }
+
+      client.once('connect', function () {
+        client.subscribe(testTopic, {qos: 2})
+        const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid}
+        client._handlePacket(publish, function (err) {
+          should(err).not.be.ok()
+        })
+      })
+    }
+
+    it('handle qos 2 messages exactly once when multiple pubrel received', function (done) {
+      testMultiplePubrel(false, done)
+    })
+
+    it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) {
+      testMultiplePubrel(true, done)
     })
   })
 
@@ -2317,6 +2682,57 @@
       })
     })
 
+    it('should clear outgoing if close from server', function (done) {
+      var reconnect = false
+      var client = {}
+      var server2 = new Server(function (c) {
+        c.on('connect', function (packet) {
+          c.connack({returnCode: 0})
+        })
+        c.on('subscribe', function (packet) {
+          if (reconnect) {
+            c.suback({
+              messageId: packet.messageId,
+              granted: packet.subscriptions.map(function (e) {
+                return e.qos
+              })
+            })
+          } else {
+            c.destroy()
+          }
+        })
+      })
+
+      server2.listen(port + 50, function () {
+        client = mqtt.connect({
+          port: port + 50,
+          host: 'localhost',
+          clean: true,
+          clientId: 'cid1',
+          reconnectPeriod: 0
+        })
+
+        client.on('connect', function () {
+          client.subscribe('test', {qos: 2}, function (e) {
+            if (!e) {
+              client.end()
+            }
+          })
+        })
+
+        client.on('close', function () {
+          if (reconnect) {
+            server2.close()
+            done()
+          } else {
+            Object.keys(client.outgoing).length.should.equal(0)
+            reconnect = true
+            client.reconnect()
+          }
+        })
+      })
+    })
+
     it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) {
       var reconnect = false
       var client = {}
@@ -2591,6 +3007,7 @@
 
     context('with alternate server client', function () {
       var cachedClientListeners
+      var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
 
       beforeEach(function () {
         cachedClientListeners = server.listeners('client')
@@ -2612,7 +3029,7 @@
         server.on('client', function (serverClient) {
           serverClient.on('connect', function () {
             connectCount++
-            serverClient.connack({returnCode: 0})
+            serverClient.connack(connack)
           })
 
           serverClient.on('subscribe', function () {
@@ -2641,7 +3058,7 @@
 
         server.on('client', function (serverClient) {
           serverClient.on('connect', function () {
-            serverClient.connack({returnCode: 0})
+            serverClient.connack(connack)
           })
 
           serverClient.on('subscribe', function () {

--
Gitblit v1.8.0