liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
/*
 *                Copyright Lingxi Li 2015.
 *             Copyright Andrey Semashev 2016.
 * Distributed under the Boost Software License, Version 1.0.
 *    (See accompanying file LICENSE_1_0.txt or copy at
 *          http://www.boost.org/LICENSE_1_0.txt)
 */
/*!
 * \file   utility/ipc/reliable_message_queue.hpp
 * \author Lingxi Li
 * \author Andrey Semashev
 * \date   01.01.2016
 *
 * The header contains declaration of a reliable interprocess message queue.
 */
 
#ifndef BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_
#define BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_
 
#include <boost/log/detail/config.hpp>
#include <cstddef>
#include <boost/cstdint.hpp>
#include <boost/move/core.hpp>
#include <boost/log/keywords/open_mode.hpp>
#include <boost/log/keywords/name.hpp>
#include <boost/log/keywords/capacity.hpp>
#include <boost/log/keywords/block_size.hpp>
#include <boost/log/keywords/overflow_policy.hpp>
#include <boost/log/keywords/permissions.hpp>
#include <boost/log/utility/open_mode.hpp>
#include <boost/log/utility/permissions.hpp>
#include <boost/log/utility/ipc/object_name.hpp>
#include <boost/log/detail/parameter_tools.hpp>
#include <boost/log/detail/header.hpp>
 
#ifdef BOOST_HAS_PRAGMA_ONCE
#pragma once
#endif
 
namespace boost {
 
BOOST_LOG_OPEN_NAMESPACE
 
namespace ipc {
 
namespace aux {
 
template< typename T, typename R >
struct enable_if_byte {};
template< typename R >
struct enable_if_byte< char, R > { typedef R type; };
template< typename R >
struct enable_if_byte< signed char, R > { typedef R type; };
template< typename R >
struct enable_if_byte< unsigned char, R > { typedef R type; };
 
} // namespace aux
 
/*!
 * \brief A reliable interprocess message queue
 *
 * The queue implements a reliable one-way channel of passing messages from one or multiple writers to a single reader.
 * The format of the messages is user-defined and must be consistent across all writers and the reader. The queue does
 * not enforce any specific format of the messages, other than they should be supplied as a contiguous array of bytes.
 *
 * The queue internally uses a process-shared storage identified by an \c object_name (the queue name). Refer to \c object_name
 * documentation for details on restrictions imposed on object names.
 *
 * The queue storage is organized as a fixed number of blocks of a fixed size. The block size must be an integer power of 2 and
 * is expressed in bytes. Each written message, together with some metadata added by the queue, consumes an integer number
 * of blocks. Each read message received by the reader releases the blocks allocated for that message. As such the maximum size
 * of a message is slightly less than block size times capacity of the queue. For efficiency, it is recommended to choose
 * block size large enough to accommodate most of the messages to be passed through the queue.
 *
 * The queue is considered empty when no messages are enqueued (all blocks are free). The queue is considered full at the point
 * of enqueueing a message when there is not enough free blocks to accommodate the message.
 *
 * The queue is reliable in that it will not drop successfully sent messages that are not received by the reader, other than the
 * case when a non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is
 * full, the queue can either block the writer or return an error or throw an exception, depending on the policy specified at
 * the queue creation. The policy is object local, i.e. different writers and the reader can have different overflow policies.
 *
 * If the queue is empty and the reader attempts to dequeue a message, it will block until a message is enqueued by a writer.
 *
 * A blocked reader or writer can be unblocked by calling \c stop_local. After this method is called, all threads blocked on
 * this particular object are released and return \c operation_result::aborted. The other instances of the queue (in the current
 * or other processes) are unaffected. In order to restore the normal functioning of the queue instance after the \c stop_local
 * call the user has to invoke \c reset_local.
 *
 * The queue does not guarantee any particular order of received messages from different writer threads. Messages sent by a
 * particular writer thread will be received in the order of sending.
 *
 * Methods of this class are not thread-safe, unless otherwise specified.
 */
class reliable_message_queue
{
public:
    //! Result codes for various operations on the queue
    enum operation_result
    {
        succeeded,    //!< The operation has completed successfully
        no_space,     //!< The message could not be sent because the queue is full
        aborted       //!< The operation has been aborted because the queue method <tt>stop_local()</tt> has been called
    };
 
    //! Interprocess queue overflow policies
    enum overflow_policy
    {
        //! Block the send operation when the queue is full
        block_on_overflow,
        //! Return \c operation_result::no_space when the queue is full
        fail_on_overflow,
        //! Throw \c capacity_limit_reached exception when the queue is full
        throw_on_overflow
    };
 
