| | |
| | | var Duplex = require('readable-stream').Duplex |
| | | var Connection = require('mqtt-connection') |
| | | var Server = require('./server') |
| | | var FastServer = require('./server').FastMqttServer |
| | | var port = 9876 |
| | | var server |
| | | |
| | |
| | | /** |
| | | * Test server |
| | | */ |
| | | function buildServer () { |
| | | return new Server(function (client) { |
| | | function buildServer (fastFlag) { |
| | | var handler = function (client) { |
| | | client.on('auth', function (packet) { |
| | | var rc = 'reasonCode' |
| | | var connack = {} |
| | | connack[rc] = 0 |
| | | client.connack(connack) |
| | | }) |
| | | client.on('connect', function (packet) { |
| | | if (packet.clientId === 'invalid') { |
| | | client.connack({returnCode: 2}) |
| | | var rc = 'returnCode' |
| | | var connack = {} |
| | | if (client.options && client.options.protocolVersion === 5) { |
| | | rc = 'reasonCode' |
| | | if (packet.clientId === 'invalid') { |
| | | connack[rc] = 128 |
| | | } else { |
| | | connack[rc] = 0 |
| | | } |
| | | } else { |
| | | client.connack({returnCode: 0}) |
| | | if (packet.clientId === 'invalid') { |
| | | connack[rc] = 2 |
| | | } else { |
| | | connack[rc] = 0 |
| | | } |
| | | } |
| | | if (packet.properties && packet.properties.authenticationMethod) { |
| | | return false |
| | | } else { |
| | | client.connack(connack) |
| | | } |
| | | }) |
| | | |
| | |
| | | }) |
| | | |
| | | client.on('unsubscribe', function (packet) { |
| | | packet.granted = packet.unsubscriptions.map(function () { return 0 }) |
| | | client.unsuback(packet) |
| | | }) |
| | | |
| | | client.on('pingreq', function () { |
| | | client.pingresp() |
| | | }) |
| | | }) |
| | | } |
| | | if (fastFlag) { |
| | | return new FastServer(handler) |
| | | } else { |
| | | return new Server(handler) |
| | | } |
| | | } |
| | | |
| | | server = buildServer().listen(port) |
| | |
| | | it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) { |
| | | this.timeout(15000) |
| | | |
| | | var server2 = buildServer().listen(port + 42) |
| | | var server = buildServer(true).listen(port + 41) |
| | | var server2 = buildServer(true).listen(port + 42) |
| | | |
| | | server2.on('listening', function () { |
| | | var client = mqtt.connect({ |
| | | protocol: 'wss', |
| | | servers: [ |
| | | { port: port + 42, host: 'localhost', protocol: 'ws' }, |
| | | { port: port, host: 'localhost' } |
| | | { port: port + 41, host: 'localhost' } |
| | | ], |
| | | keepalive: 50 |
| | | }) |
| | |
| | | }) |
| | | |
| | | server.once('client', function () { |
| | | should.equal(client.stream.socket.url, 'wss://localhost:9876/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.') |
| | | should.equal(client.stream.socket.url, 'wss://localhost:9917/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.') |
| | | client.end() |
| | | done() |
| | | }) |
| | |
| | | }) |
| | | }) |
| | | }) |
| | | |
| | | it('check emit error on checkDisconnection w/o callback', function (done) { |
| | | this.timeout(15000) |
| | | var server118 = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.connack({ |
| | | reasonCode: 0 |
| | | }) |
| | | }) |
| | | client.on('publish', function (packet) { |
| | | setImmediate(function () { |
| | | packet.reasonCode = 0 |
| | | client.puback(packet) |
| | | }) |
| | | }) |
| | | }).listen(port + 118) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 118, |
| | | protocolVersion: 5 |
| | | } |
| | | var client = mqtt.connect(opts) |
| | | client.on('error', function (error) { |
| | | should(error.message).be.equal('client disconnecting') |
| | | server118.close() |
| | | done() |
| | | }) |
| | | client.on('connect', function () { |
| | | client.end(function () { |
| | | client._checkDisconnecting() |
| | | }) |
| | | server118.close() |
| | | }) |
| | | }) |
| | | |
| | | describe('MQTT 5.0', function () { |
| | | var server = buildServer().listen(port + 115) |
| | | var config = { protocol: 'mqtt', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 200 } } |
| | | abstractClientTests(server, config) |
| | | it('should has Auth method with Auth data', function (done) { |
| | | this.timeout(5000) |
| | | var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationData: Buffer.from([1, 2, 3, 4]) }} |
| | | try { |
| | | mqtt.connect(opts) |
| | | } catch (error) { |
| | | should(error.message).be.equal('Packet has no Authentication Method') |
| | | } |
| | | done() |
| | | }) |
| | | it('auth packet', function (done) { |
| | | this.timeout(15000) |
| | | server.once('client', function (client) { |
| | | client.on('auth', function (packet) { |
| | | done() |
| | | }) |
| | | }) |
| | | var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationMethod: 'json' }, authPacket: {}} |
| | | mqtt.connect(opts) |
| | | }) |
| | | it('Maximum Packet Size', function (done) { |
| | | this.timeout(15000) |
| | | var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 1 }} |
| | | var client = mqtt.connect(opts) |
| | | client.on('error', function (error) { |
| | | should(error.message).be.equal('exceeding packets size connack') |
| | | done() |
| | | }) |
| | | }) |
| | | describe('Topic Alias', function () { |
| | | it('topicAlias > topicAliasMaximum', function (done) { |
| | | this.timeout(15000) |
| | | var maximum = 15 |
| | | var current = 22 |
| | | server.once('client', function (client) { |
| | | client.on('publish', function (packet) { |
| | | if (packet.properties && packet.properties.topicAlias) { |
| | | done(new Error('Packet should not have topicAlias')) |
| | | return false |
| | | } |
| | | done() |
| | | }) |
| | | }) |
| | | var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { topicAliasMaximum: maximum }} |
| | | var client = mqtt.connect(opts) |
| | | client.publish('t/h', 'Message', { properties: { topicAlias: current } }) |
| | | }) |
| | | it('topicAlias w/o topicAliasMaximum in settings', function (done) { |
| | | this.timeout(15000) |
| | | server.once('client', function (client) { |
| | | client.on('publish', function (packet) { |
| | | if (packet.properties && packet.properties.topicAlias) { |
| | | done(new Error('Packet should not have topicAlias')) |
| | | return false |
| | | } |
| | | done() |
| | | }) |
| | | }) |
| | | var opts = {host: 'localhost', port: port + 115, protocolVersion: 5} |
| | | var client = mqtt.connect(opts) |
| | | client.publish('t/h', 'Message', { properties: { topicAlias: 22 } }) |
| | | }) |
| | | }) |
| | | it('Change values of some properties by server response', function (done) { |
| | | this.timeout(15000) |
| | | var server116 = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.connack({ |
| | | reasonCode: 0, |
| | | properties: { |
| | | topicAliasMaximum: 15, |
| | | serverKeepAlive: 16, |
| | | maximumPacketSize: 95 |
| | | } |
| | | }) |
| | | }) |
| | | }).listen(port + 116) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 116, |
| | | protocolVersion: 5, |
| | | properties: { |
| | | topicAliasMaximum: 10, |
| | | serverKeepAlive: 11, |
| | | maximumPacketSize: 100 |
| | | } |
| | | } |
| | | var client = mqtt.connect(opts) |
| | | client.on('connect', function () { |
| | | should(client.options.keepalive).be.equal(16) |
| | | should(client.options.properties.topicAliasMaximum).be.equal(15) |
| | | should(client.options.properties.maximumPacketSize).be.equal(95) |
| | | server116.close() |
| | | done() |
| | | }) |
| | | }) |
| | | |
| | | it('should resubscribe when reconnecting with protocolVersion 5 and Session Present flag is false', function (done) { |
| | | this.timeout(15000) |
| | | var tryReconnect = true |
| | | var reconnectEvent = false |
| | | var server316 = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.connack({ |
| | | reasonCode: 0, |
| | | sessionPresent: false |
| | | }) |
| | | client.on('subscribe', function () { |
| | | if (!tryReconnect) { |
| | | client.end() |
| | | server316.close() |
| | | done() |
| | | } |
| | | }) |
| | | }) |
| | | }).listen(port + 316) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 316, |
| | | protocolVersion: 5 |
| | | } |
| | | var client = mqtt.connect(opts) |
| | | |
| | | client.on('reconnect', function () { |
| | | reconnectEvent = true |
| | | }) |
| | | |
| | | client.on('connect', function (connack) { |
| | | should(connack.sessionPresent).be.equal(false) |
| | | if (tryReconnect) { |
| | | client.subscribe('hello', function () { |
| | | client.stream.end() |
| | | }) |
| | | |
| | | tryReconnect = false |
| | | } else { |
| | | reconnectEvent.should.equal(true) |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) { |
| | | this.timeout(15000) |
| | | var tryReconnect = true |
| | | var reconnectEvent = false |
| | | var server326 = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.on('subscribe', function (packet) { |
| | | if (!reconnectEvent) { |
| | | client.suback({ |
| | | messageId: packet.messageId, |
| | | granted: packet.subscriptions.map(function (e) { |
| | | return e.qos |
| | | }) |
| | | }) |
| | | } else { |
| | | if (!tryReconnect) { |
| | | should(packet.properties.userProperties.test).be.equal('test') |
| | | client.end() |
| | | server326.close() |
| | | done() |
| | | } |
| | | } |
| | | }) |
| | | client.connack({ |
| | | reasonCode: 0, |
| | | sessionPresent: false |
| | | }) |
| | | }) |
| | | }).listen(port + 326) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 326, |
| | | protocolVersion: 5 |
| | | } |
| | | var client = mqtt.connect(opts) |
| | | |
| | | client.on('reconnect', function () { |
| | | reconnectEvent = true |
| | | }) |
| | | |
| | | client.on('connect', function (connack) { |
| | | should(connack.sessionPresent).be.equal(false) |
| | | if (tryReconnect) { |
| | | client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () { |
| | | client.stream.end() |
| | | }) |
| | | |
| | | tryReconnect = false |
| | | } else { |
| | | reconnectEvent.should.equal(true) |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | var serverErr = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.connack({ |
| | | reasonCode: 0 |
| | | }) |
| | | }) |
| | | client.on('publish', function (packet) { |
| | | setImmediate(function () { |
| | | switch (packet.qos) { |
| | | case 0: |
| | | break |
| | | case 1: |
| | | packet.reasonCode = 142 |
| | | delete packet.cmd |
| | | client.puback(packet) |
| | | break |
| | | case 2: |
| | | packet.reasonCode = 142 |
| | | delete packet.cmd |
| | | client.pubrec(packet) |
| | | break |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | client.on('pubrel', function (packet) { |
| | | packet.reasonCode = 142 |
| | | delete packet.cmd |
| | | client.pubcomp(packet) |
| | | }) |
| | | }) |
| | | it('Subscribe properties', function (done) { |
| | | this.timeout(15000) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 119, |
| | | protocolVersion: 5 |
| | | } |
| | | var subOptions = { properties: { subscriptionIdentifier: 1234 } } |
| | | var server119 = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.connack({ |
| | | reasonCode: 0 |
| | | }) |
| | | }) |
| | | client.on('subscribe', function (packet) { |
| | | should(packet.properties.subscriptionIdentifier).be.equal(subOptions.properties.subscriptionIdentifier) |
| | | server119.close() |
| | | done() |
| | | }) |
| | | }).listen(port + 119) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.on('connect', function () { |
| | | client.subscribe('a/b', subOptions) |
| | | }) |
| | | }) |
| | | |
| | | it('puback handling errors check', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5 |
| | | } |
| | | var client = mqtt.connect(opts) |
| | | client.once('connect', () => { |
| | | client.publish('a/b', 'message', {qos: 1}, function (err, packet) { |
| | | should(err.message).be.equal('Publish error: Session taken over') |
| | | should(err.code).be.equal(142) |
| | | }) |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | }) |
| | | it('pubrec handling errors check', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 118) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 118, |
| | | protocolVersion: 5 |
| | | } |
| | | var client = mqtt.connect(opts) |
| | | client.once('connect', () => { |
| | | client.publish('a/b', 'message', {qos: 2}, function (err, packet) { |
| | | should(err.message).be.equal('Publish error: Session taken over') |
| | | should(err.code).be.equal(142) |
| | | }) |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | }) |
| | | it('puback handling custom reason code', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5, |
| | | customHandleAcks: function (topic, message, packet, cb) { |
| | | var code = 0 |
| | | if (topic === 'a/b') { |
| | | code = 128 |
| | | } |
| | | cb(code) |
| | | } |
| | | } |
| | | |
| | | serverErr.once('client', function (c) { |
| | | c.once('subscribe', function () { |
| | | c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 }) |
| | | }) |
| | | |
| | | c.on('puback', function (packet) { |
| | | should(packet.reasonCode).be.equal(128) |
| | | client.end() |
| | | c.destroy() |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | }) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.once('connect', function () { |
| | | client.subscribe('a/b', {qos: 1}) |
| | | }) |
| | | }) |
| | | it('server side disconnect', function (done) { |
| | | this.timeout(15000) |
| | | var server327 = new Server(function (client) { |
| | | client.on('connect', function (packet) { |
| | | client.connack({ |
| | | reasonCode: 0 |
| | | }) |
| | | client.disconnect({reasonCode: 128}) |
| | | server327.close() |
| | | }) |
| | | }) |
| | | server327.listen(port + 327) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 327, |
| | | protocolVersion: 5 |
| | | } |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.once('disconnect', function (disconnectPacket) { |
| | | should(disconnectPacket.reasonCode).be.equal(128) |
| | | done() |
| | | }) |
| | | }) |
| | | it('pubrec handling custom reason code', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5, |
| | | customHandleAcks: function (topic, message, packet, cb) { |
| | | var code = 0 |
| | | if (topic === 'a/b') { |
| | | code = 128 |
| | | } |
| | | cb(code) |
| | | } |
| | | } |
| | | |
| | | serverErr.once('client', function (c) { |
| | | c.once('subscribe', function () { |
| | | c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 }) |
| | | }) |
| | | |
| | | c.on('pubrec', function (packet) { |
| | | should(packet.reasonCode).be.equal(128) |
| | | client.end() |
| | | c.destroy() |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | }) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.once('connect', function () { |
| | | client.subscribe('a/b', {qos: 1}) |
| | | }) |
| | | }) |
| | | it('puback handling custom reason code with error', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5, |
| | | customHandleAcks: function (topic, message, packet, cb) { |
| | | var code = 0 |
| | | if (topic === 'a/b') { |
| | | cb(new Error('a/b is not valid')) |
| | | } |
| | | cb(code) |
| | | } |
| | | } |
| | | |
| | | serverErr.once('client', function (c) { |
| | | c.once('subscribe', function () { |
| | | c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 }) |
| | | }) |
| | | }) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.on('error', function (error) { |
| | | should(error.message).be.equal('a/b is not valid') |
| | | client.end() |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | client.once('connect', function () { |
| | | client.subscribe('a/b', {qos: 1}) |
| | | }) |
| | | }) |
| | | it('pubrec handling custom reason code with error', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5, |
| | | customHandleAcks: function (topic, message, packet, cb) { |
| | | var code = 0 |
| | | if (topic === 'a/b') { |
| | | cb(new Error('a/b is not valid')) |
| | | } |
| | | cb(code) |
| | | } |
| | | } |
| | | |
| | | serverErr.once('client', function (c) { |
| | | c.once('subscribe', function () { |
| | | c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 }) |
| | | }) |
| | | }) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.on('error', function (error) { |
| | | should(error.message).be.equal('a/b is not valid') |
| | | client.end() |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | client.once('connect', function () { |
| | | client.subscribe('a/b', {qos: 1}) |
| | | }) |
| | | }) |
| | | it('puback handling custom invalid reason code', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5, |
| | | customHandleAcks: function (topic, message, packet, cb) { |
| | | var code = 0 |
| | | if (topic === 'a/b') { |
| | | code = 124124 |
| | | } |
| | | cb(code) |
| | | } |
| | | } |
| | | |
| | | serverErr.once('client', function (c) { |
| | | c.once('subscribe', function () { |
| | | c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 }) |
| | | }) |
| | | }) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.on('error', function (error) { |
| | | should(error.message).be.equal('Wrong reason code for puback') |
| | | client.end() |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | client.once('connect', function () { |
| | | client.subscribe('a/b', {qos: 1}) |
| | | }) |
| | | }) |
| | | it('pubrec handling custom invalid reason code', function (done) { |
| | | this.timeout(15000) |
| | | serverErr.listen(port + 117) |
| | | var opts = { |
| | | host: 'localhost', |
| | | port: port + 117, |
| | | protocolVersion: 5, |
| | | customHandleAcks: function (topic, message, packet, cb) { |
| | | var code = 0 |
| | | if (topic === 'a/b') { |
| | | code = 34535 |
| | | } |
| | | cb(code) |
| | | } |
| | | } |
| | | |
| | | serverErr.once('client', function (c) { |
| | | c.once('subscribe', function () { |
| | | c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 }) |
| | | }) |
| | | }) |
| | | |
| | | var client = mqtt.connect(opts) |
| | | client.on('error', function (error) { |
| | | should(error.message).be.equal('Wrong reason code for pubrec') |
| | | client.end() |
| | | serverErr.close() |
| | | done() |
| | | }) |
| | | client.once('connect', function () { |
| | | client.subscribe('a/b', {qos: 1}) |
| | | }) |
| | | }) |
| | | }) |
| | | }) |