From 591aacee97f4a6486631c38a6b418e20b2c4109c Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 10 九月 2020 14:56:47 +0800 Subject: [PATCH] update --- service/netdisk_service.c | 334 ++++++++++++++++++++++++++++--------------------------- 1 files changed, 170 insertions(+), 164 deletions(-) diff --git a/service/netdisk_service.c b/service/netdisk_service.c index 57fd496..d6a0b19 100644 --- a/service/netdisk_service.c +++ b/service/netdisk_service.c @@ -1,242 +1,248 @@ #include "usg_common.h" #include "netdisk.h" -#include "hcnetdisk.h" +#include "safe_queue.h" +#include "login_store.h" +#include "request_handler.h" +#include "properties_config.h" +#include "netdisk_factory.h" #include <jsoncpp/json/json.h> #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> +#include <nng/protocol/survey0/survey.h> +#include <nng/protocol/survey0/respond.h> + using namespace std; -const char *url = "tcp://127.0.0.1:8899"; -Json::Value loginData; +static int work(Netdisk_DownloadRequest drequest); +static int connectAndSend(const char *url, char * str); -std::map<std::string, Netdisk *> *userDeviceMap; -std::string login_data_file = "../data/login.dat"; -int saveLoginInfo(Netdisk_LoginInfo &loginInfo) { - - Json::Value item; - item["loginUUID"] = loginInfo.loginUUID; - item["deviceType"] = loginInfo.deviceType; - item["username"] = loginInfo.username; - item["password"] = loginInfo.password; - item["host"] = loginInfo.host; - item["port"] = loginInfo.port; - - loginData[loginInfo.loginUUID] = item; - +int WORKERS ; +std::string remoteUrl; - auto str = loginData.toStyledString(); - // std::cout << str << std::endl; +PropertiesConfig config("../data/config.txt"); - std::ofstream fout(login_data_file); - fout << str; - fout.close(); - return 0; -} +SafeQueue<Netdisk_DownloadRequest> task_queue(100); -Netdisk_LoginInfo getLoginInfo(std::string uuid) { - Json::Value item = loginData[uuid]; - Netdisk_LoginInfo loginInfo; - loginInfo.loginUUID = item["loginUUID"].asString(); - loginInfo.deviceType = item["deviceType"].asString(); - loginInfo.username = item["username"].asString(); - loginInfo.password = item["password"].asString(); - loginInfo.host = item["host"].asString(); - loginInfo.port = item["port"].asInt(); - return loginInfo; -} +std::map<std::string, Netdisk *> userDeviceMap; -void -fatal(const char *func, int rv) +LoginStore loginStore; + +std::map<std::string, RequestHandleFun> requestHandleFunMap; + + + +static inline void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); - exit(1); + //exit(1); } - -int handleLogin(nng_socket sock, Json::Value request) { - int rv, code; - // char *buf; - Netdisk *netdisk = NULL; - Json::Value arguments = request["arguments"]; +int connectAndSend(const char *url, char * str) { + nng_socket sock; + int rv; + size_t sz; + char *buf; - //鐧诲綍 - Netdisk_LoginInfo loginInfo; - loginInfo.loginUUID=arguments["loginUUID"].asString(); - loginInfo.deviceType = arguments["deviceType"].asString(); - loginInfo.host = arguments["host"].asString(); - loginInfo.port = arguments["port"].asInt(); - loginInfo.username = arguments["username"].asString(); - loginInfo.password = arguments["password"].asString(); - - - std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID); - if( userDeviceIter != userDeviceMap->end() ) { - netdisk = userDeviceIter->second; + if ((rv = nng_req0_open(&sock)) != 0) { + fatal("nng_socket", rv); } - if (netdisk == NULL) { - printf("=========null\n"); - if(loginInfo.deviceType == "HC") { - // std::cout << "new HCNetdisk" << std::endl; - netdisk = new HCNetdisk(); - userDeviceMap->insert({loginInfo.loginUUID, netdisk}); - } else { - err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); - } + if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { + fatal("nng_dial", rv); } + // printf("CLIENT: SENDING DATE REQUEST\n"); - code = netdisk->login(loginInfo); - if (code == 0) { - saveLoginInfo(loginInfo); - std::cout << "璧峰閫氶亾:" << netdisk->getStartChannel() << ", 鏈�澶ч�氶亾鍙凤細" << netdisk->getMaxChannels() << std::endl; - } - - - Json::Value response; - Json::Value payload; - payload["loginUUID"] = loginInfo.loginUUID; - response["code"] = code; - response["payload"] = payload; - const std::string str = response.toStyledString(); - // nng鍐呴儴浼氶噴鏀綽uf - rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); - //free(buf); - if (rv != 0) { + if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) { fatal("nng_send", rv); - return rv; } + + if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { + fatal("nng_recv", rv); + } + std::cout << buf; + nng_free(buf, sz); + nng_close(sock); return 0; + } - -int handleDownloadByTime(nng_socket sock, Json::Value request) { - int rv, code; - // char *buf; + +int work(Netdisk_DownloadRequest drequest) { Netdisk *netdisk = NULL; - Json::Value arguments = request["arguments"]; - Netdisk_DownloadRequest drequest; - drequest.loginUUID = arguments["loginUUID"].asString(); - Json::Value start = arguments["start"]; - drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� - drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀 - drequest.start.tm_mday = start["day"].asInt(); - drequest.start.tm_hour = start["hour"].asInt(); - drequest.start.tm_min = start["min"].asInt(); - drequest.start.tm_sec = start["sec"].asInt(); - - - Json::Value end = arguments["end"]; - drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� - drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀 - drequest.end.tm_mday = end["day"].asInt(); - drequest.end.tm_hour = end["hour"].asInt(); - drequest.end.tm_min = end["min"].asInt(); - drequest.end.tm_sec = end["sec"].asInt(); - - drequest.channel = arguments["channel"].asInt(); - drequest.destpath = arguments["destpath"].asString(); - - - std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID); - if( userDeviceIter != userDeviceMap->end() ) { + std::vector<std::string> files; + int rv; + char rmsg[MAXLINE]; + strcpy(rmsg, "success"); + Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); + std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); + if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } - Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID); if (netdisk == NULL) { - - if(loginInfo.deviceType == "HC") { - // std::cout << "new HCNetdisk" << std::endl; - netdisk = new HCNetdisk(); - userDeviceMap->insert({loginInfo.loginUUID, netdisk}); - } else { - err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); + netdisk = NetdiskFacotory::create(loginInfo.deviceType); + if(netdisk != NULL) { + userDeviceMap.insert({loginInfo.loginUUID, netdisk}); + } else { + snprintf(rmsg, MAXLINE, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); } } - if ( (code = netdisk->login(loginInfo)) != 0 ) { - printf("涓嬭浇鐧诲綍澶辫触\n"); + if ( (rv = netdisk->login(loginInfo)) != 0 ) { + snprintf(rmsg, MAXLINE, "璇烽噸鏂扮櫥褰�"); + } else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { + snprintf(rmsg, MAXLINE, "涓嬭浇澶辫触"); } + - std::vector<std::string> files; - if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) { - printf("涓嬭浇澶辫触\n"); - } - - - Json::Value response; - Json::Value payload; - response["code"] = code; - + Json::Value request; + request["method"] = "downloadByTimeCallBack"; + Json::Value arguments; Json::Value filelist; for(std::string f : files) { filelist.append(f); } - payload["filelist"] = filelist; - response["payload"] = payload; - std::string str = response.toStyledString(); - std::cout << str << std::endl; - rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); - if (rv != 0) { - fatal("nng_send", rv); - return rv; - } + arguments["fileList"] = filelist; + arguments["loginUUID"] = drequest.loginUUID; + + request["arguments"] = arguments; + std::string str = request.toStyledString(); + + std::cout << "SENDING download finished\n" << str << std::endl; + connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); + return 0; - - } -int -server() +void *worker(void *vargp) { + pthread_detach(pthread_self()); + while (1) + { + Netdisk_DownloadRequest request; + task_queue.pop(request); + err_msg(0, "====take a task"); + work(request); + } +} + + + +void startServer(const char *url) { Json::Reader jsonreader; Json::Value request; nng_socket sock; int rv; - + + RequestHandleFun handleFun; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { + printf("url=%s\n", url); fatal("nng_listen", rv); } + for (;;) { char * buf = NULL; size_t sz; if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } - +printf("RECEIVED RPC REQUEST:\n %s", buf); jsonreader.parse(buf, request); + nng_free(buf, sz); std::string method = request["method"].asString(); - if (method == "login") { - handleLogin(sock, request); - } else if (method == "downloadByTime") { - handleDownloadByTime(sock, request); + + std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method); + if( handleFunIter != requestHandleFunMap.end() ) { + handleFun = handleFunIter->second; + handleFun(sock, request); } else { std::cerr << "Don't support " << method << std::endl; } - // Unrecognized command, so toss the buffer. - nng_free(buf, sz); + + } +} + +/** + * 娉ㄥ唽璇锋眰澶勭悊鐨勬柟娉� + */ +void registRequestHandleFun() { + requestHandleFunMap.insert({"login", handleLogin}); + requestHandleFunMap.insert({"logout", handleLogout}); + requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync}); + requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo}); +} + + +void initThreadPool() { + pthread_t tid; + for (int i = 0; i < WORKERS; i++) + pthread_create(&tid, NULL, worker, NULL); +} + + +void heartBeat(const char *url, const char *name) +{ + nng_socket sock; + int rv; + + if ((rv = nng_respondent0_open(&sock)) != 0) { + fatal("nng_respondent0_open", rv); + } + if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) { + fatal("nng_dial", rv); + } + for (;;) { + char *buf = NULL; + size_t sz; + if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) { +//printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf); + nng_free(buf, sz); + char response[1024]; + sprintf(response, "%s-%d", name, getpid()); +//printf("CLIENT (%s): SENDING SURVEY RESPONSE:%s\n", name, response); + if ((rv = nng_send(sock, response, strlen(response) + 1, 0)) != 0) { + fatal("nng_send", rv); + } + } + } +} + +void *heart(void *vargp) +{ + pthread_detach(pthread_self()); + heartBeat(config.get("heart_server").c_str(), "netdisk"); + return NULL; +} + +// 蹇冭烦鍙戦�佽繘绋� +void initHeart() { + pthread_t tid; + pthread_create(&tid, NULL, heart, NULL); } int main() { - Netdisk_EnvConfig config; - config.libpath = "../hclib/"; - HCNetdisk::netdisk_init(&config); + //鐜鍙橀噺鍒濆鍖� + WORKERS = config.getInt("workers"); + remoteUrl = config.get("client_url"); - Json::Reader jsonreader; - ifstream fin(login_data_file); - jsonreader.parse(fin, loginData); - fin.close(); + //娴峰悍璁惧鐜鍒濆鍖� + Netdisk_EnvConfig hcEnvConfig; + hcEnvConfig.libpath = "../lib/hc"; + HCNetdisk::netdisk_init(&hcEnvConfig); + registRequestHandleFun(); - userDeviceMap = new std::map<std::string, Netdisk *>(); - server(); + initThreadPool(); + initHeart(); + + startServer(config.get("server_url").c_str()); HCNetdisk::netdisk_deinit(); -- Gitblit v1.8.0