From 5c9cbf5ea152f501ae976e0e0b4ef5ee98afdfba Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 11 六月 2020 19:20:40 +0800 Subject: [PATCH] pdate --- service/Makefile | 6 service/request_handler.h | 15 service/login_store.c | 45 ++ service/login_store.h | 16 service/netdisk_service | 0 device/hcnetdisk.c | 1 service/request_handler.c | 193 ++++++++++ device/include/netdisk.h | 2 service/netdisk_service.c | 254 +++++------- service/test_queue | 0 data/config.txt | 11 service/client.c | 55 ++ device/include/hcnetdisk.h | 1 service/test_properties | 0 service/test_properties.c | 8 /dev/null | 0 service/test_queue.c | 18 service/safe_queue.h | 120 ++++++ common/include/usg_common.h | 31 + common/usg_common.c | 38 ++ service/safe_queue_impl.h | 258 +++++++++++++ device/test.c | 1 service/properties_config.h | 11 service/client | 0 service/properties_config.c | 42 ++ service/test | 0 26 files changed, 973 insertions(+), 153 deletions(-) diff --git a/common/include/usg_common.h b/common/include/usg_common.h index bf70221..358fbb3 100644 --- a/common/include/usg_common.h +++ b/common/include/usg_common.h @@ -79,6 +79,10 @@ void err_exit(int error, const char *fmt, ...); void err_msg(int error, const char *fmt, ...); +char *ltrim(char *str, const char *seps); +char *rtrim(char *str, const char *seps); +char *trim(char *str, const char *seps); + static inline int itoa(int num, char *str) { @@ -93,7 +97,34 @@ } + + #ifdef __cplusplus } #endif + + + +#ifdef __cplusplus + +// static inline std::string& ltrim(std::string& str, const std::string& chars = "\t\n\v\f\r ") +// { +// str.erase(0, str.find_first_not_of(chars)); +// return str; +// } + +// static inline std::string& rtrim(std::string& str, const std::string& chars = "\t\n\v\f\r ") +// { +// str.erase(str.find_last_not_of(chars) + 1); +// return str; +// } + +// static inline std::string& trim(std::string& str, const std::string& chars = "\t\n\v\f\r ") +// { +// return ltrim(rtrim(str, chars), chars); +// } + + +#endif + #endif diff --git a/common/usg_common.c b/common/usg_common.c index 4b0f76e..6ba65ae 100644 --- a/common/usg_common.c +++ b/common/usg_common.c @@ -76,3 +76,41 @@ fputs(buf, stderr); fflush(NULL); /* flushes all stdio output streams */ } + +char *ltrim(char *str, const char *seps) +{ + size_t totrim; + if (seps == NULL) { + seps = "\t\n\v\f\r "; + } + totrim = strspn(str, seps); + if (totrim > 0) { + size_t len = strlen(str); + if (totrim == len) { + str[0] = '\0'; + } + else { + memmove(str, str + totrim, len + 1 - totrim); + } + } + return str; +} + +char *rtrim(char *str, const char *seps) +{ + int i; + if (seps == NULL) { + seps = "\t\n\v\f\r "; + } + i = strlen(str) - 1; + while (i >= 0 && strchr(seps, str[i]) != NULL) { + str[i] = '\0'; + i--; + } + return str; +} + +char *trim(char *str, const char *seps) +{ + return ltrim(rtrim(str, seps), seps); +} \ No newline at end of file diff --git a/data/config.txt b/data/config.txt new file mode 100644 index 0000000..87c54c2 --- /dev/null +++ b/data/config.txt @@ -0,0 +1,11 @@ +# nng鏈湴鏈嶅姟鍦板潃 +local_url=tcp://127.0.0.1:8899 + +# nng杩滅▼璋冪敤鍦板潃鍦板潃 +remote_url=tcp://127.0.0.1:9988 + +# 娴峰悍鍖呰矾寰� +hclib=../hclib/ + +# 璐熻矗涓嬭浇浠诲姟鐨勭嚎绋嬫睜鐨勬暟閲� +workers=4 \ No newline at end of file diff --git a/device/hcnetdisk.c b/device/hcnetdisk.c index b62aecc..a4fc6e0 100644 --- a/device/hcnetdisk.c +++ b/device/hcnetdisk.c @@ -1,5 +1,4 @@ #include "netdisk.h" -#include "hcnetdisk.h" bool HCNetdisk::envInited = false; diff --git a/device/include/hcnetdisk.h b/device/include/hcnetdisk.h index 85058c6..eeee33d 100644 --- a/device/include/hcnetdisk.h +++ b/device/include/hcnetdisk.h @@ -4,7 +4,6 @@ #include "usg_common.h" #include "usg_typedef.h" #include "HCNetSDK.h" -#include "netdisk.h" //娴峰悍缃戠粶纭洏 class HCNetdisk : public Netdisk{ diff --git a/device/include/netdisk.h b/device/include/netdisk.h index 3695ac6..f92eeb4 100644 --- a/device/include/netdisk.h +++ b/device/include/netdisk.h @@ -73,5 +73,5 @@ }; - +#include "hcnetdisk.h" #endif \ No newline at end of file diff --git a/device/test.c b/device/test.c index d8a777c..0b99d8f 100644 --- a/device/test.c +++ b/device/test.c @@ -1,6 +1,5 @@ #include "usg_common.h" #include "netdisk.h" -#include "hcnetdisk.h" diff --git a/service/Makefile b/service/Makefile index 01f98fd..af5b61d 100644 --- a/service/Makefile +++ b/service/Makefile @@ -14,9 +14,11 @@ PLATFORM=$(shell $(ROOT)/systype.sh) include $(ROOT)/Make.defines.$(PLATFORM) -all: netdisk_service client test +all: netdisk_service client test test_queue test_properties -netdisk_service: $(ROOT)/device/hcnetdisk.c +test_properties: properties_config.c + +netdisk_service: $(ROOT)/device/hcnetdisk.c login_store.c request_handler.c properties_config.c test: $(ROOT)/device/hcnetdisk.c diff --git a/service/client b/service/client index f0b2e36..a792d7f 100755 --- a/service/client +++ b/service/client Binary files differ diff --git a/service/client.c b/service/client.c index 49d379d..170e788 100644 --- a/service/client.c +++ b/service/client.c @@ -6,6 +6,7 @@ #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> const char *url = "tcp://127.0.0.1:8899"; +const char *localUrl = "tcp://127.0.0.1:9988"; void fatal(const char *func, int rv) { @@ -30,7 +31,7 @@ std::string str = request.toStyledString(); - if ((rv = nng_send(sock, strdupa(str.c_str()), str.length(), 0)) != 0) { + if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), 0)) != 0) { fatal("nng_send", rv); } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { @@ -119,10 +120,58 @@ } int +server() +{ + nng_socket sock; + int rv; + + if ((rv = nng_rep0_open(&sock)) != 0) { + fatal("nng_rep0_open", rv); + } + if ((rv = nng_listen(sock, localUrl, NULL, 0)) != 0) { + fatal("nng_listen", rv); + } + for (;;) { + char * buf = NULL; + size_t sz; + if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { + fatal("nng_recv", rv); + } + std::cout << buf << std::endl; + + Json::Value response; + response["code"] = 0; + std::string str = response.toStyledString(); + rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); + if (rv != 0) { + fatal("nng_send", rv); + } + // if ((sz == sizeof(uint64_t)) && + // ((GET64(buf, val)) == DATECMD)) { + // time_t now; + // printf("SERVER: RECEIVED DATE REQUEST\n"); + // now = time(&now); + // printf("SERVER: SENDING DATE: "); + // showdate(now); + + // // Reuse the buffer. We know it is big enough. + // PUT64(buf, (uint64_t) now); + + // continue; + // } + // Unrecognized command, so toss the buffer. + //nng_free(buf, sz); + } +} + + +int main(const int argc, const char **argv) { - if ((argc > 1)) - return (client(argv[1])); + if ((argc > 1)) { + client(argv[1]); + server(); + } // std::string str("123"); // char *str2="123"; // printf("str length %d, %d\n", str.length(), strlen(str2)); diff --git a/service/core b/service/core deleted file mode 100644 index 118c81c..0000000 --- a/service/core +++ /dev/null Binary files differ diff --git a/service/login_store.c b/service/login_store.c new file mode 100644 index 0000000..0d113f6 --- /dev/null +++ b/service/login_store.c @@ -0,0 +1,45 @@ +#include "login_store.h" + +LoginStore::LoginStore() : login_data_file("../data/login.dat") { + Json::Reader jsonreader; + std::ifstream fin(login_data_file); + jsonreader.parse(fin, loginData); + fin.close(); +} + + +int LoginStore::saveLoginInfo(Netdisk_LoginInfo &loginInfo) { + + Json::Value item; + item["loginUUID"] = loginInfo.loginUUID; + item["deviceType"] = loginInfo.deviceType; + item["username"] = loginInfo.username; + item["password"] = loginInfo.password; + item["host"] = loginInfo.host; + item["port"] = loginInfo.port; + + loginData[loginInfo.loginUUID] = item; + + + auto str = loginData.toStyledString(); + // std::cout << str << std::endl; + + std::ofstream fout(login_data_file); + fout << str; + fout.close(); + return 0; +} + + +Netdisk_LoginInfo LoginStore::getLoginInfo(std::string uuid) { + Json::Value item = loginData[uuid]; + Netdisk_LoginInfo loginInfo; + loginInfo.loginUUID = item["loginUUID"].asString(); + loginInfo.deviceType = item["deviceType"].asString(); + loginInfo.username = item["username"].asString(); + loginInfo.password = item["password"].asString(); + loginInfo.host = item["host"].asString(); + loginInfo.port = item["port"].asInt(); + return loginInfo; +} + \ No newline at end of file diff --git a/service/login_store.h b/service/login_store.h new file mode 100644 index 0000000..8f24876 --- /dev/null +++ b/service/login_store.h @@ -0,0 +1,16 @@ +#ifndef _LOGINSTORE_H_ +#define _LOGINSTORE_H_ +#include "usg_common.h" +#include "netdisk.h" +#include <jsoncpp/json/json.h> + +class LoginStore { + Json::Value loginData; + std::string login_data_file; +public: + LoginStore(); + int saveLoginInfo(Netdisk_LoginInfo &loginInfo); + Netdisk_LoginInfo getLoginInfo(std::string uuid); +}; + +#endif \ No newline at end of file diff --git a/service/netdisk_service b/service/netdisk_service index 196531c..ac87c6f 100755 --- a/service/netdisk_service +++ b/service/netdisk_service Binary files differ diff --git a/service/netdisk_service.c b/service/netdisk_service.c index 57fd496..8d31587 100644 --- a/service/netdisk_service.c +++ b/service/netdisk_service.c @@ -1,174 +1,119 @@ #include "usg_common.h" #include "netdisk.h" -#include "hcnetdisk.h" +#include "safe_queue.h" +#include "login_store.h" +#include "request_handler.h" +#include "properties_config.h" #include <jsoncpp/json/json.h> #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> + using namespace std; -const char *url = "tcp://127.0.0.1:8899"; -Json::Value loginData; -std::map<std::string, Netdisk *> *userDeviceMap; -std::string login_data_file = "../data/login.dat"; -int saveLoginInfo(Netdisk_LoginInfo &loginInfo) { - - Json::Value item; - item["loginUUID"] = loginInfo.loginUUID; - item["deviceType"] = loginInfo.deviceType; - item["username"] = loginInfo.username; - item["password"] = loginInfo.password; - item["host"] = loginInfo.host; - item["port"] = loginInfo.port; - - loginData[loginInfo.loginUUID] = item; - +int work(Netdisk_DownloadRequest drequest); - auto str = loginData.toStyledString(); - // std::cout << str << std::endl; - std::ofstream fout(login_data_file); - fout << str; - fout.close(); - return 0; -} +PropertiesConfig config("../data/config.txt"); -Netdisk_LoginInfo getLoginInfo(std::string uuid) { - Json::Value item = loginData[uuid]; - Netdisk_LoginInfo loginInfo; - loginInfo.loginUUID = item["loginUUID"].asString(); - loginInfo.deviceType = item["deviceType"].asString(); - loginInfo.username = item["username"].asString(); - loginInfo.password = item["password"].asString(); - loginInfo.host = item["host"].asString(); - loginInfo.port = item["port"].asInt(); - return loginInfo; -} -void -fatal(const char *func, int rv) +int WORKERS ; +std::string localUrl; +std::string remoteUrl; + + + +SafeQueue<Netdisk_DownloadRequest> task_queue(10); + +std::map<std::string, Netdisk *> userDeviceMap; + +LoginStore loginStore; + +std::map<std::string, RequestHandleFun> requestHandleFunMap; + + + +static inline void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); - exit(1); + //exit(1); } - -int handleLogin(nng_socket sock, Json::Value request) { - int rv, code; - // char *buf; - Netdisk *netdisk = NULL; - Json::Value arguments = request["arguments"]; +int connectAndSend(const char *url, char * str) { + nng_socket sock; + int rv; + size_t sz; + char *buf; - //鐧诲綍 - Netdisk_LoginInfo loginInfo; - loginInfo.loginUUID=arguments["loginUUID"].asString(); - loginInfo.deviceType = arguments["deviceType"].asString(); - loginInfo.host = arguments["host"].asString(); - loginInfo.port = arguments["port"].asInt(); - loginInfo.username = arguments["username"].asString(); - loginInfo.password = arguments["password"].asString(); - - - std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID); - if( userDeviceIter != userDeviceMap->end() ) { - netdisk = userDeviceIter->second; + if ((rv = nng_req0_open(&sock)) != 0) { + fatal("nng_socket", rv); } - if (netdisk == NULL) { - printf("=========null\n"); - if(loginInfo.deviceType == "HC") { - // std::cout << "new HCNetdisk" << std::endl; - netdisk = new HCNetdisk(); - userDeviceMap->insert({loginInfo.loginUUID, netdisk}); - } else { - err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); - } + if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { + fatal("nng_dial", rv); } + // printf("CLIENT: SENDING DATE REQUEST\n"); - code = netdisk->login(loginInfo); - if (code == 0) { - saveLoginInfo(loginInfo); - std::cout << "璧峰閫氶亾:" << netdisk->getStartChannel() << ", 鏈�澶ч�氶亾鍙凤細" << netdisk->getMaxChannels() << std::endl; - } - - - Json::Value response; - Json::Value payload; - payload["loginUUID"] = loginInfo.loginUUID; - response["code"] = code; - response["payload"] = payload; - const std::string str = response.toStyledString(); - // nng鍐呴儴浼氶噴鏀綽uf - rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); - //free(buf); - if (rv != 0) { + if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) { fatal("nng_send", rv); - return rv; } + + if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { + fatal("nng_recv", rv); + } + std::cout << buf; + nng_free(buf, sz); + nng_close(sock); return 0; + } - -int handleDownloadByTime(nng_socket sock, Json::Value request) { - int rv, code; - // char *buf; + +void *worker(void *vargp) +{ + pthread_detach(pthread_self()); + while (1) + { + Netdisk_DownloadRequest request; + task_queue.pop(request); + work(request); + } +} + +int work(Netdisk_DownloadRequest drequest) { Netdisk *netdisk = NULL; - Json::Value arguments = request["arguments"]; - Netdisk_DownloadRequest drequest; - drequest.loginUUID = arguments["loginUUID"].asString(); - Json::Value start = arguments["start"]; - drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� - drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀 - drequest.start.tm_mday = start["day"].asInt(); - drequest.start.tm_hour = start["hour"].asInt(); - drequest.start.tm_min = start["min"].asInt(); - drequest.start.tm_sec = start["sec"].asInt(); - - - Json::Value end = arguments["end"]; - drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� - drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀 - drequest.end.tm_mday = end["day"].asInt(); - drequest.end.tm_hour = end["hour"].asInt(); - drequest.end.tm_min = end["min"].asInt(); - drequest.end.tm_sec = end["sec"].asInt(); - - drequest.channel = arguments["channel"].asInt(); - drequest.destpath = arguments["destpath"].asString(); - - - std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID); - if( userDeviceIter != userDeviceMap->end() ) { + int rv; + std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); + if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } - Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID); + Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); if (netdisk == NULL) { if(loginInfo.deviceType == "HC") { // std::cout << "new HCNetdisk" << std::endl; netdisk = new HCNetdisk(); - userDeviceMap->insert({loginInfo.loginUUID, netdisk}); + userDeviceMap.insert({loginInfo.loginUUID, netdisk}); } else { err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); } } - if ( (code = netdisk->login(loginInfo)) != 0 ) { + if ( (rv = netdisk->login(loginInfo)) != 0 ) { printf("涓嬭浇鐧诲綍澶辫触\n"); } std::vector<std::string> files; - if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) { + if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { printf("涓嬭浇澶辫触\n"); } - Json::Value response; Json::Value payload; - response["code"] = code; + response["rv"] = rv; Json::Value filelist; for(std::string f : files) { @@ -177,31 +122,29 @@ payload["filelist"] = filelist; response["payload"] = payload; std::string str = response.toStyledString(); + + std::cout << "download finished, call back" << std::endl; std::cout << str << std::endl; - rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); - if (rv != 0) { - fatal("nng_send", rv); - return rv; - } + connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); + return 0; - - } -int -server() -{ +void startServer(const char *url) { Json::Reader jsonreader; Json::Value request; nng_socket sock; int rv; - + + RequestHandleFun handleFun; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { + printf("url=%s\n", url); fatal("nng_listen", rv); } + for (;;) { char * buf = NULL; size_t sz; @@ -211,32 +154,55 @@ jsonreader.parse(buf, request); std::string method = request["method"].asString(); - if (method == "login") { - handleLogin(sock, request); - } else if (method == "downloadByTime") { - handleDownloadByTime(sock, request); + + std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method); + if( handleFunIter != requestHandleFunMap.end() ) { + handleFun = handleFunIter->second; + handleFun(sock, request); } else { std::cerr << "Don't support " << method << std::endl; } + // if (method == "login") { + // handleLogin(sock, request); + // } else if (method == "downloadByTime") { + // handleDownloadByTimeAsync(sock, request); + // } else { + // std::cerr << "Don't support " << method << std::endl; + // } // Unrecognized command, so toss the buffer. nng_free(buf, sz); } } + +void registRequestHandleFun() { + requestHandleFunMap.insert({"login", handleLogin}); + requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync}); +} + +void initThreadPool() { + pthread_t tid; + for (int i = 0; i < WORKERS; i++) + pthread_create(&tid, NULL, worker, NULL); +} + int main() { - Netdisk_EnvConfig config; - config.libpath = "../hclib/"; - HCNetdisk::netdisk_init(&config); - Json::Reader jsonreader; - ifstream fin(login_data_file); - jsonreader.parse(fin, loginData); - fin.close(); + Netdisk_EnvConfig hcEnvConfig; + hcEnvConfig.libpath = config.get("hclib"); + HCNetdisk::netdisk_init(&hcEnvConfig); + + WORKERS = config.getInt("workers"); + localUrl = config.get("local_url"); + remoteUrl = config.get("remote_url"); - userDeviceMap = new std::map<std::string, Netdisk *>(); - server(); + registRequestHandleFun(); + + initThreadPool(); + + startServer(localUrl.c_str()); HCNetdisk::netdisk_deinit(); diff --git a/service/properties_config.c b/service/properties_config.c new file mode 100644 index 0000000..af88536 --- /dev/null +++ b/service/properties_config.c @@ -0,0 +1,42 @@ +#include "properties_config.h" + +PropertiesConfig::PropertiesConfig(std::string __propertiesFile) : propertiesFile(__propertiesFile) { + + std::ifstream fin(propertiesFile); + char line[1024]; + //std::string line; + char *key, *value; + const char *delim = "="; + while(fin.getline(line, 1024)) { + // printf("line=%s\n", line); + if(strlen(trim(line, NULL))== 0) + continue; + if(*line == '#') { + continue; + } + + key = trim(strtok(line, delim), 0); + value = trim(strtok(NULL, delim), 0); + propertiesMap.insert({key, value}); + // printf("key = %s, value=%s\n", key, value); + } + fin.close(); + +} + + +std::string PropertiesConfig::get(std::string name) { + std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name); + if( propertiesIter != propertiesMap.end() ) { + return propertiesIter->second; + } + return ""; +} + +int PropertiesConfig::getInt(std::string name) { + std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name); + if( propertiesIter != propertiesMap.end() ) { + return std::stoi(propertiesIter->second); + } + return 0; +} \ No newline at end of file diff --git a/service/properties_config.h b/service/properties_config.h new file mode 100644 index 0000000..8b4fbfb --- /dev/null +++ b/service/properties_config.h @@ -0,0 +1,11 @@ +#include "usg_common.h" + +class PropertiesConfig { + std::string propertiesFile; + std::map<std::string, std::string> propertiesMap; +public: + + PropertiesConfig(std::string _propertiesFile="./config.properties"); + std::string get(std::string name); + int getInt(std::string name); +}; \ No newline at end of file diff --git a/service/request_handler.c b/service/request_handler.c new file mode 100644 index 0000000..27f2132 --- /dev/null +++ b/service/request_handler.c @@ -0,0 +1,193 @@ +#include "request_handler.h" + +extern SafeQueue<Netdisk_DownloadRequest> task_queue; + +extern std::map<std::string, Netdisk *> userDeviceMap; + +extern LoginStore loginStore; + +static inline void fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); + //exit(1); +} + +int handleLogin(nng_socket sock, Json::Value request) { +std::cout << "accepted login request" << std::endl; + int rv, code; + // char *buf; + Netdisk *netdisk = NULL; + + Json::Value arguments = request["arguments"]; + + //鐧诲綍 + Netdisk_LoginInfo loginInfo; + loginInfo.loginUUID=arguments["loginUUID"].asString(); + loginInfo.deviceType = arguments["deviceType"].asString(); + loginInfo.host = arguments["host"].asString(); + loginInfo.port = arguments["port"].asInt(); + loginInfo.username = arguments["username"].asString(); + loginInfo.password = arguments["password"].asString(); + + + std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find(loginInfo.loginUUID); + if( userDeviceIter != userDeviceMap.end() ) { + netdisk = userDeviceIter->second; + } + if (netdisk == NULL) { + if(loginInfo.deviceType == "HC") { + // std::cout << "new HCNetdisk" << std::endl; + netdisk = new HCNetdisk(); + userDeviceMap.insert({loginInfo.loginUUID, netdisk}); + } else { + err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); + } + } + + code = netdisk->login(loginInfo); + if (code == 0) { + loginStore.saveLoginInfo(loginInfo); + std::cout << "璧峰閫氶亾:" << netdisk->getStartChannel() << ", 鏈�澶ч�氶亾鍙凤細" << netdisk->getMaxChannels() << std::endl; + } + + + Json::Value response; + Json::Value payload; + payload["loginUUID"] = loginInfo.loginUUID; + response["code"] = code; + response["payload"] = payload; + const std::string str = response.toStyledString(); + // nng鍐呴儴浼氶噴鏀綽uf + rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); + //free(buf); + if (rv != 0) { + fatal("nng_send", rv); + return rv; + } + return 0; +} + + + +int handleDownloadByTimeAsync(nng_socket sock, Json::Value request) { +std::cout << "accepted handleDownloadByTime request" << std::endl; + int rv; + // char *buf; + + Json::Value arguments = request["arguments"]; + Netdisk_DownloadRequest drequest; + drequest.loginUUID = arguments["loginUUID"].asString(); + Json::Value start = arguments["start"]; + drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� + drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀 + drequest.start.tm_mday = start["day"].asInt(); + drequest.start.tm_hour = start["hour"].asInt(); + drequest.start.tm_min = start["min"].asInt(); + drequest.start.tm_sec = start["sec"].asInt(); + + + Json::Value end = arguments["end"]; + drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� + drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀 + drequest.end.tm_mday = end["day"].asInt(); + drequest.end.tm_hour = end["hour"].asInt(); + drequest.end.tm_min = end["min"].asInt(); + drequest.end.tm_sec = end["sec"].asInt(); + + drequest.channel = arguments["channel"].asInt(); + drequest.destpath = arguments["destpath"].asString(); + + task_queue.push(drequest); + + + Json::Value response; + response["code"] = 0; + std::string str = response.toStyledString(); + std::cout << str << std::endl; + if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC)) != 0) { + fatal("nng_send", rv); + return rv; + } + return 0; + + +} + +int handleDownloadByTime(nng_socket sock, Json::Value request) { +std::cout << "accepted handleDownloadByTime request" << std::endl; + int rv, code; + // char *buf; + Netdisk *netdisk = NULL; + Json::Value arguments = request["arguments"]; + Netdisk_DownloadRequest drequest; + drequest.loginUUID = arguments["loginUUID"].asString(); + Json::Value start = arguments["start"]; + drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� + drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀 + drequest.start.tm_mday = start["day"].asInt(); + drequest.start.tm_hour = start["hour"].asInt(); + drequest.start.tm_min = start["min"].asInt(); + drequest.start.tm_sec = start["sec"].asInt(); + + + Json::Value end = arguments["end"]; + drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞� + drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀 + drequest.end.tm_mday = end["day"].asInt(); + drequest.end.tm_hour = end["hour"].asInt(); + drequest.end.tm_min = end["min"].asInt(); + drequest.end.tm_sec = end["sec"].asInt(); + + drequest.channel = arguments["channel"].asInt(); + drequest.destpath = arguments["destpath"].asString(); + + + std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); + if( userDeviceIter != userDeviceMap.end() ) { + netdisk = userDeviceIter->second; + } + + Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); + if (netdisk == NULL) { + + if(loginInfo.deviceType == "HC") { + // std::cout << "new HCNetdisk" << std::endl; + netdisk = new HCNetdisk(); + userDeviceMap.insert({loginInfo.loginUUID, netdisk}); + } else { + err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str()); + + } + } + + if ( (code = netdisk->login(loginInfo)) != 0 ) { + printf("涓嬭浇鐧诲綍澶辫触\n"); + } + + std::vector<std::string> files; + if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) { + printf("涓嬭浇澶辫触\n"); + } + + + Json::Value response; + Json::Value payload; + response["code"] = code; + + Json::Value filelist; + for(std::string f : files) { + filelist.append(f); + } + payload["filelist"] = filelist; + response["payload"] = payload; + std::string str = response.toStyledString(); + std::cout << str << std::endl; + rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); + if (rv != 0) { + fatal("nng_send", rv); + return rv; + } + return 0; + + +} diff --git a/service/request_handler.h b/service/request_handler.h new file mode 100644 index 0000000..da51eaf --- /dev/null +++ b/service/request_handler.h @@ -0,0 +1,15 @@ +#include "usg_common.h" +#include "netdisk.h" +#include "safe_queue.h" +#include "login_store.h" +#include <jsoncpp/json/json.h> +#include <nng/nng.h> +#include <nng/protocol/reqrep0/rep.h> +#include <nng/protocol/reqrep0/req.h> + + +typedef int (*RequestHandleFun)(nng_socket sock, Json::Value request); + +int handleLogin(nng_socket sock, Json::Value request); +int handleDownloadByTime(nng_socket sock, Json::Value request); +int handleDownloadByTimeAsync(nng_socket sock, Json::Value request); \ No newline at end of file diff --git a/service/safe_queue.h b/service/safe_queue.h new file mode 100644 index 0000000..1db299b --- /dev/null +++ b/service/safe_queue.h @@ -0,0 +1,120 @@ +#ifndef _SAFEQUEUE_H_ +#define _SAFEQUEUE_H_ + +#include <queue> +#include <condition_variable> +#include <mutex> +#include <chrono> +#include <limits> // std::numeric_limits<>::max + +#define SAFE_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max() + +/// @brief thread-safe queue +/// It uses a mutex+condition variables to protect the internal queue +/// implementation. Inserting or reading elements use the same mutex +template <typename T> +class SafeQueue +{ +public: + /// @brief constructor + /// @param a_maxSize optional parameter with the maximum size of the queue + SafeQueue(std::size_t a_maxSize = SAFE_QUEUE_DEFAULT_MAX_SIZE); + + /// @brief destructor + ~SafeQueue(); + + /// @brief copy contructor + /// WARNING: Use with great care, this function call can take a long time + /// and block other threads from pushing/popping elements into the source + /// queue + SafeQueue(const SafeQueue<T>& a_src); + + /// @brief operator= overloading + /// This function blocks the a_src and "this" SafeQueues and copies the + /// contents of a_src into "this" SafeQueue. + /// WARNING: Use with great care, this function call can take a long time + /// and block other threads from pushing/popping elements into the queues + /// @param a_src the "right" side of the operator= + /// @return a const reference to this object + const SafeQueue<T>& operator=(const SafeQueue<T> &a_src); + + /// @brief move contructor + SafeQueue(SafeQueue<T>&& a_src); + + /// @brief move assignment + SafeQueue<T>& operator=(SafeQueue<T>&& a_src); + + /// @brief Check if the queue is empty + /// This call can block if another thread owns the lock that protects the + /// queue + /// @return true if the queue is empty. False otherwise + bool isEmpty() const; + + /// @brief inserts an element into queue queue + /// This call can block if another thread owns the lock that protects the + /// queue. If the queue is full The thread will be blocked in this queue + /// until someone else gets an element from the queue + /// @param element to insert into the queue + void push(const T &a_elem); + + /// @brief inserts an element into queue queue + /// This call can block if another thread owns the lock that protects the + /// queue. If the queue is full The call will return false and the element + /// won't be inserted + /// @param element to insert into the queue + /// @return True if the elem was successfully inserted into the queue. + /// False otherwise + bool tryPush(const T &a_elem); + + /// @brief extracts an element from the queue (and deletes it from the q) + /// If the queue is empty this call will block the thread until there is + /// something in the queue to be extracted + /// @param a reference where the element from the queue will be saved to + void pop(T &out_data); + + /// @brief extracts an element from the queue (and deletes it from the q) + /// This call gets the block that protects the queue. It will extract the + /// element from the queue only if there are elements in it + /// @param reference to the variable where the result will be saved + /// @return True if the element was retrieved from the queue. + /// False if the queue was empty + bool tryPop(T &out_data); + + /// @brief extracts an element from the queue (and deletes it from the q) + /// If the queue is empty this call will block the thread until there + /// is something in the queue to be extracted or until the timer + /// (2nd parameter) expires + /// @param reference to the variable where the result will be saved + /// @param duration to wait before returning if the queue was empty + /// you may also pass into this a std::seconds or std::milliseconds + /// (defined in std::chrono) + /// @return True if the element was retrieved from the queue. + /// False if the timeout was hit and nothing could be extracted + /// from the queue + bool timedWaitPop(T &data, std::chrono::microseconds a_microsecs); + +protected: + /// the actual queue data structure protected by this SafeQueue wrapper + std::queue<T> m_theQueue; + /// maximum number of elements for the queue + std::size_t m_maximumSize; + /// Mutex to protect the queue + mutable std::mutex m_mutex; + /// Conditional variable to wake up threads + mutable std::condition_variable m_cond; + + /// @brief calculate if copying a_src into this instance will need to + /// wake up potential threads waiting to perform push or pop ops. + /// WARNING: It assumes the caller holds all mutexes required to access + /// to the data of this and a_src SafeQueues + /// @param a_src const reference to the SafeQueue that will be copied + /// into this object + /// @return true if threads will need to be waken up. False otherwise + inline bool wakeUpSignalNeeded(const SafeQueue<T> &a_src) const; +}; + +// include the implementation file +#include "safe_queue_impl.h" + +#endif /* _SAFEQUEUE_H_ */ + diff --git a/service/safe_queue_impl.h b/service/safe_queue_impl.h new file mode 100644 index 0000000..9d1d442 --- /dev/null +++ b/service/safe_queue_impl.h @@ -0,0 +1,258 @@ +#ifndef _SAFEQUEUEIMPL_H_ +#define _SAFEQUEUEIMPL_H_ + +template <typename T> +SafeQueue<T>::SafeQueue(std::size_t a_maxSize): + m_theQueue(), + m_maximumSize(a_maxSize), + m_mutex(), + m_cond() +{ +} + +template <typename T> +SafeQueue<T>::~SafeQueue() +{ +} + +template <typename T> +SafeQueue<T>::SafeQueue(const SafeQueue<T>& a_src): + m_theQueue(), + m_maximumSize(0), + m_mutex(), + m_cond() +{ + // copying a safe queue involves only copying the data (m_theQueue and + // m_maximumSize). This object has not been instantiated yet so nobody can + // be trying to perform push or pop operations on it, but we need to + // acquire a_src.m_mutex before copying its data into m_theQueue and + // m_maximumSize + std::unique_lock<std::mutex> lk(a_src.m_mutex); + + this->m_maximumSize = a_src.m_maximumSize; + this->m_theQueue = a_src.m_theQueue; +} + +template <typename T> +const SafeQueue<T>& SafeQueue<T>::operator=(const SafeQueue<T> &a_src) +{ + if (this != &a_src) + { + // lock both mutexes at the same time to avoid deadlocks + std::unique_lock<std::mutex> this_lk(this->m_mutex, std::defer_lock); + std::unique_lock<std::mutex> src_lk (a_src.m_mutex, std::defer_lock); + std::lock(this_lk, src_lk); + + // will we need to wake up waiting threads after copying the source + // queue? + bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src); + + // copy data from the left side of the operator= into this intance + this->m_maximumSize = a_src.m_maximumSize; + this->m_theQueue = a_src.m_theQueue; + + // time now to wake up threads waiting for data to be inserted + // or extracted + if (wakeUpWaitingThreads) + { + this->m_cond.notify_all(); + } + } + + return *this; +} + +template <typename T> +SafeQueue<T>::SafeQueue(SafeQueue<T>&& a_src): + m_theQueue(a_src.m_theQueue), // implicit std::move(a_src.m_theQueue) + m_maximumSize(a_src.m_maximumSize), // move constructor called implicitly + m_mutex(), // instantiate a new mutex + m_cond() // instantiate a new conditional variable +{ + // This object has not been instantiated yet. We can assume no one is using + // its mutex. + // Also, a_src is a temporary object so there is no need to acquire + // its mutex. + // Things can therefore be safely moved without the need for any mutex or + // conditional variable +} + +template <typename T> +SafeQueue<T>& SafeQueue<T>::operator=(SafeQueue<T> &&a_src) +{ + if (this != &a_src) + { + // make sure we hold this mutex before moving things around. a_src is + // a temporary object so no need to hold its mutex + std::unique_lock<std::mutex> lk(this->m_mutex); + + // will we need to wake up waiting threads after copying the source + // queue? + bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src); + + // process data from the temporary copy into this intance + this->m_maximumSize = std::move(a_src.m_maximumSize); + this->m_theQueue = std::move(a_src.m_theQueue); + + // time now to wake up threads waiting for data to be inserted + // or extracted + if (wakeUpWaitingThreads) + { + this->m_cond.notify_all(); + } + } + + return *this; +} + +template <typename T> +bool SafeQueue<T>::wakeUpSignalNeeded(const SafeQueue<T> &a_src) const +{ + if (this->m_theQueue.empty() && (!a_src.m_theQueue.empty())) + { + // threads waiting for stuff to be popped off the queue + return true; + } + else if ((this->m_theQueue.size() >= this->m_maximumSize) && + (a_src.m_theQueue.size() < a_src.m_maximumSize)) + { + // threads waiting for stuff to be pushed into the queue + return true; + } + + return false; +} + +template <typename T> +bool SafeQueue<T>::isEmpty() const +{ + std::lock_guard<std::mutex> lk(m_mutex); + return m_theQueue.empty(); +} + +template <typename T> +void SafeQueue<T>::push(const T &a_elem) +{ + std::unique_lock<std::mutex> lk(m_mutex); + + while (m_theQueue.size() >= m_maximumSize) + { + m_cond.wait(lk); + } + + bool queueEmpty = m_theQueue.empty(); + + m_theQueue.push(a_elem); + + if (queueEmpty) + { + // wake up threads waiting for stuff + m_cond.notify_all(); + } +} + +template <typename T> +bool SafeQueue<T>::tryPush(const T &a_elem) +{ + std::lock_guard<std::mutex> lk(m_mutex); + + bool rv = false; + bool queueEmpty = m_theQueue.empty(); + + if (m_theQueue.size() < m_maximumSize) + { + m_theQueue.push(a_elem); + rv = true; + } + + if (queueEmpty) + { + // wake up threads waiting for stuff + m_cond.notify_all(); + } + + return rv; +} + +template <typename T> +void SafeQueue<T>::pop(T &out_data) +{ + std::unique_lock<std::mutex> lk(m_mutex); + + while (m_theQueue.empty()) + { + m_cond.wait(lk); + } + + bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; + + out_data = m_theQueue.front(); + m_theQueue.pop(); + + if (queueFull) + { + // wake up threads waiting for stuff + m_cond.notify_all(); + } +} + +template <typename T> +bool SafeQueue<T>::tryPop(T &out_data) +{ + std::lock_guard<std::mutex> lk(m_mutex); + + bool rv = false; + if (!m_theQueue.empty()) + { + bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; + + out_data = m_theQueue.front(); + m_theQueue.pop(); + + if (queueFull) + { + // wake up threads waiting for stuff + m_cond.notify_all(); + } + + rv = true; + } + + return rv; +} + +template <typename T> +bool SafeQueue<T>::timedWaitPop(T &data, std::chrono::microseconds a_microsecs) +{ + std::unique_lock<std::mutex> lk(m_mutex); + + auto wakeUpTime = std::chrono::steady_clock::now() + a_microsecs; + if (m_cond.wait_until(lk, wakeUpTime, + [this](){return (m_theQueue.size() > 0);})) + { + // wait_until returns false if the predicate (3rd parameter) still + // evaluates to false after the rel_time timeout expired + // we are in this side of the if-clause because the queue is not empty + // (so the 3rd parameter evaluated to true) + bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; + + data = m_theQueue.front(); + m_theQueue.pop(); + + if (queueFull) + { + // wake up threads waiting to insert things into the queue. + // The queue used to be full, now it's not. + m_cond.notify_all(); + } + + return true; + } + else + { + // timed-out and the queue is still empty + return false; + } +} + +#endif /* _SAFEQUEUEIMPL_H_ */ diff --git a/service/test b/service/test index 498a736..6a3c133 100755 --- a/service/test +++ b/service/test Binary files differ diff --git a/service/test_properties b/service/test_properties new file mode 100755 index 0000000..6b1a2fb --- /dev/null +++ b/service/test_properties Binary files differ diff --git a/service/test_properties.c b/service/test_properties.c new file mode 100644 index 0000000..a86c503 --- /dev/null +++ b/service/test_properties.c @@ -0,0 +1,8 @@ +#include "usg_common.h" +#include "properties_config.h" + +int main() { + PropertiesConfig config("../data/config.txt"); + + std::cout << config.get("local_url") << std::endl; +} \ No newline at end of file diff --git a/service/test_queue b/service/test_queue new file mode 100755 index 0000000..d11beb0 --- /dev/null +++ b/service/test_queue Binary files differ diff --git a/service/test_queue.c b/service/test_queue.c new file mode 100644 index 0000000..4f66425 --- /dev/null +++ b/service/test_queue.c @@ -0,0 +1,18 @@ +#include "usg_common.h" +#include "safe_queue.h" + +#define QUEUE_SIZE 10 + + +using namespace std; + +int main() { + SafeQueue<int> m_queue(10); + m_queue.push(1); + m_queue.push(2); + int a; + m_queue.pop(a); + cout << a << endl; + m_queue.pop(a); + cout << a << endl; +} -- Gitblit v1.8.0