From 1b167ec5ad101ac44451381e26cc73ab5d67d2a1 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 26 四月 2021 16:37:52 +0800
Subject: [PATCH] fix socket busy loop; del locked readall; refactor.
---
src/socket.h | 4
src/shm_alloc_queue.cpp | 19 +
box/center.cpp | 8
src/shm_msg_queue.h | 77 +++++++
src/socket.cpp | 5
src/defs.h | 1
src/sendq.cpp | 10
src/shm_queue.cpp | 87 --------
src/topic_node.cpp | 2
src/shm_msg_queue.cpp | 105 ++++++++++
src/defs.cpp | 8
box/center_main.cc | 8
src/shm_alloc_queue.h | 23 ++
utest/api_test.cpp | 102 ++++++---
src/shm_queue.h | 88 +-------
src/sendq.h | 9
16 files changed, 334 insertions(+), 222 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 829a089..5cb9bc3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -133,12 +133,12 @@
UpdateRegInfo(node);
nodes_[ssn] = node;
- printf("new ssn %ld\n", ssn);
+ printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
auto old = online_node_addr_map_.find(head.proc_id());
if (old != online_node_addr_map_.end()) { // old session
auto &old_ssn = old->second;
nodes_[old_ssn]->state_.PutOffline(offline_time_);
- printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
+ printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
old_ssn = ssn;
} else {
online_node_addr_map_.emplace(head.proc_id(), ssn);
@@ -201,6 +201,10 @@
for (auto &topic : topics) {
service_map_[topic].insert(dest);
}
+ printf("node %s ssn %ld serve %d topics:\n", node->proc_.proc_id().c_str(), *node->addrs_.begin(), topics.size());
+ for (auto &topic : topics) {
+ printf("\t %s\n", topic.c_str());
+ }
return MakeReply(eSuccess);
});
}
diff --git a/box/center_main.cc b/box/center_main.cc
index fdda2cd..79210fc 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -85,15 +85,15 @@
} // namespace
int center_main(int argc, const char *argv[])
{
- auto &shm = BHomeShm();
- GlobalInit(shm);
-
AppArg args(argc, argv);
if (args.Has("remove")) {
- shm.Remove();
+ SharedMemory::Remove(BHomeShmName());
return 0;
}
+ auto &shm = BHomeShm();
+ GlobalInit(shm);
+
InstanceFlag inst(shm, kCenterRunningFlag);
if (!inst.TryStartAsFirstInstance()) {
printf("another instance is running, exit.\n");
diff --git a/src/defs.cpp b/src/defs.cpp
index 0ca82bf..cc6f23b 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -17,7 +17,7 @@
*/
#include "defs.h"
#include "msg.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
namespace
{
@@ -35,9 +35,13 @@
} // namespace
+std::string BHomeShmName()
+{
+ return "bhome_default_shm_v0";
+}
bhome_shm::SharedMemory &BHomeShm()
{
- static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
+ static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
return shm;
}
diff --git a/src/defs.h b/src/defs.h
index 1c9e663..43375bf 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -37,6 +37,7 @@
class SharedMemory;
} // namespace bhome_shm
+std::string BHomeShmName();
bhome_shm::SharedMemory &BHomeShm();
bool GlobalInit(bhome_shm::SharedMemory &shm);
typedef std::string Topic;
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 5b57d72..c0d5afd 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -16,10 +16,12 @@
* =====================================================================================
*/
#include "sendq.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
#include <chrono>
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr)
+using namespace bhome_shm;
+
+int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
{
auto FirstNotExpired = [](Array &l) {
auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -65,7 +67,7 @@
return nprocessed;
}
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al)
+int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al)
{
int nsend = 0;
auto AllSent = [&](Array &arr) {
@@ -76,7 +78,7 @@
return nsend;
}
-bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
+bool SendQ::TrySend(ShmMsgQueue &mq)
{
std::unique_lock<std::mutex> lock(mutex_out_);
size_t nsend = 0;
diff --git a/src/sendq.h b/src/sendq.h
index 0699df7..0e565d5 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -29,10 +29,7 @@
#include <string>
#include <unordered_map>
-namespace bhome_shm
-{
class ShmMsgQueue;
-} // namespace bhome_shm
class SendQ
{
@@ -65,7 +62,7 @@
{
AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
}
- bool TrySend(bhome_shm::ShmMsgQueue &mq);
+ bool TrySend(ShmMsgQueue &mq);
// bool empty() const { return store_.empty(); }
private:
@@ -88,8 +85,8 @@
typedef std::list<Array> ArrayList;
typedef std::unordered_map<Remote, ArrayList> Store;
- int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
- int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+ int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
+ int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
std::mutex mutex_in_;
std::mutex mutex_out_;
diff --git a/src/shm_alloc_queue.cpp b/src/shm_alloc_queue.cpp
new file mode 100644
index 0000000..7ea5213
--- /dev/null
+++ b/src/shm_alloc_queue.cpp
@@ -0,0 +1,19 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: shm_alloc_queue.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�26鏃� 16鏃�24鍒�25绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "shm_alloc_queue.h"
+
diff --git a/src/shm_alloc_queue.h b/src/shm_alloc_queue.h
new file mode 100644
index 0000000..73a184f
--- /dev/null
+++ b/src/shm_alloc_queue.h
@@ -0,0 +1,23 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: shm_alloc_queue.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�26鏃� 16鏃�24鍒�40绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef SHM_ALLOC_QUEUE_EQBLM9FZ
+#define SHM_ALLOC_QUEUE_EQBLM9FZ
+
+
+
+#endif // end of include guard: SHM_ALLOC_QUEUE_EQBLM9FZ
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
new file mode 100644
index 0000000..ae019bf
--- /dev/null
+++ b/src/shm_msg_queue.cpp
@@ -0,0 +1,105 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: shm_msg_queue.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�26鏃� 16鏃�25鍒�05绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "shm_msg_queue.h"
+
+using namespace bhome_msg;
+using namespace boost::interprocess;
+
+namespace
+{
+std::string MsgQIdToName(const ShmMsgQueue::MQId id)
+{
+ char buf[40] = "mqOx";
+ int n = sprintf(buf + 4, "%lx", id);
+ return std::string(buf, n + 4);
+}
+
+const int AdjustMQLength(const int len)
+{
+ const int kMaxLength = 10000;
+ const int kDefaultLen = 12;
+ if (len <= 0) {
+ return kDefaultLen;
+ } else if (len < kMaxLength) {
+ return len;
+ } else {
+ return kMaxLength;
+ }
+}
+
+} // namespace
+
+ShmMsgQueue::MQId ShmMsgQueue::NewId()
+{
+ static auto &id = GetData();
+ return ++id;
+}
+// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
+ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
+ id_(id),
+ queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+{
+}
+
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
+ id_(NewId()),
+ queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+{
+ if (!queue_.IsOk()) {
+ throw("error create msgq " + std::to_string(id_));
+ }
+}
+
+ShmMsgQueue::~ShmMsgQueue() {}
+
+bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
+{
+ Queue *q = Find(shm, id);
+ if (q) {
+ MsgI msg;
+ while (q->TryRead(msg)) {
+ msg.Release();
+ }
+ }
+ return Shmq::Remove(shm, MsgQIdToName(id));
+}
+
+ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
+{
+ return Shmq::Find(shm, MsgQIdToName(remote_id));
+}
+
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
+{
+ Queue *remote = Find(shm, remote_id);
+ if (remote) {
+ if (onsend) {
+ return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
+ } else {
+ return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
+ }
+ } else {
+ // SetLestError(eNotFound);
+ return false;
+ }
+}
+
+// Test shows that in the 2 cases:
+// 1) build msg first, then find remote queue;
+// 2) find remote queue first, then build msg;
+// 1 is about 50% faster than 2, maybe cache related.
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
new file mode 100644
index 0000000..d7b33af
--- /dev/null
+++ b/src/shm_msg_queue.h
@@ -0,0 +1,77 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: shm_msg_queue.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�26鏃� 16鏃�25鍒�21绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef SHM_MSG_QUEUE_D847TQXH
+#define SHM_MSG_QUEUE_D847TQXH
+
+#include "msg.h"
+#include "shm_queue.h"
+
+using namespace bhome_shm;
+using namespace bhome_msg;
+
+class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
+{
+ typedef ShmObject<SharedQueue<MsgI>> Shmq;
+ typedef Shmq::ShmType ShmType;
+ typedef Shmq::Data Queue;
+ typedef std::function<void()> OnSend;
+
+public:
+ typedef uint64_t MQId;
+
+ static MQId NewId();
+
+ ShmMsgQueue(const MQId id, ShmType &segment, const int len);
+ ShmMsgQueue(ShmType &segment, const int len);
+ ~ShmMsgQueue();
+ static bool Remove(SharedMemory &shm, const MQId id);
+ MQId Id() const { return id_; }
+ ShmType &shm() const { return queue_.shm(); }
+
+ bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
+ bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
+ static Queue *Find(SharedMemory &shm, const MQId remote_id);
+ static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
+ template <class Iter>
+ static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
+ {
+ Queue *remote = Find(shm, remote_id);
+ if (remote) {
+ if (onsend) {
+ return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
+ } else {
+ return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
+ }
+ } else {
+ // SetLestError(eNotFound);
+ return 0;
+ }
+ }
+
+ template <class... Rest>
+ bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
+ template <class... Rest>
+ int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
+
+private:
+ MQId id_;
+ Shmq &queue() { return queue_; }
+ Shmq queue_;
+};
+
+#endif // end of include guard: SHM_MSG_QUEUE_D847TQXH
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index face18b..86f0d91 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -21,91 +21,4 @@
namespace bhome_shm
{
-using namespace bhome_msg;
-using namespace boost::interprocess;
-
-namespace
-{
-std::string MsgQIdToName(const ShmMsgQueue::MQId id)
-{
- char buf[40] = "mqOx";
- int n = sprintf(buf + 4, "%lx", id);
- return std::string(buf, n + 4);
-}
-
-const int AdjustMQLength(const int len)
-{
- const int kMaxLength = 10000;
- const int kDefaultLen = 12;
- if (len <= 0) {
- return kDefaultLen;
- } else if (len < kMaxLength) {
- return len;
- } else {
- return kMaxLength;
- }
-}
-
-} // namespace
-
-ShmMsgQueue::MQId ShmMsgQueue::NewId()
-{
- static auto &id = GetData();
- return ++id;
-}
-// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
-ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
- id_(id),
- queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
-{
-}
-
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
- id_(NewId()),
- queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
-{
- if (!queue_.IsOk()) {
- throw("error create msgq " + std::to_string(id_));
- }
-}
-
-ShmMsgQueue::~ShmMsgQueue() {}
-
-bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
-{
- Queue *q = Find(shm, id);
- if (q) {
- MsgI msg;
- while (q->TryRead(msg)) {
- msg.Release();
- }
- }
- return Shmq::Remove(shm, MsgQIdToName(id));
-}
-
-ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
-{
- return Shmq::Find(shm, MsgQIdToName(remote_id));
-}
-
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
-{
- Queue *remote = Find(shm, remote_id);
- if (remote) {
- if (onsend) {
- return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
- } else {
- return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
- }
- } else {
- // SetLestError(eNotFound);
- return false;
- }
-}
-
-// Test shows that in the 2 cases:
-// 1) build msg first, then find remote queue;
-// 2) find remote queue first, then build msg;
-// 1 is about 50% faster than 2, maybe cache related.
-
} // namespace bhome_shm
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 4f576a7..5dbda96 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -19,7 +19,6 @@
#ifndef SHM_QUEUE_JE0OEUP3
#define SHM_QUEUE_JE0OEUP3
-#include "msg.h"
#include "shm.h"
#include <atomic>
#include <boost/circular_buffer.hpp>
@@ -59,30 +58,21 @@
return [this](Guard &lock) { return !this->empty(); };
}
- template <class Pred, class OnData>
- int ReadAllOnCond(Pred const &pred, OnData const &onData)
- {
- Guard lock(this->mutex());
- int n = 0;
- while (pred(lock)) {
- ++n;
- onData(this->front());
- this->pop_front();
- this->cond_write_.notify_one();
- }
- return n;
- }
-
template <class Pred>
bool ReadOnCond(D &buf, Pred const &pred)
{
- int flag = 0;
- auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); };
- auto onData = [&buf](D &d) {
- using std::swap;
- swap(buf, d);
+ auto Read = [&]() {
+ Guard lock(this->mutex());
+ if (pred(lock)) {
+ using std::swap;
+ swap(buf, Super::front());
+ Super::pop_front();
+ return true;
+ } else {
+ return false;
+ }
};
- return ReadAllOnCond(only_once, onData);
+ return Read() ? (this->cond_write_.notify_one(), true) : false;
}
template <class Iter, class Pred, class OnWrite>
@@ -94,7 +84,7 @@
Guard lock(mutex());
while (pred(lock)) {
onWrite(*begin);
- this->push_back(*begin);
+ Super::push_back(*begin);
++n;
cond_read_.notify_one();
if (++begin == end) {
@@ -130,60 +120,6 @@
bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
-};
-
-using namespace bhome_msg;
-
-class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
-{
- typedef ShmObject<SharedQueue<MsgI>> Shmq;
- typedef Shmq::ShmType ShmType;
- typedef Shmq::Data Queue;
- typedef std::function<void()> OnSend;
-
-public:
- typedef uint64_t MQId;
-
- static MQId NewId();
-
- ShmMsgQueue(const MQId id, ShmType &segment, const int len);
- ShmMsgQueue(ShmType &segment, const int len);
- ~ShmMsgQueue();
- static bool Remove(SharedMemory &shm, const MQId id);
- MQId Id() const { return id_; }
- ShmType &shm() const { return queue_.shm(); }
-
- bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
- bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
- template <class OnData>
- int TryRecvAll(OnData const &onData) { return queue_.data()->TryReadAll(onData); }
- static Queue *Find(SharedMemory &shm, const MQId remote_id);
- static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
- template <class Iter>
- static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
- {
- Queue *remote = Find(shm, remote_id);
- if (remote) {
- if (onsend) {
- return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
- } else {
- return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
- }
- } else {
- // SetLestError(eNotFound);
- return 0;
- }
- }
-
- template <class... Rest>
- bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
- template <class... Rest>
- int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
-
-private:
- MQId id_;
- Shmq &queue() { return queue_; }
- Shmq queue_;
};
} // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index e471633..2127260 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -65,7 +65,8 @@
onRecvWithPerMsgCB(*this, imsg, head);
}
};
- return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
+ MsgI imsg;
+ return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
};
try {
@@ -74,6 +75,8 @@
if (onIdle) { onIdle(*this); }
if (!more_to_send && !more_to_recv) {
std::this_thread::yield();
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for(10000ns);
}
} catch (...) {
}
diff --git a/src/socket.h b/src/socket.h
index cd6bfee..a5dd72c 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -22,7 +22,7 @@
#include "bh_util.h"
#include "defs.h"
#include "sendq.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
@@ -37,7 +37,7 @@
{
protected:
- typedef bhome_shm::ShmMsgQueue Queue;
+ typedef ShmMsgQueue Queue;
public:
typedef ShmMsgQueue::MQId MQId;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 1131816..d274c4b 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -389,7 +389,7 @@
BHAddress addr;
if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
- printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id());
+ // printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id());
BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
AddRoute(head, sock.id());
head.set_topic(request.topic());
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index debe8ad..6682aaf 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -18,6 +18,7 @@
#include "bh_api.h"
#include "util.h"
#include <atomic>
+#include <boost/lockfree/queue.hpp>
using namespace bhome_msg;
@@ -49,7 +50,6 @@
static MsgStatus st;
return st;
}
-} // namespace
void SubRecvProc(const void *proc_id,
const int proc_id_len,
@@ -59,7 +59,7 @@
std::string proc((const char *) proc_id, proc_id_len);
MsgPublish pub;
pub.ParseFromArray(data, data_len);
- // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
+ printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
}
void ServerProc(const void *proc_id,
@@ -98,8 +98,8 @@
class TLMutex
{
- // typedef boost::interprocess::interprocess_mutex MutexT;
- typedef CasMutex MutexT;
+ typedef boost::interprocess::interprocess_mutex MutexT;
+ // typedef CasMutex MutexT;
// typedef std::mutex MutexT;
typedef std::chrono::steady_clock Clock;
typedef Clock::duration Duration;
@@ -108,6 +108,7 @@
const Duration limit_;
std::atomic<Duration> last_lock_time_;
MutexT mutex_;
+ bool Expired(const Duration diff) { return diff > limit_; }
public:
struct Status {
@@ -127,16 +128,18 @@
{
if (mutex_.try_lock()) {
auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- return last_lock_time_.compare_exchange_strong(old_time, Now());
+ auto cur = Now();
+ if (Expired(cur - old_time)) {
+ return last_lock_time_.compare_exchange_strong(old_time, cur);
} else {
last_lock_time_.store(Now());
return true;
}
} else {
auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- return last_lock_time_.compare_exchange_strong(old_time, Now());
+ auto cur = Now();
+ if (Expired(cur - old_time)) {
+ return last_lock_time_.compare_exchange_strong(old_time, cur);
} else {
return false;
}
@@ -154,55 +157,88 @@
void unlock()
{
auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- } else {
- if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
+ auto cur = Now();
+ if (!Expired(cur - old_time)) {
+ if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
mutex_.unlock();
}
}
}
};
-namespace
-{
-typedef int64_t Offset;
-Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
-void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
-} // namespace
-
+//robust attr does NOT work, maybe os does not support it.
class RobustMutex
{
public:
RobustMutex()
{
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setrobust(&attr, 1);
- pthread_mutex_init(mtx(), &attr);
- if (!valid()) {
+ pthread_mutexattr_t mutex_attr;
+ auto attr = [&]() { return &mutex_attr; };
+ int r = pthread_mutexattr_init(attr());
+ r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
+ r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
+ r |= pthread_mutex_init(mtx(), attr());
+ int rob = 0;
+ pthread_mutexattr_getrobust_np(attr(), &rob);
+ int shared = 0;
+ pthread_mutexattr_getpshared(attr(), &shared);
+ printf("robust : %d, shared : %d\n", rob, shared);
+ r |= pthread_mutexattr_destroy(attr());
+ if (r) {
throw("init mutex error.");
}
}
+ ~RobustMutex()
+ {
+ pthread_mutex_destroy(mtx());
+ }
+
+public:
+ void lock() { Lock(); }
+ bool try_lock()
+ {
+ int r = TryLock();
+ printf("TryLock ret: %d\n", r);
+ return r == 0;
+ }
+
+ void unlock() { Unlock(); }
+
+ // private:
int TryLock() { return pthread_mutex_trylock(mtx()); }
int Lock() { return pthread_mutex_lock(mtx()); }
int Unlock() { return pthread_mutex_unlock(mtx()); }
- bool valid() const { return false; }
private:
pthread_mutex_t *mtx() { return &mutex_; }
pthread_mutex_t mutex_;
};
+class LockFreeQueue
+{
+ typedef int64_t Data;
+ typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
+ void push_back(Data d) { queue_.push(d); }
+
+private:
+ LFQueue queue_;
+};
+
+} // namespace
+
BOOST_AUTO_TEST_CASE(MutexTest)
{
SharedMemory &shm = TestShm();
+ // shm.Remove();
+ // return;
GlobalInit(shm);
const std::string mtx_name("test_mutex");
const std::string int_name("test_int");
- auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
+ auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
auto pi = shm.FindOrCreate<int>(int_name, 100);
+ std::mutex m;
typedef std::chrono::steady_clock Clock;
auto Now = []() { return Clock::now().time_since_epoch(); };
if (pi) {
@@ -334,7 +370,6 @@
printf("subscribe topic : %s\n", r ? "ok" : "failed");
}
- // BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
auto ServerLoop = [&](std::atomic<bool> *run) {
while (*run) {
void *proc_id = 0;
@@ -446,27 +481,20 @@
std::atomic<bool> run(true);
+ BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
ThreadManager threads;
boost::timer::auto_cpu_timer timer;
threads.Launch(hb, &run);
- threads.Launch(ServerLoop, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
- const uint64_t nreq = 1000 * 1;
+ const uint64_t nreq = 1000 * 10;
for (int i = 0; i < ncli; ++i) {
- // threads.Launch(asyncRequest, nreq);
+ threads.Launch(asyncRequest, nreq);
}
-
- for (int i = 0; i < 10; ++i) {
- SyncRequest(i);
- }
- // run.store(false);
- // server_thread.join();
- // return;
int same = 0;
int64_t last = 0;
- while (last < nreq * ncli && same < 1) {
+ while (last < nreq * ncli && same < 2) {
Sleep(1s, false);
auto cur = Status().nreply_.load();
if (last == cur) {
--
Gitblit v1.8.0