    //! Queue message size type
    typedef uint32_t size_type;
 
#if !defined(BOOST_LOG_DOXYGEN_PASS)
 
    BOOST_MOVABLE_BUT_NOT_COPYABLE(reliable_message_queue)
 
private:
    typedef void (*receive_handler)(void* state, const void* data, size_type size);
 
    struct fixed_buffer_state
    {
        uint8_t* data;
        size_type size;
    };
 
    struct implementation;
    implementation* m_impl;
 
#endif // !defined(BOOST_LOG_DOXYGEN_PASS)
 
public:
    /*!
     * Default constructor. The method constructs an object that is not associated with any
     * message queue.
     *
     * \post <tt>is_open() == false</tt>
     */
    BOOST_CONSTEXPR reliable_message_queue() BOOST_NOEXCEPT : m_impl(NULL)
    {
    }
 
    /*!
     * Constructor. The method is used to construct an object and create the associated
     * message queue. The constructed object will be in running state if the message queue is
     * successfully created.
     *
     * \post <tt>is_open() == true</tt>
     *
     * \param name Name of the message queue to be associated with.
     * \param capacity Maximum number of allocation blocks the queue can hold.
     * \param block_size Size in bytes of allocation block. Must be a power of 2.
     * \param oflow_policy Queue behavior policy in case of overflow.
     * \param perms Access permissions for the associated message queue.
     */
    reliable_message_queue
    (
        open_mode::create_only_tag,
        object_name const& name,
        uint32_t capacity,
        size_type block_size,
        overflow_policy oflow_policy = block_on_overflow,
        permissions const& perms = permissions()
    ) :
        m_impl(NULL)
    {
        this->create(name, capacity, block_size, oflow_policy, perms);
    }
 
    /*!
     * Constructor. The method is used to construct an object and create or open the associated
     * message queue. The constructed object will be in running state if the message queue is
     * successfully created or opened. If the message queue that is identified by the name already
     * exists then the other queue parameters are ignored. The actual queue parameters can be obtained
     * with accessors from the constructed object.
     *
     * \post <tt>is_open() == true</tt>
     *
     * \param name Name of the message queue to be associated with.
     * \param capacity Maximum number of allocation blocks the queue can hold.
     * \param block_size Size in bytes of allocation block. Must be a power of 2.
     * \param oflow_policy Queue behavior policy in case of overflow.
     * \param perms Access permissions for the associated message queue.
     */
    reliable_message_queue
    (
        open_mode::open_or_create_tag,
        object_name const& name,
        uint32_t capacity,
        size_type block_size,
        overflow_policy oflow_policy = block_on_overflow,
        permissions const& perms = permissions()
    ) :
        m_impl(NULL)
    {
        this->open_or_create(name, capacity, block_size, oflow_policy, perms);
    }
 
    /*!
     * Constructor. The method is used to construct an object and open the existing
     * message queue. The constructed object will be in running state if the message queue is
     * successfully opened.
     *
     * \post <tt>is_open() == true</tt>
     *
     * \param name Name of the message queue to be associated with.
     * \param oflow_policy Queue behavior policy in case of overflow.
     * \param perms Access permissions for the associated message queue. The permissions will only be used
     *              if the queue implementation has to create system objects while operating.
     *              This parameter is currently not used on POSIX systems.
     */
    reliable_message_queue
    (
        open_mode::open_only_tag,
        object_name const& name,
        overflow_policy oflow_policy = block_on_overflow,
        permissions const& perms = permissions()
    ) :
        m_impl(NULL)
    {
        this->open(name, oflow_policy, perms);
    }
 
    /*!
     * Constructor with named parameters. The method is used to construct an object and create or open
     * the associated message queue. The constructed object will be in running state if the message queue is
     * successfully created.
     *
     * The following named parameters are accepted:
     *
     * * open_mode - One of the open mode tags: \c open_mode::create_only, \c open_mode::open_only or
     *               \c open_mode::open_or_create.
     * * name - Name of the message queue to be associated with.
     * * capacity - Maximum number of allocation blocks the queue can hold. Used only if the queue is created.
     * * block_size - Size in bytes of allocation block. Must be a power of 2. Used only if the queue is created.
     * * overflow_policy - Queue behavior policy in case of overflow, see \c overflow_policy.
     * * permissions - Access permissions for the associated message queue.
     *
     * \post <tt>is_open() == true</tt>
     */
#if !defined(BOOST_LOG_DOXYGEN_PASS)
    BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_CALL(reliable_message_queue, construct)
#else
    template< typename... Args >
    explicit reliable_message_queue(Args const&... args);
#endif
 
