fix socket busy loop; del locked readall; refactor.
| | |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | |
| | | printf("new ssn %ld\n", ssn); |
| | | printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | | if (old != online_node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | nodes_[old_ssn]->state_.PutOffline(offline_time_); |
| | | printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); |
| | | printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); |
| | | old_ssn = ssn; |
| | | } else { |
| | | online_node_addr_map_.emplace(head.proc_id(), ssn); |
| | |
| | | for (auto &topic : topics) { |
| | | service_map_[topic].insert(dest); |
| | | } |
| | | printf("node %s ssn %ld serve %d topics:\n", node->proc_.proc_id().c_str(), *node->addrs_.begin(), topics.size()); |
| | | for (auto &topic : topics) { |
| | | printf("\t %s\n", topic.c_str()); |
| | | } |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | |
| | | } // namespace |
| | | int center_main(int argc, const char *argv[]) |
| | | { |
| | | auto &shm = BHomeShm(); |
| | | GlobalInit(shm); |
| | | |
| | | AppArg args(argc, argv); |
| | | if (args.Has("remove")) { |
| | | shm.Remove(); |
| | | SharedMemory::Remove(BHomeShmName()); |
| | | return 0; |
| | | } |
| | | |
| | | auto &shm = BHomeShm(); |
| | | GlobalInit(shm); |
| | | |
| | | InstanceFlag inst(shm, kCenterRunningFlag); |
| | | if (!inst.TryStartAsFirstInstance()) { |
| | | printf("another instance is running, exit.\n"); |
| | |
| | | */ |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_msg_queue.h" |
| | | |
| | | namespace |
| | | { |
| | |
| | | |
| | | } // namespace |
| | | |
| | | std::string BHomeShmName() |
| | | { |
| | | return "bhome_default_shm_v0"; |
| | | } |
| | | bhome_shm::SharedMemory &BHomeShm() |
| | | { |
| | | static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); |
| | | static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | |
| | | class SharedMemory; |
| | | } // namespace bhome_shm |
| | | |
| | | std::string BHomeShmName(); |
| | | bhome_shm::SharedMemory &BHomeShm(); |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm); |
| | | typedef std::string Topic; |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "sendq.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_msg_queue.h" |
| | | #include <chrono> |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr) |
| | | using namespace bhome_shm; |
| | | |
| | | int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) |
| | | { |
| | | auto FirstNotExpired = [](Array &l) { |
| | | auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; |
| | |
| | | return nprocessed; |
| | | } |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al) |
| | | int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) |
| | | { |
| | | int nsend = 0; |
| | | auto AllSent = [&](Array &arr) { |
| | |
| | | return nsend; |
| | | } |
| | | |
| | | bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) |
| | | bool SendQ::TrySend(ShmMsgQueue &mq) |
| | | { |
| | | std::unique_lock<std::mutex> lock(mutex_out_); |
| | | size_t nsend = 0; |
| | |
| | | #include <string> |
| | | #include <unordered_map> |
| | | |
| | | namespace bhome_shm |
| | | { |
| | | class ShmMsgQueue; |
| | | } // namespace bhome_shm |
| | | |
| | | class SendQ |
| | | { |
| | |
| | | { |
| | | AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); |
| | | } |
| | | bool TrySend(bhome_shm::ShmMsgQueue &mq); |
| | | bool TrySend(ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | | |
| | | private: |
| | |
| | | typedef std::list<Array> ArrayList; |
| | | typedef std::unordered_map<Remote, ArrayList> Store; |
| | | |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr); |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr); |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); |
| | | |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: shm_alloc_queue.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月26日 16时24分25秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "shm_alloc_queue.h" |
| | | |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: shm_alloc_queue.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月26日 16时24分40秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef SHM_ALLOC_QUEUE_EQBLM9FZ |
| | | #define SHM_ALLOC_QUEUE_EQBLM9FZ |
| | | |
| | | |
| | | |
| | | #endif // end of include guard: SHM_ALLOC_QUEUE_EQBLM9FZ |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: shm_msg_queue.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月26日 16时25分05秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "shm_msg_queue.h" |
| | | |
| | | using namespace bhome_msg; |
| | | using namespace boost::interprocess; |
| | | |
| | | namespace |
| | | { |
| | | std::string MsgQIdToName(const ShmMsgQueue::MQId id) |
| | | { |
| | | char buf[40] = "mqOx"; |
| | | int n = sprintf(buf + 4, "%lx", id); |
| | | return std::string(buf, n + 4); |
| | | } |
| | | |
| | | const int AdjustMQLength(const int len) |
| | | { |
| | | const int kMaxLength = 10000; |
| | | const int kDefaultLen = 12; |
| | | if (len <= 0) { |
| | | return kDefaultLen; |
| | | } else if (len < kMaxLength) { |
| | | return len; |
| | | } else { |
| | | return kMaxLength; |
| | | } |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData(); |
| | | return ++id; |
| | | } |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | id_(NewId()), |
| | | queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create msgq " + std::to_string(id_)); |
| | | } |
| | | } |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | | MsgI msg; |
| | | while (q->TryRead(msg)) { |
| | | msg.Release(); |
| | | } |
| | | } |
| | | return Shmq::Remove(shm, MsgQIdToName(id)); |
| | | } |
| | | |
| | | ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id) |
| | | { |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | // Test shows that in the 2 cases: |
| | | // 1) build msg first, then find remote queue; |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: shm_msg_queue.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月26日 16时25分21秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef SHM_MSG_QUEUE_D847TQXH |
| | | #define SHM_MSG_QUEUE_D847TQXH |
| | | |
| | | #include "msg.h" |
| | | #include "shm_queue.h" |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | |
| | | class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | { |
| | | typedef ShmObject<SharedQueue<MsgI>> Shmq; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef Shmq::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | |
| | | public: |
| | | typedef uint64_t MQId; |
| | | |
| | | static MQId NewId(); |
| | | |
| | | ShmMsgQueue(const MQId id, ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const int len); |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(SharedMemory &shm, const MQId id); |
| | | MQId Id() const { return id_; } |
| | | ShmType &shm() const { return queue_.shm(); } |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | template <class Iter> |
| | | static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | template <class... Rest> |
| | | bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } |
| | | |
| | | private: |
| | | MQId id_; |
| | | Shmq &queue() { return queue_; } |
| | | Shmq queue_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SHM_MSG_QUEUE_D847TQXH |
| | |
| | | |
| | | namespace bhome_shm |
| | | { |
| | | using namespace bhome_msg; |
| | | using namespace boost::interprocess; |
| | | |
| | | namespace |
| | | { |
| | | std::string MsgQIdToName(const ShmMsgQueue::MQId id) |
| | | { |
| | | char buf[40] = "mqOx"; |
| | | int n = sprintf(buf + 4, "%lx", id); |
| | | return std::string(buf, n + 4); |
| | | } |
| | | |
| | | const int AdjustMQLength(const int len) |
| | | { |
| | | const int kMaxLength = 10000; |
| | | const int kDefaultLen = 12; |
| | | if (len <= 0) { |
| | | return kDefaultLen; |
| | | } else if (len < kMaxLength) { |
| | | return len; |
| | | } else { |
| | | return kMaxLength; |
| | | } |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData(); |
| | | return ++id; |
| | | } |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | id_(NewId()), |
| | | queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create msgq " + std::to_string(id_)); |
| | | } |
| | | } |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | | MsgI msg; |
| | | while (q->TryRead(msg)) { |
| | | msg.Release(); |
| | | } |
| | | } |
| | | return Shmq::Remove(shm, MsgQIdToName(id)); |
| | | } |
| | | |
| | | ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id) |
| | | { |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | // Test shows that in the 2 cases: |
| | | // 1) build msg first, then find remote queue; |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | #ifndef SHM_QUEUE_JE0OEUP3 |
| | | #define SHM_QUEUE_JE0OEUP3 |
| | | |
| | | #include "msg.h" |
| | | #include "shm.h" |
| | | #include <atomic> |
| | | #include <boost/circular_buffer.hpp> |
| | |
| | | return [this](Guard &lock) { return !this->empty(); }; |
| | | } |
| | | |
| | | template <class Pred, class OnData> |
| | | int ReadAllOnCond(Pred const &pred, OnData const &onData) |
| | | { |
| | | Guard lock(this->mutex()); |
| | | int n = 0; |
| | | while (pred(lock)) { |
| | | ++n; |
| | | onData(this->front()); |
| | | this->pop_front(); |
| | | this->cond_write_.notify_one(); |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | template <class Pred> |
| | | bool ReadOnCond(D &buf, Pred const &pred) |
| | | { |
| | | int flag = 0; |
| | | auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); }; |
| | | auto onData = [&buf](D &d) { |
| | | auto Read = [&]() { |
| | | Guard lock(this->mutex()); |
| | | if (pred(lock)) { |
| | | using std::swap; |
| | | swap(buf, d); |
| | | swap(buf, Super::front()); |
| | | Super::pop_front(); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | return ReadAllOnCond(only_once, onData); |
| | | return Read() ? (this->cond_write_.notify_one(), true) : false; |
| | | } |
| | | |
| | | template <class Iter, class Pred, class OnWrite> |
| | |
| | | Guard lock(mutex()); |
| | | while (pred(lock)) { |
| | | onWrite(*begin); |
| | | this->push_back(*begin); |
| | | Super::push_back(*begin); |
| | | ++n; |
| | | cond_read_.notify_one(); |
| | | if (++begin == end) { |
| | |
| | | |
| | | bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); } |
| | | bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); } |
| | | }; |
| | | |
| | | using namespace bhome_msg; |
| | | |
| | | class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | { |
| | | typedef ShmObject<SharedQueue<MsgI>> Shmq; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef Shmq::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | |
| | | public: |
| | | typedef uint64_t MQId; |
| | | |
| | | static MQId NewId(); |
| | | |
| | | ShmMsgQueue(const MQId id, ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const int len); |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(SharedMemory &shm, const MQId id); |
| | | MQId Id() const { return id_; } |
| | | ShmType &shm() const { return queue_.shm(); } |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } |
| | | template <class OnData> |
| | | int TryRecvAll(OnData const &onData) { return queue_.data()->TryReadAll(onData); } |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | template <class Iter> |
| | | static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | template <class... Rest> |
| | | bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } |
| | | |
| | | private: |
| | | MQId id_; |
| | | Shmq &queue() { return queue_; } |
| | | Shmq queue_; |
| | | }; |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | | } |
| | | }; |
| | | return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs. |
| | | MsgI imsg; |
| | | return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; |
| | | }; |
| | | |
| | | try { |
| | |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | std::this_thread::yield(); |
| | | using namespace std::chrono_literals; |
| | | std::this_thread::sleep_for(10000ns); |
| | | } |
| | | } catch (...) { |
| | | } |
| | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "sendq.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_msg_queue.h" |
| | | #include <atomic> |
| | | #include <boost/noncopyable.hpp> |
| | | #include <condition_variable> |
| | |
| | | { |
| | | |
| | | protected: |
| | | typedef bhome_shm::ShmMsgQueue Queue; |
| | | typedef ShmMsgQueue Queue; |
| | | |
| | | public: |
| | | typedef ShmMsgQueue::MQId MQId; |
| | |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); |
| | | // printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(request.topic()); |
| | |
| | | #include "bh_api.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/lockfree/queue.hpp> |
| | | |
| | | using namespace bhome_msg; |
| | | |
| | |
| | | static MsgStatus st; |
| | | return st; |
| | | } |
| | | } // namespace |
| | | |
| | | void SubRecvProc(const void *proc_id, |
| | | const int proc_id_len, |
| | |
| | | std::string proc((const char *) proc_id, proc_id_len); |
| | | MsgPublish pub; |
| | | pub.ParseFromArray(data, data_len); |
| | | // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); |
| | | printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); |
| | | } |
| | | |
| | | void ServerProc(const void *proc_id, |
| | |
| | | |
| | | class TLMutex |
| | | { |
| | | // typedef boost::interprocess::interprocess_mutex MutexT; |
| | | typedef CasMutex MutexT; |
| | | typedef boost::interprocess::interprocess_mutex MutexT; |
| | | // typedef CasMutex MutexT; |
| | | // typedef std::mutex MutexT; |
| | | typedef std::chrono::steady_clock Clock; |
| | | typedef Clock::duration Duration; |
| | |
| | | const Duration limit_; |
| | | std::atomic<Duration> last_lock_time_; |
| | | MutexT mutex_; |
| | | bool Expired(const Duration diff) { return diff > limit_; } |
| | | |
| | | public: |
| | | struct Status { |
| | |
| | | { |
| | | if (mutex_.try_lock()) { |
| | | auto old_time = last_lock_time_.load(); |
| | | if (Now() - old_time > limit_) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, Now()); |
| | | auto cur = Now(); |
| | | if (Expired(cur - old_time)) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, cur); |
| | | } else { |
| | | last_lock_time_.store(Now()); |
| | | return true; |
| | | } |
| | | } else { |
| | | auto old_time = last_lock_time_.load(); |
| | | if (Now() - old_time > limit_) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, Now()); |
| | | auto cur = Now(); |
| | | if (Expired(cur - old_time)) { |
| | | return last_lock_time_.compare_exchange_strong(old_time, cur); |
| | | } else { |
| | | return false; |
| | | } |
| | |
| | | void unlock() |
| | | { |
| | | auto old_time = last_lock_time_.load(); |
| | | if (Now() - old_time > limit_) { |
| | | } else { |
| | | if (last_lock_time_.compare_exchange_strong(old_time, Now())) { |
| | | auto cur = Now(); |
| | | if (!Expired(cur - old_time)) { |
| | | if (last_lock_time_.compare_exchange_strong(old_time, cur)) { |
| | | mutex_.unlock(); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | |
| | | namespace |
| | | { |
| | | typedef int64_t Offset; |
| | | Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } |
| | | void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } |
| | | } // namespace |
| | | |
| | | //robust attr does NOT work, maybe os does not support it. |
| | | class RobustMutex |
| | | { |
| | | public: |
| | | RobustMutex() |
| | | { |
| | | pthread_mutexattr_t attr; |
| | | pthread_mutexattr_init(&attr); |
| | | pthread_mutexattr_setrobust(&attr, 1); |
| | | pthread_mutex_init(mtx(), &attr); |
| | | if (!valid()) { |
| | | pthread_mutexattr_t mutex_attr; |
| | | auto attr = [&]() { return &mutex_attr; }; |
| | | int r = pthread_mutexattr_init(attr()); |
| | | r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED); |
| | | r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP); |
| | | r |= pthread_mutex_init(mtx(), attr()); |
| | | int rob = 0; |
| | | pthread_mutexattr_getrobust_np(attr(), &rob); |
| | | int shared = 0; |
| | | pthread_mutexattr_getpshared(attr(), &shared); |
| | | printf("robust : %d, shared : %d\n", rob, shared); |
| | | r |= pthread_mutexattr_destroy(attr()); |
| | | if (r) { |
| | | throw("init mutex error."); |
| | | } |
| | | } |
| | | ~RobustMutex() |
| | | { |
| | | pthread_mutex_destroy(mtx()); |
| | | } |
| | | |
| | | public: |
| | | void lock() { Lock(); } |
| | | bool try_lock() |
| | | { |
| | | int r = TryLock(); |
| | | printf("TryLock ret: %d\n", r); |
| | | return r == 0; |
| | | } |
| | | |
| | | void unlock() { Unlock(); } |
| | | |
| | | // private: |
| | | int TryLock() { return pthread_mutex_trylock(mtx()); } |
| | | int Lock() { return pthread_mutex_lock(mtx()); } |
| | | int Unlock() { return pthread_mutex_unlock(mtx()); } |
| | | bool valid() const { return false; } |
| | | |
| | | private: |
| | | pthread_mutex_t *mtx() { return &mutex_; } |
| | | pthread_mutex_t mutex_; |
| | | }; |
| | | |
| | | class LockFreeQueue |
| | | { |
| | | typedef int64_t Data; |
| | | typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue; |
| | | void push_back(Data d) { queue_.push(d); } |
| | | |
| | | private: |
| | | LFQueue queue_; |
| | | }; |
| | | |
| | | } // namespace |
| | | |
| | | BOOST_AUTO_TEST_CASE(MutexTest) |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | // shm.Remove(); |
| | | // return; |
| | | GlobalInit(shm); |
| | | |
| | | const std::string mtx_name("test_mutex"); |
| | | const std::string int_name("test_int"); |
| | | auto mtx = shm.FindOrCreate<Mutex>(mtx_name); |
| | | auto mtx = shm.FindOrCreate<TLMutex>(mtx_name); |
| | | auto pi = shm.FindOrCreate<int>(int_name, 100); |
| | | |
| | | std::mutex m; |
| | | typedef std::chrono::steady_clock Clock; |
| | | auto Now = []() { return Clock::now().time_since_epoch(); }; |
| | | if (pi) { |
| | |
| | | printf("subscribe topic : %s\n", r ? "ok" : "failed"); |
| | | } |
| | | |
| | | // BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | auto ServerLoop = [&](std::atomic<bool> *run) { |
| | | while (*run) { |
| | | void *proc_id = 0; |
| | |
| | | |
| | | std::atomic<bool> run(true); |
| | | |
| | | BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | ThreadManager threads; |
| | | boost::timer::auto_cpu_timer timer; |
| | | threads.Launch(hb, &run); |
| | | threads.Launch(ServerLoop, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const uint64_t nreq = 1000 * 1; |
| | | const uint64_t nreq = 1000 * 10; |
| | | for (int i = 0; i < ncli; ++i) { |
| | | // threads.Launch(asyncRequest, nreq); |
| | | threads.Launch(asyncRequest, nreq); |
| | | } |
| | | |
| | | for (int i = 0; i < 10; ++i) { |
| | | SyncRequest(i); |
| | | } |
| | | // run.store(false); |
| | | // server_thread.join(); |
| | | // return; |
| | | |
| | | int same = 0; |
| | | int64_t last = 0; |
| | | while (last < nreq * ncli && same < 1) { |
| | | while (last < nreq * ncli && same < 2) { |
| | | Sleep(1s, false); |
| | | auto cur = Status().nreply_.load(); |
| | | if (last == cur) { |