From 591aacee97f4a6486631c38a6b418e20b2c4109c Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 10 九月 2020 14:56:47 +0800 Subject: [PATCH] update --- service/netdisk_service.c | 190 ++++++++++++++++++++++++++++------------------ 1 files changed, 115 insertions(+), 75 deletions(-) diff --git a/service/netdisk_service.c b/service/netdisk_service.c index 8d31587..d6a0b19 100644 --- a/service/netdisk_service.c +++ b/service/netdisk_service.c @@ -4,28 +4,26 @@ #include "login_store.h" #include "request_handler.h" #include "properties_config.h" +#include "netdisk_factory.h" #include <jsoncpp/json/json.h> #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> +#include <nng/protocol/survey0/survey.h> +#include <nng/protocol/survey0/respond.h> using namespace std; - - -int work(Netdisk_DownloadRequest drequest); - - -PropertiesConfig config("../data/config.txt"); +static int work(Netdisk_DownloadRequest drequest); +static int connectAndSend(const char *url, char * str); int WORKERS ; -std::string localUrl; std::string remoteUrl; +PropertiesConfig config("../data/config.txt"); - -SafeQueue<Netdisk_DownloadRequest> task_queue(10); +SafeQueue<Netdisk_DownloadRequest> task_queue(100); std::map<std::string, Netdisk *> userDeviceMap; @@ -70,6 +68,54 @@ } +int work(Netdisk_DownloadRequest drequest) { + Netdisk *netdisk = NULL; + std::vector<std::string> files; + int rv; + char rmsg[MAXLINE]; + strcpy(rmsg, "success"); + Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); + std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); + if( userDeviceIter != userDeviceMap.end() ) { + netdisk = userDeviceIter->second; + } + + if (netdisk == NULL) { + + netdisk = NetdiskFacotory::create(loginInfo.deviceType); + if(netdisk != NULL) { + userDeviceMap.insert({loginInfo.loginUUID, netdisk}); + } else { + snprintf(rmsg, MAXLINE, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); + } + } + + if ( (rv = netdisk->login(loginInfo)) != 0 ) { + snprintf(rmsg, MAXLINE, "璇烽噸鏂扮櫥褰�"); + } else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { + snprintf(rmsg, MAXLINE, "涓嬭浇澶辫触"); + } + + + Json::Value request; + request["method"] = "downloadByTimeCallBack"; + Json::Value arguments; + Json::Value filelist; + for(std::string f : files) { + filelist.append(f); + } + arguments["fileList"] = filelist; + arguments["loginUUID"] = drequest.loginUUID; + + request["arguments"] = arguments; + std::string str = request.toStyledString(); + + std::cout << "SENDING download finished\n" << str << std::endl; + connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); + + return 0; +} + void *worker(void *vargp) { pthread_detach(pthread_self()); @@ -77,58 +123,12 @@ { Netdisk_DownloadRequest request; task_queue.pop(request); + err_msg(0, "====take a task"); work(request); } } -int work(Netdisk_DownloadRequest drequest) { - Netdisk *netdisk = NULL; - int rv; - std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); - if( userDeviceIter != userDeviceMap.end() ) { - netdisk = userDeviceIter->second; - } - Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); - if (netdisk == NULL) { - - if(loginInfo.deviceType == "HC") { - // std::cout << "new HCNetdisk" << std::endl; - netdisk = new HCNetdisk(); - userDeviceMap.insert({loginInfo.loginUUID, netdisk}); - } else { - err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); - - } - } - - if ( (rv = netdisk->login(loginInfo)) != 0 ) { - printf("涓嬭浇鐧诲綍澶辫触\n"); - } - - std::vector<std::string> files; - if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { - printf("涓嬭浇澶辫触\n"); - } - - Json::Value response; - Json::Value payload; - response["rv"] = rv; - - Json::Value filelist; - for(std::string f : files) { - filelist.append(f); - } - payload["filelist"] = filelist; - response["payload"] = payload; - std::string str = response.toStyledString(); - - std::cout << "download finished, call back" << std::endl; - std::cout << str << std::endl; - connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); - - return 0; -} void startServer(const char *url) { Json::Reader jsonreader; @@ -151,8 +151,9 @@ if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } - +printf("RECEIVED RPC REQUEST:\n %s", buf); jsonreader.parse(buf, request); + nng_free(buf, sz); std::string method = request["method"].asString(); std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method); @@ -162,23 +163,21 @@ } else { std::cerr << "Don't support " << method << std::endl; } - // if (method == "login") { - // handleLogin(sock, request); - // } else if (method == "downloadByTime") { - // handleDownloadByTimeAsync(sock, request); - // } else { - // std::cerr << "Don't support " << method << std::endl; - // } - // Unrecognized command, so toss the buffer. - nng_free(buf, sz); + + } } - +/** + * 娉ㄥ唽璇锋眰澶勭悊鐨勬柟娉� + */ void registRequestHandleFun() { requestHandleFunMap.insert({"login", handleLogin}); + requestHandleFunMap.insert({"logout", handleLogout}); requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync}); + requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo}); } + void initThreadPool() { pthread_t tid; @@ -186,23 +185,64 @@ pthread_create(&tid, NULL, worker, NULL); } + +void heartBeat(const char *url, const char *name) +{ + nng_socket sock; + int rv; + + if ((rv = nng_respondent0_open(&sock)) != 0) { + fatal("nng_respondent0_open", rv); + } + if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) { + fatal("nng_dial", rv); + } + for (;;) { + char *buf = NULL; + size_t sz; + if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) { +//printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf); + nng_free(buf, sz); + char response[1024]; + sprintf(response, "%s-%d", name, getpid()); +//printf("CLIENT (%s): SENDING SURVEY RESPONSE:%s\n", name, response); + if ((rv = nng_send(sock, response, strlen(response) + 1, 0)) != 0) { + fatal("nng_send", rv); + } + } + } +} + +void *heart(void *vargp) +{ + pthread_detach(pthread_self()); + heartBeat(config.get("heart_server").c_str(), "netdisk"); + return NULL; +} + +// 蹇冭烦鍙戦�佽繘绋� +void initHeart() { + pthread_t tid; + pthread_create(&tid, NULL, heart, NULL); +} + int main() { - - Netdisk_EnvConfig hcEnvConfig; - hcEnvConfig.libpath = config.get("hclib"); - HCNetdisk::netdisk_init(&hcEnvConfig); - + //鐜鍙橀噺鍒濆鍖� WORKERS = config.getInt("workers"); - localUrl = config.get("local_url"); - remoteUrl = config.get("remote_url"); + remoteUrl = config.get("client_url"); + //娴峰悍璁惧鐜鍒濆鍖� + Netdisk_EnvConfig hcEnvConfig; + hcEnvConfig.libpath = "../lib/hc"; + HCNetdisk::netdisk_init(&hcEnvConfig); registRequestHandleFun(); initThreadPool(); + initHeart(); - startServer(localUrl.c_str()); + startServer(config.get("server_url").c_str()); HCNetdisk::netdisk_deinit(); -- Gitblit v1.8.0