From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- src/bh_api.cc | 41 ++++++++++++++++++++++++++++++----------- 1 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/bh_api.cc b/src/bh_api.cc index ca69b6e..6bcf45c 100644 --- a/src/bh_api.cc +++ b/src/bh_api.cc @@ -1,9 +1,11 @@ #include "bh_api.h" #include "defs.h" #include "topic_node.h" +#include <chrono> #include <cstdio> #include <memory> +using namespace std::chrono_literals; using namespace bhome_shm; using namespace bhome_msg; @@ -30,20 +32,33 @@ } std::unique_ptr<TopicNode> &ProcNodePtr() { + // client side init here. static std::mutex mtx; - std::lock_guard<std::mutex> lk(mtx); + auto InitLog = []() { + ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log", true); + return true; + }; + static bool init_log = InitLog(); + static std::string shm_name; static std::unique_ptr<TopicNode> ptr; - if (!ptr && GlobalInit(BHomeShm())) { - auto InitLog = []() { - auto id = GetProcExe(); - char path[200] = {0}; - sprintf(path, "/opt/vasystem/valog/bhshmq_node_%s.log", id.c_str()); - ns_log::AddLog(path); - return true; - }; - static bool init_log = InitLog(); - ptr.reset(new TopicNode(BHomeShm())); + + std::lock_guard<std::mutex> lk(mtx); + if (shm_name != BHomeShmName()) { + shm_name = BHomeShmName(); + LOG_INFO() << "using shm " << shm_name; + + ptr.reset(); + // must reset/stop node before call BHomeShm() which resets shm. + + auto &shm = BHomeShm(); + for (int i = 0; !ptr && i < 3; ++i) { + if (GlobalInit(shm)) { + ptr.reset(new TopicNode(shm)); + } else { + std::this_thread::sleep_for(1s); // make sure shm init done. + } + } } return ptr; } @@ -211,6 +226,10 @@ { return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } +int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::SubscribeNet, topics, topics_len, reply, reply_len, timeout_ms); +} int BHPublish(const void *msgpub, const int msgpub_len, -- Gitblit v1.8.0