    /*!
     * Destructor. Calls <tt>close()</tt>.
     */
    ~reliable_message_queue() BOOST_NOEXCEPT
    {
        this->close();
    }
 
    /*!
     * Move constructor. The method move-constructs an object from \c other. After
     * the call, the constructed object becomes \c other, while \c other is left in
     * default constructed state.
     *
     * \param that The object to be moved.
     */
    reliable_message_queue(BOOST_RV_REF(reliable_message_queue) that) BOOST_NOEXCEPT :
        m_impl(that.m_impl)
    {
        that.m_impl = NULL;
    }
 
    /*!
     * Move assignment operator. If the object is associated with a message queue,
     * <tt>close()</tt> is first called and the precondition to calling <tt>close()</tt>
     * applies. After the call, the object becomes \a that while \a that is left
     * in default constructed state.
     *
     * \param that The object to be moved.
     *
     * \return A reference to the assigned object.
     */
    reliable_message_queue& operator= (BOOST_RV_REF(reliable_message_queue) that) BOOST_NOEXCEPT
    {
        reliable_message_queue other(static_cast< BOOST_RV_REF(reliable_message_queue) >(that));
        this->swap(other);
        return *this;
    }
 
    /*!
     * The method swaps the object with \a that.
     *
     * \param that The other object to swap with.
     */
    void swap(reliable_message_queue& that) BOOST_NOEXCEPT
    {
        implementation* p = m_impl;
        m_impl = that.m_impl;
        that.m_impl = p;
    }
 
    //! Swaps the two \c reliable_message_queue objects.
    friend void swap(reliable_message_queue& a, reliable_message_queue& b) BOOST_NOEXCEPT
    {
        a.swap(b);
    }
 
    /*!
     * The method creates the message queue to be associated with the object. After the call,
     * the object will be in running state if a message queue is successfully created.
     *
     * \pre <tt>is_open() == false</tt>
     * \post <tt>is_open() == true</tt>
     *
     * \param name Name of the message queue to be associated with.
     * \param capacity Maximum number of allocation blocks the queue can hold.
     * \param block_size Size in bytes of allocation block. Must be a power of 2.
     * \param oflow_policy Queue behavior policy in case of overflow.
     * \param perms Access permissions for the associated message queue.
     */
    BOOST_LOG_API void create
    (
        object_name const& name,
        uint32_t capacity,
        size_type block_size,
        overflow_policy oflow_policy = block_on_overflow,
        permissions const& perms = permissions()
    );
 
    /*!
     * The method creates or opens the message queue to be associated with the object.
     * After the call, the object will be in running state if a message queue is successfully
     * created or opened. If the message queue that is identified by the name already exists then
     * the other queue parameters are ignored. The actual queue parameters can be obtained
     * with accessors from this object after this method returns.
     *
     * \pre <tt>is_open() == false</tt>
     * \post <tt>is_open() == true</tt>
     *
     * \param name Name of the message queue to be associated with.
     * \param capacity Maximum number of allocation blocks the queue can hold.
     * \param block_size Size in bytes of allocation block. Must be a power of 2.
     * \param oflow_policy Queue behavior policy in case of overflow.
     * \param perms Access permissions for the associated message queue.
     */
    BOOST_LOG_API void open_or_create
    (
        object_name const& name,
        uint32_t capacity,
        size_type block_size,
        overflow_policy oflow_policy = block_on_overflow,
        permissions const& perms = permissions()
    );
 
    /*!
     * The method opens the existing message queue to be associated with the object.
     * After the call, the object will be in running state if a message queue is successfully
     * opened.
     *
     * \pre <tt>is_open() == false</tt>
     * \post <tt>is_open() == true</tt>
     *
     * \param name Name of the message queue to be associated with.
     * \param oflow_policy Queue behavior policy in case of overflow.
     * \param perms Access permissions for the associated message queue. The permissions will only be used
     *              if the queue implementation has to create system objects while operating.
     *              This parameter is currently not used on POSIX systems.
     */
    BOOST_LOG_API void open
    (
        object_name const& name,
        overflow_policy oflow_policy = block_on_overflow,
        permissions const& perms = permissions()
    );
 
