common/include/usg_common.h
@@ -79,6 +79,10 @@ void err_exit(int error, const char *fmt, ...); void err_msg(int error, const char *fmt, ...); char *ltrim(char *str, const char *seps); char *rtrim(char *str, const char *seps); char *trim(char *str, const char *seps); static inline int itoa(int num, char *str) { @@ -93,7 +97,34 @@ } #ifdef __cplusplus } #endif #ifdef __cplusplus // static inline std::string& ltrim(std::string& str, const std::string& chars = "\t\n\v\f\r ") // { // str.erase(0, str.find_first_not_of(chars)); // return str; // } // static inline std::string& rtrim(std::string& str, const std::string& chars = "\t\n\v\f\r ") // { // str.erase(str.find_last_not_of(chars) + 1); // return str; // } // static inline std::string& trim(std::string& str, const std::string& chars = "\t\n\v\f\r ") // { // return ltrim(rtrim(str, chars), chars); // } #endif #endif common/usg_common.c
@@ -76,3 +76,41 @@ fputs(buf, stderr); fflush(NULL); /* flushes all stdio output streams */ } char *ltrim(char *str, const char *seps) { size_t totrim; if (seps == NULL) { seps = "\t\n\v\f\r "; } totrim = strspn(str, seps); if (totrim > 0) { size_t len = strlen(str); if (totrim == len) { str[0] = '\0'; } else { memmove(str, str + totrim, len + 1 - totrim); } } return str; } char *rtrim(char *str, const char *seps) { int i; if (seps == NULL) { seps = "\t\n\v\f\r "; } i = strlen(str) - 1; while (i >= 0 && strchr(seps, str[i]) != NULL) { str[i] = '\0'; i--; } return str; } char *trim(char *str, const char *seps) { return ltrim(rtrim(str, seps), seps); } data/config.txt
New file @@ -0,0 +1,11 @@ # nng本地服务地址 local_url=tcp://127.0.0.1:8899 # nng远程调用地址地址 remote_url=tcp://127.0.0.1:9988 # 海康包路径 hclib=../hclib/ # 负责下载任务的线程池的数量 workers=4 device/hcnetdisk.c
@@ -1,5 +1,4 @@ #include "netdisk.h" #include "hcnetdisk.h" bool HCNetdisk::envInited = false; device/include/hcnetdisk.h
@@ -4,7 +4,6 @@ #include "usg_common.h" #include "usg_typedef.h" #include "HCNetSDK.h" #include "netdisk.h" //海康网络硬盘 class HCNetdisk : public Netdisk{ device/include/netdisk.h
@@ -73,5 +73,5 @@ }; #include "hcnetdisk.h" #endif device/test.c
@@ -1,6 +1,5 @@ #include "usg_common.h" #include "netdisk.h" #include "hcnetdisk.h" service/Makefile
@@ -14,9 +14,11 @@ PLATFORM=$(shell $(ROOT)/systype.sh) include $(ROOT)/Make.defines.$(PLATFORM) all: netdisk_service client test all: netdisk_service client test test_queue test_properties netdisk_service: $(ROOT)/device/hcnetdisk.c test_properties: properties_config.c netdisk_service: $(ROOT)/device/hcnetdisk.c login_store.c request_handler.c properties_config.c test: $(ROOT)/device/hcnetdisk.c service/clientBinary files differ
service/client.c
@@ -6,6 +6,7 @@ #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> const char *url = "tcp://127.0.0.1:8899"; const char *localUrl = "tcp://127.0.0.1:9988"; void fatal(const char *func, int rv) { @@ -30,7 +31,7 @@ std::string str = request.toStyledString(); if ((rv = nng_send(sock, strdupa(str.c_str()), str.length(), 0)) != 0) { if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), 0)) != 0) { fatal("nng_send", rv); } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { @@ -119,10 +120,58 @@ } int server() { nng_socket sock; int rv; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, localUrl, NULL, 0)) != 0) { fatal("nng_listen", rv); } for (;;) { char * buf = NULL; size_t sz; if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } std::cout << buf << std::endl; Json::Value response; response["code"] = 0; std::string str = response.toStyledString(); rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); if (rv != 0) { fatal("nng_send", rv); } // if ((sz == sizeof(uint64_t)) && // ((GET64(buf, val)) == DATECMD)) { // time_t now; // printf("SERVER: RECEIVED DATE REQUEST\n"); // now = time(&now); // printf("SERVER: SENDING DATE: "); // showdate(now); // // Reuse the buffer. We know it is big enough. // PUT64(buf, (uint64_t) now); // continue; // } // Unrecognized command, so toss the buffer. //nng_free(buf, sz); } } int main(const int argc, const char **argv) { if ((argc > 1)) return (client(argv[1])); if ((argc > 1)) { client(argv[1]); server(); } // std::string str("123"); // char *str2="123"; // printf("str length %d, %d\n", str.length(), strlen(str2)); service/coreBinary files differ
service/login_store.c
New file @@ -0,0 +1,45 @@ #include "login_store.h" LoginStore::LoginStore() : login_data_file("../data/login.dat") { Json::Reader jsonreader; std::ifstream fin(login_data_file); jsonreader.parse(fin, loginData); fin.close(); } int LoginStore::saveLoginInfo(Netdisk_LoginInfo &loginInfo) { Json::Value item; item["loginUUID"] = loginInfo.loginUUID; item["deviceType"] = loginInfo.deviceType; item["username"] = loginInfo.username; item["password"] = loginInfo.password; item["host"] = loginInfo.host; item["port"] = loginInfo.port; loginData[loginInfo.loginUUID] = item; auto str = loginData.toStyledString(); // std::cout << str << std::endl; std::ofstream fout(login_data_file); fout << str; fout.close(); return 0; } Netdisk_LoginInfo LoginStore::getLoginInfo(std::string uuid) { Json::Value item = loginData[uuid]; Netdisk_LoginInfo loginInfo; loginInfo.loginUUID = item["loginUUID"].asString(); loginInfo.deviceType = item["deviceType"].asString(); loginInfo.username = item["username"].asString(); loginInfo.password = item["password"].asString(); loginInfo.host = item["host"].asString(); loginInfo.port = item["port"].asInt(); return loginInfo; } service/login_store.h
New file @@ -0,0 +1,16 @@ #ifndef _LOGINSTORE_H_ #define _LOGINSTORE_H_ #include "usg_common.h" #include "netdisk.h" #include <jsoncpp/json/json.h> class LoginStore { Json::Value loginData; std::string login_data_file; public: LoginStore(); int saveLoginInfo(Netdisk_LoginInfo &loginInfo); Netdisk_LoginInfo getLoginInfo(std::string uuid); }; #endif service/netdisk_serviceBinary files differ
service/netdisk_service.c
@@ -1,174 +1,119 @@ #include "usg_common.h" #include "netdisk.h" #include "hcnetdisk.h" #include "safe_queue.h" #include "login_store.h" #include "request_handler.h" #include "properties_config.h" #include <jsoncpp/json/json.h> #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> using namespace std; const char *url = "tcp://127.0.0.1:8899"; Json::Value loginData; std::map<std::string, Netdisk *> *userDeviceMap; std::string login_data_file = "../data/login.dat"; int saveLoginInfo(Netdisk_LoginInfo &loginInfo) { Json::Value item; item["loginUUID"] = loginInfo.loginUUID; item["deviceType"] = loginInfo.deviceType; item["username"] = loginInfo.username; item["password"] = loginInfo.password; item["host"] = loginInfo.host; item["port"] = loginInfo.port; loginData[loginInfo.loginUUID] = item; auto str = loginData.toStyledString(); // std::cout << str << std::endl; int work(Netdisk_DownloadRequest drequest); std::ofstream fout(login_data_file); fout << str; fout.close(); return 0; } Netdisk_LoginInfo getLoginInfo(std::string uuid) { Json::Value item = loginData[uuid]; Netdisk_LoginInfo loginInfo; loginInfo.loginUUID = item["loginUUID"].asString(); loginInfo.deviceType = item["deviceType"].asString(); loginInfo.username = item["username"].asString(); loginInfo.password = item["password"].asString(); loginInfo.host = item["host"].asString(); loginInfo.port = item["port"].asInt(); return loginInfo; } PropertiesConfig config("../data/config.txt"); void fatal(const char *func, int rv) int WORKERS ; std::string localUrl; std::string remoteUrl; SafeQueue<Netdisk_DownloadRequest> task_queue(10); std::map<std::string, Netdisk *> userDeviceMap; LoginStore loginStore; std::map<std::string, RequestHandleFun> requestHandleFunMap; static inline void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); exit(1); //exit(1); } int handleLogin(nng_socket sock, Json::Value request) { int rv, code; // char *buf; Netdisk *netdisk = NULL; int connectAndSend(const char *url, char * str) { nng_socket sock; int rv; size_t sz; char *buf; Json::Value arguments = request["arguments"]; //登录 Netdisk_LoginInfo loginInfo; loginInfo.loginUUID=arguments["loginUUID"].asString(); loginInfo.deviceType = arguments["deviceType"].asString(); loginInfo.host = arguments["host"].asString(); loginInfo.port = arguments["port"].asInt(); loginInfo.username = arguments["username"].asString(); loginInfo.password = arguments["password"].asString(); std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID); if( userDeviceIter != userDeviceMap->end() ) { netdisk = userDeviceIter->second; if ((rv = nng_req0_open(&sock)) != 0) { fatal("nng_socket", rv); } if (netdisk == NULL) { printf("=========null\n"); if(loginInfo.deviceType == "HC") { // std::cout << "new HCNetdisk" << std::endl; netdisk = new HCNetdisk(); userDeviceMap->insert({loginInfo.loginUUID, netdisk}); } else { err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str()); if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { fatal("nng_dial", rv); } } // printf("CLIENT: SENDING DATE REQUEST\n"); code = netdisk->login(loginInfo); if (code == 0) { saveLoginInfo(loginInfo); std::cout << "起始通道:" << netdisk->getStartChannel() << ", 最大通道号:" << netdisk->getMaxChannels() << std::endl; } Json::Value response; Json::Value payload; payload["loginUUID"] = loginInfo.loginUUID; response["code"] = code; response["payload"] = payload; const std::string str = response.toStyledString(); // nng内部会释放buf rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); //free(buf); if (rv != 0) { if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) { fatal("nng_send", rv); return rv; } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } std::cout << buf; nng_free(buf, sz); nng_close(sock); return 0; } int handleDownloadByTime(nng_socket sock, Json::Value request) { int rv, code; // char *buf; void *worker(void *vargp) { pthread_detach(pthread_self()); while (1) { Netdisk_DownloadRequest request; task_queue.pop(request); work(request); } } int work(Netdisk_DownloadRequest drequest) { Netdisk *netdisk = NULL; Json::Value arguments = request["arguments"]; Netdisk_DownloadRequest drequest; drequest.loginUUID = arguments["loginUUID"].asString(); Json::Value start = arguments["start"]; drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年 drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月 drequest.start.tm_mday = start["day"].asInt(); drequest.start.tm_hour = start["hour"].asInt(); drequest.start.tm_min = start["min"].asInt(); drequest.start.tm_sec = start["sec"].asInt(); Json::Value end = arguments["end"]; drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年 drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月 drequest.end.tm_mday = end["day"].asInt(); drequest.end.tm_hour = end["hour"].asInt(); drequest.end.tm_min = end["min"].asInt(); drequest.end.tm_sec = end["sec"].asInt(); drequest.channel = arguments["channel"].asInt(); drequest.destpath = arguments["destpath"].asString(); std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID); if( userDeviceIter != userDeviceMap->end() ) { int rv; std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID); Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); if (netdisk == NULL) { if(loginInfo.deviceType == "HC") { // std::cout << "new HCNetdisk" << std::endl; netdisk = new HCNetdisk(); userDeviceMap->insert({loginInfo.loginUUID, netdisk}); userDeviceMap.insert({loginInfo.loginUUID, netdisk}); } else { err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str()); } } if ( (code = netdisk->login(loginInfo)) != 0 ) { if ( (rv = netdisk->login(loginInfo)) != 0 ) { printf("下载登录失败\n"); } std::vector<std::string> files; if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) { if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { printf("下载失败\n"); } Json::Value response; Json::Value payload; response["code"] = code; response["rv"] = rv; Json::Value filelist; for(std::string f : files) { @@ -177,31 +122,29 @@ payload["filelist"] = filelist; response["payload"] = payload; std::string str = response.toStyledString(); std::cout << "download finished, call back" << std::endl; std::cout << str << std::endl; rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); if (rv != 0) { fatal("nng_send", rv); return rv; } connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); return 0; } int server() { void startServer(const char *url) { Json::Reader jsonreader; Json::Value request; nng_socket sock; int rv; RequestHandleFun handleFun; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { printf("url=%s\n", url); fatal("nng_listen", rv); } for (;;) { char * buf = NULL; size_t sz; @@ -211,32 +154,55 @@ jsonreader.parse(buf, request); std::string method = request["method"].asString(); if (method == "login") { handleLogin(sock, request); } else if (method == "downloadByTime") { handleDownloadByTime(sock, request); std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method); if( handleFunIter != requestHandleFunMap.end() ) { handleFun = handleFunIter->second; handleFun(sock, request); } else { std::cerr << "Don't support " << method << std::endl; } // if (method == "login") { // handleLogin(sock, request); // } else if (method == "downloadByTime") { // handleDownloadByTimeAsync(sock, request); // } else { // std::cerr << "Don't support " << method << std::endl; // } // Unrecognized command, so toss the buffer. nng_free(buf, sz); } } void registRequestHandleFun() { requestHandleFunMap.insert({"login", handleLogin}); requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync}); } void initThreadPool() { pthread_t tid; for (int i = 0; i < WORKERS; i++) pthread_create(&tid, NULL, worker, NULL); } int main() { Netdisk_EnvConfig config; config.libpath = "../hclib/"; HCNetdisk::netdisk_init(&config); Json::Reader jsonreader; ifstream fin(login_data_file); jsonreader.parse(fin, loginData); fin.close(); Netdisk_EnvConfig hcEnvConfig; hcEnvConfig.libpath = config.get("hclib"); HCNetdisk::netdisk_init(&hcEnvConfig); WORKERS = config.getInt("workers"); localUrl = config.get("local_url"); remoteUrl = config.get("remote_url"); userDeviceMap = new std::map<std::string, Netdisk *>(); server(); registRequestHandleFun(); initThreadPool(); startServer(localUrl.c_str()); HCNetdisk::netdisk_deinit(); service/properties_config.c
New file @@ -0,0 +1,42 @@ #include "properties_config.h" PropertiesConfig::PropertiesConfig(std::string __propertiesFile) : propertiesFile(__propertiesFile) { std::ifstream fin(propertiesFile); char line[1024]; //std::string line; char *key, *value; const char *delim = "="; while(fin.getline(line, 1024)) { // printf("line=%s\n", line); if(strlen(trim(line, NULL))== 0) continue; if(*line == '#') { continue; } key = trim(strtok(line, delim), 0); value = trim(strtok(NULL, delim), 0); propertiesMap.insert({key, value}); // printf("key = %s, value=%s\n", key, value); } fin.close(); } std::string PropertiesConfig::get(std::string name) { std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name); if( propertiesIter != propertiesMap.end() ) { return propertiesIter->second; } return ""; } int PropertiesConfig::getInt(std::string name) { std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name); if( propertiesIter != propertiesMap.end() ) { return std::stoi(propertiesIter->second); } return 0; } service/properties_config.h
New file @@ -0,0 +1,11 @@ #include "usg_common.h" class PropertiesConfig { std::string propertiesFile; std::map<std::string, std::string> propertiesMap; public: PropertiesConfig(std::string _propertiesFile="./config.properties"); std::string get(std::string name); int getInt(std::string name); }; service/request_handler.c
New file @@ -0,0 +1,193 @@ #include "request_handler.h" extern SafeQueue<Netdisk_DownloadRequest> task_queue; extern std::map<std::string, Netdisk *> userDeviceMap; extern LoginStore loginStore; static inline void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); //exit(1); } int handleLogin(nng_socket sock, Json::Value request) { std::cout << "accepted login request" << std::endl; int rv, code; // char *buf; Netdisk *netdisk = NULL; Json::Value arguments = request["arguments"]; //登录 Netdisk_LoginInfo loginInfo; loginInfo.loginUUID=arguments["loginUUID"].asString(); loginInfo.deviceType = arguments["deviceType"].asString(); loginInfo.host = arguments["host"].asString(); loginInfo.port = arguments["port"].asInt(); loginInfo.username = arguments["username"].asString(); loginInfo.password = arguments["password"].asString(); std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find(loginInfo.loginUUID); if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } if (netdisk == NULL) { if(loginInfo.deviceType == "HC") { // std::cout << "new HCNetdisk" << std::endl; netdisk = new HCNetdisk(); userDeviceMap.insert({loginInfo.loginUUID, netdisk}); } else { err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str()); } } code = netdisk->login(loginInfo); if (code == 0) { loginStore.saveLoginInfo(loginInfo); std::cout << "起始通道:" << netdisk->getStartChannel() << ", 最大通道号:" << netdisk->getMaxChannels() << std::endl; } Json::Value response; Json::Value payload; payload["loginUUID"] = loginInfo.loginUUID; response["code"] = code; response["payload"] = payload; const std::string str = response.toStyledString(); // nng内部会释放buf rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); //free(buf); if (rv != 0) { fatal("nng_send", rv); return rv; } return 0; } int handleDownloadByTimeAsync(nng_socket sock, Json::Value request) { std::cout << "accepted handleDownloadByTime request" << std::endl; int rv; // char *buf; Json::Value arguments = request["arguments"]; Netdisk_DownloadRequest drequest; drequest.loginUUID = arguments["loginUUID"].asString(); Json::Value start = arguments["start"]; drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年 drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月 drequest.start.tm_mday = start["day"].asInt(); drequest.start.tm_hour = start["hour"].asInt(); drequest.start.tm_min = start["min"].asInt(); drequest.start.tm_sec = start["sec"].asInt(); Json::Value end = arguments["end"]; drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年 drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月 drequest.end.tm_mday = end["day"].asInt(); drequest.end.tm_hour = end["hour"].asInt(); drequest.end.tm_min = end["min"].asInt(); drequest.end.tm_sec = end["sec"].asInt(); drequest.channel = arguments["channel"].asInt(); drequest.destpath = arguments["destpath"].asString(); task_queue.push(drequest); Json::Value response; response["code"] = 0; std::string str = response.toStyledString(); std::cout << str << std::endl; if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC)) != 0) { fatal("nng_send", rv); return rv; } return 0; } int handleDownloadByTime(nng_socket sock, Json::Value request) { std::cout << "accepted handleDownloadByTime request" << std::endl; int rv, code; // char *buf; Netdisk *netdisk = NULL; Json::Value arguments = request["arguments"]; Netdisk_DownloadRequest drequest; drequest.loginUUID = arguments["loginUUID"].asString(); Json::Value start = arguments["start"]; drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年 drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月 drequest.start.tm_mday = start["day"].asInt(); drequest.start.tm_hour = start["hour"].asInt(); drequest.start.tm_min = start["min"].asInt(); drequest.start.tm_sec = start["sec"].asInt(); Json::Value end = arguments["end"]; drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年 drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月 drequest.end.tm_mday = end["day"].asInt(); drequest.end.tm_hour = end["hour"].asInt(); drequest.end.tm_min = end["min"].asInt(); drequest.end.tm_sec = end["sec"].asInt(); drequest.channel = arguments["channel"].asInt(); drequest.destpath = arguments["destpath"].asString(); std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); if (netdisk == NULL) { if(loginInfo.deviceType == "HC") { // std::cout << "new HCNetdisk" << std::endl; netdisk = new HCNetdisk(); userDeviceMap.insert({loginInfo.loginUUID, netdisk}); } else { err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str()); } } if ( (code = netdisk->login(loginInfo)) != 0 ) { printf("下载登录失败\n"); } std::vector<std::string> files; if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) { printf("下载失败\n"); } Json::Value response; Json::Value payload; response["code"] = code; Json::Value filelist; for(std::string f : files) { filelist.append(f); } payload["filelist"] = filelist; response["payload"] = payload; std::string str = response.toStyledString(); std::cout << str << std::endl; rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC); if (rv != 0) { fatal("nng_send", rv); return rv; } return 0; } service/request_handler.h
New file @@ -0,0 +1,15 @@ #include "usg_common.h" #include "netdisk.h" #include "safe_queue.h" #include "login_store.h" #include <jsoncpp/json/json.h> #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> typedef int (*RequestHandleFun)(nng_socket sock, Json::Value request); int handleLogin(nng_socket sock, Json::Value request); int handleDownloadByTime(nng_socket sock, Json::Value request); int handleDownloadByTimeAsync(nng_socket sock, Json::Value request); service/safe_queue.h
New file @@ -0,0 +1,120 @@ #ifndef _SAFEQUEUE_H_ #define _SAFEQUEUE_H_ #include <queue> #include <condition_variable> #include <mutex> #include <chrono> #include <limits> // std::numeric_limits<>::max #define SAFE_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max() /// @brief thread-safe queue /// It uses a mutex+condition variables to protect the internal queue /// implementation. Inserting or reading elements use the same mutex template <typename T> class SafeQueue { public: /// @brief constructor /// @param a_maxSize optional parameter with the maximum size of the queue SafeQueue(std::size_t a_maxSize = SAFE_QUEUE_DEFAULT_MAX_SIZE); /// @brief destructor ~SafeQueue(); /// @brief copy contructor /// WARNING: Use with great care, this function call can take a long time /// and block other threads from pushing/popping elements into the source /// queue SafeQueue(const SafeQueue<T>& a_src); /// @brief operator= overloading /// This function blocks the a_src and "this" SafeQueues and copies the /// contents of a_src into "this" SafeQueue. /// WARNING: Use with great care, this function call can take a long time /// and block other threads from pushing/popping elements into the queues /// @param a_src the "right" side of the operator= /// @return a const reference to this object const SafeQueue<T>& operator=(const SafeQueue<T> &a_src); /// @brief move contructor SafeQueue(SafeQueue<T>&& a_src); /// @brief move assignment SafeQueue<T>& operator=(SafeQueue<T>&& a_src); /// @brief Check if the queue is empty /// This call can block if another thread owns the lock that protects the /// queue /// @return true if the queue is empty. False otherwise bool isEmpty() const; /// @brief inserts an element into queue queue /// This call can block if another thread owns the lock that protects the /// queue. If the queue is full The thread will be blocked in this queue /// until someone else gets an element from the queue /// @param element to insert into the queue void push(const T &a_elem); /// @brief inserts an element into queue queue /// This call can block if another thread owns the lock that protects the /// queue. If the queue is full The call will return false and the element /// won't be inserted /// @param element to insert into the queue /// @return True if the elem was successfully inserted into the queue. /// False otherwise bool tryPush(const T &a_elem); /// @brief extracts an element from the queue (and deletes it from the q) /// If the queue is empty this call will block the thread until there is /// something in the queue to be extracted /// @param a reference where the element from the queue will be saved to void pop(T &out_data); /// @brief extracts an element from the queue (and deletes it from the q) /// This call gets the block that protects the queue. It will extract the /// element from the queue only if there are elements in it /// @param reference to the variable where the result will be saved /// @return True if the element was retrieved from the queue. /// False if the queue was empty bool tryPop(T &out_data); /// @brief extracts an element from the queue (and deletes it from the q) /// If the queue is empty this call will block the thread until there /// is something in the queue to be extracted or until the timer /// (2nd parameter) expires /// @param reference to the variable where the result will be saved /// @param duration to wait before returning if the queue was empty /// you may also pass into this a std::seconds or std::milliseconds /// (defined in std::chrono) /// @return True if the element was retrieved from the queue. /// False if the timeout was hit and nothing could be extracted /// from the queue bool timedWaitPop(T &data, std::chrono::microseconds a_microsecs); protected: /// the actual queue data structure protected by this SafeQueue wrapper std::queue<T> m_theQueue; /// maximum number of elements for the queue std::size_t m_maximumSize; /// Mutex to protect the queue mutable std::mutex m_mutex; /// Conditional variable to wake up threads mutable std::condition_variable m_cond; /// @brief calculate if copying a_src into this instance will need to /// wake up potential threads waiting to perform push or pop ops. /// WARNING: It assumes the caller holds all mutexes required to access /// to the data of this and a_src SafeQueues /// @param a_src const reference to the SafeQueue that will be copied /// into this object /// @return true if threads will need to be waken up. False otherwise inline bool wakeUpSignalNeeded(const SafeQueue<T> &a_src) const; }; // include the implementation file #include "safe_queue_impl.h" #endif /* _SAFEQUEUE_H_ */ service/safe_queue_impl.h
New file @@ -0,0 +1,258 @@ #ifndef _SAFEQUEUEIMPL_H_ #define _SAFEQUEUEIMPL_H_ template <typename T> SafeQueue<T>::SafeQueue(std::size_t a_maxSize): m_theQueue(), m_maximumSize(a_maxSize), m_mutex(), m_cond() { } template <typename T> SafeQueue<T>::~SafeQueue() { } template <typename T> SafeQueue<T>::SafeQueue(const SafeQueue<T>& a_src): m_theQueue(), m_maximumSize(0), m_mutex(), m_cond() { // copying a safe queue involves only copying the data (m_theQueue and // m_maximumSize). This object has not been instantiated yet so nobody can // be trying to perform push or pop operations on it, but we need to // acquire a_src.m_mutex before copying its data into m_theQueue and // m_maximumSize std::unique_lock<std::mutex> lk(a_src.m_mutex); this->m_maximumSize = a_src.m_maximumSize; this->m_theQueue = a_src.m_theQueue; } template <typename T> const SafeQueue<T>& SafeQueue<T>::operator=(const SafeQueue<T> &a_src) { if (this != &a_src) { // lock both mutexes at the same time to avoid deadlocks std::unique_lock<std::mutex> this_lk(this->m_mutex, std::defer_lock); std::unique_lock<std::mutex> src_lk (a_src.m_mutex, std::defer_lock); std::lock(this_lk, src_lk); // will we need to wake up waiting threads after copying the source // queue? bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src); // copy data from the left side of the operator= into this intance this->m_maximumSize = a_src.m_maximumSize; this->m_theQueue = a_src.m_theQueue; // time now to wake up threads waiting for data to be inserted // or extracted if (wakeUpWaitingThreads) { this->m_cond.notify_all(); } } return *this; } template <typename T> SafeQueue<T>::SafeQueue(SafeQueue<T>&& a_src): m_theQueue(a_src.m_theQueue), // implicit std::move(a_src.m_theQueue) m_maximumSize(a_src.m_maximumSize), // move constructor called implicitly m_mutex(), // instantiate a new mutex m_cond() // instantiate a new conditional variable { // This object has not been instantiated yet. We can assume no one is using // its mutex. // Also, a_src is a temporary object so there is no need to acquire // its mutex. // Things can therefore be safely moved without the need for any mutex or // conditional variable } template <typename T> SafeQueue<T>& SafeQueue<T>::operator=(SafeQueue<T> &&a_src) { if (this != &a_src) { // make sure we hold this mutex before moving things around. a_src is // a temporary object so no need to hold its mutex std::unique_lock<std::mutex> lk(this->m_mutex); // will we need to wake up waiting threads after copying the source // queue? bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src); // process data from the temporary copy into this intance this->m_maximumSize = std::move(a_src.m_maximumSize); this->m_theQueue = std::move(a_src.m_theQueue); // time now to wake up threads waiting for data to be inserted // or extracted if (wakeUpWaitingThreads) { this->m_cond.notify_all(); } } return *this; } template <typename T> bool SafeQueue<T>::wakeUpSignalNeeded(const SafeQueue<T> &a_src) const { if (this->m_theQueue.empty() && (!a_src.m_theQueue.empty())) { // threads waiting for stuff to be popped off the queue return true; } else if ((this->m_theQueue.size() >= this->m_maximumSize) && (a_src.m_theQueue.size() < a_src.m_maximumSize)) { // threads waiting for stuff to be pushed into the queue return true; } return false; } template <typename T> bool SafeQueue<T>::isEmpty() const { std::lock_guard<std::mutex> lk(m_mutex); return m_theQueue.empty(); } template <typename T> void SafeQueue<T>::push(const T &a_elem) { std::unique_lock<std::mutex> lk(m_mutex); while (m_theQueue.size() >= m_maximumSize) { m_cond.wait(lk); } bool queueEmpty = m_theQueue.empty(); m_theQueue.push(a_elem); if (queueEmpty) { // wake up threads waiting for stuff m_cond.notify_all(); } } template <typename T> bool SafeQueue<T>::tryPush(const T &a_elem) { std::lock_guard<std::mutex> lk(m_mutex); bool rv = false; bool queueEmpty = m_theQueue.empty(); if (m_theQueue.size() < m_maximumSize) { m_theQueue.push(a_elem); rv = true; } if (queueEmpty) { // wake up threads waiting for stuff m_cond.notify_all(); } return rv; } template <typename T> void SafeQueue<T>::pop(T &out_data) { std::unique_lock<std::mutex> lk(m_mutex); while (m_theQueue.empty()) { m_cond.wait(lk); } bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; out_data = m_theQueue.front(); m_theQueue.pop(); if (queueFull) { // wake up threads waiting for stuff m_cond.notify_all(); } } template <typename T> bool SafeQueue<T>::tryPop(T &out_data) { std::lock_guard<std::mutex> lk(m_mutex); bool rv = false; if (!m_theQueue.empty()) { bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; out_data = m_theQueue.front(); m_theQueue.pop(); if (queueFull) { // wake up threads waiting for stuff m_cond.notify_all(); } rv = true; } return rv; } template <typename T> bool SafeQueue<T>::timedWaitPop(T &data, std::chrono::microseconds a_microsecs) { std::unique_lock<std::mutex> lk(m_mutex); auto wakeUpTime = std::chrono::steady_clock::now() + a_microsecs; if (m_cond.wait_until(lk, wakeUpTime, [this](){return (m_theQueue.size() > 0);})) { // wait_until returns false if the predicate (3rd parameter) still // evaluates to false after the rel_time timeout expired // we are in this side of the if-clause because the queue is not empty // (so the 3rd parameter evaluated to true) bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; data = m_theQueue.front(); m_theQueue.pop(); if (queueFull) { // wake up threads waiting to insert things into the queue. // The queue used to be full, now it's not. m_cond.notify_all(); } return true; } else { // timed-out and the queue is still empty return false; } } #endif /* _SAFEQUEUEIMPL_H_ */ service/testBinary files differ
service/test_propertiesBinary files differ
service/test_properties.c
New file @@ -0,0 +1,8 @@ #include "usg_common.h" #include "properties_config.h" int main() { PropertiesConfig config("../data/config.txt"); std::cout << config.get("local_url") << std::endl; } service/test_queueBinary files differ
service/test_queue.c
New file @@ -0,0 +1,18 @@ #include "usg_common.h" #include "safe_queue.h" #define QUEUE_SIZE 10 using namespace std; int main() { SafeQueue<int> m_queue(10); m_queue.push(1); m_queue.push(2); int a; m_queue.pop(a); cout << a << endl; m_queue.pop(a); cout << a << endl; }