lichao
2021-04-28 a6f67b4249525089fb97eb9418c7014f66c2a000
src/socket.cpp
@@ -24,12 +24,7 @@
using namespace bhome_msg;
using namespace bhome_shm;
namespace
{
} // namespace
ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
    run_(false), mq_(id, shm, len)
{
   Start();
@@ -42,7 +37,7 @@
ShmSocket::~ShmSocket()
{
   Stop(); //TODO should stop in sub class, incase thread access sub class data.
   Stop();
}
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
@@ -64,13 +59,14 @@
            return false;
         }
         auto onMsg = [&](MsgI &imsg) {
            DEFER1(imsg.Release(shm()));
            DEFER1(imsg.Release());
            BHMsgHead head;
            if (imsg.ParseHead(head)) {
               onRecvWithPerMsgCB(*this, imsg, head);
            }
         };
         return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
         MsgI imsg;
         return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
      };
      try {
@@ -79,6 +75,8 @@
         if (onIdle) { onIdle(*this); }
         if (!more_to_send && !more_to_recv) {
            std::this_thread::yield();
            using namespace std::chrono_literals;
            std::this_thread::sleep_for(10000ns);
         }
      } catch (...) {
      }
@@ -123,7 +121,7 @@
      if (msg.ParseHead(head)) {
         return true;
      } else {
         msg.Release(shm());
         msg.Release();
      }
   }
   return false;