lichao
2021-04-09 2197cf91e7a3bd5941327ba630a42946b88f069e
src/socket.cpp
@@ -43,51 +43,37 @@
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
   auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
      auto Find = [&](RecvCB &cb) {
         std::lock_guard<std::mutex> lock(mutex());
         const std::string &msgid = head.msg_id();
         auto pos = async_cbs_.find(msgid);
         if (pos != async_cbs_.end()) {
            cb.swap(pos->second);
            async_cbs_.erase(pos);
            return true;
         } else {
            return false;
         }
      };
   auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
      RecvCB cb;
      if (Find(cb)) {
      if (async_cbs_->Find(head.msg_id(), cb)) {
         cb(socket, imsg, head);
      } else if (onData) {
         onData(socket, imsg, head);
      } // else ignored, or dropped
   };
   std::lock_guard<std::mutex> lock(mutex_);
   StopNoLock();
   auto RecvProc = [this, onRecv, onIdle]() {
      while (run_) {
         try {
            MsgI imsg;
            if (mq().Recv(imsg, 10)) {
               DEFER1(imsg.Release(shm()));
               BHMsgHead head;
               if (imsg.ParseHead(head)) {
                  onRecv(*this, imsg, head);
               }
            } else if (onIdle) {
               onIdle(*this);
   auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
      try {
         MsgI imsg;
         if (mq().Recv(imsg, 10)) {
            DEFER1(imsg.Release(shm()));
            BHMsgHead head;
            if (imsg.ParseHead(head)) {
               onRecvWithPerMsgCB(*this, imsg, head);
            }
         } catch (...) {
         } else if (onIdle) {
            onIdle(*this);
         }
      } catch (...) {
      }
   };
   std::lock_guard<std::mutex> lock(mutex_);
   StopNoLock();
   run_.store(true);
   for (int i = 0; i < nworker; ++i) {
      workers_.emplace_back(RecvProc);
      workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
   }
   return true;
}