    /*!
     * Tests whether the object is associated with any message queue.
     *
     * \return \c true if the object is associated with a message queue, and \c false otherwise.
     */
    bool is_open() const BOOST_NOEXCEPT
    {
        return m_impl != NULL;
    }
 
    /*!
     * This method empties the associated message queue. Concurrent calls to this method, <tt>send()</tt>,
     * <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, and <tt>stop_local()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     */
    BOOST_LOG_API void clear();
 
    /*!
     * The method returns the name of the associated message queue.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \return Name of the associated message queue
     */
    BOOST_LOG_API object_name const& name() const;
 
    /*!
     * The method returns the maximum number of allocation blocks the associated message queue
     * can hold. Note that the returned value may be different from the corresponding
     * value passed to the constructor or <tt>open_or_create()</tt>, for the message queue may
     * not have been created by this object.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \return Maximum number of allocation blocks the associated message queue can hold.
     */
    BOOST_LOG_API uint32_t capacity() const;
 
    /*!
     * The method returns the allocation block size, in bytes. Each message in the
     * associated message queue consumes an integer number of allocation blocks.
     * Note that the returned value may be different from the corresponding value passed
     * to the constructor or <tt>open_or_create()</tt>, for the message queue may not
     * have been created by this object.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \return Allocation block size, in bytes.
     */
    BOOST_LOG_API size_type block_size() const;
 
    /*!
     * The method wakes up all threads that are blocked in calls to <tt>send()</tt> or
     * <tt>receive()</tt>. Those calls would then return <tt>operation_result::aborted</tt>.
     * Note that, the method does not block until the woken-up threads have actually
     * returned from <tt>send()</tt> or <tt>receive()</tt>. Other means is needed to ensure
     * that calls to <tt>send()</tt> or <tt>receive()</tt> have returned, e.g., joining the
     * threads that might be blocking on the calls.
     *
     * The method also puts the object in stopped state. When in stopped state, calls to
     * <tt>send()</tt> or <tt>receive()</tt> will return immediately with return value
     * <tt>operation_result::aborted</tt> when they would otherwise block in running state.
     *
     * Concurrent calls to this method, <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     */
    BOOST_LOG_API void stop_local();
 
    /*!
     * The method puts the object in running state where calls to <tt>send()</tt> or
     * <tt>receive()</tt> may block. This method is not thread-safe.
     *
     * \pre <tt>is_open() == true</tt>
     */
    BOOST_LOG_API void reset_local();
 
    /*!
     * The method disassociates the associated message queue, if any. No other threads
     * should be using this object before calling this method. The <tt>stop_local()</tt> method
     * can be used to have any threads currently blocked in <tt>send()</tt> or
     * <tt>receive()</tt> return, and prevent further calls to them from blocking. Typically,
     * before calling this method, one would first call <tt>stop_local()</tt> and then join all
     * threads that might be blocking on <tt>send()</tt> or <tt>receive()</tt> to ensure that
     * they have returned from the calls. The associated message queue is destroyed if the
     * object represents the last outstanding reference to it.
     *
     * \post <tt>is_open() == false</tt>
     */
    void close() BOOST_NOEXCEPT
    {
        if (is_open())
            do_close();
    }
 
    /*!
     * The method sends a message to the associated message queue. When the object is in
     * running state and the queue has no free space for the message, the method either blocks
     * or throws an exception, depending on the overflow policy that was specified on the queue
     * opening/creation. If blocking policy is in effect, the blocking can be interrupted by
     * calling <tt>stop_local()</tt>, in which case the method returns \c operation_result::aborted.
     * When the object is already in the stopped state, the method does not block but returns
     * immediately with return value \c operation_result::aborted.
     *
     * It is possible to send an empty message by passing \c 0 to the parameter \c message_size.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>,
     * <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param message_data The message data to send. Ignored when \c message_size is \c 0.
     * \param message_size Size of the message data in bytes. If the size is larger than
     *                     the associated message queue capacity, an <tt>std::logic_error</tt> exception is thrown.
     *
     * \retval operation_result::succeeded if the operation is successful
     * \retval operation_result::no_space if \c overflow_policy::fail_on_overflow is in effect and the queue is full
     * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
     *
     * <b>Throws:</b> <tt>std::logic_error</tt> in case if the message size exceeds the queue
     *                capacity, <tt>system_error</tt> in case if a native OS method fails.
     */
    BOOST_LOG_API operation_result send(void const* message_data, size_type message_size);
 
    /*!
     * The method performs an attempt to send a message to the associated message queue.
     * The method is non-blocking, and always returns immediately.
     * <tt>boost::system::system_error</tt> is thrown for errors resulting from native
     * operating system calls. Note that it is possible to send an empty message by passing
     * \c 0 to the parameter \c message_size. Concurrent calls to <tt>send()</tt>,
     * <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, <tt>stop_local()</tt>,
     * and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param message_data The message data to send. Ignored when \c message_size is \c 0.
     * \param message_size Size of the message data in bytes. If the size is larger than the
     *                     maximum size allowed by the associated message queue, an
     *                     <tt>std::logic_error</tt> exception is thrown.
     *
     * \return \c true if the message is successfully sent, and \c false otherwise (e.g.,
     *         when the queue is full).
     *
     * <b>Throws:</b> <tt>std::logic_error</tt> in case if the message size exceeds the queue
     *                capacity, <tt>system_error</tt> in case if a native OS method fails.
     */
    BOOST_LOG_API bool try_send(void const* message_data, size_type message_size);
 
    /*!
     * The method takes a message from the associated message queue. When the object is in
     * running state and the queue is empty, the method blocks. The blocking is interrupted
     * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted.
     * When the object is already in the stopped state and the queue is empty, the method
     * does not block but returns immediately with return value \c operation_result::aborted.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param buffer The memory buffer to store the received message in.
     * \param buffer_size The size of the buffer, in bytes.
     * \param message_size Receives the size of the received message, in bytes.
     *
     * \retval operation_result::succeeded if the operation is successful
     * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
     */
    operation_result receive(void* buffer, size_type buffer_size, size_type& message_size)
    {
        fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size };
        operation_result result = do_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state);
        message_size = buffer_size - state.size;
        return result;
    }
 
    /*!
     * The method takes a message from the associated message queue. When the object is in
     * running state and the queue is empty, the method blocks. The blocking is interrupted
     * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted.
     * When the object is already in the stopped state and the queue is empty, the method
     * does not block but returns immediately with return value \c operation_result::aborted.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param buffer The memory buffer to store the received message in.
     * \param message_size Receives the size of the received message, in bytes.
     *
     * \retval operation_result::succeeded if the operation is successful
     * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
     */
    template< typename ElementT, size_type SizeV >
#if !defined(BOOST_LOG_DOXYGEN_PASS)
    typename aux::enable_if_byte< ElementT, operation_result >::type
#else
    operation_result
#endif
    receive(ElementT (&buffer)[SizeV], size_type& message_size)
    {
        return receive(buffer, SizeV, message_size);
    }
 
    /*!
     * The method takes a message from the associated message queue. When the object is in
     * running state and the queue is empty, the method blocks. The blocking is interrupted
     * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted.
     * When the object is already in the stopped state and the queue is empty, the method
     * does not block but returns immediately with return value \c operation_result::aborted.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param container The container to store the received message in. The container should have
     *                  value type of <tt>char</tt>, <tt>signed char</tt> or <tt>unsigned char</tt>
     *                  and support inserting elements at the end.
     *
     * \retval operation_result::succeeded if the operation is successful
     * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
     */
    template< typename ContainerT >
#if !defined(BOOST_LOG_DOXYGEN_PASS)
    typename aux::enable_if_byte< typename ContainerT::value_type, operation_result >::type
#else
    operation_result
#endif
    receive(ContainerT& container)
    {
        return do_receive(&reliable_message_queue::container_receive_handler< ContainerT >, &container);
    }
 
    /*!
     * The method performs an attempt to take a message from the associated message queue. The
     * method is non-blocking, and always returns immediately.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param buffer The memory buffer to store the received message in.
     * \param buffer_size The size of the buffer, in bytes.
     * \param message_size Receives the size of the received message, in bytes.
     *
     * \return \c true if a message is successfully received, and \c false otherwise (e.g.,
     *         when the queue is empty).
     */
    bool try_receive(void* buffer, size_type buffer_size, size_type& message_size)
    {
        fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size };
        bool result = do_try_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state);
        message_size = buffer_size - state.size;
        return result;
    }
 
    /*!
     * The method performs an attempt to take a message from the associated message queue. The
     * method is non-blocking, and always returns immediately.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param buffer The memory buffer to store the received message in.
     * \param message_size Receives the size of the received message, in bytes.
     *
     * \return \c true if a message is successfully received, and \c false otherwise (e.g.,
     *         when the queue is empty).
     */
    template< typename ElementT, size_type SizeV >
