From 4885600ecc369aa2e30a65de8dd7a410f13c34df Mon Sep 17 00:00:00 2001 From: heyujie <516346543@qq.com> Date: 星期一, 24 五月 2021 11:25:04 +0800 Subject: [PATCH] 解决app通信问题 --- node_modules/mqtt/test/client.js | 603 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 594 insertions(+), 9 deletions(-) diff --git a/node_modules/mqtt/test/client.js b/node_modules/mqtt/test/client.js index ff4e8e6..008d037 100644 --- a/node_modules/mqtt/test/client.js +++ b/node_modules/mqtt/test/client.js @@ -12,6 +12,7 @@ var Duplex = require('readable-stream').Duplex var Connection = require('mqtt-connection') var Server = require('./server') +var FastServer = require('./server').FastMqttServer var port = 9876 var server @@ -26,13 +27,35 @@ /** * Test server */ -function buildServer () { - return new Server(function (client) { +function buildServer (fastFlag) { + var handler = function (client) { + client.on('auth', function (packet) { + var rc = 'reasonCode' + var connack = {} + connack[rc] = 0 + client.connack(connack) + }) client.on('connect', function (packet) { - if (packet.clientId === 'invalid') { - client.connack({returnCode: 2}) + var rc = 'returnCode' + var connack = {} + if (client.options && client.options.protocolVersion === 5) { + rc = 'reasonCode' + if (packet.clientId === 'invalid') { + connack[rc] = 128 + } else { + connack[rc] = 0 + } } else { - client.connack({returnCode: 0}) + if (packet.clientId === 'invalid') { + connack[rc] = 2 + } else { + connack[rc] = 0 + } + } + if (packet.properties && packet.properties.authenticationMethod) { + return false + } else { + client.connack(connack) } }) @@ -73,13 +96,19 @@ }) client.on('unsubscribe', function (packet) { + packet.granted = packet.unsubscriptions.map(function () { return 0 }) client.unsuback(packet) }) client.on('pingreq', function () { client.pingresp() }) - }) + } + if (fastFlag) { + return new FastServer(handler) + } else { + return new Server(handler) + } } server = buildServer().listen(port) @@ -263,14 +292,15 @@ it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) { this.timeout(15000) - var server2 = buildServer().listen(port + 42) + var server = buildServer(true).listen(port + 41) + var server2 = buildServer(true).listen(port + 42) server2.on('listening', function () { var client = mqtt.connect({ protocol: 'wss', servers: [ { port: port + 42, host: 'localhost', protocol: 'ws' }, - { port: port, host: 'localhost' } + { port: port + 41, host: 'localhost' } ], keepalive: 50 }) @@ -281,7 +311,7 @@ }) server.once('client', function () { - should.equal(client.stream.socket.url, 'wss://localhost:9876/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.') + should.equal(client.stream.socket.url, 'wss://localhost:9917/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.') client.end() done() }) @@ -541,4 +571,559 @@ }) }) }) + + it('check emit error on checkDisconnection w/o callback', function (done) { + this.timeout(15000) + var server118 = new Server(function (client) { + client.on('connect', function (packet) { + client.connack({ + reasonCode: 0 + }) + }) + client.on('publish', function (packet) { + setImmediate(function () { + packet.reasonCode = 0 + client.puback(packet) + }) + }) + }).listen(port + 118) + var opts = { + host: 'localhost', + port: port + 118, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + client.on('error', function (error) { + should(error.message).be.equal('client disconnecting') + server118.close() + done() + }) + client.on('connect', function () { + client.end(function () { + client._checkDisconnecting() + }) + server118.close() + }) + }) + + describe('MQTT 5.0', function () { + var server = buildServer().listen(port + 115) + var config = { protocol: 'mqtt', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 200 } } + abstractClientTests(server, config) + it('should has Auth method with Auth data', function (done) { + this.timeout(5000) + var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationData: Buffer.from([1, 2, 3, 4]) }} + try { + mqtt.connect(opts) + } catch (error) { + should(error.message).be.equal('Packet has no Authentication Method') + } + done() + }) + it('auth packet', function (done) { + this.timeout(15000) + server.once('client', function (client) { + client.on('auth', function (packet) { + done() + }) + }) + var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationMethod: 'json' }, authPacket: {}} + mqtt.connect(opts) + }) + it('Maximum Packet Size', function (done) { + this.timeout(15000) + var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 1 }} + var client = mqtt.connect(opts) + client.on('error', function (error) { + should(error.message).be.equal('exceeding packets size connack') + done() + }) + }) + describe('Topic Alias', function () { + it('topicAlias > topicAliasMaximum', function (done) { + this.timeout(15000) + var maximum = 15 + var current = 22 + server.once('client', function (client) { + client.on('publish', function (packet) { + if (packet.properties && packet.properties.topicAlias) { + done(new Error('Packet should not have topicAlias')) + return false + } + done() + }) + }) + var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { topicAliasMaximum: maximum }} + var client = mqtt.connect(opts) + client.publish('t/h', 'Message', { properties: { topicAlias: current } }) + }) + it('topicAlias w/o topicAliasMaximum in settings', function (done) { + this.timeout(15000) + server.once('client', function (client) { + client.on('publish', function (packet) { + if (packet.properties && packet.properties.topicAlias) { + done(new Error('Packet should not have topicAlias')) + return false + } + done() + }) + }) + var opts = {host: 'localhost', port: port + 115, protocolVersion: 5} + var client = mqtt.connect(opts) + client.publish('t/h', 'Message', { properties: { topicAlias: 22 } }) + }) + }) + it('Change values of some properties by server response', function (done) { + this.timeout(15000) + var server116 = new Server(function (client) { + client.on('connect', function (packet) { + client.connack({ + reasonCode: 0, + properties: { + topicAliasMaximum: 15, + serverKeepAlive: 16, + maximumPacketSize: 95 + } + }) + }) + }).listen(port + 116) + var opts = { + host: 'localhost', + port: port + 116, + protocolVersion: 5, + properties: { + topicAliasMaximum: 10, + serverKeepAlive: 11, + maximumPacketSize: 100 + } + } + var client = mqtt.connect(opts) + client.on('connect', function () { + should(client.options.keepalive).be.equal(16) + should(client.options.properties.topicAliasMaximum).be.equal(15) + should(client.options.properties.maximumPacketSize).be.equal(95) + server116.close() + done() + }) + }) + + it('should resubscribe when reconnecting with protocolVersion 5 and Session Present flag is false', function (done) { + this.timeout(15000) + var tryReconnect = true + var reconnectEvent = false + var server316 = new Server(function (client) { + client.on('connect', function (packet) { + client.connack({ + reasonCode: 0, + sessionPresent: false + }) + client.on('subscribe', function () { + if (!tryReconnect) { + client.end() + server316.close() + done() + } + }) + }) + }).listen(port + 316) + var opts = { + host: 'localhost', + port: port + 316, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + + client.on('reconnect', function () { + reconnectEvent = true + }) + + client.on('connect', function (connack) { + should(connack.sessionPresent).be.equal(false) + if (tryReconnect) { + client.subscribe('hello', function () { + client.stream.end() + }) + + tryReconnect = false + } else { + reconnectEvent.should.equal(true) + } + }) + }) + + it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) { + this.timeout(15000) + var tryReconnect = true + var reconnectEvent = false + var server326 = new Server(function (client) { + client.on('connect', function (packet) { + client.on('subscribe', function (packet) { + if (!reconnectEvent) { + client.suback({ + messageId: packet.messageId, + granted: packet.subscriptions.map(function (e) { + return e.qos + }) + }) + } else { + if (!tryReconnect) { + should(packet.properties.userProperties.test).be.equal('test') + client.end() + server326.close() + done() + } + } + }) + client.connack({ + reasonCode: 0, + sessionPresent: false + }) + }) + }).listen(port + 326) + var opts = { + host: 'localhost', + port: port + 326, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + + client.on('reconnect', function () { + reconnectEvent = true + }) + + client.on('connect', function (connack) { + should(connack.sessionPresent).be.equal(false) + if (tryReconnect) { + client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () { + client.stream.end() + }) + + tryReconnect = false + } else { + reconnectEvent.should.equal(true) + } + }) + }) + + var serverErr = new Server(function (client) { + client.on('connect', function (packet) { + client.connack({ + reasonCode: 0 + }) + }) + client.on('publish', function (packet) { + setImmediate(function () { + switch (packet.qos) { + case 0: + break + case 1: + packet.reasonCode = 142 + delete packet.cmd + client.puback(packet) + break + case 2: + packet.reasonCode = 142 + delete packet.cmd + client.pubrec(packet) + break + } + }) + }) + + client.on('pubrel', function (packet) { + packet.reasonCode = 142 + delete packet.cmd + client.pubcomp(packet) + }) + }) + it('Subscribe properties', function (done) { + this.timeout(15000) + var opts = { + host: 'localhost', + port: port + 119, + protocolVersion: 5 + } + var subOptions = { properties: { subscriptionIdentifier: 1234 } } + var server119 = new Server(function (client) { + client.on('connect', function (packet) { + client.connack({ + reasonCode: 0 + }) + }) + client.on('subscribe', function (packet) { + should(packet.properties.subscriptionIdentifier).be.equal(subOptions.properties.subscriptionIdentifier) + server119.close() + done() + }) + }).listen(port + 119) + + var client = mqtt.connect(opts) + client.on('connect', function () { + client.subscribe('a/b', subOptions) + }) + }) + + it('puback handling errors check', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + client.once('connect', () => { + client.publish('a/b', 'message', {qos: 1}, function (err, packet) { + should(err.message).be.equal('Publish error: Session taken over') + should(err.code).be.equal(142) + }) + serverErr.close() + done() + }) + }) + it('pubrec handling errors check', function (done) { + this.timeout(15000) + serverErr.listen(port + 118) + var opts = { + host: 'localhost', + port: port + 118, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + client.once('connect', () => { + client.publish('a/b', 'message', {qos: 2}, function (err, packet) { + should(err.message).be.equal('Publish error: Session taken over') + should(err.code).be.equal(142) + }) + serverErr.close() + done() + }) + }) + it('puback handling custom reason code', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5, + customHandleAcks: function (topic, message, packet, cb) { + var code = 0 + if (topic === 'a/b') { + code = 128 + } + cb(code) + } + } + + serverErr.once('client', function (c) { + c.once('subscribe', function () { + c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 }) + }) + + c.on('puback', function (packet) { + should(packet.reasonCode).be.equal(128) + client.end() + c.destroy() + serverErr.close() + done() + }) + }) + + var client = mqtt.connect(opts) + client.once('connect', function () { + client.subscribe('a/b', {qos: 1}) + }) + }) + it('server side disconnect', function (done) { + this.timeout(15000) + var server327 = new Server(function (client) { + client.on('connect', function (packet) { + client.connack({ + reasonCode: 0 + }) + client.disconnect({reasonCode: 128}) + server327.close() + }) + }) + server327.listen(port + 327) + var opts = { + host: 'localhost', + port: port + 327, + protocolVersion: 5 + } + + var client = mqtt.connect(opts) + client.once('disconnect', function (disconnectPacket) { + should(disconnectPacket.reasonCode).be.equal(128) + done() + }) + }) + it('pubrec handling custom reason code', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5, + customHandleAcks: function (topic, message, packet, cb) { + var code = 0 + if (topic === 'a/b') { + code = 128 + } + cb(code) + } + } + + serverErr.once('client', function (c) { + c.once('subscribe', function () { + c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 }) + }) + + c.on('pubrec', function (packet) { + should(packet.reasonCode).be.equal(128) + client.end() + c.destroy() + serverErr.close() + done() + }) + }) + + var client = mqtt.connect(opts) + client.once('connect', function () { + client.subscribe('a/b', {qos: 1}) + }) + }) + it('puback handling custom reason code with error', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5, + customHandleAcks: function (topic, message, packet, cb) { + var code = 0 + if (topic === 'a/b') { + cb(new Error('a/b is not valid')) + } + cb(code) + } + } + + serverErr.once('client', function (c) { + c.once('subscribe', function () { + c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 }) + }) + }) + + var client = mqtt.connect(opts) + client.on('error', function (error) { + should(error.message).be.equal('a/b is not valid') + client.end() + serverErr.close() + done() + }) + client.once('connect', function () { + client.subscribe('a/b', {qos: 1}) + }) + }) + it('pubrec handling custom reason code with error', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5, + customHandleAcks: function (topic, message, packet, cb) { + var code = 0 + if (topic === 'a/b') { + cb(new Error('a/b is not valid')) + } + cb(code) + } + } + + serverErr.once('client', function (c) { + c.once('subscribe', function () { + c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 }) + }) + }) + + var client = mqtt.connect(opts) + client.on('error', function (error) { + should(error.message).be.equal('a/b is not valid') + client.end() + serverErr.close() + done() + }) + client.once('connect', function () { + client.subscribe('a/b', {qos: 1}) + }) + }) + it('puback handling custom invalid reason code', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5, + customHandleAcks: function (topic, message, packet, cb) { + var code = 0 + if (topic === 'a/b') { + code = 124124 + } + cb(code) + } + } + + serverErr.once('client', function (c) { + c.once('subscribe', function () { + c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 }) + }) + }) + + var client = mqtt.connect(opts) + client.on('error', function (error) { + should(error.message).be.equal('Wrong reason code for puback') + client.end() + serverErr.close() + done() + }) + client.once('connect', function () { + client.subscribe('a/b', {qos: 1}) + }) + }) + it('pubrec handling custom invalid reason code', function (done) { + this.timeout(15000) + serverErr.listen(port + 117) + var opts = { + host: 'localhost', + port: port + 117, + protocolVersion: 5, + customHandleAcks: function (topic, message, packet, cb) { + var code = 0 + if (topic === 'a/b') { + code = 34535 + } + cb(code) + } + } + + serverErr.once('client', function (c) { + c.once('subscribe', function () { + c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 }) + }) + }) + + var client = mqtt.connect(opts) + client.on('error', function (error) { + should(error.message).be.equal('Wrong reason code for pubrec') + client.end() + serverErr.close() + done() + }) + client.once('connect', function () { + client.subscribe('a/b', {qos: 1}) + }) + }) + }) }) -- Gitblit v1.8.0