change refcount to AddRef,Release interface.
| | |
| | | |
| | | } |
| | | |
| | | void Msg::FreeFrom(SharedMemory &shm) |
| | | int Msg::Release(SharedMemory &shm) const |
| | | { |
| | | if (IsCounted()) { |
| | | const int n = count_->Dec(); |
| | | if (n != 0) { |
| | | return n; |
| | | } |
| | | } |
| | | // free data |
| | | shm.Dealloc(ptr_); |
| | | shm.Delete(count_); |
| | | return 0; |
| | | } |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | public: |
| | | Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {} |
| | | void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); } |
| | | |
| | | // ~Msg() { RemoveRef(); } |
| | | // Msg(const Msg &a):ptr_(a.ptr_), count_(a.count_) { AddRef(); } |
| | | // Msg(Msg &&a):ptr_(a.ptr_), count_(a.count_) { a.ptr_ = 0; a.count_ = 0; } |
| | | // Msg & operator = (const Msg &a) { Msg(a).swap(*this); } |
| | | // Msg & operator = (Msg &&a) { Msg(std::move(a)).swap(*this); } |
| | | |
| | | template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); } |
| | | |
| | | // AddRef and Release works for both counted and not counted msg. |
| | | int AddRef() const { return IsCounted() ? count_->Inc() : 1; } |
| | | int RemoveRef() const{ return IsCounted() ? count_->Dec() : 0; } |
| | | int Release(SharedMemory &shm) const; |
| | | |
| | | int Count() const{ return IsCounted() ? count_->Get() : 1; } |
| | | bool IsCounted() const { return static_cast<bool>(count_); } |
| | | bool Build(SharedMemory &shm, const MQId &src_id, const void *p, const size_t size, const bool refcount); |
| | | void FreeFrom(SharedMemory &shm); |
| | | }; |
| | | |
| | | inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); } |
| | |
| | | bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms) |
| | | { |
| | | Queue *remote = find(MsgQIdToName(remote_id)); |
| | | |
| | | if(!remote) { |
| | | return false; |
| | | } |
| | | msg.AddRef(); |
| | | if (remote->Write(msg, timeout_ms)) { |
| | | return true; |
| | | } else { |
| | | msg.RemoveRef(); |
| | | return false; |
| | | } |
| | | return remote && remote->Write(msg, timeout_ms, [&](){msg.AddRef();}); |
| | | } |
| | | |
| | | bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms) |
| | |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | Msg msg; |
| | | if (msg.Build(shm(), Id(), data, size, false)) { |
| | | if (Send(remote_id, msg, timeout_ms)) { |
| | | if(msg.Build(shm(), Id(), data, size, false)) { |
| | | if(Send(remote_id, msg, timeout_ms)) { |
| | | return true; |
| | | } else { |
| | | if (msg.RemoveRef() == 0) { // works for both refcounted and not counted. |
| | | msg.FreeFrom(shm()); |
| | | } |
| | | msg.Release(shm()); |
| | | } |
| | | } |
| | | return false; |
| | |
| | | { |
| | | Msg msg; |
| | | if (Read(msg, timeout_ms)) { |
| | | DEFER1(if (msg.RemoveRef() == 0) { msg.FreeFrom(shm()); }); |
| | | DEFER1(msg.Release(shm());); |
| | | |
| | | auto ptr = msg.get<char>(); |
| | | if (ptr) { |
| | |
| | | using Super::size; |
| | | using Super::capacity; |
| | | const MQId &Id() const { return id_; } |
| | | bool Write(const D &buf, const int timeout_ms) { |
| | | template <class OnWrite> |
| | | bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { |
| | | Guard lock(mutex()); |
| | | if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) { |
| | | onWrite(); |
| | | this->push_back(buf); |
| | | cond_read_.notify_one(); |
| | | return true; |
| | |
| | | return false; |
| | | } |
| | | } |
| | | bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](){}); } |
| | | |
| | | bool Read(D &buf, const int timeout_ms){ |
| | | Guard lock(mutex()); |
| | |
| | | bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms); |
| | | const MQId &Id() const { return data()->Id(); } |
| | | bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms); |
| | | bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); } |
| | | }; |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | #include <boost/date_time/microsec_time_clock.hpp> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <boost/uuid/uuid_io.hpp> |
| | | #include "shm_queue.h" |
| | | #include "bh_util.h" |
| | | |
| | |
| | | BOOST_CHECK(m1.IsCounted()); |
| | | BOOST_CHECK_EQUAL(m1.AddRef(), 2); |
| | | BOOST_CHECK_EQUAL(m0.AddRef(), 3); |
| | | BOOST_CHECK_EQUAL(m0.RemoveRef(), 2); |
| | | BOOST_CHECK_EQUAL(m0.RemoveRef(), 1); |
| | | BOOST_CHECK_EQUAL(m1.RemoveRef(), 0); |
| | | BOOST_CHECK_EQUAL(m1.Count(), 0); |
| | | BOOST_CHECK_EQUAL(m0.Release(shm), 2); |
| | | BOOST_CHECK_EQUAL(m0.Release(shm), 1); |
| | | BOOST_CHECK_EQUAL(m1.Release(shm), 0); |
| | | BOOST_CHECK_THROW(m1.Count(), std::exception); |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(MsgHeaderTest) |
| | |
| | | auto Client = [&](int tid, int nmsg){ |
| | | for (int i = 0; i < nmsg; ++i) { |
| | | auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); }; |
| | | auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); }; |
| | | // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); }; |
| | | |
| | | if (!Send()) { |
| | | printf("********** client send error.\n"); |
| | |
| | | if (srv.Recv(src_id, data, size, 100)) { |
| | | DEFER1(free(data)); |
| | | auto Send = [&](){ return srv.Send(src_id, data, size, 100); }; |
| | | auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); }; |
| | | // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); }; |
| | | |
| | | if (SendRefCounted()) { |
| | | if (Send()) { |
| | | if (size != msg_content.size()) { |
| | | BOOST_TEST(false, "server msg size error"); |
| | | } |
| | |
| | | ThreadManager clients, servers; |
| | | for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } |
| | | int ncli = 100*1; |
| | | uint64_t nmsg = 100*100; |
| | | uint64_t nmsg = 100*100*10; |
| | | printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); |
| | | for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } |
| | | clients.WaitAll(); |
| | |
| | | BOOST_CHECK_EQUAL(request.Count(), 1); |
| | | BOOST_CHECK(reply.IsCounted()); |
| | | BOOST_CHECK_EQUAL(reply.Count(), 1); |
| | | if (request.RemoveRef() == 0) { |
| | | BOOST_CHECK_EQUAL(reply.Count(), 0); |
| | | request.FreeFrom(shm); |
| | | } |
| | | request.Release(shm); |
| | | BOOST_CHECK_THROW(request.Count(), std::exception); |
| | | BOOST_CHECK_THROW(reply.Count(), std::exception); |
| | | // BOOST_CHECK_THROW(reply.Count(), int); |
| | | } |
| | | |
| | | inline int MyMin(int a, int b) { |
| | | printf("MyMin\n"); |
| | | return a < b ? a : b; |
| | | } |
| | | int test_main(int argc, char *argv[]) |
| | | { |
| | | printf("test main\n"); |
| | | int a = 0; |
| | | int b = 0; |
| | | BOOST_CHECK_EQUAL(a, b); |
| | | int n = MyMin(4,6); |
| | | for (int i = 0; i < n; ++i) { |
| | | printf("i = %d\n", i); |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |