liudong
2023-05-29 340f156319b863525e50e900c58e59b86ecb3d5e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
'use strict'
 
/**
 * Module dependencies
 */
const xtend = require('xtend')
 
const Readable = require('readable-stream').Readable
const streamsOpts = { objectMode: true }
const defaultStoreOptions = {
  clean: true
}
 
/**
 * In-memory implementation of the message store
 * This can actually be saved into files.
 *
 * @param {Object} [options] - store options
 */
function Store (options) {
  if (!(this instanceof Store)) {
    return new Store(options)
  }
 
  this.options = options || {}
 
  // Defaults
  this.options = xtend(defaultStoreOptions, options)
 
  this._inflights = new Map()
}
 
/**
 * Adds a packet to the store, a packet is
 * anything that has a messageId property.
 *
 */
Store.prototype.put = function (packet, cb) {
  this._inflights.set(packet.messageId, packet)
 
  if (cb) {
    cb()
  }
 
  return this
}
 
/**
 * Creates a stream with all the packets in the store
 *
 */
Store.prototype.createStream = function () {
  const stream = new Readable(streamsOpts)
  const values = []
  let destroyed = false
  let i = 0
 
  this._inflights.forEach(function (value, key) {
    values.push(value)
  })
 
  stream._read = function () {
    if (!destroyed && i < values.length) {
      this.push(values[i++])
    } else {
      this.push(null)
    }
  }
 
  stream.destroy = function () {
    if (destroyed) {
      return
    }
 
    const self = this
 
    destroyed = true
 
    setTimeout(function () {
      self.emit('close')
    }, 0)
  }
 
  return stream
}
 
/**
 * deletes a packet from the store.
 */
Store.prototype.del = function (packet, cb) {
  packet = this._inflights.get(packet.messageId)
  if (packet) {
    this._inflights.delete(packet.messageId)
    cb(null, packet)
  } else if (cb) {
    cb(new Error('missing packet'))
  }
 
  return this
}
 
/**
 * get a packet from the store.
 */
Store.prototype.get = function (packet, cb) {
  packet = this._inflights.get(packet.messageId)
  if (packet) {
    cb(null, packet)
  } else if (cb) {
    cb(new Error('missing packet'))
  }
 
  return this
}
 
/**
 * Close the store
 */
Store.prototype.close = function (cb) {
  if (this.options.clean) {
    this._inflights = null
  }
  if (cb) {
    cb()
  }
}
 
module.exports = Store