From 94a455aba299af5ffe476560d859dfd007cd5467 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 16 四月 2021 12:13:43 +0800 Subject: [PATCH] fix crash by using normal timeout; add sendq todo. --- src/socket.h | 32 ++++------ utest/api_test.cpp | 82 ++++++++++++++++++++++++-- .gitignore | 1 box/center.cpp | 7 - utest/util.h | 6 + box/center.h | 1 src/sendq.cpp | 5 + src/sendq.h | 2 src/topic_node.cpp | 6 + src/defs.cpp | 4 10 files changed, 106 insertions(+), 40 deletions(-) diff --git a/.gitignore b/.gitignore index d6ac3de..7cf0ce4 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ box/bhshmqbox box/bhshmq_center box/help +utest/bhshmq_center diff --git a/box/center.cpp b/box/center.cpp index 8625f7f..a95e82d 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -417,7 +417,7 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); + auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); @@ -496,7 +496,7 @@ SharedMemory &BHomeShm() { - static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64); + static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); return shm; } @@ -530,9 +530,6 @@ sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_); } } - -BHCenter::BHCenter() : - BHCenter(BHomeShm()) {} bool BHCenter::Start() { diff --git a/box/center.h b/box/center.h index aea0897..60639d5 100644 --- a/box/center.h +++ b/box/center.h @@ -34,7 +34,6 @@ static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len); BHCenter(Socket::Shm &shm); - BHCenter(); ~BHCenter() { Stop(); } bool Start(); bool Stop(); diff --git a/src/defs.cpp b/src/defs.cpp index bab2e53..77b0722 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -20,7 +20,7 @@ { const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); -const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); +const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff"); struct LastError { @@ -37,7 +37,7 @@ } // namespace const MQId &BHTopicBusAddress() { return kBHTopicBus; } -const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; } +const MQId &BHTopicCenterAddress() { return kBHTopicCenter; } const MQId &BHUniCenterAddress() { return kBHUniCenter; } void SetLastError(const int ec, const std::string &msg) diff --git a/src/sendq.cpp b/src/sendq.cpp index 242f8de..ad293c3 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -19,6 +19,11 @@ #include "shm_queue.h" #include <chrono> +//TODO change to save head, body, instead of MsgI. +// as MsgI which is in shm, but head, body are in current process. +// Then if node crashes, shm will not be affected by msgs in sendq. +// but pulishing ref-counted msg need some work. + int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr) { auto FirstNotExpired = [](Array &l) { diff --git a/src/sendq.h b/src/sendq.h index aa8923d..b4f3821 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -55,7 +55,7 @@ void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) { using namespace std::chrono_literals; - Append(addr, msg, Now() + 3s, onExpire); + Append(addr, msg, Now() + 60s, onExpire); } bool TrySend(bhome_shm::ShmMsgQueue &mq); // bool empty() const { return store_.empty(); } diff --git a/src/socket.h b/src/socket.h index 96af6e7..66f716e 100644 --- a/src/socket.h +++ b/src/socket.h @@ -66,32 +66,24 @@ size_t Pending() const { return mq().Pending(); } template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB()) { - MsgI msg; - if (msg.Make(shm(), head, body)) { - DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); - return SendImpl(valid_remote, msg); - } - return false; - } - - template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb) - { - //TODO send_buffer_ need flag, and remove callback on expire. MsgI msg; if (msg.Make(shm(), head, body)) { DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); std::string msg_id(head.msg_id()); - per_msg_cbs_->Add(msg_id, cb); - auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Find(msg_id, cb_no_use); - }; - return SendImpl(valid_remote, msg, onExpireRemoveCB); + if (!cb) { + return SendImpl(valid_remote, msg); + } else { + per_msg_cbs_->Add(msg_id, cb); + auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Find(msg_id, cb_no_use); + }; + return SendImpl(valid_remote, msg, onExpireRemoveCB); + } } else { - printf("out of mem?, avail: %ld\n", shm().get_free_memory()); + SetLastError(ENOMEM, "Out of mem"); } return false; } diff --git a/src/topic_node.cpp b/src/topic_node.cpp index e9e627f..f947f98 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -245,7 +245,10 @@ bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) { - if (!IsRegistered()) { return false; } + if (!IsRegistered()) { + SetLastError(eNotRegistered, "Not Registered."); + return false; + } const std::string &msg_id(NewMsgId()); @@ -298,6 +301,7 @@ return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); } catch (...) { + SetLastError(eError, "internal error."); return false; } } diff --git a/utest/api_test.cpp b/utest/api_test.cpp index cff2cc5..58c73c6 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -25,12 +25,23 @@ { typedef std::atomic<uint64_t> Number; +void Assign(Number &a, const Number &b) { a.store(b.load()); } struct MsgStatus { + Number nrequest_; + Number nfailed_; Number nreply_; Number nserved_; MsgStatus() : nrequest_(0), nreply_(0), nserved_(0) {} + MsgStatus &operator=(const MsgStatus &a) + { + Assign(nrequest_, a.nrequest_); + Assign(nserved_, a.nserved_); + Assign(nreply_, a.nreply_); + Assign(nfailed_, a.nfailed_); + return *this; + } }; MsgStatus &Status() @@ -83,6 +94,48 @@ ++Status().nreply_; } // printf("client Recv reply : %s\n", reply.data().c_str()); +} + +BOOST_AUTO_TEST_CASE(MutexTest) +{ + const std::string shm_name("ShmMutex"); + // ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024 * 1024 * 10); + + const std::string mtx_name("test_mutex"); + const std::string int_name("test_int"); + auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); + auto pi = shm.find_or_construct<int>(int_name.c_str())(100); + if (pi) { + auto old = *pi; + printf("int : %d, add1: %d\n", old, ++*pi); + } + + auto TryLock = [&]() { + if (mtx->try_lock()) { + printf("try_lock ok\n"); + return true; + } else { + printf("try_lock failed\n"); + return false; + } + }; + auto Unlock = [&]() { + mtx->unlock(); + printf("unlocked\n"); + }; + + if (mtx) { + printf("mtx exists\n"); + if (TryLock()) { + if (TryLock()) { + Unlock(); + } + // Unlock(); + } + } else { + printf("mtx not exists\n"); + } } BOOST_AUTO_TEST_CASE(ApiTest) @@ -166,36 +219,49 @@ std::string s(req.SerializeAsString()); void *msg_id = 0; int len = 0; + // Sleep(10ms, false); bool r = BHAsyncRequest(s.data(), s.size(), 0, 0); DEFER1(BHFree(msg_id, len);); if (r) { ++Status().nrequest_; } else { - printf("request topic : %s\n", r ? "ok" : "failed"); + ++Status().nfailed_; + static std::atomic<int64_t> last(0); + auto now = NowSec(); + if (last.exchange(now) < now) { + int ec = 0; + std::string msg; + GetLastError(ec, msg); + printf("request topic error --------- : %s\n", msg.c_str()); + } } } }; auto showStatus = [](std::atomic<bool> *run) { - int64_t last = 0; + MsgStatus last; while (*run) { auto &st = Status(); std::this_thread::sleep_for(1s); - int cur = st.nreply_.load(); - printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last); - last = cur; + printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n", + st.nrequest_.load(), st.nrequest_ - last.nrequest_, + st.nfailed_.load(), + st.nserved_.load(), st.nserved_ - last.nserved_, + st.nreply_.load(), st.nreply_ - last.nreply_); + last = st; } }; auto hb = [](std::atomic<bool> *run) { while (*run) { - BHHeartBeatEasy(0); - std::this_thread::sleep_for(1s); + Sleep(1s, false); + bool r = BHHeartBeatEasy(1000); + printf("heartbeat: %s\n", r ? "ok" : "failed"); } }; std::atomic<bool> run(true); ThreadManager threads; boost::timer::auto_cpu_timer timer; threads.Launch(hb, &run); - // threads.Launch(showStatus, &run); + threads.Launch(showStatus, &run); int ncli = 10; const uint64_t nreq = 1000 * 100; for (int i = 0; i < ncli; ++i) { diff --git a/utest/util.h b/utest/util.h index 7f41da9..f31d63f 100644 --- a/utest/util.h +++ b/utest/util.h @@ -39,9 +39,11 @@ using namespace std::chrono_literals; template <class D> -inline void Sleep(D d) +inline void Sleep(D d, bool print = true) { - printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); + if (print) { + printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); + } std::this_thread::sleep_for(d); } -- Gitblit v1.8.0