api server callback change tag to src; refactor.
| | |
| | | }, |
| | | "files.exclude": { |
| | | "**/*.un~": true, |
| | | "**/bhshmq_center": true, |
| | | "**/bhshmq_status": true, |
| | | "**/bhshmqbox": true, |
| | | "**/gmon.out": true, |
| | | "api/go/bhome_msg": true, |
| | | "build/": true, |
| | | "debug/": true |
| | | "debug/": true, |
| | | "utest/utest": true |
| | | }, |
| | | "cmake.configureOnOpen": false, |
| | | "C_Cpp.default.includePath": [ |
| | |
| | | |
| | | } |
| | | |
| | | func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { |
| | | data, _ := rep.Marshal() |
| | | return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 |
| | | } |
| | | |
| | | type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool |
| | | type ServecCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) |
| | | type SubDataCB func(proc_id *string, pub *bh.MsgPublish) |
| | | type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) |
| | | |
| | | func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) { |
| | | func cserver_callback(cpid unsafe.Pointer, pid_len C.int, src unsafe.Pointer) { |
| | | |
| | | } |
| | | func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) { |
| | |
| | | return ProcNode().ServerSendReply(src, rep); |
| | | } |
| | | |
| | | int BHCleanUp() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | namespace |
| | | { |
| | | typedef std::function<bool(const void *, const int)> ServerSender; |
| | | } // namespace |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) |
| | | { |
| | | TopicNode::ServerCB on_req; |
| | | TopicNode::ServerAsyncCB on_req; |
| | | TopicNode::SubDataCB on_sub; |
| | | TopicNode::RequestResultCB on_reply; |
| | | if (server_cb) { |
| | | on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) { |
| | | std::string sreq(request.SerializeAsString()); |
| | | bool r = false; |
| | | ServerSender sender = [&](const void *p, const int len) { |
| | | r = reply.ParseFromArray(p, len); |
| | | return r; |
| | | }; |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender); |
| | | return r; |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src); |
| | | }; |
| | | } |
| | | if (sub_cb) { |
| | |
| | | } |
| | | |
| | | ProcNode().Start(on_req, on_sub, on_reply); |
| | | } |
| | | int BHServerCallbackReply(const void *tag, |
| | | const void *data, |
| | | const int data_len) |
| | | { |
| | | auto &sender = *(const ServerSender *) (tag); |
| | | return sender(data, data_len); |
| | | } |
| | | |
| | | void BHFree(void *data, int size) |
| | |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len, |
| | | const void *tag); |
| | | void *src); |
| | | |
| | | typedef void (*FClientCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | |
| | | const int data_len); |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb); |
| | | |
| | | int BHServerCallbackReply(const void *tag, |
| | | const void *data, |
| | | const int data_len); |
| | | |
| | | int BHHeartbeatEasy(const int timeout_ms); |
| | | int BHHeartbeat(const void *proc_info, |
| | |
| | | auto DoRecv = [=] { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (per_msg_cbs_->Find(head.msg_id(), cb)) { |
| | | if (per_msg_cbs_->Pick(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB()) |
| | | bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | { |
| | | try { |
| | | if (!cb) { |
| | | return SendImpl(valid_remote, MsgI::Serialize(head, body)); |
| | | } else { |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Add(msg_id, cb); |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Find(msg_id, cb_no_use); |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB); |
| | | } |
| | |
| | | }; |
| | | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | bool sendok = Send(remote, head, body, OnRecv); |
| | | bool sendok = Send(remote, head, body, std::move(OnRecv)); |
| | | if (!sendok) { |
| | | printf("send timeout\n"); |
| | | } |
| | |
| | | |
| | | public: |
| | | bool empty() const { return store_.empty(); } |
| | | bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } |
| | | bool Find(const std::string &id, RecvCB &cb) |
| | | bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const std::string &id, RecvCB &cb) |
| | | { |
| | | auto pos = store_.find(id); |
| | | if (pos != store_.end()) { |
| | |
| | | } |
| | | } |
| | | |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) |
| | | bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) |
| | | { |
| | | auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | |
| | | |
| | | auto &sock = SockServer(); |
| | | return rcb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) |
| | | { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return acb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | |
| | | }; |
| | | |
| | | try { |
| | | auto &sock = SockClient(); |
| | | BHAddress addr; |
| | | |
| | | if (topic_query_cache_.Find(req.topic(), addr)) { |
| | | #if 1 |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); |
| | | #else |
| | | if (topic_query_cache_.Pick(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Update(req.topic(), addr); |
| | | topic_query_cache_.Store(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); |
| | | #endif |
| | | |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_query_cache_.Update(topic, addr); |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } |
| | |
| | | SharedMemory &shm() { return shm_; } |
| | | |
| | | public: |
| | | typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB; |
| | | TopicNode(SharedMemory &shm); |
| | | ~TopicNode(); |
| | | |
| | |
| | | bool Heartbeat(const int timeout_ms); |
| | | |
| | | // topic rpc server |
| | | typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB; |
| | | bool ServerStart(ServerCB const &cb, const int nworker = 2); |
| | | typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; |
| | | typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB; |
| | | bool ServerStart(ServerSyncCB const &cb, const int nworker = 2); |
| | | bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2); |
| | | bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); |
| | | bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); |
| | |
| | | bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); |
| | | |
| | | void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); |
| | | void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); |
| | | void Stop(); |
| | | |
| | | private: |
| | |
| | | { |
| | | class Impl |
| | | { |
| | | typedef std::unordered_map<Topic, Address> Store; |
| | | Store store_; |
| | | typedef std::unordered_map<Topic, Address> Records; |
| | | Records records_; |
| | | |
| | | public: |
| | | bool Find(const Topic &topic, Address &addr) |
| | | { |
| | | auto pos = store_.find(topic); |
| | | if (pos != store_.end()) { |
| | | auto pos = records_.find(topic); |
| | | if (pos != records_.end()) { |
| | | addr = pos->second; |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | bool Update(const Topic &topic, const Address &addr) |
| | | bool Store(const Topic &topic, const Address &addr) |
| | | { |
| | | store_[topic] = addr; |
| | | records_[topic] = addr; |
| | | return true; |
| | | } |
| | | }; |
| | | Synced<Impl> impl_; |
| | | // Impl &impl() |
| | | // { |
| | | // thread_local Impl impl; |
| | | // return impl; |
| | | // } |
| | | |
| | | public: |
| | | bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } |
| | | bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } |
| | | bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); } |
| | | }; |
| | | |
| | | // some sockets may be the same one, using functions make it easy to change. |
| | | |
| | | auto &SockNode() { return sock_node_; } |
| | | auto &SockPub() { return SockNode(); } |
| | | auto &SockSub() { return sock_sub_; } |
| | | auto &SockClient() { return sock_client_; } |
| | | auto &SockServer() { return sock_server_; } |
| | | ShmSocket &SockNode() { return sock_node_; } |
| | | ShmSocket &SockPub() { return SockNode(); } |
| | | ShmSocket &SockSub() { return sock_sub_; } |
| | | ShmSocket &SockClient() { return sock_client_; } |
| | | ShmSocket &SockServer() { return sock_server_; } |
| | | bool IsRegistered() const { return registered_.load(); } |
| | | |
| | | ShmSocket sock_node_; |
| | |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len, |
| | | const void *tag) |
| | | void *src) |
| | | { |
| | | // printf("ServerProc: "); |
| | | // DEFER1(printf("\n");); |
| | |
| | | reply.set_data(" reply: " + request.data()); |
| | | std::string s(reply.SerializeAsString()); |
| | | // printf("%s", reply.data().c_str()); |
| | | BHServerCallbackReply(tag, s.data(), s.size()); |
| | | BHSendReply(src, s.data(), s.size()); |
| | | ++Status().nserved_; |
| | | } |
| | | } |
| | |
| | | const std::string shm_name("ShmSendRecv"); |
| | | ShmRemover auto_remove(shm_name); |
| | | const int qlen = 64; |
| | | const size_t msg_length = 1000; |
| | | const size_t msg_length = 100; |
| | | std::string msg_content(msg_length, 'a'); |
| | | msg_content[20] = '\0'; |
| | | const std::string client_proc_id = "client_proc"; |
| | | const std::string server_proc_id = "server_proc"; |
| | | |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 512); |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | ShmSocket srv(shm, qlen); |
| | | ShmSocket cli(shm, qlen); |
| | | |
| | | MsgI request_rc; |
| | | MsgRequestTopic req_body; |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); |
| | | request_rc.MakeRC(shm, req_head, req_body); |
| | | DEFER1(request_rc.Release(shm)); |
| | | |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id)); |
| | | reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size()); |
| | | MsgI reply_rc; |
| | | reply_rc.MakeRC(shm, reply_head, reply_body); |
| | | DEFER1(reply_rc.Release(shm)); |
| | | |
| | | int ncli = 1; |
| | | uint64_t nmsg = 1000 * 1000 * 1; |
| | | std::atomic<uint64_t> count(0); |
| | | |
| | | std::atomic<ptime> last_time(Now() - seconds(1)); |
| | | std::atomic<int64_t> last_time(NowSec() - 1); |
| | | std::atomic<uint64_t> last_count(0); |
| | | |
| | | auto PrintStatus = [&](int64_t cur) { |
| | | std::cout << "time: " << cur; |
| | | printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n", |
| | | count.load(), count - last_count.exchange(count), init_avail - Avail()); |
| | | }; |
| | | auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | | ++count; |
| | | auto cur = NowSec(); |
| | | if (last_time.exchange(cur) < cur) { |
| | | PrintStatus(cur); |
| | | } |
| | | }; |
| | | cli.Start(onRecv, 2); |
| | | |
| | | auto Client = [&](int cli_id, int nmsg) { |
| | | for (int i = 0; i < nmsg; ++i) { |
| | |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); |
| | | return cli.Send(&srv.id(), req_head, req_body); |
| | | }; |
| | | auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); }; |
| | | |
| | | if (!ReqRC()) { |
| | | printf("********** client send error.\n"); |
| | | continue; |
| | | } |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (!cli.SyncRecv(msg, head, 1000)) { |
| | | printf("********** client recv error.\n"); |
| | | } else { |
| | | DEFER1(msg.Release(shm)); |
| | | ++count; |
| | | auto cur = Now(); |
| | | if (last_time.exchange(cur) < cur) { |
| | | std::cout << "time: " << cur; |
| | | printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", |
| | | count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); |
| | | } |
| | | } |
| | | Req(); |
| | | } |
| | | }; |
| | | |
| | |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); |
| | | return srv.Send(&src_id, reply_head, reply_body); |
| | | }; |
| | | auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); }; |
| | | |
| | | if (ReplyRC()) { |
| | | } |
| | | Reply(); |
| | | } |
| | | } |
| | | } |
| | |
| | | DEFER1(printf("Request Reply Test:");); |
| | | |
| | | ThreadManager clients, servers; |
| | | for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } |
| | | int ncli = 100 * 1; |
| | | uint64_t nmsg = 100 * 100 * 2; |
| | | for (int i = 0; i < 2; ++i) { servers.Launch(Server); } |
| | | printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); |
| | | for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } |
| | | clients.WaitAll(); |
| | | printf("request ok: %ld\n", count.load()); |
| | | do { |
| | | std::this_thread::sleep_for(100ms); |
| | | } while (count.load() < ncli * nmsg); |
| | | PrintStatus(NowSec()); |
| | | stop = true; |
| | | servers.WaitAll(); |
| | | // BOOST_CHECK_THROW(reply.Count(), int); |
| | |
| | | Sleep(100ms); |
| | | |
| | | std::atomic<uint64_t> total_count(0); |
| | | std::atomic<ptime> last_time(Now() - seconds(1)); |
| | | std::atomic<int64_t> last_time(NowSec() - 1); |
| | | std::atomic<uint64_t> last_count(0); |
| | | |
| | | const uint64_t nmsg = 100 * 2; |
| | |
| | | auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) { |
| | | ++total_count; |
| | | |
| | | auto cur = Now(); |
| | | auto cur = NowSec(); |
| | | if (last_time.exchange(cur) < cur) { |
| | | std::cout << "time: " << cur; |
| | | printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", |
| | |
| | | threads.Launch(Pub, "some_else"); |
| | | |
| | | threads.WaitAll(); |
| | | std::cout << "end : " << Now(); |
| | | |
| | | printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", |
| | | total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); |
| | | } |
| | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data("data " + std::string(100, 'a')); |
| | | |
| | | client.ClientStartWorker(onRecv, 2); |
| | | |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | std::string msg_id; |
| | |
| | | // ++count; |
| | | } |
| | | do { |
| | | std::this_thread::yield(); |
| | | std::this_thread::sleep_for(100ms); |
| | | } while (count.load() < nreq); |
| | | client.Stop(); |
| | | printf("request %s %d done ", topic.c_str(), count.load()); |
| | |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | DemoNode server(name, shm); |
| | | |
| | | auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | ++server_msg_count; |
| | | reply.set_data(request.topic() + ':' + request.data()); |
| | | return true; |
| | | }; |
| | | server.ServerStart(onData); |
| | | auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) { |
| | | ++server_msg_count; |
| | | MsgRequestTopicReply reply; |
| | | reply.set_data(request.topic() + ':' + request.data()); |
| | | server.ServerSendReply(src, reply); |
| | | }; |
| | | server.ServerStart(onDataAsync); |
| | | |
| | | MsgTopicList rpc; |
| | | for (auto &topic : topics) { |
| | |
| | | } |
| | | |
| | | while (run) { |
| | | std::this_thread::yield(); |
| | | std::this_thread::sleep_for(100ms); |
| | | } |
| | | }; |
| | | ThreadManager clients, servers; |
| | |
| | | #include <vector> |
| | | |
| | | using namespace boost::posix_time; |
| | | inline ptime Now() { return second_clock::universal_time(); }; |
| | | |
| | | using namespace std::chrono_literals; |
| | | |