| | |
| | | */ |
| | | var events = require('events') |
| | | var Store = require('./store') |
| | | var eos = require('end-of-stream') |
| | | var mqttPacket = require('mqtt-packet') |
| | | var Writable = require('readable-stream').Writable |
| | | var inherits = require('inherits') |
| | |
| | | clean: true, |
| | | resubscribe: true |
| | | } |
| | | var errors = { |
| | | 0: '', |
| | | 1: 'Unacceptable protocol version', |
| | | 2: 'Identifier rejected', |
| | | 3: 'Server unavailable', |
| | | 4: 'Bad username or password', |
| | | 5: 'Not authorized', |
| | | 16: 'No matching subscribers', |
| | | 17: 'No subscription existed', |
| | | 128: 'Unspecified error', |
| | | 129: 'Malformed Packet', |
| | | 130: 'Protocol Error', |
| | | 131: 'Implementation specific error', |
| | | 132: 'Unsupported Protocol Version', |
| | | 133: 'Client Identifier not valid', |
| | | 134: 'Bad User Name or Password', |
| | | 135: 'Not authorized', |
| | | 136: 'Server unavailable', |
| | | 137: 'Server busy', |
| | | 138: 'Banned', |
| | | 139: 'Server shutting down', |
| | | 140: 'Bad authentication method', |
| | | 141: 'Keep Alive timeout', |
| | | 142: 'Session taken over', |
| | | 143: 'Topic Filter invalid', |
| | | 144: 'Topic Name invalid', |
| | | 145: 'Packet identifier in use', |
| | | 146: 'Packet Identifier not found', |
| | | 147: 'Receive Maximum exceeded', |
| | | 148: 'Topic Alias invalid', |
| | | 149: 'Packet too large', |
| | | 150: 'Message rate too high', |
| | | 151: 'Quota exceeded', |
| | | 152: 'Administrative action', |
| | | 153: 'Payload format invalid', |
| | | 154: 'Retain not supported', |
| | | 155: 'QoS not supported', |
| | | 156: 'Use another server', |
| | | 157: 'Server moved', |
| | | 158: 'Shared Subscriptions not supported', |
| | | 159: 'Connection rate exceeded', |
| | | 160: 'Maximum connect time', |
| | | 161: 'Subscription Identifiers not supported', |
| | | 162: 'Wildcard Subscriptions not supported' |
| | | } |
| | | |
| | | function defaultId () { |
| | | return 'mqttjs_' + Math.random().toString(16).substr(2, 8) |
| | |
| | | function sendPacket (client, packet, cb) { |
| | | client.emit('packetsend', packet) |
| | | |
| | | var result = mqttPacket.writeToStream(packet, client.stream) |
| | | var result = mqttPacket.writeToStream(packet, client.stream, client.options) |
| | | |
| | | if (!result && cb) { |
| | | client.stream.once('drain', cb) |
| | |
| | | function flush (queue) { |
| | | if (queue) { |
| | | Object.keys(queue).forEach(function (messageId) { |
| | | if (typeof queue[messageId] === 'function') { |
| | | queue[messageId](new Error('Connection closed')) |
| | | if (typeof queue[messageId].cb === 'function') { |
| | | queue[messageId].cb(new Error('Connection closed')) |
| | | delete queue[messageId] |
| | | } |
| | | }) |
| | | } |
| | | } |
| | | |
| | | function storeAndSend (client, packet, cb) { |
| | | function flushVolatile (queue) { |
| | | if (queue) { |
| | | Object.keys(queue).forEach(function (messageId) { |
| | | if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { |
| | | queue[messageId].cb(new Error('Connection closed')) |
| | | delete queue[messageId] |
| | | } |
| | | }) |
| | | } |
| | | } |
| | | |
| | | function storeAndSend (client, packet, cb, cbStorePut) { |
| | | client.outgoingStore.put(packet, function storedPacket (err) { |
| | | if (err) { |
| | | return cb && cb(err) |
| | | } |
| | | cbStorePut() |
| | | sendPacket(client, packet, cb) |
| | | }) |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId() |
| | | this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() |
| | | |
| | | this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } |
| | | |
| | | this.streamBuilder = streamBuilder |
| | | |
| | | // Inflight message storages |
| | | this.outgoingStore = this.options.outgoingStore || new Store() |
| | | this.incomingStore = this.options.incomingStore || new Store() |
| | | this.outgoingStore = options.outgoingStore || new Store() |
| | | this.incomingStore = options.incomingStore || new Store() |
| | | |
| | | // Should QoS zero messages be queued when the connection is broken? |
| | | this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero |
| | | this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero |
| | | |
| | | // map of subscribed topics to support reconnection |
| | | this._resubscribeTopics = {} |
| | |
| | | this.connackTimer = null |
| | | // Reconnect timer |
| | | this.reconnectTimer = null |
| | | // Is processing store? |
| | | this._storeProcessing = false |
| | | // Packet Ids are put into the store during store processing |
| | | this._packetIdsDuringStoreProcessing = {} |
| | | /** |
| | | * MessageIDs starting with 1 |
| | | * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 |
| | |
| | | // Inflight callbacks |
| | | this.outgoing = {} |
| | | |
| | | // Mark connected on connect |
| | | this.on('connect', function () { |
| | | if (this.disconnected) { |
| | | return |
| | | } |
| | | |
| | | this.connected = true |
| | | var outStore = this.outgoingStore.createStream() |
| | | |
| | | this.once('close', remove) |
| | | outStore.on('end', function () { |
| | | that.removeListener('close', remove) |
| | | }) |
| | | outStore.on('error', function (err) { |
| | | that.removeListener('close', remove) |
| | | that.emit('error', err) |
| | | }) |
| | | |
| | | function remove () { |
| | | outStore.destroy() |
| | | outStore = null |
| | | } |
| | | |
| | | function storeDeliver () { |
| | | // edge case, we wrapped this twice |
| | | if (!outStore) { |
| | | return |
| | | } |
| | | |
| | | var packet = outStore.read(1) |
| | | var cb |
| | | |
| | | if (!packet) { |
| | | // read when data is available in the future |
| | | outStore.once('readable', storeDeliver) |
| | | return |
| | | } |
| | | |
| | | // Avoid unnecessary stream read operations when disconnected |
| | | if (!that.disconnecting && !that.reconnectTimer) { |
| | | cb = that.outgoing[packet.messageId] |
| | | that.outgoing[packet.messageId] = function (err, status) { |
| | | // Ensure that the original callback passed in to publish gets invoked |
| | | if (cb) { |
| | | cb(err, status) |
| | | } |
| | | |
| | | storeDeliver() |
| | | } |
| | | that._sendPacket(packet) |
| | | } else if (outStore.destroy) { |
| | | outStore.destroy() |
| | | } |
| | | } |
| | | |
| | | // start flowing |
| | | storeDeliver() |
| | | }) |
| | | // True if connection is first time. |
| | | this._firstConnection = true |
| | | |
| | | // Mark disconnected on stream close |
| | | this.on('close', function () { |
| | | this.connected = false |
| | | clearTimeout(this.connackTimer) |
| | | }) |
| | | |
| | | // Setup ping timer |
| | | this.on('connect', this._setupPingTimer) |
| | | |
| | | // Send queued packets |
| | | this.on('connect', function () { |
| | |
| | | } |
| | | |
| | | deliver() |
| | | }) |
| | | |
| | | var firstConnection = true |
| | | // resubscribe |
| | | this.on('connect', function () { |
| | | if (!firstConnection && |
| | | this.options.clean && |
| | | Object.keys(this._resubscribeTopics).length > 0) { |
| | | if (this.options.resubscribe) { |
| | | this._resubscribeTopics.resubscribe = true |
| | | this.subscribe(this._resubscribeTopics) |
| | | } else { |
| | | this._resubscribeTopics = {} |
| | | } |
| | | } |
| | | |
| | | firstConnection = false |
| | | }) |
| | | |
| | | // Clear ping timer |
| | |
| | | }) |
| | | |
| | | function nextTickWork () { |
| | | if (packets.length) { |
| | | process.nextTick(work) |
| | | } else { |
| | | var done = completeParse |
| | | completeParse = null |
| | | done() |
| | | } |
| | | } |
| | | |
| | | function work () { |
| | | var packet = packets.shift() |
| | | var done = completeParse |
| | | |
| | | if (packet) { |
| | | that._handlePacket(packet, nextTickWork) |
| | | } else { |
| | | var done = completeParse |
| | | completeParse = null |
| | | done() |
| | | if (done) done() |
| | | } |
| | | } |
| | | |
| | |
| | | this.stream.on('error', nop) |
| | | |
| | | // Echo stream close |
| | | eos(this.stream, this.emit.bind(this, 'close')) |
| | | this.stream.on('close', function () { |
| | | flushVolatile(that.outgoing) |
| | | that.emit('close') |
| | | }) |
| | | |
| | | // Send a connect packet |
| | | connectPacket = Object.create(this.options) |
| | |
| | | |
| | | // Echo connection errors |
| | | parser.on('error', this.emit.bind(this, 'error')) |
| | | |
| | | // auth |
| | | if (this.options.properties) { |
| | | if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { |
| | | this.emit('error', new Error('Packet has no Authentication Method')) |
| | | return this |
| | | } |
| | | if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { |
| | | var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket) |
| | | sendPacket(this, authPacket) |
| | | } |
| | | } |
| | | |
| | | // many drain listeners are needed for qos 1 callbacks if the connection is intermittent |
| | | this.stream.setMaxListeners(1000) |
| | |
| | | } |
| | | |
| | | MqttClient.prototype._handlePacket = function (packet, done) { |
| | | var options = this.options |
| | | |
| | | if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { |
| | | this.emit('error', new Error('exceeding packets size ' + packet.cmd)) |
| | | this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }}) |
| | | return this |
| | | } |
| | | |
| | | this.emit('packetreceive', packet) |
| | | |
| | | switch (packet.cmd) { |
| | |
| | | break |
| | | case 'pingresp': |
| | | this._handlePingresp(packet) |
| | | done() |
| | | break |
| | | case 'disconnect': |
| | | this._handleDisconnect(packet) |
| | | done() |
| | | break |
| | | default: |
| | |
| | | * {Number} qos - qos level to publish on |
| | | * {Boolean} retain - whether or not to retain the message |
| | | * {Boolean} dup - whether or not mark a message as duplicate |
| | | * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` |
| | | * @param {Function} [callback] - function(err){} |
| | | * called when publish succeeds or fails |
| | | * @returns {MqttClient} this - for chaining |
| | |
| | | */ |
| | | MqttClient.prototype.publish = function (topic, message, opts, callback) { |
| | | var packet |
| | | var options = this.options |
| | | |
| | | // .publish(topic, payload, cb); |
| | | if (typeof opts === 'function') { |
| | |
| | | dup: opts.dup |
| | | } |
| | | |
| | | if (options.protocolVersion === 5) { |
| | | packet.properties = opts.properties |
| | | if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && |
| | | ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || |
| | | (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { |
| | | /* |
| | | if we are don`t setup topic alias or |
| | | topic alias maximum less than topic alias or |
| | | server don`t give topic alias maximum, |
| | | we are removing topic alias from packet |
| | | */ |
| | | delete packet.properties.topicAlias |
| | | } |
| | | } |
| | | |
| | | switch (opts.qos) { |
| | | case 1: |
| | | case 2: |
| | | |
| | | // Add to callbacks |
| | | this.outgoing[packet.messageId] = callback || nop |
| | | this._sendPacket(packet) |
| | | this.outgoing[packet.messageId] = { |
| | | volatile: false, |
| | | cb: callback || nop |
| | | } |
| | | if (this._storeProcessing) { |
| | | this._packetIdsDuringStoreProcessing[packet.messageId] = false |
| | | this._storePacket(packet, undefined, opts.cbStorePut) |
| | | } else { |
| | | this._sendPacket(packet, undefined, opts.cbStorePut) |
| | | } |
| | | break |
| | | default: |
| | | this._sendPacket(packet, callback) |
| | | if (this._storeProcessing) { |
| | | this._storePacket(packet, callback, opts.cbStorePut) |
| | | } else { |
| | | this._sendPacket(packet, callback, opts.cbStorePut) |
| | | } |
| | | break |
| | | } |
| | | |
| | |
| | | * @api public |
| | | * @example client.subscribe('topic'); |
| | | * @example client.subscribe('topic', {qos: 1}); |
| | | * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log); |
| | | * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); |
| | | * @example client.subscribe('topic', console.log); |
| | | */ |
| | | MqttClient.prototype.subscribe = function () { |
| | | var packet |
| | | var args = Array.prototype.slice.call(arguments) |
| | | var args = new Array(arguments.length) |
| | | for (var i = 0; i < arguments.length; i++) { |
| | | args[i] = arguments[i] |
| | | } |
| | | var subs = [] |
| | | var obj = args.shift() |
| | | var resubscribe = obj.resubscribe |
| | |
| | | var opts = args.pop() |
| | | var invalidTopic |
| | | var that = this |
| | | var version = this.options.protocolVersion |
| | | |
| | | delete obj.resubscribe |
| | | |
| | |
| | | return this |
| | | } |
| | | |
| | | var defaultOpts = { qos: 0 } |
| | | var defaultOpts = { |
| | | qos: 0 |
| | | } |
| | | if (version === 5) { |
| | | defaultOpts.nl = false |
| | | defaultOpts.rap = false |
| | | defaultOpts.rh = 0 |
| | | } |
| | | opts = xtend(defaultOpts, opts) |
| | | |
| | | if (Array.isArray(obj)) { |
| | | obj.forEach(function (topic) { |
| | | if (that._resubscribeTopics[topic] < opts.qos || |
| | | !that._resubscribeTopics.hasOwnProperty(topic) || |
| | | if (!that._resubscribeTopics.hasOwnProperty(topic) || |
| | | that._resubscribeTopics[topic].qos < opts.qos || |
| | | resubscribe) { |
| | | subs.push({ |
| | | var currentOpts = { |
| | | topic: topic, |
| | | qos: opts.qos |
| | | }) |
| | | } |
| | | if (version === 5) { |
| | | currentOpts.nl = opts.nl |
| | | currentOpts.rap = opts.rap |
| | | currentOpts.rh = opts.rh |
| | | currentOpts.properties = opts.properties |
| | | } |
| | | subs.push(currentOpts) |
| | | } |
| | | }) |
| | | } else { |
| | | Object |
| | | .keys(obj) |
| | | .forEach(function (k) { |
| | | if (that._resubscribeTopics[k] < obj[k] || |
| | | !that._resubscribeTopics.hasOwnProperty(k) || |
| | | if (!that._resubscribeTopics.hasOwnProperty(k) || |
| | | that._resubscribeTopics[k].qos < obj[k].qos || |
| | | resubscribe) { |
| | | subs.push({ |
| | | var currentOpts = { |
| | | topic: k, |
| | | qos: obj[k] |
| | | }) |
| | | qos: obj[k].qos |
| | | } |
| | | if (version === 5) { |
| | | currentOpts.nl = obj[k].nl |
| | | currentOpts.rap = obj[k].rap |
| | | currentOpts.rh = obj[k].rh |
| | | currentOpts.properties = opts.properties |
| | | } |
| | | subs.push(currentOpts) |
| | | } |
| | | }) |
| | | } |
| | |
| | | messageId: this._nextId() |
| | | } |
| | | |
| | | if (opts.properties) { |
| | | packet.properties = opts.properties |
| | | } |
| | | |
| | | if (!subs.length) { |
| | | callback(null, []) |
| | | return |
| | |
| | | var topics = [] |
| | | subs.forEach(function (sub) { |
| | | if (that.options.reconnectPeriod > 0) { |
| | | that._resubscribeTopics[sub.topic] = sub.qos |
| | | var topic = { qos: sub.qos } |
| | | if (version === 5) { |
| | | topic.nl = sub.nl || false |
| | | topic.rap = sub.rap || false |
| | | topic.rh = sub.rh || 0 |
| | | topic.properties = sub.properties |
| | | } |
| | | that._resubscribeTopics[sub.topic] = topic |
| | | topics.push(sub.topic) |
| | | } |
| | | }) |
| | | that.messageIdToTopic[packet.messageId] = topics |
| | | } |
| | | |
| | | this.outgoing[packet.messageId] = function (err, packet) { |
| | | this.outgoing[packet.messageId] = { |
| | | volatile: true, |
| | | cb: function (err, packet) { |
| | | if (!err) { |
| | | var granted = packet.granted |
| | | for (var i = 0; i < granted.length; i += 1) { |
| | |
| | | } |
| | | |
| | | callback(err, subs) |
| | | } |
| | | } |
| | | |
| | | this._sendPacket(packet) |
| | |
| | | * unsubscribe - unsubscribe from topic(s) |
| | | * |
| | | * @param {String, Array} topic - topics to unsubscribe from |
| | | * @param {Object} [opts] - optional subscription options, includes: |
| | | * {Object} properties - properties of unsubscribe packet |
| | | * @param {Function} [callback] - callback fired on unsuback |
| | | * @returns {MqttClient} this - for chaining |
| | | * @api public |
| | | * @example client.unsubscribe('topic'); |
| | | * @example client.unsubscribe('topic', console.log); |
| | | */ |
| | | MqttClient.prototype.unsubscribe = function (topic, callback) { |
| | | MqttClient.prototype.unsubscribe = function () { |
| | | var packet = { |
| | | cmd: 'unsubscribe', |
| | | qos: 1, |
| | | messageId: this._nextId() |
| | | } |
| | | var that = this |
| | | var args = new Array(arguments.length) |
| | | for (var i = 0; i < arguments.length; i++) { |
| | | args[i] = arguments[i] |
| | | } |
| | | var topic = args.shift() |
| | | var callback = args.pop() || nop |
| | | var opts = args.pop() |
| | | |
| | | callback = callback || nop |
| | | if (typeof topic === 'string') { |
| | | topic = [topic] |
| | | } |
| | | |
| | | if (typeof callback !== 'function') { |
| | | opts = callback |
| | | callback = nop |
| | | } |
| | | |
| | | if (this._checkDisconnecting(callback)) { |
| | | return this |
| | |
| | | }) |
| | | } |
| | | |
| | | this.outgoing[packet.messageId] = callback |
| | | if (typeof opts === 'object' && opts.properties) { |
| | | packet.properties = opts.properties |
| | | } |
| | | |
| | | this.outgoing[packet.messageId] = { |
| | | volatile: true, |
| | | cb: callback |
| | | } |
| | | |
| | | this._sendPacket(packet) |
| | | |
| | |
| | | * |
| | | * @api public |
| | | */ |
| | | MqttClient.prototype.end = function (force, cb) { |
| | | MqttClient.prototype.end = function () { |
| | | var that = this |
| | | |
| | | if (typeof force === 'function') { |
| | | cb = force |
| | | var force = arguments[0] |
| | | var opts = arguments[1] |
| | | var cb = arguments[2] |
| | | |
| | | if (force == null || typeof force !== 'boolean') { |
| | | cb = opts || nop |
| | | opts = force |
| | | force = false |
| | | if (typeof opts !== 'object') { |
| | | cb = opts |
| | | opts = null |
| | | if (typeof cb !== 'function') { |
| | | cb = nop |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (typeof opts !== 'object') { |
| | | cb = opts |
| | | opts = null |
| | | } |
| | | |
| | | cb = cb || nop |
| | | |
| | | function closeStores () { |
| | | that.disconnected = true |
| | |
| | | // defer closesStores of an I/O cycle, |
| | | // just to make sure things are |
| | | // ok for websockets |
| | | that._cleanUp(force, setImmediate.bind(null, closeStores)) |
| | | that._cleanUp(force, setImmediate.bind(null, closeStores), opts) |
| | | } |
| | | |
| | | if (this.disconnecting) { |
| | |
| | | * @example client.removeOutgoingMessage(client.getLastMessageId()); |
| | | */ |
| | | MqttClient.prototype.removeOutgoingMessage = function (mid) { |
| | | var cb = this.outgoing[mid] |
| | | var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null |
| | | delete this.outgoing[mid] |
| | | this.outgoingStore.del({messageId: mid}, function () { |
| | | cb(new Error('Message removed')) |
| | |
| | | * @api private |
| | | */ |
| | | MqttClient.prototype._cleanUp = function (forced, done) { |
| | | var opts = arguments[2] |
| | | if (done) { |
| | | this.stream.on('close', done) |
| | | } |
| | |
| | | } |
| | | this.stream.destroy() |
| | | } else { |
| | | var packet = xtend({ cmd: 'disconnect' }, opts) |
| | | this._sendPacket( |
| | | { cmd: 'disconnect' }, |
| | | packet, |
| | | setImmediate.bind( |
| | | null, |
| | | this.stream.end.bind(this.stream) |
| | |
| | | * @param {String} type - packet type (see `protocol`) |
| | | * @param {Object} packet - packet options |
| | | * @param {Function} cb - callback when the packet is sent |
| | | * @param {Function} cbStorePut - called when message is put into outgoingStore |
| | | * @api private |
| | | */ |
| | | MqttClient.prototype._sendPacket = function (packet, cb) { |
| | | if (!this.connected) { |
| | | if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { |
| | | this.queue.push({ packet: packet, cb: cb }) |
| | | } else if (packet.qos > 0) { |
| | | cb = this.outgoing[packet.messageId] |
| | | this.outgoingStore.put(packet, function (err) { |
| | | if (err) { |
| | | return cb && cb(err) |
| | | } |
| | | }) |
| | | } else if (cb) { |
| | | cb(new Error('No connection to broker')) |
| | | } |
| | | MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) { |
| | | cbStorePut = cbStorePut || nop |
| | | |
| | | if (!this.connected) { |
| | | this._storePacket(packet, cb, cbStorePut) |
| | | return |
| | | } |
| | | |
| | |
| | | case 'publish': |
| | | break |
| | | case 'pubrel': |
| | | storeAndSend(this, packet, cb) |
| | | storeAndSend(this, packet, cb, cbStorePut) |
| | | return |
| | | default: |
| | | sendPacket(this, packet, cb) |
| | |
| | | switch (packet.qos) { |
| | | case 2: |
| | | case 1: |
| | | storeAndSend(this, packet, cb) |
| | | storeAndSend(this, packet, cb, cbStorePut) |
| | | break |
| | | /** |
| | | * no need of case here since it will be caught by default |
| | |
| | | default: |
| | | sendPacket(this, packet, cb) |
| | | break |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * _storePacket - queue a packet |
| | | * @param {String} type - packet type (see `protocol`) |
| | | * @param {Object} packet - packet options |
| | | * @param {Function} cb - callback when the packet is sent |
| | | * @param {Function} cbStorePut - called when message is put into outgoingStore |
| | | * @api private |
| | | */ |
| | | MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { |
| | | cbStorePut = cbStorePut || nop |
| | | |
| | | if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { |
| | | this.queue.push({ packet: packet, cb: cb }) |
| | | } else if (packet.qos > 0) { |
| | | cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null |
| | | this.outgoingStore.put(packet, function (err) { |
| | | if (err) { |
| | | return cb && cb(err) |
| | | } |
| | | cbStorePut() |
| | | }) |
| | | } else if (cb) { |
| | | cb(new Error('No connection to broker')) |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | |
| | | MqttClient.prototype._handleConnack = function (packet) { |
| | | var rc = packet.returnCode |
| | | var errors = [ |
| | | '', |
| | | 'Unacceptable protocol version', |
| | | 'Identifier rejected', |
| | | 'Server unavailable', |
| | | 'Bad username or password', |
| | | 'Not authorized' |
| | | ] |
| | | var options = this.options |
| | | var version = options.protocolVersion |
| | | var rc = version === 5 ? packet.reasonCode : packet.returnCode |
| | | |
| | | clearTimeout(this.connackTimer) |
| | | |
| | | if (packet.properties) { |
| | | if (packet.properties.topicAliasMaximum) { |
| | | if (!options.properties) { options.properties = {} } |
| | | options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum |
| | | } |
| | | if (packet.properties.serverKeepAlive && options.keepalive) { |
| | | options.keepalive = packet.properties.serverKeepAlive |
| | | this._shiftPingInterval() |
| | | } |
| | | if (packet.properties.maximumPacketSize) { |
| | | if (!options.properties) { options.properties = {} } |
| | | options.properties.maximumPacketSize = packet.properties.maximumPacketSize |
| | | } |
| | | } |
| | | |
| | | if (rc === 0) { |
| | | this.reconnecting = false |
| | | this.emit('connect', packet) |
| | | this._onConnect(packet) |
| | | } else if (rc > 0) { |
| | | var err = new Error('Connection refused: ' + errors[rc]) |
| | | err.code = rc |
| | |
| | | var qos = packet.qos |
| | | var mid = packet.messageId |
| | | var that = this |
| | | var options = this.options |
| | | var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] |
| | | |
| | | switch (qos) { |
| | | case 2: |
| | | this.incomingStore.put(packet, function (err) { |
| | | if (err) { |
| | | return done(err) |
| | | case 2: { |
| | | options.customHandleAcks(topic, message, packet, function (error, code) { |
| | | if (!(error instanceof Error)) { |
| | | code = error |
| | | error = null |
| | | } |
| | | if (error) { return that.emit('error', error) } |
| | | if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } |
| | | if (code) { |
| | | that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done) |
| | | } else { |
| | | that.incomingStore.put(packet, function () { |
| | | that._sendPacket({cmd: 'pubrec', messageId: mid}, done) |
| | | }) |
| | | break |
| | | case 1: |
| | | // emit the message event |
| | | this.emit('message', topic, message, packet) |
| | | this.handleMessage(packet, function (err) { |
| | | if (err) { |
| | | return done(err) |
| | | } |
| | | // send 'puback' if the above 'handleMessage' method executed |
| | | // successfully. |
| | | that._sendPacket({cmd: 'puback', messageId: mid}, done) |
| | | }) |
| | | break |
| | | } |
| | | case 1: { |
| | | // emit the message event |
| | | options.customHandleAcks(topic, message, packet, function (error, code) { |
| | | if (!(error instanceof Error)) { |
| | | code = error |
| | | error = null |
| | | } |
| | | if (error) { return that.emit('error', error) } |
| | | if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } |
| | | if (!code) { that.emit('message', topic, message, packet) } |
| | | that.handleMessage(packet, function (err) { |
| | | if (err) { |
| | | return done && done(err) |
| | | } |
| | | that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done) |
| | | }) |
| | | }) |
| | | break |
| | | } |
| | | case 0: |
| | | // emit the message event |
| | | this.emit('message', topic, message, packet) |
| | |
| | | var mid = packet.messageId |
| | | var type = packet.cmd |
| | | var response = null |
| | | var cb = this.outgoing[mid] |
| | | var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null |
| | | var that = this |
| | | var err |
| | | |
| | | if (!cb) { |
| | | // Server sent an ack in error, ignore it. |
| | |
| | | case 'pubcomp': |
| | | // same thing as puback for QoS 2 |
| | | case 'puback': |
| | | var pubackRC = packet.reasonCode |
| | | // Callback - we're done |
| | | if (pubackRC && pubackRC > 0 && pubackRC !== 16) { |
| | | err = new Error('Publish error: ' + errors[pubackRC]) |
| | | err.code = pubackRC |
| | | cb(err, packet) |
| | | } |
| | | delete this.outgoing[mid] |
| | | this.outgoingStore.del(packet, cb) |
| | | break |
| | |
| | | qos: 2, |
| | | messageId: mid |
| | | } |
| | | var pubrecRC = packet.reasonCode |
| | | |
| | | if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { |
| | | err = new Error('Publish error: ' + errors[pubrecRC]) |
| | | err.code = pubrecRC |
| | | cb(err, packet) |
| | | } else { |
| | | this._sendPacket(response) |
| | | } |
| | | break |
| | | case 'suback': |
| | | delete this.outgoing[mid] |
| | | if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) { |
| | | for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { |
| | | if ((packet.granted[grantedI] & 0x80) !== 0) { |
| | | // suback with Failure status |
| | | var topics = this.messageIdToTopic[mid] |
| | | if (topics) { |
| | | topics.forEach(function (topic) { |
| | | delete that._resubscribeTopics[topic] |
| | | }) |
| | | } |
| | | } |
| | | } |
| | | cb(null, packet) |
| | |
| | | var comp = {cmd: 'pubcomp', messageId: mid} |
| | | |
| | | that.incomingStore.get(packet, function (err, pub) { |
| | | if (!err && pub.cmd !== 'pubrel') { |
| | | if (!err) { |
| | | that.emit('message', pub.topic, pub.payload, pub) |
| | | that.incomingStore.put(packet, function (err) { |
| | | if (err) { |
| | | return callback(err) |
| | | } |
| | | that.handleMessage(pub, function (err) { |
| | | if (err) { |
| | | return callback(err) |
| | | } |
| | | that.incomingStore.del(pub, nop) |
| | | that._sendPacket(comp, callback) |
| | | }) |
| | | }) |
| | | } else { |
| | | that._sendPacket(comp, callback) |
| | | } |
| | | }) |
| | | } |
| | | |
| | | /** |
| | | * _handleDisconnect |
| | | * |
| | | * @param {Object} packet |
| | | * @api private |
| | | */ |
| | | MqttClient.prototype._handleDisconnect = function (packet) { |
| | | this.emit('disconnect', packet) |
| | | } |
| | | |
| | | /** |
| | |
| | | return (this.nextId === 1) ? 65535 : (this.nextId - 1) |
| | | } |
| | | |
| | | /** |
| | | * _resubscribe |
| | | * @api private |
| | | */ |
| | | MqttClient.prototype._resubscribe = function (connack) { |
| | | var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) |
| | | if (!this._firstConnection && |
| | | (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) && |
| | | _resubscribeTopicsKeys.length > 0) { |
| | | if (this.options.resubscribe) { |
| | | if (this.options.protocolVersion === 5) { |
| | | for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { |
| | | var resubscribeTopic = {} |
| | | resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] |
| | | resubscribeTopic.resubscribe = true |
| | | this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties}) |
| | | } |
| | | } else { |
| | | this._resubscribeTopics.resubscribe = true |
| | | this.subscribe(this._resubscribeTopics) |
| | | } |
| | | } else { |
| | | this._resubscribeTopics = {} |
| | | } |
| | | } |
| | | |
| | | this._firstConnection = false |
| | | } |
| | | |
| | | /** |
| | | * _onConnect |
| | | * |
| | | * @api private |
| | | */ |
| | | MqttClient.prototype._onConnect = function (packet) { |
| | | if (this.disconnected) { |
| | | this.emit('connect', packet) |
| | | return |
| | | } |
| | | |
| | | var that = this |
| | | |
| | | this._setupPingTimer() |
| | | this._resubscribe(packet) |
| | | |
| | | this.connected = true |
| | | |
| | | function startStreamProcess () { |
| | | var outStore = that.outgoingStore.createStream() |
| | | |
| | | function clearStoreProcessing () { |
| | | that._storeProcessing = false |
| | | that._packetIdsDuringStoreProcessing = {} |
| | | } |
| | | |
| | | that.once('close', remove) |
| | | outStore.on('error', function (err) { |
| | | clearStoreProcessing() |
| | | that.removeListener('close', remove) |
| | | that.emit('error', err) |
| | | }) |
| | | |
| | | function remove () { |
| | | outStore.destroy() |
| | | outStore = null |
| | | clearStoreProcessing() |
| | | } |
| | | |
| | | function storeDeliver () { |
| | | // edge case, we wrapped this twice |
| | | if (!outStore) { |
| | | return |
| | | } |
| | | that._storeProcessing = true |
| | | |
| | | var packet = outStore.read(1) |
| | | |
| | | var cb |
| | | |
| | | if (!packet) { |
| | | // read when data is available in the future |
| | | outStore.once('readable', storeDeliver) |
| | | return |
| | | } |
| | | |
| | | // Skip already processed store packets |
| | | if (that._packetIdsDuringStoreProcessing[packet.messageId]) { |
| | | storeDeliver() |
| | | return |
| | | } |
| | | |
| | | // Avoid unnecessary stream read operations when disconnected |
| | | if (!that.disconnecting && !that.reconnectTimer) { |
| | | cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null |
| | | that.outgoing[packet.messageId] = { |
| | | volatile: false, |
| | | cb: function (err, status) { |
| | | // Ensure that the original callback passed in to publish gets invoked |
| | | if (cb) { |
| | | cb(err, status) |
| | | } |
| | | |
| | | storeDeliver() |
| | | } |
| | | } |
| | | that._packetIdsDuringStoreProcessing[packet.messageId] = true |
| | | that._sendPacket(packet) |
| | | } else if (outStore.destroy) { |
| | | outStore.destroy() |
| | | } |
| | | } |
| | | |
| | | outStore.on('end', function () { |
| | | var allProcessed = true |
| | | for (var id in that._packetIdsDuringStoreProcessing) { |
| | | if (!that._packetIdsDuringStoreProcessing[id]) { |
| | | allProcessed = false |
| | | break |
| | | } |
| | | } |
| | | if (allProcessed) { |
| | | clearStoreProcessing() |
| | | that.removeListener('close', remove) |
| | | that.emit('connect', packet) |
| | | } else { |
| | | startStreamProcess() |
| | | } |
| | | }) |
| | | storeDeliver() |
| | | } |
| | | // start flowing |
| | | startStreamProcess() |
| | | } |
| | | |
| | | module.exports = MqttClient |