From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001 From: heyujie <516346543@qq.com> Date: 星期一, 24 五月 2021 11:25:04 +0800 Subject: [PATCH] 解决app通信问题 --- node_modules/mqtt/test/abstract_client.js | 465 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 441 insertions(+), 24 deletions(-) diff --git a/node_modules/mqtt/test/abstract_client.js b/node_modules/mqtt/test/abstract_client.js index 87c0b98..017a2e3 100644 --- a/node_modules/mqtt/test/abstract_client.js +++ b/node_modules/mqtt/test/abstract_client.js @@ -12,6 +12,7 @@ var port = 9876 module.exports = function (server, config) { + var version = config.protocolVersion || 4 function connect (opts) { opts = xtend(config, opts) return mqtt.connect(opts) @@ -302,11 +303,13 @@ }) it('should provide connack packet with connect event', function (done) { + var connack = version === 5 ? {reasonCode: 0} : {returnCode: 0} server.once('client', function (serverClient) { - serverClient.connack({returnCode: 0, sessionPresent: true}) - + connack.sessionPresent = true + serverClient.connack(connack) server.once('client', function (serverClient) { - serverClient.connack({returnCode: 0, sessionPresent: false}) + connack.sessionPresent = false + serverClient.connack(connack) }) }) @@ -339,7 +342,8 @@ done(new Error('Should not emit connect')) }) client.once('error', function (error) { - should(error.code).be.equal(2) // code for clientID identifer rejected + var value = version === 5 ? 128 : 2 + should(error.code).be.equal(value) // code for clientID identifer rejected client.end() done() }) @@ -529,6 +533,64 @@ setTimeout(function () { client.end(true, done) }, 10) + }) + }) + + it('should not interrupt messages', function (done) { + var client = null + var incomingStore = new mqtt.Store({ clean: false }) + var outgoingStore = new mqtt.Store({ clean: false }) + var publishCount = 0 + var server2 = new Server(function (c) { + c.on('connect', function () { + c.connack({returnCode: 0}) + }) + c.on('publish', function (packet) { + if (packet.qos !== 0) { + c.puback({messageId: packet.messageId}) + } + switch (publishCount++) { + case 0: + packet.payload.toString().should.equal('payload1') + break + case 1: + packet.payload.toString().should.equal('payload2') + break + case 2: + packet.payload.toString().should.equal('payload3') + break + case 3: + packet.payload.toString().should.equal('payload4') + server2.close() + done() + break + } + }) + }) + + server2.listen(port + 50, function () { + client = mqtt.connect({ + port: port + 50, + host: 'localhost', + clean: false, + clientId: 'cid1', + reconnectPeriod: 0, + incomingStore: incomingStore, + outgoingStore: outgoingStore, + queueQoSZero: true + }) + client.on('packetreceive', function (packet) { + if (packet.cmd === 'connack') { + setImmediate( + function () { + client.publish('test', 'payload3', {qos: 1}) + client.publish('test', 'payload4', {qos: 0}) + } + ) + } + }) + client.publish('test', 'payload1', {qos: 2}) + client.publish('test', 'payload2', {qos: 2}) }) }) @@ -1004,7 +1066,7 @@ return new AsyncStore() } } - AsyncStore.prototype.put = function (packet, cb) { + AsyncStore.prototype.del = function (packet, cb) { process.nextTick(function () { cb(new Error('Error')) }) @@ -1027,15 +1089,15 @@ }) it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) { - var putComplete = false + var delComplete = false function AsyncStore () { if (!(this instanceof AsyncStore)) { return new AsyncStore() } } - AsyncStore.prototype.put = function (packet, cb) { + AsyncStore.prototype.del = function (packet, cb) { process.nextTick(function () { - putComplete = true + delComplete = true cb(null) }) } @@ -1051,7 +1113,7 @@ messageId: 1, qos: 2 }, function () { - putComplete.should.equal(true) + delComplete.should.equal(true) done() client.end() }) @@ -1152,6 +1214,115 @@ } }) }) + }) + + it('should keep message order', function (done) { + var publishCount = 0 + var reconnect = false + var client = {} + var incomingStore = new mqtt.Store({ clean: false }) + var outgoingStore = new mqtt.Store({ clean: false }) + var server2 = new Server(function (c) { + // errors are not interesting for this test + // but they might happen on some platforms + c.on('error', function () {}) + + c.on('connect', function (packet) { + c.connack({returnCode: 0}) + }) + c.on('publish', function (packet) { + c.puback({messageId: packet.messageId}) + if (reconnect) { + switch (publishCount++) { + case 0: + packet.payload.toString().should.equal('payload1') + break + case 1: + packet.payload.toString().should.equal('payload2') + break + case 2: + packet.payload.toString().should.equal('payload3') + server2.close() + done() + break + } + } + }) + }) + + server2.listen(port + 50, function () { + client = mqtt.connect({ + port: port + 50, + host: 'localhost', + clean: false, + clientId: 'cid1', + reconnectPeriod: 0, + incomingStore: incomingStore, + outgoingStore: outgoingStore + }) + + client.on('connect', function () { + if (!reconnect) { + client.publish('topic', 'payload1', {qos: 1}) + client.publish('topic', 'payload2', {qos: 1}) + client.end(true) + } else { + client.publish('topic', 'payload3', {qos: 1}) + } + }) + client.on('close', function () { + if (!reconnect) { + client.reconnect({ + clean: false, + incomingStore: incomingStore, + outgoingStore: outgoingStore + }) + reconnect = true + } + }) + }) + }) + + function testCallbackStorePutByQoS (qos, clean, expected, done) { + var client = connect({ + clean: clean, + clientId: 'testId' + }) + + var callbacks = [] + + function cbStorePut () { + callbacks.push('storeput') + } + + client.on('connect', function () { + client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) { + if (err) done(err) + callbacks.push('publish') + should.deepEqual(callbacks, expected) + done() + }) + client.end() + }) + } + + it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) { + testCallbackStorePutByQoS(0, true, ['publish'], done) + }) + it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) { + testCallbackStorePutByQoS(0, false, ['publish'], done) + }) + it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) { + testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done) + }) + it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) { + testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done) + }) + it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) { + testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done) + }) + it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) { + testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done) }) }) @@ -1428,10 +1599,16 @@ server.once('client', function (serverClient) { serverClient.once('subscribe', function (packet) { - packet.subscriptions.should.containEql({ + var result = { topic: topic, qos: 0 - }) + } + if (version === 5) { + result.nl = false + result.rap = false + result.rh = 0 + } + packet.subscriptions.should.containEql(result) done() }) }) @@ -1479,7 +1656,13 @@ serverClient.once('subscribe', function (packet) { // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}] var expected = subs.map(function (i) { - return {topic: i, qos: 0} + var result = {topic: i, qos: 0} + if (version === 5) { + result.nl = false + result.rap = false + result.rh = 0 + } + return result }) packet.subscriptions.should.eql(expected) @@ -1491,8 +1674,8 @@ it('should accept an hash of subscriptions', function (done) { var client = connect() var topics = { - test1: 0, - test2: 1 + test1: {qos: 0}, + test2: {qos: 1} } client.once('connect', function () { @@ -1506,10 +1689,16 @@ for (k in topics) { if (topics.hasOwnProperty(k)) { - expected.push({ + var result = { topic: k, - qos: topics[k] - }) + qos: topics[k].qos + } + if (version === 5) { + result.nl = false + result.rap = false + result.rh = 0 + } + expected.push(result) } } @@ -1535,6 +1724,12 @@ qos: 1 }] + if (version === 5) { + expected[0].nl = false + expected[0].rap = false + expected[0].rh = 0 + } + packet.subscriptions.should.eql(expected) done() }) @@ -1552,10 +1747,16 @@ server.once('client', function (serverClient) { serverClient.once('subscribe', function (packet) { - packet.subscriptions.should.containEql({ + var result = { topic: topic, qos: defaultOpts.qos - }) + } + if (version === 5) { + result.nl = false + result.rap = false + result.rh = 0 + } + packet.subscriptions.should.containEql(result) done() }) }) @@ -1571,7 +1772,14 @@ done(err) } else { should.exist(granted, 'granted not given') - granted.should.containEql({topic: 'test', qos: 2}) + var result = {topic: 'test', qos: 2} + if (version === 5) { + result.nl = false + result.rap = false + result.rh = 0 + result.properties = undefined + } + granted.should.containEql(result) done() } }) @@ -1617,10 +1825,16 @@ server.once('client', function (serverClient) { serverClient.once('subscribe', function (packet) { - packet.subscriptions.should.containEql({ + var result = { topic: topic, qos: 0 - }) + } + if (version === 5) { + result.nl = false + result.rap = false + result.rh = 0 + } + packet.subscriptions.should.containEql(result) done() }) }) @@ -1847,9 +2061,76 @@ var testTopic = 'test' var testMessage = 'message' var mid = 253 + var publishReceived = false + var pubrecReceived = false + var pubrelReceived = false client.once('connect', function () { client.subscribe(testTopic, {qos: 2}) + }) + + client.on('packetreceive', (packet) => { + switch (packet.cmd) { + case 'connack': + case 'suback': + // expected, but not specifically part of QOS 2 semantics + break + case 'publish': + pubrecReceived.should.be.false() + pubrelReceived.should.be.false() + publishReceived = true + break + case 'pubrel': + publishReceived.should.be.true() + pubrecReceived.should.be.true() + pubrelReceived = true + break + default: + should.fail() + } + }) + + server.once('client', function (serverClient) { + serverClient.once('subscribe', function () { + serverClient.publish({ + topic: testTopic, + payload: testMessage, + qos: 2, + messageId: mid + }) + }) + + serverClient.on('pubrec', function () { + publishReceived.should.be.true() + pubrelReceived.should.be.false() + pubrecReceived = true + }) + + serverClient.once('pubcomp', function () { + client.removeAllListeners() + serverClient.removeAllListeners() + publishReceived.should.be.true() + pubrecReceived.should.be.true() + pubrelReceived.should.be.true() + done() + }) + }) + }) + + it('should should empty the incoming store after a qos 2 handshake is completed', function (done) { + var client = connect() + var testTopic = 'test' + var testMessage = 'message' + var mid = 253 + + client.once('connect', function () { + client.subscribe(testTopic, {qos: 2}) + }) + + client.on('packetreceive', (packet) => { + if (packet.cmd === 'pubrel') { + should(client.incomingStore._inflights.size).be.equal(1) + } }) server.once('client', function (serverClient) { @@ -1863,9 +2144,93 @@ }) serverClient.once('pubcomp', function () { + should(client.incomingStore._inflights.size).be.equal(0) + client.removeAllListeners() done() }) }) + }) + + function testMultiplePubrel (shouldSendPubcompFail, done) { + var client = connect() + var testTopic = 'test' + var testMessage = 'message' + var mid = 253 + var pubcompCount = 0 + var pubrelCount = 0 + var handleMessageCount = 0 + var emitMessageCount = 0 + var origSendPacket = client._sendPacket + var shouldSendFail + + client.handleMessage = function (packet, callback) { + handleMessageCount++ + callback() + } + + client.on('message', function () { + emitMessageCount++ + }) + + client._sendPacket = function (packet, sendDone) { + shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail + if (sendDone) { + sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined) + } + + // send the mocked response + switch (packet.cmd) { + case 'subscribe': + const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]} + client._handlePacket(suback, function (err) { + should(err).not.be.ok() + }) + break + case 'pubrec': + case 'pubcomp': + // for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp + if (packet.cmd === 'pubcomp') { + pubcompCount++ + if (pubcompCount === 2) { + // end the test once the client has gone through two rounds of replying to pubrel messages + pubrelCount.should.be.exactly(2) + handleMessageCount.should.be.exactly(1) + emitMessageCount.should.be.exactly(1) + client._sendPacket = origSendPacket + done() + break + } + } + + // simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received + const pubrel = {cmd: 'pubrel', messageId: mid} + pubrelCount++ + client._handlePacket(pubrel, function (err) { + if (shouldSendFail) { + should(err).be.ok() + } else { + should(err).not.be.ok() + } + }) + break + } + } + + client.once('connect', function () { + client.subscribe(testTopic, {qos: 2}) + const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid} + client._handlePacket(publish, function (err) { + should(err).not.be.ok() + }) + }) + } + + it('handle qos 2 messages exactly once when multiple pubrel received', function (done) { + testMultiplePubrel(false, done) + }) + + it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) { + testMultiplePubrel(true, done) }) }) @@ -2317,6 +2682,57 @@ }) }) + it('should clear outgoing if close from server', function (done) { + var reconnect = false + var client = {} + var server2 = new Server(function (c) { + c.on('connect', function (packet) { + c.connack({returnCode: 0}) + }) + c.on('subscribe', function (packet) { + if (reconnect) { + c.suback({ + messageId: packet.messageId, + granted: packet.subscriptions.map(function (e) { + return e.qos + }) + }) + } else { + c.destroy() + } + }) + }) + + server2.listen(port + 50, function () { + client = mqtt.connect({ + port: port + 50, + host: 'localhost', + clean: true, + clientId: 'cid1', + reconnectPeriod: 0 + }) + + client.on('connect', function () { + client.subscribe('test', {qos: 2}, function (e) { + if (!e) { + client.end() + } + }) + }) + + client.on('close', function () { + if (reconnect) { + server2.close() + done() + } else { + Object.keys(client.outgoing).length.should.equal(0) + reconnect = true + client.reconnect() + } + }) + }) + }) + it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) { var reconnect = false var client = {} @@ -2591,6 +3007,7 @@ context('with alternate server client', function () { var cachedClientListeners + var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 } beforeEach(function () { cachedClientListeners = server.listeners('client') @@ -2612,7 +3029,7 @@ server.on('client', function (serverClient) { serverClient.on('connect', function () { connectCount++ - serverClient.connack({returnCode: 0}) + serverClient.connack(connack) }) serverClient.on('subscribe', function () { @@ -2641,7 +3058,7 @@ server.on('client', function (serverClient) { serverClient.on('connect', function () { - serverClient.connack({returnCode: 0}) + serverClient.connack(connack) }) serverClient.on('subscribe', function () { -- Gitblit v1.8.0