From 94a455aba299af5ffe476560d859dfd007cd5467 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 16 四月 2021 12:13:43 +0800
Subject: [PATCH] fix crash by using normal timeout; add sendq todo.
---
src/socket.h | 32 ++++------
utest/api_test.cpp | 82 ++++++++++++++++++++++++--
.gitignore | 1
box/center.cpp | 7 -
utest/util.h | 6 +
box/center.h | 1
src/sendq.cpp | 5 +
src/sendq.h | 2
src/topic_node.cpp | 6 +
src/defs.cpp | 4
10 files changed, 106 insertions(+), 40 deletions(-)
diff --git a/.gitignore b/.gitignore
index d6ac3de..7cf0ce4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@
box/bhshmqbox
box/bhshmq_center
box/help
+utest/bhshmq_center
diff --git a/box/center.cpp b/box/center.cpp
index 8625f7f..a95e82d 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -417,7 +417,7 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
- auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s);
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
@@ -496,7 +496,7 @@
SharedMemory &BHomeShm()
{
- static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
+ static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
return shm;
}
@@ -530,9 +530,6 @@
sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
}
}
-
-BHCenter::BHCenter() :
- BHCenter(BHomeShm()) {}
bool BHCenter::Start()
{
diff --git a/box/center.h b/box/center.h
index aea0897..60639d5 100644
--- a/box/center.h
+++ b/box/center.h
@@ -34,7 +34,6 @@
static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
- BHCenter();
~BHCenter() { Stop(); }
bool Start();
bool Stop();
diff --git a/src/defs.cpp b/src/defs.cpp
index bab2e53..77b0722 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -20,7 +20,7 @@
{
const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
struct LastError {
@@ -37,7 +37,7 @@
} // namespace
const MQId &BHTopicBusAddress() { return kBHTopicBus; }
-const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
+const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
void SetLastError(const int ec, const std::string &msg)
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 242f8de..ad293c3 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -19,6 +19,11 @@
#include "shm_queue.h"
#include <chrono>
+//TODO change to save head, body, instead of MsgI.
+// as MsgI which is in shm, but head, body are in current process.
+// Then if node crashes, shm will not be affected by msgs in sendq.
+// but pulishing ref-counted msg need some work.
+
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
{
auto FirstNotExpired = [](Array &l) {
diff --git a/src/sendq.h b/src/sendq.h
index aa8923d..b4f3821 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -55,7 +55,7 @@
void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
{
using namespace std::chrono_literals;
- Append(addr, msg, Now() + 3s, onExpire);
+ Append(addr, msg, Now() + 60s, onExpire);
}
bool TrySend(bhome_shm::ShmMsgQueue &mq);
// bool empty() const { return store_.empty(); }
diff --git a/src/socket.h b/src/socket.h
index 96af6e7..66f716e 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -66,32 +66,24 @@
size_t Pending() const { return mq().Pending(); }
template <class Body>
- bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
+ bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
{
- MsgI msg;
- if (msg.Make(shm(), head, body)) {
- DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
- return SendImpl(valid_remote, msg);
- }
- return false;
- }
-
- template <class Body>
- bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb)
- {
- //TODO send_buffer_ need flag, and remove callback on expire.
MsgI msg;
if (msg.Make(shm(), head, body)) {
DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
std::string msg_id(head.msg_id());
- per_msg_cbs_->Add(msg_id, cb);
- auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
- RecvCB cb_no_use;
- per_msg_cbs_->Find(msg_id, cb_no_use);
- };
- return SendImpl(valid_remote, msg, onExpireRemoveCB);
+ if (!cb) {
+ return SendImpl(valid_remote, msg);
+ } else {
+ per_msg_cbs_->Add(msg_id, cb);
+ auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
+ RecvCB cb_no_use;
+ per_msg_cbs_->Find(msg_id, cb_no_use);
+ };
+ return SendImpl(valid_remote, msg, onExpireRemoveCB);
+ }
} else {
- printf("out of mem?, avail: %ld\n", shm().get_free_memory());
+ SetLastError(ENOMEM, "Out of mem");
}
return false;
}
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index e9e627f..f947f98 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -245,7 +245,10 @@
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
- if (!IsRegistered()) { return false; }
+ if (!IsRegistered()) {
+ SetLastError(eNotRegistered, "Not Registered.");
+ return false;
+ }
const std::string &msg_id(NewMsgId());
@@ -298,6 +301,7 @@
return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
} catch (...) {
+ SetLastError(eError, "internal error.");
return false;
}
}
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index cff2cc5..58c73c6 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -25,12 +25,23 @@
{
typedef std::atomic<uint64_t> Number;
+void Assign(Number &a, const Number &b) { a.store(b.load()); }
struct MsgStatus {
+
Number nrequest_;
+ Number nfailed_;
Number nreply_;
Number nserved_;
MsgStatus() :
nrequest_(0), nreply_(0), nserved_(0) {}
+ MsgStatus &operator=(const MsgStatus &a)
+ {
+ Assign(nrequest_, a.nrequest_);
+ Assign(nserved_, a.nserved_);
+ Assign(nreply_, a.nreply_);
+ Assign(nfailed_, a.nfailed_);
+ return *this;
+ }
};
MsgStatus &Status()
@@ -83,6 +94,48 @@
++Status().nreply_;
}
// printf("client Recv reply : %s\n", reply.data().c_str());
+}
+
+BOOST_AUTO_TEST_CASE(MutexTest)
+{
+ const std::string shm_name("ShmMutex");
+ // ShmRemover auto_remove(shm_name);
+ SharedMemory shm(shm_name, 1024 * 1024 * 10);
+
+ const std::string mtx_name("test_mutex");
+ const std::string int_name("test_int");
+ auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
+ auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
+ if (pi) {
+ auto old = *pi;
+ printf("int : %d, add1: %d\n", old, ++*pi);
+ }
+
+ auto TryLock = [&]() {
+ if (mtx->try_lock()) {
+ printf("try_lock ok\n");
+ return true;
+ } else {
+ printf("try_lock failed\n");
+ return false;
+ }
+ };
+ auto Unlock = [&]() {
+ mtx->unlock();
+ printf("unlocked\n");
+ };
+
+ if (mtx) {
+ printf("mtx exists\n");
+ if (TryLock()) {
+ if (TryLock()) {
+ Unlock();
+ }
+ // Unlock();
+ }
+ } else {
+ printf("mtx not exists\n");
+ }
}
BOOST_AUTO_TEST_CASE(ApiTest)
@@ -166,36 +219,49 @@
std::string s(req.SerializeAsString());
void *msg_id = 0;
int len = 0;
+ // Sleep(10ms, false);
bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
DEFER1(BHFree(msg_id, len););
if (r) {
++Status().nrequest_;
} else {
- printf("request topic : %s\n", r ? "ok" : "failed");
+ ++Status().nfailed_;
+ static std::atomic<int64_t> last(0);
+ auto now = NowSec();
+ if (last.exchange(now) < now) {
+ int ec = 0;
+ std::string msg;
+ GetLastError(ec, msg);
+ printf("request topic error --------- : %s\n", msg.c_str());
+ }
}
}
};
auto showStatus = [](std::atomic<bool> *run) {
- int64_t last = 0;
+ MsgStatus last;
while (*run) {
auto &st = Status();
std::this_thread::sleep_for(1s);
- int cur = st.nreply_.load();
- printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
- last = cur;
+ printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
+ st.nrequest_.load(), st.nrequest_ - last.nrequest_,
+ st.nfailed_.load(),
+ st.nserved_.load(), st.nserved_ - last.nserved_,
+ st.nreply_.load(), st.nreply_ - last.nreply_);
+ last = st;
}
};
auto hb = [](std::atomic<bool> *run) {
while (*run) {
- BHHeartBeatEasy(0);
- std::this_thread::sleep_for(1s);
+ Sleep(1s, false);
+ bool r = BHHeartBeatEasy(1000);
+ printf("heartbeat: %s\n", r ? "ok" : "failed");
}
};
std::atomic<bool> run(true);
ThreadManager threads;
boost::timer::auto_cpu_timer timer;
threads.Launch(hb, &run);
- // threads.Launch(showStatus, &run);
+ threads.Launch(showStatus, &run);
int ncli = 10;
const uint64_t nreq = 1000 * 100;
for (int i = 0; i < ncli; ++i) {
diff --git a/utest/util.h b/utest/util.h
index 7f41da9..f31d63f 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -39,9 +39,11 @@
using namespace std::chrono_literals;
template <class D>
-inline void Sleep(D d)
+inline void Sleep(D d, bool print = true)
{
- printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
+ if (print) {
+ printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
+ }
std::this_thread::sleep_for(d);
}
--
Gitblit v1.8.0