From 0261ffd75d0f04e018adc9781cef4ad2c8b8d813 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 16:53:59 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/socket.cpp | 104 +++++++++++++++++++++++++---------------------------
1 files changed, 50 insertions(+), 54 deletions(-)
diff --git a/src/socket.cpp b/src/socket.cpp
index f2b29f4..c664982 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -24,70 +24,67 @@
using namespace bhome_msg;
using namespace bhome_shm;
-namespace
-{
-
-} // namespace
-
ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
- shm_(shm), run_(false), mq_(id, shm, len)
+ run_(false), mq_(id, shm, len)
{
+ Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
- shm_(shm), run_(false), mq_(shm, len) {}
+ run_(false), mq_(shm, len)
+{
+ Start();
+}
ShmSocket::~ShmSocket()
{
- Stop(); //TODO should stop in sub class, incase thread access sub class data.
+ Stop();
}
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
- auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
- auto Find = [&](RecvCB &cb) {
- std::lock_guard<std::mutex> lock(mutex());
- const std::string &msgid = head.msg_id();
- auto pos = async_cbs_.find(msgid);
- if (pos != async_cbs_.end()) {
- cb.swap(pos->second);
- async_cbs_.erase(pos);
- return true;
- } else {
+ auto ioProc = [this, onData, onIdle]() {
+ auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
+ auto DoRecv = [=] {
+ auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ RecvCB cb;
+ if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
+ cb(socket, imsg, head);
+ } else if (onData) {
+ onData(socket, imsg, head);
+ }
+ };
+
+ // do not recv if no cb is set.
+ if (!onData && per_msg_cbs_->empty()) {
return false;
}
+ auto onMsg = [&](MsgI &imsg) {
+ DEFER1(imsg.Release(shm()));
+ BHMsgHead head;
+ if (imsg.ParseHead(head)) {
+ onRecvWithPerMsgCB(*this, imsg, head);
+ }
+ };
+ return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
};
- RecvCB cb;
- if (Find(cb)) {
- cb(socket, imsg, head);
- } else if (onData) {
- onData(socket, imsg, head);
- } // else ignored, or dropped
+ try {
+ bool more_to_send = DoSend();
+ bool more_to_recv = DoRecv();
+ if (onIdle) { onIdle(*this); }
+ if (!more_to_send && !more_to_recv) {
+ std::this_thread::yield();
+ }
+ } catch (...) {
+ }
};
std::lock_guard<std::mutex> lock(mutex_);
StopNoLock();
- auto RecvProc = [this, onRecv, onIdle]() {
- while (run_) {
- try {
- MsgI imsg;
- if (mq().Recv(imsg, 10)) {
- DEFER1(imsg.Release(shm()));
- BHMsgHead head;
- if (imsg.ParseHead(head)) {
- onRecv(*this, imsg, head);
- }
- } else if (onIdle) {
- onIdle(*this);
- }
- } catch (...) {
- }
- }
- };
run_.store(true);
for (int i = 0; i < nworker; ++i) {
- workers_.emplace_back(RecvProc);
+ workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
}
return true;
}
@@ -112,18 +109,17 @@
return false;
}
-bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
+//maybe reimplment, using async cbs?
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
- std::lock_guard<std::mutex> lock(mutex_);
- auto Recv = [&]() {
- if (mq().Recv(msg, timeout_ms)) {
- if (msg.ParseHead(head)) {
- return true;
- } else {
- msg.Release(shm());
- }
+ // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
+ bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
+ if (got) {
+ if (msg.ParseHead(head)) {
+ return true;
+ } else {
+ msg.Release(shm());
}
- return false;
- };
- return !RunningNoLock() && Recv();
+ }
+ return false;
}
--
Gitblit v1.8.0