#if !defined(BOOST_LOG_DOXYGEN_PASS)
    typename aux::enable_if_byte< ElementT, bool >::type
#else
    bool
#endif
    try_receive(ElementT (&buffer)[SizeV], size_type& message_size)
    {
        return try_receive(buffer, SizeV, message_size);
    }
 
    /*!
     * The method performs an attempt to take a message from the associated message queue. The
     * method is non-blocking, and always returns immediately.
     *
     * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
     * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
     *
     * \pre <tt>is_open() == true</tt>
     *
     * \param container The container to store the received message in. The container should have
     *                  value type of <tt>char</tt>, <tt>signed char</tt> or <tt>unsigned char</tt>
     *                  and support inserting elements at the end.
     *
     * \return \c true if a message is successfully received, and \c false otherwise (e.g.,
     *         when the queue is empty).
     */
    template< typename ContainerT >
#if !defined(BOOST_LOG_DOXYGEN_PASS)
    typename aux::enable_if_byte< typename ContainerT::value_type, bool >::type
#else
    bool
#endif
    try_receive(ContainerT& container)
    {
        return do_try_receive(&reliable_message_queue::container_receive_handler< ContainerT >, &container);
    }
 
    /*!
     * The method frees system-wide resources, associated with the interprocess queue with the supplied name.
     * The queue referred to by the specified name must not be opened in any process at the point of this call.
     * After this call succeeds a new queue with the specified name can be created.
     *
     * This call can be useful to recover from an earlier process misbehavior (e.g. a crash without properly
     * closing the message queue). In this case resources allocated for the interprocess queue may remain
     * allocated after the last process closed the queue, which in turn may prevent creating a new queue with
     * the same name. By calling this method before creating a queue the application can attempt to ensure
     * it starts with a clean slate.
     *
     * On some platforms resources associated with the queue are automatically reclaimed by the operating system
     * when the last process using those resources terminates (even if it terminates abnormally). On these
     * platforms this call may be a no-op. However, portable code should still call this method at appropriate
     * places to ensure compatibility with other platforms and future library versions, which may change implementation
     * of the queue.
     *
     * \param name Name of the message queue to be removed.
     */
    static BOOST_LOG_API void remove(object_name const& name);
 
#if !defined(BOOST_LOG_DOXYGEN_PASS)
private:
    //! Implementation of the constructor with named arguments
    template< typename ArgsT >
    void construct(ArgsT const& args)
    {
        m_impl = NULL;
        construct_dispatch(args[keywords::open_mode], args);
    }
 
    //! Implementation of the constructor with named arguments
    template< typename ArgsT >
    void construct_dispatch(open_mode::create_only_tag, ArgsT const& args)
    {
        this->create(args[keywords::name], args[keywords::capacity], args[keywords::block_size], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]);
    }
 
    //! Implementation of the constructor with named arguments
    template< typename ArgsT >
    void construct_dispatch(open_mode::open_or_create_tag, ArgsT const& args)
    {
        this->open_or_create(args[keywords::name], args[keywords::capacity], args[keywords::block_size], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]);
    }
 
    //! Implementation of the constructor with named arguments
    template< typename ArgsT >
    void construct_dispatch(open_mode::open_only_tag, ArgsT const& args)
    {
        this->open(args[keywords::name], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]);
    }
 
    //! Closes the message queue, if it's open
    BOOST_LOG_API void do_close() BOOST_NOEXCEPT;
 
    //! Receives the message from the queue and calls the handler to place the data in the user's storage
    BOOST_LOG_API operation_result do_receive(receive_handler handler, void* state);
    //! Attempts to receives the message from the queue and calls the handler to place the data in the user's storage
    BOOST_LOG_API bool do_try_receive(receive_handler handler, void* state);
 
    //! Fixed buffer receive handler
    static BOOST_LOG_API void fixed_buffer_receive_handler(void* state, const void* data, size_type size);
    //! Receive handler for a container
    template< typename ContainerT >
    static void container_receive_handler(void* state, const void* data, size_type size)
    {
        ContainerT* const container = static_cast< ContainerT* >(state);
        container->insert
        (
            container->end(),
            static_cast< typename ContainerT::value_type const* >(data),
            static_cast< typename ContainerT::value_type const* >(data) + size
        );
    }
#endif
};
 
} // namespace ipc
 
BOOST_LOG_CLOSE_NAMESPACE // namespace log
 
} // namespace boost
 
#include <boost/log/detail/footer.hpp>
 
#endif // BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_