lichao
2021-04-09 2197cf91e7a3bd5941327ba630a42946b88f069e
src/socket.h
@@ -19,6 +19,7 @@
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
#include "bh_util.h"
#include "defs.h"
#include "shm_queue.h"
#include <atomic>
@@ -34,6 +35,15 @@
class ShmSocket : private boost::noncopyable
{
   template <class DoSend>
   inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend)
   {
      bool r = false;
      DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); });
      r = doSend(msg);
      return r;
   }
protected:
   typedef bhome_shm::ShmMsgQueue Queue;
@@ -55,30 +65,28 @@
   bool Stop();
   size_t Pending() const { return mq().Pending(); }
   bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
   bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms)
   {
      return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
      assert(valid_remote);
      return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms);
   }
   //TODO reimplment, using async.
   bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
   template <class Body>
   bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
   bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
   {
      assert(valid_remote);
      try {
         if (cb) {
            auto RegisterCB = [&]() {
               std::lock_guard<std::mutex> lock(mutex());
               async_cbs_.emplace(head.msg_id(), cb);
            };
            return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
         } else {
            return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
         }
      } catch (...) {
         return false;
      }
      auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
      MsgI msg;
      return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
   }
   template <class Body>
   bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms)
   {
      auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); };
      MsgI msg;
      return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
   }
   template <class Body>
@@ -133,7 +141,26 @@
   std::atomic<bool> run_;
   Queue mq_;
   std::unordered_map<std::string, RecvCB> async_cbs_;
   class AsyncCBs
   {
      std::unordered_map<std::string, RecvCB> store_;
   public:
      bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
      bool Find(const std::string &id, RecvCB &cb)
      {
         auto pos = store_.find(id);
         if (pos != store_.end()) {
            cb.swap(pos->second);
            store_.erase(pos);
            return true;
         } else {
            return false;
         }
      }
   };
   Synced<AsyncCBs> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO