#include "bn_api.h" #include #include "nng_wrap.h" using namespace nng_wrap; #include "common.h" using namespace std; #include "bhome_msg.pb.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; static void fetch_set_proc_id(std::string& pid){ static string cur_proc_id{}; if (cur_proc_id.empty()){ cur_proc_id = pid; }else { pid = cur_proc_id; } } int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { if (!proc_info || proc_info_len <= 0) return false; const auto& url = get_url(URLReg); bhome_msg::ProcInfo pi; if (pi.ParseFromArray(proc_info, proc_info_len)){ string tmp(pi.proc_id()); fetch_set_proc_id(tmp); } void* out{}; int out_len{}; auto ret = simple_request(url, proc_info, proc_info_len, &out, &out_len, timeout_ms); // printf("======>> BHRegister return val %d\n", ret); if (ret){ constexpr int port_size = sizeof(unsigned short); unsigned short port = *(unsigned short*)(out); copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size); BHFree(out, out_len); // printf("======>> recv port %d\n", port); ///////////////////////////////////////////////////////////////////////// const auto& url_pub_proxy = get_url(URLPubProxy); publish(url_pub_proxy, NULL, 0); const auto& url_sub_queue = get_url(URLSubQueue); subscribe_center(url_sub_queue); const auto& url_hb = get_url(URLHeartBeat); respond_survey(url_hb, string{(const char*)proc_info, (const size_t)proc_info_len}); start_reply(pi.proc_id(), port); } return ret; } int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { if (!proc_info || proc_info_len <= 0) return false; return simple_request(get_url(URLDeReg), proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHHeartbeatEasy(const int timeout_ms) { return 1; // string pid; // fetch_set_proc_id(pid); // bhome_msg::ProcInfo pi; // pi.set_proc_id(pid); // string str_pi = pi.SerializeAsString(); // if (str_pi.empty()) return false; // void *reply; // int reply_len; // auto ret = BHHeartbeat(str_pi.data(), str_pi.size(), &reply, &reply_len, timeout_ms); // if (ret){ // BHFree(reply, reply_len); // } // return ret; } int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return 1; // return simple_request(get_url(URLHeartBeat), proc_info, proc_info_len, reply, reply_len, timeout_ms); } // request reply topic int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { if (!topics || topics_len <= 0) return false; const auto& url = get_url(URLRegTopic); if (url.empty()) { set_last_error("BHRegisterTopics url empty"); return false; } bhome_msg::MsgTopicList mtl; if (!mtl.ParseFromArray(topics, topics_len)){ // printf("======>> BHRegisterTopics url %s ParseFromArray failed\n", url.c_str()); return false; } string pid; fetch_set_proc_id(pid); // printf("======>> BHRegisterTopics procid %s\n", pid.c_str()); bhome_msg::MsgTopicList mtl2; mtl2.add_topic_list(pid); for(int i = 0; i < mtl.topic_list_size(); i++){ // printf("======>> BHRegisterTopics topic %s\n", mtl.topic_list(i).c_str()); mtl2.add_topic_list(mtl.topic_list(i)); } string msg(mtl2.SerializeAsString()); auto ret = simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms); // printf("======>> BHRegisterTopics return value %d msg size %lu\n", ret, msg.size()); return ret; } // 跟中心通信 // 根据topic获取procinfo in [reply] 在BHRequest中得到proc_id int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms) { if (!topic || topic_len <= 0) return false; auto url(get_url(URLQueryTopic)); if (url.empty()) { set_last_error("BHQueryTopicAddress url empty"); return false; } // if (remote && remote_len > 0){ // BHAddress addr; // if (addr.ParseFromArray(remote, remote_len)){ // if (!addr.ip().empty() && addr.port() > 0){ // // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); // printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str()); // } // } // } return simple_request(url, topic, topic_len, reply, reply_len, timeout_ms); } // 请求在线进程 request int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms) { // if (!query || query_len <= 0) return false; auto url(get_url(URLQueryProcs)); if (url.empty()) { set_last_error("BHQueryProcs url empty"); return false; } // if (remote && remote_len > 0){ // BHAddress addr; // if (addr.ParseFromArray(remote, remote_len)){ // if (!addr.ip().empty() && addr.port() > 0){ // // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); // printf("======>> BHQueryProcs use remote address %s\n", url.c_str()); // } // } // } return simple_request(url, query, query_len, reply, reply_len, timeout_ms); } // above communicate with center ////////////////////////////////////////////////////////////// // below communicate with peer // 发布消息 int BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms) { if (!msgpub || msgpub_len <= 0) return false; bhome_msg::MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)){ return false; } string pid; fetch_set_proc_id(pid); MsgPublish newPub; newPub.set_topic(pid); newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len}); string msg(newPub.SerializeAsString()); if (pub.topic().empty()) { set_last_error("BHPublish topic empty"); return false; } auto ret = publish(pub.topic(), msg.data(), msg.size()); if (ret == 0) return false; return true; } static int sub(const string& url, const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms){ if (!topics || topics_len <= 0) return false; bhome_msg::MsgTopicList mtl; if (!mtl.ParseFromArray(topics, topics_len)){ printf("BHSubscribeTopics ParseFromArray failed\n"); return false; } if (mtl.topic_list_size() == 0){ printf("BHSubscribeTopics topic_list_size zero\n"); return false; } for(int i = 0; i < mtl.topic_list_size(); i ++){ auto& t = mtl.topic_list(i); // printf("BHSubscribeTopics %s\n", t.c_str()); subscribe_topic(t); } auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms); if (!ret){ printf("BHSubscribeTopics simple_request failed\n"); } return ret; } // 订阅 int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { const auto& url = get_url(URLSubLocal); if (url.empty()) { set_last_error("BHSubscribeTopics url empty"); return false; } return sub(url, topics, topics_len, reply, reply_len, timeout_ms); } // 订阅网络,不实现 int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { const auto& url = get_url(URLSubNet); if (url.empty()) { set_last_error("BHSubscribeNetTopics url empty"); return false; } return sub(url, topics, topics_len, reply, reply_len, timeout_ms); } // 读取订阅消息,proc_id暂时没用,返回fake msg int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) { string topic, msg; auto ret = subscribe_read(&topic, &msg, timeout_ms); // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length()); if (ret < 0) return false; // bhome_msg::MsgPublish pub; // if (!pub.ParseFromArray(msg.data(), msg.size())){ // printf("MsgPublish parse failed size %lu\n", msg.size()); // return false; // } // printf("MsgPublish topic %s data %s\n", pub.topic().c_str(), pub.data().c_str()); // fake proc id MsgPublish newPub; if (newPub.ParseFromString(msg)){ copy_memory(proc_id, proc_id_len, newPub.topic().data(), newPub.topic().size()); copy_memory(msgpub, msgpub_len, newPub.data().data(), newPub.data().size()); return true; } return false; // const auto& fakeid = GetMeaninglessString(); // copy_memory(proc_id, proc_id_len, fakeid.data(), fakeid.size()); // copy_memory(msgpub, msgpub_len, msg.data(), msg.size()); // return true; } int get_proc_id_from_MsgRequestTopic( const void* request, const int request_len, const int timeout_ms, string* proc_id) { // BHQueryTopicAddress获取proc_id bhome_msg::MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)){ return false; } bhome_msg::MsgQueryTopic msg; msg.set_topic(req.topic()); string s(msg.SerializeAsString()); void* reply2; int reply_len2; string remote("fake_remote"); int ret = BHQueryTopicAddress(remote.c_str(), remote.length(), s.data(), s.size(), &reply2, &reply_len2, timeout_ms); if (!ret) return false; bhome_msg::MsgQueryTopicReply mr; if (!mr.ParseFromArray(reply2, reply_len2)){ BHFree(reply2, reply_len2); return false; } BHFree(reply2, reply_len2); if (mr.node_address_size() == 0) { return false; } auto& node_addr = mr.node_address(0); auto& procid = node_addr.proc_id(); *proc_id = procid; return true; } // 请求topic,request/reply topic [BHRegisterTopics] int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len, void **reply, int *reply_len, const int timeout_ms) { if (!request || request_len <= 0) return false; string url{}; // BHQueryTopicAddress获取proc_id string procid{}; if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed"); // return false; url = "ipc:///tmp/" + procid; } if (remote && remote_len > 0){ BHAddress addr; if (addr.ParseFromArray(remote, remote_len)){ if (!addr.ip().empty() && addr.port() > 0){ url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); // printf("======>> BHRequest use remote address %s\n", url.c_str()); } } } if (url.empty()) { set_last_error("BHRequest url empty"); return false; } int rc = request2(url, request, request_len, reply, reply_len, timeout_ms); if (rc < 0) return false; copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); return true; } // 对方的proc_id 拿不到, src 请求方的key int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) { string msg; auto ret = read_request(src, &msg, timeout_ms); if (ret != 0) return false; string procid{}; if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); } copy_memory(request, request_len, msg.data(), msg.size()); return true; } int BHSendReply(void *src, const void *reply, const int reply_len) { if (!src || !reply || reply_len <= 0) return false; auto ret = send_reply(src, reply, reply_len); if (ret < 0) return false; return true; } void BHFree(void *data, int size) { // printf("======>> BHFree %p\n", data); free_nng(data, size); } int BHCleanup() { return 1; } int BHGetLastError(void **msg, int *msg_len) { *msg = NULL; *msg_len = 0; int ec; get_last_error(&ec, msg, msg_len); return true; } ////////////////////////////////////////////////////////////// // 没用 int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len) { return 0; } void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { } ////////////////////////////////////////////////////////////// void TestRequest(int type, const char* msg, const int len){ const auto& topic = get_url(type); void *reply; int reply_len; int rc = request2(topic, msg, len, &reply, &reply_len, 10000); if (rc < 0) { printf("TestRequest failed\n"); return; } printf("------>>TestRequest send %s reply %s len %d\n", msg, (char*)reply, reply_len); BHFree(reply, reply_len); } static int test_read_request(void** request, int* request_len, void** src, const int to_ms){ string msg; auto ret = read_request(src, &msg, to_ms); if (ret != 0) return false; copy_memory(request, request_len, msg.data(), msg.size()); return true; } void TestReply(int type, int count) { const auto& in = get_url(type); // printf("url %s\n", (char*)in.data); start_reply(in, 0); // printf("start read %s\n", (char*)in.data); char *request, *src; int request_len; if (!test_read_request((void**)&request, &request_len, (void**)&src, 10000)){ printf("test_read_request failed\n"); return; } string msg{request, (size_t)request_len}; BHFree(request, request_len); // printf("------>>TestReply recv msg %s src %lu\n", msg.c_str(), *(uint64_t*)src); auto back_msg = "back:" + msg; BHSendReply(src, back_msg.c_str(), back_msg.size()); BHFree(src, 0); } void TestPub(const char* t, const int t_l, const char* d, const int d_l){ publish(t, d, d_l); } void TestSub(const char* t, const int t_l, void** d, int* d_l){ if (d == 0 && d_l == 0){ subscribe_topic(t); }else{ void* proc_id, *msgpub; int proc_id_len, msgpub_len; int timeout_ms = -1; BHReadSub(&proc_id, &proc_id_len, &msgpub, &msgpub_len, timeout_ms); if (BHReadSub(&proc_id, &proc_id_len, &msgpub, &msgpub_len, timeout_ms)){ string t((char*)proc_id, proc_id_len); BHFree(proc_id, proc_id_len); string m((char*)msgpub, msgpub_len); BHFree(msgpub, msgpub_len); printf("readsubmsg topic %s data %s\n", t.c_str(), m.c_str()); }else{ printf("readsubmsg failed\n"); } } }