From 0261ffd75d0f04e018adc9781cef4ad2c8b8d813 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 16:53:59 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/socket.cpp | 101 +++++++++++++++++++++++++++-----------------------
1 files changed, 54 insertions(+), 47 deletions(-)
diff --git a/src/socket.cpp b/src/socket.cpp
index b9def0c..c664982 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -24,60 +24,67 @@
using namespace bhome_msg;
using namespace bhome_shm;
-namespace
+ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
+ run_(false), mq_(id, shm, len)
{
-
-} // namespace
-
-ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
- shm_(shm), run_(false)
-{
- if (id && len > 0) {
- mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
- }
+ Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
- shm_(shm), run_(false)
+ run_(false), mq_(shm, len)
{
- if (len > 0) {
- mq_.reset(new Queue(shm_, len));
- }
+ Start();
}
ShmSocket::~ShmSocket()
{
- Stop(); //TODO should stop in sub class, incase thread access sub class data.
+ Stop();
}
-bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
+bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
- if (!mq_ || !onData) {
- return false; // TODO error code.
- }
-
- std::lock_guard<std::mutex> lock(mutex_);
- StopNoLock();
- auto RecvProc = [this, onData, onIdle]() {
- while (run_) {
- try {
- MsgI imsg;
- DEFER1(imsg.Release(shm_));
- if (mq_->Recv(imsg, 100)) {
- BHMsg msg;
- if (imsg.Unpack(msg)) {
- onData(*this, imsg, msg);
- }
- } else if (onIdle) {
- onIdle(*this);
+ auto ioProc = [this, onData, onIdle]() {
+ auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
+ auto DoRecv = [=] {
+ auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ RecvCB cb;
+ if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
+ cb(socket, imsg, head);
+ } else if (onData) {
+ onData(socket, imsg, head);
}
- } catch (...) {
+ };
+
+ // do not recv if no cb is set.
+ if (!onData && per_msg_cbs_->empty()) {
+ return false;
}
+ auto onMsg = [&](MsgI &imsg) {
+ DEFER1(imsg.Release(shm()));
+ BHMsgHead head;
+ if (imsg.ParseHead(head)) {
+ onRecvWithPerMsgCB(*this, imsg, head);
+ }
+ };
+ return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
+ };
+
+ try {
+ bool more_to_send = DoSend();
+ bool more_to_recv = DoRecv();
+ if (onIdle) { onIdle(*this); }
+ if (!more_to_send && !more_to_recv) {
+ std::this_thread::yield();
+ }
+ } catch (...) {
}
};
+ std::lock_guard<std::mutex> lock(mutex_);
+ StopNoLock();
+
run_.store(true);
for (int i = 0; i < nworker; ++i) {
- workers_.emplace_back(RecvProc);
+ workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
}
return true;
}
@@ -102,17 +109,17 @@
return false;
}
-bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
+//maybe reimplment, using async cbs?
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
- return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
-}
-
-bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
-{
- std::lock_guard<std::mutex> lock(mutex_);
- if (!mq_ || RunningNoLock()) {
- return false;
- } else {
- return mq_->Recv(msg, timeout_ms);
+ // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
+ bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
+ if (got) {
+ if (msg.ParseHead(head)) {
+ return true;
+ } else {
+ msg.Release(shm());
+ }
}
+ return false;
}
--
Gitblit v1.8.0