lichao
2021-05-21 11f6c600e55ca5677f93624efe44d2605cdd908d
src/shm_socket.cpp
@@ -20,7 +20,9 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include "sleeper.h"
#include <chrono>
using namespace std::chrono;
using namespace std::chrono_literals;
@@ -28,18 +30,18 @@
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
    run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); }
    run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); }
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); }
    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); }
ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
    run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); }
    run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); }
ShmSocket::~ShmSocket() { Stop(); }
bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
{
   auto ioProc = [this, onData, onRaw, onIdle]() {
      auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
      auto DoSend = [this]() { return send_buffer_.TrySend(); };
      auto DoRecv = [=] {
         // do not recv if no cb is set.
         if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; }
@@ -71,7 +73,7 @@
            if (IsCmd(val)) {
               onCmdCB(*this, val);
            } else {
               MsgI imsg(val);
               MsgI imsg(val, shm());
               DEFER1(imsg.Release());
               BHMsgHead head;
               if (imsg.ParseHead(head)) {
@@ -90,11 +92,15 @@
      };
      try {
         thread_local FibUSleeper sleeper(1000 * 10);
         bool more_to_send = DoSend();
         bool more_to_recv = DoRecv();
         if (onIdle) { onIdle(*this); }
         if (!more_to_send && !more_to_recv) {
            robust::QuickSleep();
            sleeper.Sleep();
         } else {
            sleeper.Reset();
         }
      } catch (...) {
      }
@@ -107,7 +113,7 @@
      while (run_) { ioProc(); }
      // try send pending msgs.
      auto end_time = steady_clock::now() + 3s;
      while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) {
      while (send_buffer_.TrySend() && steady_clock::now() < end_time) {
         // LOG_DEBUG() << "try send pending msgs.";
      }
   };
@@ -164,7 +170,7 @@
   };
#if 0
   // self alloc
   MsgI msg;
   MsgI msg(shm());
   if (msg.Make(size)) {
      DEFER1(msg.Release());
      return OnResult(msg);
@@ -188,7 +194,7 @@
                 (id << 4) |
                 EncodeCmd(eCmdAllocRequest0);
   auto rawCB = [onResult](ShmSocket &sock, int64_t &val) {
      MsgI msg((val >> 32) & MaskBits(31));
      MsgI msg(((val >> 32) & MaskBits(31)), sock.shm());
      DEFER1(msg.Release());
      onResult(msg);
      return true;
@@ -200,5 +206,5 @@
      alloc_cbs_->Pick(id, cb_no_use);
   };
   return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
   return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB);
}