From 1b167ec5ad101ac44451381e26cc73ab5d67d2a1 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 26 四月 2021 16:37:52 +0800 Subject: [PATCH] fix socket busy loop; del locked readall; refactor. --- src/socket.h | 4 src/shm_alloc_queue.cpp | 19 + box/center.cpp | 8 src/shm_msg_queue.h | 77 +++++++ src/socket.cpp | 5 src/defs.h | 1 src/sendq.cpp | 10 src/shm_queue.cpp | 87 -------- src/topic_node.cpp | 2 src/shm_msg_queue.cpp | 105 ++++++++++ src/defs.cpp | 8 box/center_main.cc | 8 src/shm_alloc_queue.h | 23 ++ utest/api_test.cpp | 102 ++++++--- src/shm_queue.h | 88 +------- src/sendq.h | 9 16 files changed, 334 insertions(+), 222 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 829a089..5cb9bc3 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -133,12 +133,12 @@ UpdateRegInfo(node); nodes_[ssn] = node; - printf("new ssn %ld\n", ssn); + printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); auto old = online_node_addr_map_.find(head.proc_id()); if (old != online_node_addr_map_.end()) { // old session auto &old_ssn = old->second; nodes_[old_ssn]->state_.PutOffline(offline_time_); - printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); + printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); old_ssn = ssn; } else { online_node_addr_map_.emplace(head.proc_id(), ssn); @@ -201,6 +201,10 @@ for (auto &topic : topics) { service_map_[topic].insert(dest); } + printf("node %s ssn %ld serve %d topics:\n", node->proc_.proc_id().c_str(), *node->addrs_.begin(), topics.size()); + for (auto &topic : topics) { + printf("\t %s\n", topic.c_str()); + } return MakeReply(eSuccess); }); } diff --git a/box/center_main.cc b/box/center_main.cc index fdda2cd..79210fc 100644 --- a/box/center_main.cc +++ b/box/center_main.cc @@ -85,15 +85,15 @@ } // namespace int center_main(int argc, const char *argv[]) { - auto &shm = BHomeShm(); - GlobalInit(shm); - AppArg args(argc, argv); if (args.Has("remove")) { - shm.Remove(); + SharedMemory::Remove(BHomeShmName()); return 0; } + auto &shm = BHomeShm(); + GlobalInit(shm); + InstanceFlag inst(shm, kCenterRunningFlag); if (!inst.TryStartAsFirstInstance()) { printf("another instance is running, exit.\n"); diff --git a/src/defs.cpp b/src/defs.cpp index 0ca82bf..cc6f23b 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -17,7 +17,7 @@ */ #include "defs.h" #include "msg.h" -#include "shm_queue.h" +#include "shm_msg_queue.h" namespace { @@ -35,9 +35,13 @@ } // namespace +std::string BHomeShmName() +{ + return "bhome_default_shm_v0"; +} bhome_shm::SharedMemory &BHomeShm() { - static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); + static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); return shm; } diff --git a/src/defs.h b/src/defs.h index 1c9e663..43375bf 100644 --- a/src/defs.h +++ b/src/defs.h @@ -37,6 +37,7 @@ class SharedMemory; } // namespace bhome_shm +std::string BHomeShmName(); bhome_shm::SharedMemory &BHomeShm(); bool GlobalInit(bhome_shm::SharedMemory &shm); typedef std::string Topic; diff --git a/src/sendq.cpp b/src/sendq.cpp index 5b57d72..c0d5afd 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -16,10 +16,12 @@ * ===================================================================================== */ #include "sendq.h" -#include "shm_queue.h" +#include "shm_msg_queue.h" #include <chrono> -int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr) +using namespace bhome_shm; + +int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; @@ -65,7 +67,7 @@ return nprocessed; } -int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al) +int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { @@ -76,7 +78,7 @@ return nsend; } -bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) +bool SendQ::TrySend(ShmMsgQueue &mq) { std::unique_lock<std::mutex> lock(mutex_out_); size_t nsend = 0; diff --git a/src/sendq.h b/src/sendq.h index 0699df7..0e565d5 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -29,10 +29,7 @@ #include <string> #include <unordered_map> -namespace bhome_shm -{ class ShmMsgQueue; -} // namespace bhome_shm class SendQ { @@ -65,7 +62,7 @@ { AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); } - bool TrySend(bhome_shm::ShmMsgQueue &mq); + bool TrySend(ShmMsgQueue &mq); // bool empty() const { return store_.empty(); } private: @@ -88,8 +85,8 @@ typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; - int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr); - int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr); + int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); + int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); std::mutex mutex_in_; std::mutex mutex_out_; diff --git a/src/shm_alloc_queue.cpp b/src/shm_alloc_queue.cpp new file mode 100644 index 0000000..7ea5213 --- /dev/null +++ b/src/shm_alloc_queue.cpp @@ -0,0 +1,19 @@ +/* + * ===================================================================================== + * + * Filename: shm_alloc_queue.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�26鏃� 16鏃�24鍒�25绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "shm_alloc_queue.h" + diff --git a/src/shm_alloc_queue.h b/src/shm_alloc_queue.h new file mode 100644 index 0000000..73a184f --- /dev/null +++ b/src/shm_alloc_queue.h @@ -0,0 +1,23 @@ +/* + * ===================================================================================== + * + * Filename: shm_alloc_queue.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�26鏃� 16鏃�24鍒�40绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef SHM_ALLOC_QUEUE_EQBLM9FZ +#define SHM_ALLOC_QUEUE_EQBLM9FZ + + + +#endif // end of include guard: SHM_ALLOC_QUEUE_EQBLM9FZ diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp new file mode 100644 index 0000000..ae019bf --- /dev/null +++ b/src/shm_msg_queue.cpp @@ -0,0 +1,105 @@ +/* + * ===================================================================================== + * + * Filename: shm_msg_queue.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�26鏃� 16鏃�25鍒�05绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "shm_msg_queue.h" + +using namespace bhome_msg; +using namespace boost::interprocess; + +namespace +{ +std::string MsgQIdToName(const ShmMsgQueue::MQId id) +{ + char buf[40] = "mqOx"; + int n = sprintf(buf + 4, "%lx", id); + return std::string(buf, n + 4); +} + +const int AdjustMQLength(const int len) +{ + const int kMaxLength = 10000; + const int kDefaultLen = 12; + if (len <= 0) { + return kDefaultLen; + } else if (len < kMaxLength) { + return len; + } else { + return kMaxLength; + } +} + +} // namespace + +ShmMsgQueue::MQId ShmMsgQueue::NewId() +{ + static auto &id = GetData(); + return ++id; +} +// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 +ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : + id_(id), + queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) +{ +} + +ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : + id_(NewId()), + queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) +{ + if (!queue_.IsOk()) { + throw("error create msgq " + std::to_string(id_)); + } +} + +ShmMsgQueue::~ShmMsgQueue() {} + +bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) +{ + Queue *q = Find(shm, id); + if (q) { + MsgI msg; + while (q->TryRead(msg)) { + msg.Release(); + } + } + return Shmq::Remove(shm, MsgQIdToName(id)); +} + +ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id) +{ + return Shmq::Find(shm, MsgQIdToName(remote_id)); +} + +bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) +{ + Queue *remote = Find(shm, remote_id); + if (remote) { + if (onsend) { + return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + } else { + return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); + } + } else { + // SetLestError(eNotFound); + return false; + } +} + +// Test shows that in the 2 cases: +// 1) build msg first, then find remote queue; +// 2) find remote queue first, then build msg; +// 1 is about 50% faster than 2, maybe cache related. diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h new file mode 100644 index 0000000..d7b33af --- /dev/null +++ b/src/shm_msg_queue.h @@ -0,0 +1,77 @@ +/* + * ===================================================================================== + * + * Filename: shm_msg_queue.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�26鏃� 16鏃�25鍒�21绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef SHM_MSG_QUEUE_D847TQXH +#define SHM_MSG_QUEUE_D847TQXH + +#include "msg.h" +#include "shm_queue.h" + +using namespace bhome_shm; +using namespace bhome_msg; + +class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> +{ + typedef ShmObject<SharedQueue<MsgI>> Shmq; + typedef Shmq::ShmType ShmType; + typedef Shmq::Data Queue; + typedef std::function<void()> OnSend; + +public: + typedef uint64_t MQId; + + static MQId NewId(); + + ShmMsgQueue(const MQId id, ShmType &segment, const int len); + ShmMsgQueue(ShmType &segment, const int len); + ~ShmMsgQueue(); + static bool Remove(SharedMemory &shm, const MQId id); + MQId Id() const { return id_; } + ShmType &shm() const { return queue_.shm(); } + + bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } + bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } + static Queue *Find(SharedMemory &shm, const MQId remote_id); + static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); + template <class Iter> + static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) + { + Queue *remote = Find(shm, remote_id); + if (remote) { + if (onsend) { + return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + } else { + return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); + } + } else { + // SetLestError(eNotFound); + return 0; + } + } + + template <class... Rest> + bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } + template <class... Rest> + int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } + +private: + MQId id_; + Shmq &queue() { return queue_; } + Shmq queue_; +}; + +#endif // end of include guard: SHM_MSG_QUEUE_D847TQXH diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index face18b..86f0d91 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -21,91 +21,4 @@ namespace bhome_shm { -using namespace bhome_msg; -using namespace boost::interprocess; - -namespace -{ -std::string MsgQIdToName(const ShmMsgQueue::MQId id) -{ - char buf[40] = "mqOx"; - int n = sprintf(buf + 4, "%lx", id); - return std::string(buf, n + 4); -} - -const int AdjustMQLength(const int len) -{ - const int kMaxLength = 10000; - const int kDefaultLen = 12; - if (len <= 0) { - return kDefaultLen; - } else if (len < kMaxLength) { - return len; - } else { - return kMaxLength; - } -} - -} // namespace - -ShmMsgQueue::MQId ShmMsgQueue::NewId() -{ - static auto &id = GetData(); - return ++id; -} -// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 -ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : - id_(id), - queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) -{ -} - -ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : - id_(NewId()), - queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) -{ - if (!queue_.IsOk()) { - throw("error create msgq " + std::to_string(id_)); - } -} - -ShmMsgQueue::~ShmMsgQueue() {} - -bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) -{ - Queue *q = Find(shm, id); - if (q) { - MsgI msg; - while (q->TryRead(msg)) { - msg.Release(); - } - } - return Shmq::Remove(shm, MsgQIdToName(id)); -} - -ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id) -{ - return Shmq::Find(shm, MsgQIdToName(remote_id)); -} - -bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) -{ - Queue *remote = Find(shm, remote_id); - if (remote) { - if (onsend) { - return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); - } else { - return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); - } - } else { - // SetLestError(eNotFound); - return false; - } -} - -// Test shows that in the 2 cases: -// 1) build msg first, then find remote queue; -// 2) find remote queue first, then build msg; -// 1 is about 50% faster than 2, maybe cache related. - } // namespace bhome_shm diff --git a/src/shm_queue.h b/src/shm_queue.h index 4f576a7..5dbda96 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -19,7 +19,6 @@ #ifndef SHM_QUEUE_JE0OEUP3 #define SHM_QUEUE_JE0OEUP3 -#include "msg.h" #include "shm.h" #include <atomic> #include <boost/circular_buffer.hpp> @@ -59,30 +58,21 @@ return [this](Guard &lock) { return !this->empty(); }; } - template <class Pred, class OnData> - int ReadAllOnCond(Pred const &pred, OnData const &onData) - { - Guard lock(this->mutex()); - int n = 0; - while (pred(lock)) { - ++n; - onData(this->front()); - this->pop_front(); - this->cond_write_.notify_one(); - } - return n; - } - template <class Pred> bool ReadOnCond(D &buf, Pred const &pred) { - int flag = 0; - auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); }; - auto onData = [&buf](D &d) { - using std::swap; - swap(buf, d); + auto Read = [&]() { + Guard lock(this->mutex()); + if (pred(lock)) { + using std::swap; + swap(buf, Super::front()); + Super::pop_front(); + return true; + } else { + return false; + } }; - return ReadAllOnCond(only_once, onData); + return Read() ? (this->cond_write_.notify_one(), true) : false; } template <class Iter, class Pred, class OnWrite> @@ -94,7 +84,7 @@ Guard lock(mutex()); while (pred(lock)) { onWrite(*begin); - this->push_back(*begin); + Super::push_back(*begin); ++n; cond_read_.notify_one(); if (++begin == end) { @@ -130,60 +120,6 @@ bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); } bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); } -}; - -using namespace bhome_msg; - -class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> -{ - typedef ShmObject<SharedQueue<MsgI>> Shmq; - typedef Shmq::ShmType ShmType; - typedef Shmq::Data Queue; - typedef std::function<void()> OnSend; - -public: - typedef uint64_t MQId; - - static MQId NewId(); - - ShmMsgQueue(const MQId id, ShmType &segment, const int len); - ShmMsgQueue(ShmType &segment, const int len); - ~ShmMsgQueue(); - static bool Remove(SharedMemory &shm, const MQId id); - MQId Id() const { return id_; } - ShmType &shm() const { return queue_.shm(); } - - bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } - bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } - template <class OnData> - int TryRecvAll(OnData const &onData) { return queue_.data()->TryReadAll(onData); } - static Queue *Find(SharedMemory &shm, const MQId remote_id); - static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); - template <class Iter> - static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) - { - Queue *remote = Find(shm, remote_id); - if (remote) { - if (onsend) { - return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); - } else { - return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); - } - } else { - // SetLestError(eNotFound); - return 0; - } - } - - template <class... Rest> - bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } - template <class... Rest> - int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } - -private: - MQId id_; - Shmq &queue() { return queue_; } - Shmq queue_; }; } // namespace bhome_shm diff --git a/src/socket.cpp b/src/socket.cpp index e471633..2127260 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -65,7 +65,8 @@ onRecvWithPerMsgCB(*this, imsg, head); } }; - return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs. + MsgI imsg; + return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; }; try { @@ -74,6 +75,8 @@ if (onIdle) { onIdle(*this); } if (!more_to_send && !more_to_recv) { std::this_thread::yield(); + using namespace std::chrono_literals; + std::this_thread::sleep_for(10000ns); } } catch (...) { } diff --git a/src/socket.h b/src/socket.h index cd6bfee..a5dd72c 100644 --- a/src/socket.h +++ b/src/socket.h @@ -22,7 +22,7 @@ #include "bh_util.h" #include "defs.h" #include "sendq.h" -#include "shm_queue.h" +#include "shm_msg_queue.h" #include <atomic> #include <boost/noncopyable.hpp> #include <condition_variable> @@ -37,7 +37,7 @@ { protected: - typedef bhome_shm::ShmMsgQueue Queue; + typedef ShmMsgQueue Queue; public: typedef ShmMsgQueue::MQId MQId; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 1131816..d274c4b 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -389,7 +389,7 @@ BHAddress addr; if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { - printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); + // printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); AddRoute(head, sock.id()); head.set_topic(request.topic()); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index debe8ad..6682aaf 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -18,6 +18,7 @@ #include "bh_api.h" #include "util.h" #include <atomic> +#include <boost/lockfree/queue.hpp> using namespace bhome_msg; @@ -49,7 +50,6 @@ static MsgStatus st; return st; } -} // namespace void SubRecvProc(const void *proc_id, const int proc_id_len, @@ -59,7 +59,7 @@ std::string proc((const char *) proc_id, proc_id_len); MsgPublish pub; pub.ParseFromArray(data, data_len); - // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); + printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); } void ServerProc(const void *proc_id, @@ -98,8 +98,8 @@ class TLMutex { - // typedef boost::interprocess::interprocess_mutex MutexT; - typedef CasMutex MutexT; + typedef boost::interprocess::interprocess_mutex MutexT; + // typedef CasMutex MutexT; // typedef std::mutex MutexT; typedef std::chrono::steady_clock Clock; typedef Clock::duration Duration; @@ -108,6 +108,7 @@ const Duration limit_; std::atomic<Duration> last_lock_time_; MutexT mutex_; + bool Expired(const Duration diff) { return diff > limit_; } public: struct Status { @@ -127,16 +128,18 @@ { if (mutex_.try_lock()) { auto old_time = last_lock_time_.load(); - if (Now() - old_time > limit_) { - return last_lock_time_.compare_exchange_strong(old_time, Now()); + auto cur = Now(); + if (Expired(cur - old_time)) { + return last_lock_time_.compare_exchange_strong(old_time, cur); } else { last_lock_time_.store(Now()); return true; } } else { auto old_time = last_lock_time_.load(); - if (Now() - old_time > limit_) { - return last_lock_time_.compare_exchange_strong(old_time, Now()); + auto cur = Now(); + if (Expired(cur - old_time)) { + return last_lock_time_.compare_exchange_strong(old_time, cur); } else { return false; } @@ -154,55 +157,88 @@ void unlock() { auto old_time = last_lock_time_.load(); - if (Now() - old_time > limit_) { - } else { - if (last_lock_time_.compare_exchange_strong(old_time, Now())) { + auto cur = Now(); + if (!Expired(cur - old_time)) { + if (last_lock_time_.compare_exchange_strong(old_time, cur)) { mutex_.unlock(); } } } }; -namespace -{ -typedef int64_t Offset; -Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } -void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } -} // namespace - +//robust attr does NOT work, maybe os does not support it. class RobustMutex { public: RobustMutex() { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setrobust(&attr, 1); - pthread_mutex_init(mtx(), &attr); - if (!valid()) { + pthread_mutexattr_t mutex_attr; + auto attr = [&]() { return &mutex_attr; }; + int r = pthread_mutexattr_init(attr()); + r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED); + r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP); + r |= pthread_mutex_init(mtx(), attr()); + int rob = 0; + pthread_mutexattr_getrobust_np(attr(), &rob); + int shared = 0; + pthread_mutexattr_getpshared(attr(), &shared); + printf("robust : %d, shared : %d\n", rob, shared); + r |= pthread_mutexattr_destroy(attr()); + if (r) { throw("init mutex error."); } } + ~RobustMutex() + { + pthread_mutex_destroy(mtx()); + } + +public: + void lock() { Lock(); } + bool try_lock() + { + int r = TryLock(); + printf("TryLock ret: %d\n", r); + return r == 0; + } + + void unlock() { Unlock(); } + + // private: int TryLock() { return pthread_mutex_trylock(mtx()); } int Lock() { return pthread_mutex_lock(mtx()); } int Unlock() { return pthread_mutex_unlock(mtx()); } - bool valid() const { return false; } private: pthread_mutex_t *mtx() { return &mutex_; } pthread_mutex_t mutex_; }; +class LockFreeQueue +{ + typedef int64_t Data; + typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue; + void push_back(Data d) { queue_.push(d); } + +private: + LFQueue queue_; +}; + +} // namespace + BOOST_AUTO_TEST_CASE(MutexTest) { SharedMemory &shm = TestShm(); + // shm.Remove(); + // return; GlobalInit(shm); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.FindOrCreate<Mutex>(mtx_name); + auto mtx = shm.FindOrCreate<TLMutex>(mtx_name); auto pi = shm.FindOrCreate<int>(int_name, 100); + std::mutex m; typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { @@ -334,7 +370,6 @@ printf("subscribe topic : %s\n", r ? "ok" : "failed"); } - // BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); auto ServerLoop = [&](std::atomic<bool> *run) { while (*run) { void *proc_id = 0; @@ -446,27 +481,20 @@ std::atomic<bool> run(true); + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); ThreadManager threads; boost::timer::auto_cpu_timer timer; threads.Launch(hb, &run); - threads.Launch(ServerLoop, &run); threads.Launch(showStatus, &run); int ncli = 10; - const uint64_t nreq = 1000 * 1; + const uint64_t nreq = 1000 * 10; for (int i = 0; i < ncli; ++i) { - // threads.Launch(asyncRequest, nreq); + threads.Launch(asyncRequest, nreq); } - - for (int i = 0; i < 10; ++i) { - SyncRequest(i); - } - // run.store(false); - // server_thread.join(); - // return; int same = 0; int64_t last = 0; - while (last < nreq * ncli && same < 1) { + while (last < nreq * ncli && same < 2) { Sleep(1s, false); auto cur = Status().nreply_.load(); if (last == cur) { -- Gitblit v1.8.0