| | |
| | | var port = 9876 |
| | | |
| | | module.exports = function (server, config) { |
| | | var version = config.protocolVersion || 4 |
| | | function connect (opts) { |
| | | opts = xtend(config, opts) |
| | | return mqtt.connect(opts) |
| | |
| | | }) |
| | | |
| | | it('should provide connack packet with connect event', function (done) { |
| | | var connack = version === 5 ? {reasonCode: 0} : {returnCode: 0} |
| | | server.once('client', function (serverClient) { |
| | | serverClient.connack({returnCode: 0, sessionPresent: true}) |
| | | |
| | | connack.sessionPresent = true |
| | | serverClient.connack(connack) |
| | | server.once('client', function (serverClient) { |
| | | serverClient.connack({returnCode: 0, sessionPresent: false}) |
| | | connack.sessionPresent = false |
| | | serverClient.connack(connack) |
| | | }) |
| | | }) |
| | | |
| | |
| | | done(new Error('Should not emit connect')) |
| | | }) |
| | | client.once('error', function (error) { |
| | | should(error.code).be.equal(2) // code for clientID identifer rejected |
| | | var value = version === 5 ? 128 : 2 |
| | | should(error.code).be.equal(value) // code for clientID identifer rejected |
| | | client.end() |
| | | done() |
| | | }) |
| | |
| | | setTimeout(function () { |
| | | client.end(true, done) |
| | | }, 10) |
| | | }) |
| | | }) |
| | | |
| | | it('should not interrupt messages', function (done) { |
| | | var client = null |
| | | var incomingStore = new mqtt.Store({ clean: false }) |
| | | var outgoingStore = new mqtt.Store({ clean: false }) |
| | | var publishCount = 0 |
| | | var server2 = new Server(function (c) { |
| | | c.on('connect', function () { |
| | | c.connack({returnCode: 0}) |
| | | }) |
| | | c.on('publish', function (packet) { |
| | | if (packet.qos !== 0) { |
| | | c.puback({messageId: packet.messageId}) |
| | | } |
| | | switch (publishCount++) { |
| | | case 0: |
| | | packet.payload.toString().should.equal('payload1') |
| | | break |
| | | case 1: |
| | | packet.payload.toString().should.equal('payload2') |
| | | break |
| | | case 2: |
| | | packet.payload.toString().should.equal('payload3') |
| | | break |
| | | case 3: |
| | | packet.payload.toString().should.equal('payload4') |
| | | server2.close() |
| | | done() |
| | | break |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | server2.listen(port + 50, function () { |
| | | client = mqtt.connect({ |
| | | port: port + 50, |
| | | host: 'localhost', |
| | | clean: false, |
| | | clientId: 'cid1', |
| | | reconnectPeriod: 0, |
| | | incomingStore: incomingStore, |
| | | outgoingStore: outgoingStore, |
| | | queueQoSZero: true |
| | | }) |
| | | client.on('packetreceive', function (packet) { |
| | | if (packet.cmd === 'connack') { |
| | | setImmediate( |
| | | function () { |
| | | client.publish('test', 'payload3', {qos: 1}) |
| | | client.publish('test', 'payload4', {qos: 0}) |
| | | } |
| | | ) |
| | | } |
| | | }) |
| | | client.publish('test', 'payload1', {qos: 2}) |
| | | client.publish('test', 'payload2', {qos: 2}) |
| | | }) |
| | | }) |
| | | |
| | |
| | | return new AsyncStore() |
| | | } |
| | | } |
| | | AsyncStore.prototype.put = function (packet, cb) { |
| | | AsyncStore.prototype.del = function (packet, cb) { |
| | | process.nextTick(function () { |
| | | cb(new Error('Error')) |
| | | }) |
| | |
| | | }) |
| | | |
| | | it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) { |
| | | var putComplete = false |
| | | var delComplete = false |
| | | function AsyncStore () { |
| | | if (!(this instanceof AsyncStore)) { |
| | | return new AsyncStore() |
| | | } |
| | | } |
| | | AsyncStore.prototype.put = function (packet, cb) { |
| | | AsyncStore.prototype.del = function (packet, cb) { |
| | | process.nextTick(function () { |
| | | putComplete = true |
| | | delComplete = true |
| | | cb(null) |
| | | }) |
| | | } |
| | |
| | | messageId: 1, |
| | | qos: 2 |
| | | }, function () { |
| | | putComplete.should.equal(true) |
| | | delComplete.should.equal(true) |
| | | done() |
| | | client.end() |
| | | }) |
| | |
| | | } |
| | | }) |
| | | }) |
| | | }) |
| | | |
| | | it('should keep message order', function (done) { |
| | | var publishCount = 0 |
| | | var reconnect = false |
| | | var client = {} |
| | | var incomingStore = new mqtt.Store({ clean: false }) |
| | | var outgoingStore = new mqtt.Store({ clean: false }) |
| | | var server2 = new Server(function (c) { |
| | | // errors are not interesting for this test |
| | | // but they might happen on some platforms |
| | | c.on('error', function () {}) |
| | | |
| | | c.on('connect', function (packet) { |
| | | c.connack({returnCode: 0}) |
| | | }) |
| | | c.on('publish', function (packet) { |
| | | c.puback({messageId: packet.messageId}) |
| | | if (reconnect) { |
| | | switch (publishCount++) { |
| | | case 0: |
| | | packet.payload.toString().should.equal('payload1') |
| | | break |
| | | case 1: |
| | | packet.payload.toString().should.equal('payload2') |
| | | break |
| | | case 2: |
| | | packet.payload.toString().should.equal('payload3') |
| | | server2.close() |
| | | done() |
| | | break |
| | | } |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | server2.listen(port + 50, function () { |
| | | client = mqtt.connect({ |
| | | port: port + 50, |
| | | host: 'localhost', |
| | | clean: false, |
| | | clientId: 'cid1', |
| | | reconnectPeriod: 0, |
| | | incomingStore: incomingStore, |
| | | outgoingStore: outgoingStore |
| | | }) |
| | | |
| | | client.on('connect', function () { |
| | | if (!reconnect) { |
| | | client.publish('topic', 'payload1', {qos: 1}) |
| | | client.publish('topic', 'payload2', {qos: 1}) |
| | | client.end(true) |
| | | } else { |
| | | client.publish('topic', 'payload3', {qos: 1}) |
| | | } |
| | | }) |
| | | client.on('close', function () { |
| | | if (!reconnect) { |
| | | client.reconnect({ |
| | | clean: false, |
| | | incomingStore: incomingStore, |
| | | outgoingStore: outgoingStore |
| | | }) |
| | | reconnect = true |
| | | } |
| | | }) |
| | | }) |
| | | }) |
| | | |
| | | function testCallbackStorePutByQoS (qos, clean, expected, done) { |
| | | var client = connect({ |
| | | clean: clean, |
| | | clientId: 'testId' |
| | | }) |
| | | |
| | | var callbacks = [] |
| | | |
| | | function cbStorePut () { |
| | | callbacks.push('storeput') |
| | | } |
| | | |
| | | client.on('connect', function () { |
| | | client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) { |
| | | if (err) done(err) |
| | | callbacks.push('publish') |
| | | should.deepEqual(callbacks, expected) |
| | | done() |
| | | }) |
| | | client.end() |
| | | }) |
| | | } |
| | | |
| | | it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) { |
| | | testCallbackStorePutByQoS(0, true, ['publish'], done) |
| | | }) |
| | | it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) { |
| | | testCallbackStorePutByQoS(0, false, ['publish'], done) |
| | | }) |
| | | it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) { |
| | | testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done) |
| | | }) |
| | | it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) { |
| | | testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done) |
| | | }) |
| | | it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) { |
| | | testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done) |
| | | }) |
| | | it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) { |
| | | testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done) |
| | | }) |
| | | }) |
| | | |
| | |
| | | |
| | | server.once('client', function (serverClient) { |
| | | serverClient.once('subscribe', function (packet) { |
| | | packet.subscriptions.should.containEql({ |
| | | var result = { |
| | | topic: topic, |
| | | qos: 0 |
| | | }) |
| | | } |
| | | if (version === 5) { |
| | | result.nl = false |
| | | result.rap = false |
| | | result.rh = 0 |
| | | } |
| | | packet.subscriptions.should.containEql(result) |
| | | done() |
| | | }) |
| | | }) |
| | |
| | | serverClient.once('subscribe', function (packet) { |
| | | // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}] |
| | | var expected = subs.map(function (i) { |
| | | return {topic: i, qos: 0} |
| | | var result = {topic: i, qos: 0} |
| | | if (version === 5) { |
| | | result.nl = false |
| | | result.rap = false |
| | | result.rh = 0 |
| | | } |
| | | return result |
| | | }) |
| | | |
| | | packet.subscriptions.should.eql(expected) |
| | |
| | | it('should accept an hash of subscriptions', function (done) { |
| | | var client = connect() |
| | | var topics = { |
| | | test1: 0, |
| | | test2: 1 |
| | | test1: {qos: 0}, |
| | | test2: {qos: 1} |
| | | } |
| | | |
| | | client.once('connect', function () { |
| | |
| | | |
| | | for (k in topics) { |
| | | if (topics.hasOwnProperty(k)) { |
| | | expected.push({ |
| | | var result = { |
| | | topic: k, |
| | | qos: topics[k] |
| | | }) |
| | | qos: topics[k].qos |
| | | } |
| | | if (version === 5) { |
| | | result.nl = false |
| | | result.rap = false |
| | | result.rh = 0 |
| | | } |
| | | expected.push(result) |
| | | } |
| | | } |
| | | |
| | |
| | | qos: 1 |
| | | }] |
| | | |
| | | if (version === 5) { |
| | | expected[0].nl = false |
| | | expected[0].rap = false |
| | | expected[0].rh = 0 |
| | | } |
| | | |
| | | packet.subscriptions.should.eql(expected) |
| | | done() |
| | | }) |
| | |
| | | |
| | | server.once('client', function (serverClient) { |
| | | serverClient.once('subscribe', function (packet) { |
| | | packet.subscriptions.should.containEql({ |
| | | var result = { |
| | | topic: topic, |
| | | qos: defaultOpts.qos |
| | | }) |
| | | } |
| | | if (version === 5) { |
| | | result.nl = false |
| | | result.rap = false |
| | | result.rh = 0 |
| | | } |
| | | packet.subscriptions.should.containEql(result) |
| | | done() |
| | | }) |
| | | }) |
| | |
| | | done(err) |
| | | } else { |
| | | should.exist(granted, 'granted not given') |
| | | granted.should.containEql({topic: 'test', qos: 2}) |
| | | var result = {topic: 'test', qos: 2} |
| | | if (version === 5) { |
| | | result.nl = false |
| | | result.rap = false |
| | | result.rh = 0 |
| | | result.properties = undefined |
| | | } |
| | | granted.should.containEql(result) |
| | | done() |
| | | } |
| | | }) |
| | |
| | | |
| | | server.once('client', function (serverClient) { |
| | | serverClient.once('subscribe', function (packet) { |
| | | packet.subscriptions.should.containEql({ |
| | | var result = { |
| | | topic: topic, |
| | | qos: 0 |
| | | }) |
| | | } |
| | | if (version === 5) { |
| | | result.nl = false |
| | | result.rap = false |
| | | result.rh = 0 |
| | | } |
| | | packet.subscriptions.should.containEql(result) |
| | | done() |
| | | }) |
| | | }) |
| | |
| | | var testTopic = 'test' |
| | | var testMessage = 'message' |
| | | var mid = 253 |
| | | var publishReceived = false |
| | | var pubrecReceived = false |
| | | var pubrelReceived = false |
| | | |
| | | client.once('connect', function () { |
| | | client.subscribe(testTopic, {qos: 2}) |
| | | }) |
| | | |
| | | client.on('packetreceive', (packet) => { |
| | | switch (packet.cmd) { |
| | | case 'connack': |
| | | case 'suback': |
| | | // expected, but not specifically part of QOS 2 semantics |
| | | break |
| | | case 'publish': |
| | | pubrecReceived.should.be.false() |
| | | pubrelReceived.should.be.false() |
| | | publishReceived = true |
| | | break |
| | | case 'pubrel': |
| | | publishReceived.should.be.true() |
| | | pubrecReceived.should.be.true() |
| | | pubrelReceived = true |
| | | break |
| | | default: |
| | | should.fail() |
| | | } |
| | | }) |
| | | |
| | | server.once('client', function (serverClient) { |
| | | serverClient.once('subscribe', function () { |
| | | serverClient.publish({ |
| | | topic: testTopic, |
| | | payload: testMessage, |
| | | qos: 2, |
| | | messageId: mid |
| | | }) |
| | | }) |
| | | |
| | | serverClient.on('pubrec', function () { |
| | | publishReceived.should.be.true() |
| | | pubrelReceived.should.be.false() |
| | | pubrecReceived = true |
| | | }) |
| | | |
| | | serverClient.once('pubcomp', function () { |
| | | client.removeAllListeners() |
| | | serverClient.removeAllListeners() |
| | | publishReceived.should.be.true() |
| | | pubrecReceived.should.be.true() |
| | | pubrelReceived.should.be.true() |
| | | done() |
| | | }) |
| | | }) |
| | | }) |
| | | |
| | | it('should should empty the incoming store after a qos 2 handshake is completed', function (done) { |
| | | var client = connect() |
| | | var testTopic = 'test' |
| | | var testMessage = 'message' |
| | | var mid = 253 |
| | | |
| | | client.once('connect', function () { |
| | | client.subscribe(testTopic, {qos: 2}) |
| | | }) |
| | | |
| | | client.on('packetreceive', (packet) => { |
| | | if (packet.cmd === 'pubrel') { |
| | | should(client.incomingStore._inflights.size).be.equal(1) |
| | | } |
| | | }) |
| | | |
| | | server.once('client', function (serverClient) { |
| | |
| | | }) |
| | | |
| | | serverClient.once('pubcomp', function () { |
| | | should(client.incomingStore._inflights.size).be.equal(0) |
| | | client.removeAllListeners() |
| | | done() |
| | | }) |
| | | }) |
| | | }) |
| | | |
| | | function testMultiplePubrel (shouldSendPubcompFail, done) { |
| | | var client = connect() |
| | | var testTopic = 'test' |
| | | var testMessage = 'message' |
| | | var mid = 253 |
| | | var pubcompCount = 0 |
| | | var pubrelCount = 0 |
| | | var handleMessageCount = 0 |
| | | var emitMessageCount = 0 |
| | | var origSendPacket = client._sendPacket |
| | | var shouldSendFail |
| | | |
| | | client.handleMessage = function (packet, callback) { |
| | | handleMessageCount++ |
| | | callback() |
| | | } |
| | | |
| | | client.on('message', function () { |
| | | emitMessageCount++ |
| | | }) |
| | | |
| | | client._sendPacket = function (packet, sendDone) { |
| | | shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail |
| | | if (sendDone) { |
| | | sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined) |
| | | } |
| | | |
| | | // send the mocked response |
| | | switch (packet.cmd) { |
| | | case 'subscribe': |
| | | const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]} |
| | | client._handlePacket(suback, function (err) { |
| | | should(err).not.be.ok() |
| | | }) |
| | | break |
| | | case 'pubrec': |
| | | case 'pubcomp': |
| | | // for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp |
| | | if (packet.cmd === 'pubcomp') { |
| | | pubcompCount++ |
| | | if (pubcompCount === 2) { |
| | | // end the test once the client has gone through two rounds of replying to pubrel messages |
| | | pubrelCount.should.be.exactly(2) |
| | | handleMessageCount.should.be.exactly(1) |
| | | emitMessageCount.should.be.exactly(1) |
| | | client._sendPacket = origSendPacket |
| | | done() |
| | | break |
| | | } |
| | | } |
| | | |
| | | // simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received |
| | | const pubrel = {cmd: 'pubrel', messageId: mid} |
| | | pubrelCount++ |
| | | client._handlePacket(pubrel, function (err) { |
| | | if (shouldSendFail) { |
| | | should(err).be.ok() |
| | | } else { |
| | | should(err).not.be.ok() |
| | | } |
| | | }) |
| | | break |
| | | } |
| | | } |
| | | |
| | | client.once('connect', function () { |
| | | client.subscribe(testTopic, {qos: 2}) |
| | | const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid} |
| | | client._handlePacket(publish, function (err) { |
| | | should(err).not.be.ok() |
| | | }) |
| | | }) |
| | | } |
| | | |
| | | it('handle qos 2 messages exactly once when multiple pubrel received', function (done) { |
| | | testMultiplePubrel(false, done) |
| | | }) |
| | | |
| | | it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) { |
| | | testMultiplePubrel(true, done) |
| | | }) |
| | | }) |
| | | |
| | |
| | | }) |
| | | }) |
| | | |
| | | it('should clear outgoing if close from server', function (done) { |
| | | var reconnect = false |
| | | var client = {} |
| | | var server2 = new Server(function (c) { |
| | | c.on('connect', function (packet) { |
| | | c.connack({returnCode: 0}) |
| | | }) |
| | | c.on('subscribe', function (packet) { |
| | | if (reconnect) { |
| | | c.suback({ |
| | | messageId: packet.messageId, |
| | | granted: packet.subscriptions.map(function (e) { |
| | | return e.qos |
| | | }) |
| | | }) |
| | | } else { |
| | | c.destroy() |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | server2.listen(port + 50, function () { |
| | | client = mqtt.connect({ |
| | | port: port + 50, |
| | | host: 'localhost', |
| | | clean: true, |
| | | clientId: 'cid1', |
| | | reconnectPeriod: 0 |
| | | }) |
| | | |
| | | client.on('connect', function () { |
| | | client.subscribe('test', {qos: 2}, function (e) { |
| | | if (!e) { |
| | | client.end() |
| | | } |
| | | }) |
| | | }) |
| | | |
| | | client.on('close', function () { |
| | | if (reconnect) { |
| | | server2.close() |
| | | done() |
| | | } else { |
| | | Object.keys(client.outgoing).length.should.equal(0) |
| | | reconnect = true |
| | | client.reconnect() |
| | | } |
| | | }) |
| | | }) |
| | | }) |
| | | |
| | | it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) { |
| | | var reconnect = false |
| | | var client = {} |
| | |
| | | |
| | | context('with alternate server client', function () { |
| | | var cachedClientListeners |
| | | var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 } |
| | | |
| | | beforeEach(function () { |
| | | cachedClientListeners = server.listeners('client') |
| | |
| | | server.on('client', function (serverClient) { |
| | | serverClient.on('connect', function () { |
| | | connectCount++ |
| | | serverClient.connack({returnCode: 0}) |
| | | serverClient.connack(connack) |
| | | }) |
| | | |
| | | serverClient.on('subscribe', function () { |
| | |
| | | |
| | | server.on('client', function (serverClient) { |
| | | serverClient.on('connect', function () { |
| | | serverClient.connack({returnCode: 0}) |
| | | serverClient.connack(connack) |
| | | }) |
| | | |
| | | serverClient.on('subscribe', function () { |