| | |
| | | using namespace bhome_msg; |
| | | using namespace bhome_shm; |
| | | |
| | | namespace |
| | | { |
| | | |
| | | } // namespace |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : |
| | | run_(false), mq_(id, shm, len) |
| | | { |
| | | Start(); |
| | |
| | | |
| | | ShmSocket::~ShmSocket() |
| | | { |
| | | Stop(); //TODO should stop in sub class, incase thread access sub class data. |
| | | Stop(); |
| | | } |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | |
| | | auto DoRecv = [=] { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (per_msg_cbs_->Find(head.msg_id(), cb)) { |
| | | if (per_msg_cbs_->Pick(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release(shm())); |
| | | DEFER1(imsg.Release()); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | msg.Release(); |
| | | } |
| | | } |
| | | return false; |