cbhomeclient.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
cbhomeclient.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
exported_symbols | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
fixed_q.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
cbhomeclient.cpp
@@ -82,7 +82,7 @@ Msg to_bus(client* cli, F&& f, Args&&... args){ Msg mesg; if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...)) mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)))); mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))); return mesg; } @@ -96,7 +96,7 @@ tie(d, s) = tp; if (m.ParseFromArray(d, s)) { bus_free(d, s); msg = std::move(make_tuple(true, std::move(m))); msg = make_tuple(true, std::move(m)); break; } m.Clear(); @@ -110,8 +110,8 @@ template <size_t... Is, class F, class... Args> MsgCR to_center(client* cli, F&& f, Args&&... args){ MsgCR msg(dummy()); auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...)); if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0))); auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...); if (!vmsg.empty()) msg = parse(cli, vmsg.at(0)); return msg; } @@ -124,8 +124,8 @@ const auto& tpc = tlist.SerializeAsString(); void* replymsg = NULL; int replysize = 0; msg = std::move(to_center<2,3>(cli, std::forward<F>(f), tpc.data(), tpc.size(), &replymsg, &replysize, sndto)); msg = to_center<2,3>(cli, std::forward<F>(f), tpc.data(), tpc.size(), &replymsg, &replysize, sndto); return msg; } @@ -248,11 +248,7 @@ delete cli; } struct csubmsg* bus_client_get_submsg(void* handle){ client* cli = ptr(handle); Msg msg = std::move(cli->sub_q->pop()); if (msg.empty()) return NULL; static struct csubmsg* parse_submsg(const Msg& msg){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -265,12 +261,23 @@ return pmsg; } struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){ client* cli = ptr(handle); Msg msg = std::move(cli->readreq_q->pop()); Msg msg = cli->sub_q->pop(ms); if (msg.empty()) return NULL; return parse_submsg(msg); } struct csubmsg* bus_client_get_submsg(void* handle){ client* cli = ptr(handle); Msg msg = cli->sub_q->pop(); if (msg.empty()) return NULL; return parse_submsg(msg); } static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -284,14 +291,28 @@ return pmsg; } struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){ client* cli = ptr(handle); Msg msg = cli->readreq_q->pop(ms); if (msg.empty()) return NULL; return parse_reqmsg(msg, src); } struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ client* cli = ptr(handle); Msg msg = cli->readreq_q->pop(); if (msg.empty()) return NULL; return parse_reqmsg(msg, src); } int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ void* procid = NULL, *reply = NULL; int pids = 0, replys = 0; auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, msg->msg, msg->msgl, &procid, &pids, &reply, &replys, sndto)); &reply, &replys, sndto); if (!vmsg.empty()){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; cbhomeclient.h
@@ -24,11 +24,15 @@ void bus_client_free(void* handle); /* 获取订阅的消息,订阅消息通过线程不停读取,此处从缓存中读取 获取订阅的消息,订阅消息通过线程不停读取,此处从缓存中读取,读不到不返回 必须通过 message.h 的 free_reqmsg 释放 通过 get_submsg_db get_submsg_proclist 获取对应的消息 */ struct csubmsg* bus_client_get_submsg(void* handle); /* 与 bus_client_get_submsg 相同,但支持超时返回,ms 参数表明超时时长,单位毫秒 */ struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms); /* 发布消息,data 是 MsgPublish protobuffer序列化后的数据 */ @@ -39,7 +43,7 @@ int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size); /* 获取 request 消息,通过线程读取,此处从缓存中读取 获取 request 消息,通过线程读取,此处从缓存中读取, 读不到不返回 必须调用 free_reqmsg 释放 可通过 message.h 的 get_reqmsg_stackerr get_reqmsg_stack 获取对应的消息 src 是哪一个进程请求的标识符 @@ -48,6 +52,10 @@ */ struct creqmsg* bus_client_get_reqmsg(void* handle, void** src); /* 与 bus_client_get_reqmsg 相同,但支持超时返回,ms 参数表明超时时长,单位毫秒 */ struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms); /* 响应消息回复,src是连接标识符,msg是需要回复的消息 通过 message.h 的 make_reply_msg 创建时,有内存拷贝,必须通过 free_reply_msg 释放 或者通过填充 crepmsg 结构体构造,由调用者控制变量的内存和生命周期,可能不会拷贝内存,效率更高 exported_symbols
@@ -3,9 +3,11 @@ bus_client_init; bus_client_free; bus_client_get_submsg; bus_client_get_submsg_intime; bus_client_pubmsg; bus_client_publish; bus_client_get_reqmsg; bus_client_get_reqmsg_intime; bus_client_reply_msg; bus_client_request; fixed_q.h
@@ -46,6 +46,17 @@ cond_node.notify(); return q.size(); } int pop(T& m, const size_t ms){ std::unique_lock<std::mutex> l{mtx_q}; auto d = std::chrono::milliseconds{ms}; if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){ return -1; } m = q.front(); q.pop_front(); cond_spare.notify(); return q.size(); } int pop(T& m){ std::unique_lock<std::mutex> l{mtx_q}; auto d = std::chrono::milliseconds{du}; @@ -57,16 +68,29 @@ cond_spare.notify(); return q.size(); } T pop(const size_t ms){ std::unique_lock<std::mutex> l{mtx_q}; auto d = std::chrono::milliseconds{ms}; T t{}; if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){ return t; } t = q.front(); q.pop_front(); cond_spare.notify(); return t; } T pop(){ std::unique_lock<std::mutex> l{mtx_q}; auto d = std::chrono::milliseconds{du}; T t{}; while(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){ if (pred()) return T{}; if (pred()) return t; } auto m(move(q.front())); t = q.front(); q.pop_front(); cond_spare.notify(); return m; return t; } template<typename F> void clear(F&& f){