fix crash by using normal timeout; add sendq todo.
| | |
| | | box/bhshmqbox |
| | | box/bhshmq_center |
| | | box/help |
| | | utest/bhshmq_center |
| | |
| | | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | |
| | | |
| | | SharedMemory &BHomeShm() |
| | | { |
| | | static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64); |
| | | static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_); |
| | | } |
| | | } |
| | | |
| | | BHCenter::BHCenter() : |
| | | BHCenter(BHomeShm()) {} |
| | | |
| | | bool BHCenter::Start() |
| | | { |
| | |
| | | static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | BHCenter(); |
| | | ~BHCenter() { Stop(); } |
| | | bool Start(); |
| | | bool Stop(); |
| | |
| | | { |
| | | |
| | | const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff"); |
| | | |
| | | struct LastError { |
| | |
| | | } // namespace |
| | | |
| | | const MQId &BHTopicBusAddress() { return kBHTopicBus; } |
| | | const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; } |
| | | const MQId &BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | const MQId &BHUniCenterAddress() { return kBHUniCenter; } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | |
| | | #include "shm_queue.h" |
| | | #include <chrono> |
| | | |
| | | //TODO change to save head, body, instead of MsgI. |
| | | // as MsgI which is in shm, but head, body are in current process. |
| | | // Then if node crashes, shm will not be affected by msgs in sendq. |
| | | // but pulishing ref-counted msg need some work. |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr) |
| | | { |
| | | auto FirstNotExpired = [](Array &l) { |
| | |
| | | void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | using namespace std::chrono_literals; |
| | | Append(addr, msg, Now() + 3s, onExpire); |
| | | Append(addr, msg, Now() + 60s, onExpire); |
| | | } |
| | | bool TrySend(bhome_shm::ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB()) |
| | | { |
| | | MsgI msg; |
| | | if (msg.Make(shm(), head, body)) { |
| | | DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); |
| | | return SendImpl(valid_remote, msg); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb) |
| | | { |
| | | //TODO send_buffer_ need flag, and remove callback on expire. |
| | | MsgI msg; |
| | | if (msg.Make(shm(), head, body)) { |
| | | DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Add(msg_id, cb); |
| | | auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Find(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, msg, onExpireRemoveCB); |
| | | if (!cb) { |
| | | return SendImpl(valid_remote, msg); |
| | | } else { |
| | | per_msg_cbs_->Add(msg_id, cb); |
| | | auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Find(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, msg, onExpireRemoveCB); |
| | | } |
| | | } else { |
| | | printf("out of mem?, avail: %ld\n", shm().get_free_memory()); |
| | | SetLastError(ENOMEM, "Out of mem"); |
| | | } |
| | | return false; |
| | | } |
| | |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | const std::string &msg_id(NewMsgId()); |
| | | |
| | |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); |
| | | |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | | return false; |
| | | } |
| | | } |
| | |
| | | { |
| | | typedef std::atomic<uint64_t> Number; |
| | | |
| | | void Assign(Number &a, const Number &b) { a.store(b.load()); } |
| | | struct MsgStatus { |
| | | |
| | | Number nrequest_; |
| | | Number nfailed_; |
| | | Number nreply_; |
| | | Number nserved_; |
| | | MsgStatus() : |
| | | nrequest_(0), nreply_(0), nserved_(0) {} |
| | | MsgStatus &operator=(const MsgStatus &a) |
| | | { |
| | | Assign(nrequest_, a.nrequest_); |
| | | Assign(nserved_, a.nserved_); |
| | | Assign(nreply_, a.nreply_); |
| | | Assign(nfailed_, a.nfailed_); |
| | | return *this; |
| | | } |
| | | }; |
| | | |
| | | MsgStatus &Status() |
| | |
| | | ++Status().nreply_; |
| | | } |
| | | // printf("client Recv reply : %s\n", reply.data().c_str()); |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(MutexTest) |
| | | { |
| | | const std::string shm_name("ShmMutex"); |
| | | // ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 10); |
| | | |
| | | const std::string mtx_name("test_mutex"); |
| | | const std::string int_name("test_int"); |
| | | auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); |
| | | auto pi = shm.find_or_construct<int>(int_name.c_str())(100); |
| | | if (pi) { |
| | | auto old = *pi; |
| | | printf("int : %d, add1: %d\n", old, ++*pi); |
| | | } |
| | | |
| | | auto TryLock = [&]() { |
| | | if (mtx->try_lock()) { |
| | | printf("try_lock ok\n"); |
| | | return true; |
| | | } else { |
| | | printf("try_lock failed\n"); |
| | | return false; |
| | | } |
| | | }; |
| | | auto Unlock = [&]() { |
| | | mtx->unlock(); |
| | | printf("unlocked\n"); |
| | | }; |
| | | |
| | | if (mtx) { |
| | | printf("mtx exists\n"); |
| | | if (TryLock()) { |
| | | if (TryLock()) { |
| | | Unlock(); |
| | | } |
| | | // Unlock(); |
| | | } |
| | | } else { |
| | | printf("mtx not exists\n"); |
| | | } |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(ApiTest) |
| | |
| | | std::string s(req.SerializeAsString()); |
| | | void *msg_id = 0; |
| | | int len = 0; |
| | | // Sleep(10ms, false); |
| | | bool r = BHAsyncRequest(s.data(), s.size(), 0, 0); |
| | | DEFER1(BHFree(msg_id, len);); |
| | | if (r) { |
| | | ++Status().nrequest_; |
| | | } else { |
| | | printf("request topic : %s\n", r ? "ok" : "failed"); |
| | | ++Status().nfailed_; |
| | | static std::atomic<int64_t> last(0); |
| | | auto now = NowSec(); |
| | | if (last.exchange(now) < now) { |
| | | int ec = 0; |
| | | std::string msg; |
| | | GetLastError(ec, msg); |
| | | printf("request topic error --------- : %s\n", msg.c_str()); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | auto showStatus = [](std::atomic<bool> *run) { |
| | | int64_t last = 0; |
| | | MsgStatus last; |
| | | while (*run) { |
| | | auto &st = Status(); |
| | | std::this_thread::sleep_for(1s); |
| | | int cur = st.nreply_.load(); |
| | | printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last); |
| | | last = cur; |
| | | printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n", |
| | | st.nrequest_.load(), st.nrequest_ - last.nrequest_, |
| | | st.nfailed_.load(), |
| | | st.nserved_.load(), st.nserved_ - last.nserved_, |
| | | st.nreply_.load(), st.nreply_ - last.nreply_); |
| | | last = st; |
| | | } |
| | | }; |
| | | auto hb = [](std::atomic<bool> *run) { |
| | | while (*run) { |
| | | BHHeartBeatEasy(0); |
| | | std::this_thread::sleep_for(1s); |
| | | Sleep(1s, false); |
| | | bool r = BHHeartBeatEasy(1000); |
| | | printf("heartbeat: %s\n", r ? "ok" : "failed"); |
| | | } |
| | | }; |
| | | std::atomic<bool> run(true); |
| | | ThreadManager threads; |
| | | boost::timer::auto_cpu_timer timer; |
| | | threads.Launch(hb, &run); |
| | | // threads.Launch(showStatus, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const uint64_t nreq = 1000 * 100; |
| | | for (int i = 0; i < ncli; ++i) { |
| | |
| | | using namespace std::chrono_literals; |
| | | |
| | | template <class D> |
| | | inline void Sleep(D d) |
| | | inline void Sleep(D d, bool print = true) |
| | | { |
| | | printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); |
| | | if (print) { |
| | | printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); |
| | | } |
| | | std::this_thread::sleep_for(d); |
| | | } |
| | | |