change shm socket msg queue to atomic queue.
| | |
| | | int Dec() { return --num_; } |
| | | int Get() { return num_.load(); } |
| | | }; |
| | | typedef int64_t Offset; |
| | | static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } |
| | | static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } |
| | | static inline Offset BaseAddr() |
| | | typedef int64_t OffsetType; |
| | | static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); } |
| | | static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); } |
| | | static inline OffsetType BaseAddr() |
| | | { |
| | | static const Offset base = Addr(shm().get_address()); // cache value. |
| | | static const OffsetType base = Addr(shm().get_address()); // cache value. |
| | | return base; |
| | | } |
| | | |
| | | static const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | typedef struct { |
| | | struct Meta { |
| | | RefCount count_; |
| | | const uint32_t tag_ = kMsgTag; |
| | | } Meta; |
| | | Offset offset_; |
| | | const uint32_t size_ = 0; |
| | | Meta(uint32_t size) : |
| | | size_(size) {} |
| | | }; |
| | | OffsetType offset_; |
| | | void *Alloc(const size_t size) |
| | | { |
| | | void *p = shm().Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | | auto pmeta = new (p) Meta; |
| | | auto pmeta = new (p) Meta(size); |
| | | p = pmeta + 1; |
| | | } |
| | | return p; |
| | |
| | | static bool BindShm(SharedMemory &shm) { return SetData(shm); } |
| | | ShmMsg() : |
| | | ShmMsg(nullptr) {} |
| | | explicit ShmMsg(const size_t size) : |
| | | ShmMsg(Alloc(size)) {} |
| | | explicit ShmMsg(const OffsetType offset) : |
| | | offset_(offset) {} |
| | | OffsetType Offset() const { return offset_; } |
| | | OffsetType &OffsetRef() { return offset_; } |
| | | void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } |
| | | bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; } |
| | | |
| | |
| | | return std::string(buf, n + 4); |
| | | } |
| | | |
| | | const int AdjustMQLength(const int len) |
| | | { |
| | | const int kMaxLength = 10000; |
| | | const int kDefaultLen = 12; |
| | | if (len <= 0) { |
| | | return kDefaultLen; |
| | | } else if (len < kMaxLength) { |
| | | return len; |
| | | } else { |
| | | return kMaxLength; |
| | | } |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | id_(NewId()), |
| | | queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create msgq " + std::to_string(id_)); |
| | |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | | MsgI msg; |
| | | while (q->TryRead(msg)) { |
| | | while (q->TryRead(msg.OffsetRef())) { |
| | | msg.Release(); |
| | | } |
| | | } |
| | |
| | | bool r = false; |
| | | if (remote) { |
| | | msg.AddRef(); |
| | | r = remote->TryWrite(msg); |
| | | r = remote->TryWrite(msg.Offset()); |
| | | if (!r) { |
| | | msg.Release(); |
| | | } |
| | |
| | | |
| | | class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | { |
| | | typedef ShmObject<SharedQueue<MsgI>> Shmq; |
| | | typedef ShmObject<SharedQ63<4>> Shmq; |
| | | // typedef ShmObject<SharedQueue<int64_t>> Shmq; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef Shmq::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | |
| | | MQId Id() const { return id_; } |
| | | ShmType &shm() const { return queue_.shm(); } |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); } |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); |
| | | bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } |
| | | |
| | | private: |
| | | MQId id_; |
| | | Shmq &queue() { return queue_; } |
| | | Queue &queue() { return *queue_.data(); } |
| | | Shmq queue_; |
| | | }; |
| | | |
| | |
| | | bool TryWrite(const D &d) { return queue_.push_back(d); } |
| | | |
| | | private: |
| | | typedef Circular<D> Queue; |
| | | Queue queue_; |
| | | Circular<D> queue_; |
| | | }; |
| | | |
| | | template <int Power = 4> |
| | | class SharedQ63 |
| | | { |
| | | public: |
| | | typedef int64_t Data; |
| | | bool Read(Data &d, const int timeout_ms) |
| | | { |
| | | using namespace std::chrono; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | do { |
| | | if (TryRead(d)) { |
| | | return true; |
| | | } else { |
| | | robust::QuickSleep(); |
| | | } |
| | | } while (steady_clock::now() < end_time); |
| | | return false; |
| | | } |
| | | bool TryRead(Data &d, const bool try_more = true) { return queue_.pop_front(d, try_more); } |
| | | bool TryWrite(const Data d, const bool try_more = true) { return queue_.push_back(d, try_more); } |
| | | |
| | | private: |
| | | robust::AtomicQueue<Power, Data> queue_; |
| | | }; |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | BHFree(reply, reply_len); |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | Sleep(1s); |
| | | // Sleep(1s); |
| | | } |
| | | |
| | | { // Subscribe |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "util.h" |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | |
| | | using namespace boost::posix_time; |
| | | |
| | | BOOST_AUTO_TEST_CASE(SpeedTest) |
| | | { |
| | |
| | | }; |
| | | auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { |
| | | ShmMsgQueue mq(id, shm, 1000); |
| | | auto now = []() { return steady_clock::now(); }; |
| | | auto tm = now(); |
| | | while (*run) { |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (mq.Recv(msg, timeout)) { |
| | | if (mq.TryRecv(msg)) { |
| | | DEFER1(msg.Release()); |
| | | // ok |
| | | tm = now(); |
| | | } else if (isfork) { |
| | | exit(0); // for forked quit after 1s. |
| | | if (now() > tm + 1s) { |
| | | exit(0); // for forked quit after 1s. |
| | | } |
| | | } |
| | | } |
| | | }; |
| | |
| | | } |
| | | }; |
| | | |
| | | int nwriters[] = {1, 2, 4}; |
| | | int nreaders[] = {1, 2}; |
| | | int nwriters[] = {1, 4, 16}; |
| | | int nreaders[] = {1, 4}; |
| | | |
| | | auto Test = [&](auto &www, auto &rrr, bool isfork) { |
| | | for (auto nreader : nreaders) { |
| | |
| | | #include "bh_util.h" |
| | | #include "shm.h" |
| | | #include "topic_node.h" |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | #include <boost/noncopyable.hpp> |
| | | #include <boost/test/unit_test.hpp> |
| | | #include <boost/timer/timer.hpp> |
| | |
| | | #include <thread> |
| | | #include <vector> |
| | | |
| | | using namespace boost::posix_time; |
| | | using namespace std::chrono_literals; |
| | | using namespace std::chrono; |
| | | |