From 047f801078a52042ef02750b577233d115ed0f57 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 13 四月 2021 17:12:25 +0800
Subject: [PATCH] rename library, add box.
---
src/socket.cpp | 87 +++++++++++++++++++++----------------------
1 files changed, 43 insertions(+), 44 deletions(-)
diff --git a/src/socket.cpp b/src/socket.cpp
index b9def0c..2c55665 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -29,55 +29,53 @@
} // namespace
-ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
- shm_(shm), run_(false)
+ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
+ shm_(shm), run_(false), mq_(id, shm, len)
{
- if (id && len > 0) {
- mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
- }
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
- shm_(shm), run_(false)
-{
- if (len > 0) {
- mq_.reset(new Queue(shm_, len));
- }
-}
+ shm_(shm), run_(false), mq_(shm, len) {}
ShmSocket::~ShmSocket()
{
Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
-bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
+bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
- if (!mq_ || !onData) {
- return false; // TODO error code.
- }
-
- std::lock_guard<std::mutex> lock(mutex_);
- StopNoLock();
- auto RecvProc = [this, onData, onIdle]() {
- while (run_) {
- try {
- MsgI imsg;
- DEFER1(imsg.Release(shm_));
- if (mq_->Recv(imsg, 100)) {
- BHMsg msg;
- if (imsg.Unpack(msg)) {
- onData(*this, imsg, msg);
- }
- } else if (onIdle) {
- onIdle(*this);
- }
- } catch (...) {
- }
+ auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ RecvCB cb;
+ if (per_msg_cbs_->Find(head.msg_id(), cb)) {
+ cb(socket, imsg, head);
+ } else if (onData) {
+ onData(socket, imsg, head);
+ } else { // else ignored, or dropped
}
};
+ auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
+ try {
+ MsgI imsg;
+ if (mq().Recv(imsg, 10)) {
+ DEFER1(imsg.Release(shm()));
+ BHMsgHead head;
+ if (imsg.ParseHead(head)) {
+ onRecvWithPerMsgCB(*this, imsg, head);
+ }
+ }
+ if (onIdle) {
+ onIdle(*this);
+ }
+ } catch (...) {
+ }
+ };
+
+ std::lock_guard<std::mutex> lock(mutex_);
+ StopNoLock();
+
run_.store(true);
for (int i = 0; i < nworker; ++i) {
- workers_.emplace_back(RecvProc);
+ workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
}
return true;
}
@@ -102,17 +100,18 @@
return false;
}
-bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
-{
- return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
-}
-
-bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
{
std::lock_guard<std::mutex> lock(mutex_);
- if (!mq_ || RunningNoLock()) {
+ auto Recv = [&]() {
+ if (mq().Recv(msg, timeout_ms)) {
+ if (msg.ParseHead(head)) {
+ return true;
+ } else {
+ msg.Release(shm());
+ }
+ }
return false;
- } else {
- return mq_->Recv(msg, timeout_ms);
- }
+ };
+ return !RunningNoLock() && Recv();
}
--
Gitblit v1.8.0