bind msgi to shm, change offset_ptr to abs offset.
| | |
| | | int center_main(int argc, const char *argv[]) |
| | | { |
| | | auto &shm = BHomeShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | AppArg args(argc, argv); |
| | | if (args.Has("remove")) { |
| | |
| | | { |
| | | TopicNode &ProcNode() |
| | | { |
| | | static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm()); |
| | | static TopicNode node(BHomeShm()); |
| | | return node; |
| | | } |
| | |
| | | return [remote, msg](void *valid_sock) mutable { |
| | | assert(valid_sock); |
| | | ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); |
| | | DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release. |
| | | DEFER1(msg.Release()); // Release() is not const, but it's safe to release. |
| | | return sock.Send(remote.data(), msg); |
| | | }; |
| | | } |
| | |
| | | |
| | | namespace bhome_msg |
| | | { |
| | | /*TODO change msg format, header has proc info; |
| | | reply has errer msg |
| | | center accept request and route.; |
| | | //*/ |
| | | const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | |
| | | void *MsgI::Alloc(SharedMemory &shm, const size_t size) |
| | | { |
| | | void *p = shm.Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | | auto pmeta = new (p) Meta; |
| | | p = pmeta + 1; |
| | | } |
| | | return p; |
| | | } |
| | | void MsgI::Free(SharedMemory &shm) |
| | | { |
| | | assert(valid()); |
| | | shm.Dealloc(meta()); |
| | | ptr_ = nullptr; |
| | | assert(!valid()); |
| | | } |
| | | |
| | | void *MsgI::Pack(SharedMemory &shm, |
| | | const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray) |
| | | { |
| | | void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len); |
| | | if (addr) { |
| | | auto p = static_cast<char *>(addr); |
| | | auto Pack1 = [&p](auto len, auto &writer) { |
| | | Put32(p, len); |
| | | p += sizeof(len); |
| | | writer(p, len); |
| | | p += len; |
| | | }; |
| | | Pack1(head_len, headToArray); |
| | | Pack1(body_len, bodyToArray); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | bool MsgI::ParseHead(BHMsgHead &head) const |
| | | { |
| | | auto p = get<char>(); |
| | | assert(p); |
| | | uint32_t msg_size = Get32(p); |
| | | p += 4; |
| | | return head.ParseFromArray(p, msg_size); |
| | | } |
| | | |
| | | bool MsgI::Make(SharedMemory &shm, void *p) |
| | | { |
| | | if (!p) { |
| | | return false; |
| | | } |
| | | MsgI(p).swap(*this); |
| | | return true; |
| | | } |
| | | |
| | | int MsgI::Release(SharedMemory &shm) |
| | | { |
| | | if (!valid()) { |
| | | return 0; |
| | | } |
| | | auto n = meta()->count_.Dec(); |
| | | if (n == 0) { |
| | | Free(shm); |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | } // namespace bhome_msg |
| | |
| | | { |
| | | using namespace bhome_shm; |
| | | |
| | | // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message format: header(meta) + body(data). |
| | | // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message content layout: (meta) / header_size + header + data_size + data |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | | class RefCount : private boost::noncopyable |
| | | { |
| | | std::atomic<int> num_; |
| | | |
| | | public: |
| | | RefCount() : |
| | | num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } |
| | | int Inc() { return ++num_; } |
| | | int Dec() { return --num_; } |
| | | int Get() { return num_.load(); } |
| | | }; |
| | | |
| | | // message content layout: header_size + header + data_size + data |
| | | class MsgI |
| | | class ShmMsg |
| | | { |
| | | private: |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | | class RefCount : private boost::noncopyable |
| | | { |
| | | std::atomic<int> num_; |
| | | |
| | | public: |
| | | RefCount() : |
| | | num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } |
| | | int Inc() { return ++num_; } |
| | | int Dec() { return --num_; } |
| | | int Get() { return num_.load(); } |
| | | }; |
| | | typedef int64_t Offset; |
| | | static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } |
| | | static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } |
| | | static inline Offset BaseAddr() |
| | | { |
| | | static const Offset base = Addr(shm().get_address()); // cache value. |
| | | return base; |
| | | } |
| | | static inline SharedMemory &shm() |
| | | { |
| | | if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); } |
| | | return *pshm(); |
| | | } |
| | | static inline SharedMemory *&pshm() |
| | | { |
| | | static SharedMemory *pshm = 0; |
| | | return pshm; |
| | | } |
| | | |
| | | struct Meta { |
| | | RefCount count_; |
| | | }; |
| | | offset_ptr<void> ptr_; |
| | | void *Alloc(SharedMemory &shm, const size_t size); |
| | | void Free(SharedMemory &shm); |
| | | Offset offset_; |
| | | void *Alloc(const size_t size) |
| | | { |
| | | void *p = shm().Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | | auto pmeta = new (p) Meta; |
| | | p = pmeta + 1; |
| | | } |
| | | return p; |
| | | } |
| | | void Free() |
| | | { |
| | | assert(valid()); |
| | | shm().Dealloc(meta()); |
| | | offset_ = 0; |
| | | assert(!valid()); |
| | | } |
| | | Meta *meta() const { return get<Meta>() - 1; } |
| | | |
| | | typedef std::function<void(void *p, int len)> ToArray; |
| | | void *Pack(SharedMemory &shm, |
| | | const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray); |
| | | void *Pack(const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray) |
| | | { |
| | | void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); |
| | | if (addr) { |
| | | auto p = static_cast<char *>(addr); |
| | | auto Pack1 = [&p](auto len, auto &writer) { |
| | | Put32(p, len); |
| | | p += sizeof(len); |
| | | writer(p, len); |
| | | p += len; |
| | | }; |
| | | Pack1(head_len, headToArray); |
| | | Pack1(body_len, bodyToArray); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | template <class Body> |
| | | void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | void *Pack(const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Pack( |
| | | shm, |
| | | uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, |
| | | uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | } |
| | | |
| | | void *Pack(SharedMemory &shm, const std::string &content) |
| | | void *Pack(const std::string &content) |
| | | { |
| | | void *addr = Alloc(shm, content.size()); |
| | | void *addr = Alloc(content.size()); |
| | | if (addr) { |
| | | memcpy(addr, content.data(), content.size()); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | bool Make(SharedMemory &shm, void *addr); |
| | | MsgI(void *p) : |
| | | ptr_(p) {} |
| | | bool Make(void *addr) |
| | | { |
| | | if (!addr) { |
| | | return false; |
| | | } |
| | | ShmMsg(addr).swap(*this); |
| | | return true; |
| | | } |
| | | ShmMsg(void *p) : |
| | | offset_(p ? (Addr(p) - BaseAddr()) : 0) {} |
| | | |
| | | template <class T = void> |
| | | T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } |
| | | |
| | | public: |
| | | MsgI() : |
| | | MsgI(nullptr) {} |
| | | MsgI(SharedMemory &shm, const size_t size) : |
| | | MsgI(Alloc(shm, size)) {} |
| | | void swap(MsgI &a) { std::swap(ptr_, a.ptr_); } |
| | | bool valid() const { return static_cast<bool>(ptr_); } |
| | | template <class T = void> |
| | | T *get() const { return static_cast<T *>(ptr_.get()); } |
| | | static bool BindShm(SharedMemory &shm) |
| | | { |
| | | assert(!pshm()); |
| | | pshm() = &shm; |
| | | return true; |
| | | } |
| | | |
| | | ShmMsg() : |
| | | ShmMsg(nullptr) {} |
| | | explicit ShmMsg(const size_t size) : |
| | | ShmMsg(Alloc(size)) {} |
| | | void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } |
| | | bool valid() const { return static_cast<bool>(offset_); } |
| | | |
| | | // AddRef and Release works for both counted and not counted msg. |
| | | int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } |
| | | int Release(SharedMemory &shm); |
| | | int Release() |
| | | { |
| | | if (!valid()) { |
| | | return 0; |
| | | } |
| | | auto n = meta()->count_.Dec(); |
| | | if (n == 0) { |
| | | Free(); |
| | | } |
| | | return n; |
| | | } |
| | | int Count() const { return valid() ? meta()->count_.Get() : 1; } |
| | | |
| | | template <class Body> |
| | | inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Make(shm, Pack(shm, head, body)); |
| | | } |
| | | inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } |
| | | inline bool Make(const std::string &content) { return Make(Pack(content)); } |
| | | template <class Body> |
| | | static inline std::string Serialize(const BHMsgHead &head, const Body &body) |
| | | { |
| | |
| | | assert(pos == s.size()); |
| | | return s; |
| | | } |
| | | inline bool Make(SharedMemory &shm, const std::string &content) |
| | | { |
| | | void *p = Pack(shm, content); |
| | | return Make(shm, p); |
| | | } |
| | | |
| | | bool ParseHead(BHMsgHead &head) const; |
| | | bool ParseHead(BHMsgHead &head) const |
| | | { |
| | | auto p = get<char>(); |
| | | assert(p); |
| | | uint32_t msg_size = Get32(p); |
| | | p += 4; |
| | | return head.ParseFromArray(p, msg_size); |
| | | } |
| | | template <class Body> |
| | | bool ParseBody(Body &body) const |
| | | { |
| | | auto p = static_cast<char *>(ptr_.get()); |
| | | auto p = get<char>(); |
| | | assert(p); |
| | | uint32_t size = Get32(p); |
| | | p += 4; |
| | |
| | | } |
| | | }; |
| | | |
| | | inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } |
| | | inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); } |
| | | |
| | | typedef ShmMsg MsgI; |
| | | |
| | | } // namespace bhome_msg |
| | | |
| | |
| | | info.on_expire_(info.data_); |
| | | } |
| | | if (info.data_.index() == 0) { |
| | | boost::variant2::get<0>(info.data_).Release(mq.shm()); |
| | | boost::variant2::get<0>(info.data_).Release(); |
| | | } |
| | | } |
| | | |
| | |
| | | auto &msg = boost::variant2::get<0>(pos->data().data_); |
| | | r = mq.TrySend(*(MQId *) remote.data(), msg); |
| | | if (r) { |
| | | msg.Release(mq.shm()); |
| | | msg.Release(); |
| | | } |
| | | } else { |
| | | auto &content = boost::variant2::get<1>(pos->data().data_); |
| | | MsgI msg; |
| | | if (msg.Make(mq.shm(), content)) { |
| | | DEFER1(msg.Release(mq.shm());); |
| | | if (msg.Make(content)) { |
| | | DEFER1(msg.Release();); |
| | | r = mq.TrySend(*(MQId *) remote.data(), msg); |
| | | } |
| | | } |
| | |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release(shm())); |
| | | DEFER1(imsg.Release()); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | msg.Release(); |
| | | } |
| | | } |
| | | return false; |
| | |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | if (r) { |
| | |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg; |
| | | DEFER1(reply_msg.Release(shm_);); |
| | | DEFER1(reply_msg.Release();); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && |
| | |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_)); |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { |
| | |
| | | return sock.Send(&BHTopicBusAddress(), head, pub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | |
| | | return sock.Send(&BHTopicBusAddress(), head, sub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release(shm());); |
| | | DEFER1(msg.Release();); |
| | | BHMsgHead head; |
| | | //TODO error msg. |
| | | if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { |
| | |
| | | } |
| | | void unlock() { mutex_.unlock(); } |
| | | }; |
| | | |
| | | namespace |
| | | { |
| | | typedef int64_t Offset; |
| | | Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } |
| | | void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } |
| | | } // namespace |
| | | |
| | | BOOST_AUTO_TEST_CASE(MutexTest) |
| | | { |
| | | const std::string shm_name("ShmMutex"); |
| | | // ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 10); |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | void *base_ptr = shm.get_address(); |
| | | auto PrintPtr = [&](void *p) { |
| | | printf("addr: %ld, ptr: %p, offset: %ld\n", Addr(p), p, Addr(p) - Addr(base_ptr)); |
| | | }; |
| | | |
| | | printf("base"); |
| | | PrintPtr(base_ptr); |
| | | |
| | | MsgI msg; |
| | | msg.Make("string data"); |
| | | for (int i = 0; i < 10; ++i) { |
| | | int n = msg.AddRef(); |
| | | printf("add %d ref: %d\n", i, n); |
| | | } |
| | | for (int i = 0; i < 10; ++i) { |
| | | int n = msg.Release(); |
| | | printf("release %d, ref : %d\n", i, n); |
| | | } |
| | | std::this_thread::sleep_for(1s); |
| | | msg.Release(); |
| | | |
| | | const std::string mtx_name("test_mutex"); |
| | | const std::string int_name("test_int"); |
| | | auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s); |
| | | |
| | | auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); |
| | | auto pi = shm.find_or_construct<int>(int_name.c_str())(100); |
| | | |
| | | printf("mutetx "); |
| | | PrintPtr(mtx); |
| | | printf("int "); |
| | | PrintPtr(pi); |
| | | |
| | | typedef std::chrono::steady_clock Clock; |
| | | auto Now = []() { return Clock::now().time_since_epoch(); }; |
| | | if (pi) { |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(TimedWaitTest) |
| | | { |
| | | const std::string shm_name("shm_wait"); |
| | | ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024); |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | ShmMsgQueue q(shm, 64); |
| | | for (int i = 0; i < 2; ++i) { |
| | | int ms = i * 100; |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(RefCountTest) |
| | | { |
| | | const std::string shm_name("ShmRefCount"); |
| | | ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024); |
| | | SharedMemory &shm = TestShm(); |
| | | typedef MsgI Msg; |
| | | Msg::BindShm(shm); |
| | | |
| | | MsgI m0(shm, 1000); |
| | | Msg m0(1000); |
| | | BOOST_CHECK(m0.valid()); |
| | | BOOST_CHECK_EQUAL(m0.Count(), 1); |
| | | MsgI m1 = m0; |
| | | Msg m1 = m0; |
| | | BOOST_CHECK(m1.valid()); |
| | | BOOST_CHECK_EQUAL(m1.AddRef(), 2); |
| | | BOOST_CHECK_EQUAL(m0.AddRef(), 3); |
| | | BOOST_CHECK_EQUAL(m0.Release(shm), 2); |
| | | BOOST_CHECK_EQUAL(m0.Release(shm), 1); |
| | | BOOST_CHECK_EQUAL(m1.Release(shm), 0); |
| | | BOOST_CHECK_EQUAL(m0.Release(), 2); |
| | | BOOST_CHECK_EQUAL(m0.Release(), 1); |
| | | BOOST_CHECK_EQUAL(m1.Release(), 0); |
| | | BOOST_CHECK(!m1.valid()); |
| | | } |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(SpeedTest) |
| | | { |
| | | const std::string shm_name("ShmSpeed"); |
| | | ShmRemover auto_remove(shm_name); |
| | | const int mem_size = 1024 * 1024 * 50; |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | MQId id = boost::uuids::random_generator()(); |
| | | const int timeout = 1000; |
| | | const uint32_t data_size = 4000; |
| | | const std::string proc_id = "demo_proc"; |
| | | |
| | | auto Writer = [&](int writer_id, uint64_t n) { |
| | | SharedMemory shm(shm_name, mem_size); |
| | | ShmMsgQueue mq(shm, 64); |
| | | std::string str(data_size, 'a'); |
| | | MsgI msg; |
| | |
| | | body.set_topic("topic"); |
| | | body.set_data(str); |
| | | auto head(InitMsgHead(GetType(body), proc_id)); |
| | | msg.Make(shm, head, body); |
| | | msg.Make(head, body); |
| | | assert(msg.valid()); |
| | | DEFER1(msg.Release(shm);); |
| | | DEFER1(msg.Release();); |
| | | |
| | | for (uint64_t i = 0; i < n; ++i) { |
| | | while (!mq.TrySend(id, msg)) {} |
| | | } |
| | | }; |
| | | auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { |
| | | SharedMemory shm(shm_name, mem_size); |
| | | ShmMsgQueue mq(id, shm, 1000); |
| | | while (*run) { |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (mq.Recv(msg, timeout)) { |
| | | DEFER1(msg.Release(shm)); |
| | | DEFER1(msg.Release()); |
| | | // ok |
| | | } else if (isfork) { |
| | | exit(0); // for forked quit after 1s. |
| | |
| | | } |
| | | }; |
| | | auto State = [&](std::atomic<bool> *run) { |
| | | SharedMemory shm(shm_name, mem_size); |
| | | auto init = shm.get_free_memory(); |
| | | printf("shm init : %ld\n", init); |
| | | while (*run) { |
| | |
| | | // Send Recv Test |
| | | BOOST_AUTO_TEST_CASE(SRTest) |
| | | { |
| | | const std::string shm_name("ShmSendRecv"); |
| | | ShmRemover auto_remove(shm_name); |
| | | const int qlen = 64; |
| | | const size_t msg_length = 100; |
| | | std::string msg_content(msg_length, 'a'); |
| | |
| | | const std::string client_proc_id = "client_proc"; |
| | | const std::string server_proc_id = "server_proc"; |
| | | |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 512); |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | ShmSocket srv(shm, qlen); |
| | |
| | | |
| | | while (!stop) { |
| | | if (srv.SyncRecv(req, req_head, 10)) { |
| | | DEFER1(req.Release(shm)); |
| | | DEFER1(req.Release()); |
| | | |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | | auto &mqid = req_head.route()[0].mq_id(); |
| | |
| | | #include <thread> |
| | | #include <vector> |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | |
| | | SharedMemory &TestShm() |
| | | { |
| | | static SharedMemory shm("utest_0", 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | | template <class A, class B> |
| | | struct IsSameType { |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(PubSubTest) |
| | | { |
| | | const std::string shm_name("ShmPubSub"); |
| | | ShmRemover auto_remove(shm_name); //remove twice? in case of killed? |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | DEFER1(shm.Remove()); |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | int *flag = shm.find_or_construct<int>("flag")(123); |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(ReqRepTest) |
| | | { |
| | | const std::string shm_name("ShmReqRep"); |
| | | ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 512); |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | |
| | | #define UTIL_W8A0OA5U |
| | | |
| | | #include "bh_util.h" |
| | | #include "shm.h" |
| | | #include "topic_node.h" |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | #include <boost/noncopyable.hpp> |
| | |
| | | #include <vector> |
| | | |
| | | using namespace boost::posix_time; |
| | | |
| | | using namespace std::chrono_literals; |
| | | |
| | | template <class D> |
| | |
| | | } |
| | | }; |
| | | |
| | | bhome_shm::SharedMemory &TestShm(); |
| | | |
| | | #endif // end of include guard: UTIL_W8A0OA5U |