| | |
| | | unsigned short port = *(unsigned short*)(out); |
| | | copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size); |
| | | BHFree(out, out_len); |
| | | printf("======>> recv port %d\n", port); |
| | | ///////////////////////////////////////////////////////////////////////// |
| | | |
| | | const auto& url_pub_proxy = get_url(URLPubProxy); |
| | |
| | | const auto& url_hb = get_url(URLHeartBeat); |
| | | respond_survey(url_hb, string{(const char*)proc_info, (const size_t)proc_info_len}); |
| | | |
| | | start_reply(pi.proc_id(), 0); |
| | | start_reply(pi.proc_id(), port); |
| | | } |
| | | return ret; |
| | | } |
| | |
| | | if (!topics || topics_len <= 0) return false; |
| | | |
| | | const auto& url = get_url(URLRegTopic); |
| | | if (!url) { |
| | | set_last_error("BHRegisterTopics url empty"); |
| | | return false; |
| | | } |
| | | |
| | | bhome_msg::MsgTopicList mtl; |
| | | if (!mtl.ParseFromArray(topics, topics_len)){ |
| | |
| | | // printf("======>> BHRegisterTopics topic %s\n", mtl.topic_list(i).c_str()); |
| | | mtl2.add_topic_list(mtl.topic_list(i)); |
| | | } |
| | | std::move(mtl); |
| | | |
| | | string msg(mtl2.SerializeAsString()); |
| | | std::move(mtl2); |
| | | |
| | | auto ret = simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms); |
| | | // printf("======>> BHRegisterTopics return value %d msg size %lu\n", ret, msg.size()); |
| | |
| | | { |
| | | if (!topic || topic_len <= 0) return false; |
| | | |
| | | auto url(get_url(URLQueryTopic)); |
| | | |
| | | if (remote && remote_len > 0){ |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str()); |
| | | } |
| | | } |
| | | const auto& url = get_url(URLQueryTopic); |
| | | if (!url) { |
| | | set_last_error("BHQueryTopicAddress url empty"); |
| | | return false; |
| | | } |
| | | // if (remote && remote_len > 0){ |
| | | // BHAddress addr; |
| | | // if (addr.ParseFromArray(remote, remote_len)){ |
| | | // if (!addr.ip().empty() && addr.port() > 0){ |
| | | // // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | // printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str()); |
| | | // } |
| | | // } |
| | | // } |
| | | |
| | | return simple_request(url, topic, topic_len, reply, reply_len, timeout_ms); |
| | | } |
| | |
| | | { |
| | | // if (!query || query_len <= 0) return false; |
| | | |
| | | auto url(get_url(URLQueryProcs)); |
| | | |
| | | if (remote && remote_len > 0){ |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>> BHQueryProcs use remote address %s\n", url.c_str()); |
| | | } |
| | | } |
| | | const auto& url = get_url(URLQueryProcs); |
| | | if (!url) { |
| | | set_last_error("BHQueryProcs url empty"); |
| | | return false; |
| | | } |
| | | |
| | | // if (remote && remote_len > 0){ |
| | | // BHAddress addr; |
| | | // if (addr.ParseFromArray(remote, remote_len)){ |
| | | // if (!addr.ip().empty() && addr.port() > 0){ |
| | | // // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | // printf("======>> BHQueryProcs use remote address %s\n", url.c_str()); |
| | | // } |
| | | // } |
| | | // } |
| | | |
| | | return simple_request(url, query, query_len, reply, reply_len, timeout_ms); |
| | | } |
| | |
| | | newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len}); |
| | | |
| | | string msg(newPub.SerializeAsString()); |
| | | std::move(newPub); |
| | | |
| | | if (pub.topic().empty()) { |
| | | set_last_error("BHPublish topic empty"); |
| | | return false; |
| | | } |
| | | auto ret = publish(pub.topic(), msg.data(), msg.size()); |
| | | if (ret == 0) return false; |
| | | return true; |
| | | } |
| | | |
| | | // 订阅 |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | static int sub(const string& url, const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms){ |
| | | if (!topics || topics_len <= 0) return false; |
| | | |
| | | bhome_msg::MsgTopicList mtl; |
| | |
| | | // printf("BHSubscribeTopics %s\n", t.c_str()); |
| | | subscribe_topic(t); |
| | | } |
| | | return true; |
| | | auto ret = simple_request(get_url(URLSubLocal), topics, topics_len, reply, reply_len, timeout_ms); |
| | | std::move(mtl); |
| | | |
| | | auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms); |
| | | if (!ret){ |
| | | printf("BHSubscribeTopics simple_request failed\n"); |
| | | } |
| | | return ret; |
| | | } |
| | | // 订阅 |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | const auto& url = get_url(URLSubLocal); |
| | | if (!url) { |
| | | set_last_error("BHSubscribeTopics url empty"); |
| | | return false; |
| | | } |
| | | return sub(url, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | // 订阅网络,不实现 |
| | | int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms); |
| | | const auto& url = get_url(URLSubNet); |
| | | if (!url) { |
| | | set_last_error("BHSubscribeNetTopics url empty"); |
| | | return false; |
| | | } |
| | | |
| | | return sub(url, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | // 读取订阅消息,proc_id暂时没用,返回fake msg |
| | |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | if (!proc_id && !proc_id_len && !msgpub && !msgpub_len) |
| | | return subscribe_read(NULL, NULL, timeout_ms) == 0; |
| | | |
| | | string topic, msg; |
| | | auto ret = subscribe_read(&topic, &msg, timeout_ms); |
| | | // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length()); |
| | |
| | | |
| | | bhome_msg::MsgQueryTopic msg; |
| | | msg.set_topic(req.topic()); |
| | | std::move(req); |
| | | |
| | | string s(msg.SerializeAsString()); |
| | | std::move(msg); |
| | | |
| | | void* reply2; |
| | | int reply_len2; |
| | |
| | | int ret = BHQueryTopicAddress(remote.c_str(), remote.length(), s.data(), s.size(), |
| | | &reply2, &reply_len2, timeout_ms); |
| | | if (!ret) return false; |
| | | |
| | | |
| | | bhome_msg::MsgQueryTopicReply mr; |
| | | if (!mr.ParseFromArray(reply2, reply_len2)){ |
| | |
| | | } |
| | | |
| | | auto& node_addr = mr.node_address(0); |
| | | auto& procid = node_addr.proc_id(); |
| | | *proc_id = node_addr.proc_id(); |
| | | |
| | | *proc_id = procid; |
| | | return true; |
| | | } |
| | | |
| | |
| | | { |
| | | if (!request || request_len <= 0) return false; |
| | | |
| | | string url{}; |
| | | // BHQueryTopicAddress获取proc_id |
| | | string procid{}; |
| | | if (!get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ |
| | | return false; |
| | | } |
| | | |
| | | auto url("ipc:///tmp/" + procid); |
| | | |
| | | if (remote && remote_len > 0){ |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>> BHRequest use remote address %s\n", url.c_str()); |
| | | url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | // printf("======>> BHRequest use remote address %s\n", url.c_str()); |
| | | } |
| | | } |
| | | } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ |
| | | // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); |
| | | // return false; |
| | | url = "ipc:///tmp/" + procid; |
| | | } |
| | | // 使用procid作为ipc通信 |
| | | // printf("BHRequest procid %s\n", procid.c_str()); |
| | | |
| | | if (url.empty()) { |
| | | set_last_error("BHRequest url empty"); |
| | | return false; |
| | | } |
| | | |
| | | int rc = request2(url, request, request_len, reply, reply_len, timeout_ms); |
| | | if (rc < 0) return false; |
| | |
| | | void **src, |
| | | const int timeout_ms) |
| | | { |
| | | if (!proc_id && !proc_id_len && !request && !request_len && !src) |
| | | return read_request(NULL, NULL, timeout_ms) == 0; |
| | | |
| | | string msg; |
| | | auto ret = read_request(src, &msg, timeout_ms); |
| | | if (ret != 0) return false; |
| | | if (ret < 0) return false; |
| | | |
| | | string procid{}; |
| | | if (!get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ |
| | | return false; |
| | | if (ret == REPLY_IPC){ |
| | | string procid{}; |
| | | if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ |
| | | copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); |
| | | } |
| | | } |
| | | |
| | | copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); |
| | | copy_memory(request, request_len, msg.data(), msg.size()); |
| | | |
| | | return true; |
| | |
| | | |
| | | auto back_msg = "back:" + msg; |
| | | BHSendReply(src, back_msg.c_str(), back_msg.size()); |
| | | BHFree(src, 0); |
| | | } |
| | | |
| | | void TestPub(const char* t, const int t_l, const char* d, const int d_l){ |