From 5c9cbf5ea152f501ae976e0e0b4ef5ee98afdfba Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 11 六月 2020 19:20:40 +0800
Subject: [PATCH] pdate
---
service/Makefile | 6
service/request_handler.h | 15
service/login_store.c | 45 ++
service/login_store.h | 16
service/netdisk_service | 0
device/hcnetdisk.c | 1
service/request_handler.c | 193 ++++++++++
device/include/netdisk.h | 2
service/netdisk_service.c | 254 +++++-------
service/test_queue | 0
data/config.txt | 11
service/client.c | 55 ++
device/include/hcnetdisk.h | 1
service/test_properties | 0
service/test_properties.c | 8
/dev/null | 0
service/test_queue.c | 18
service/safe_queue.h | 120 ++++++
common/include/usg_common.h | 31 +
common/usg_common.c | 38 ++
service/safe_queue_impl.h | 258 +++++++++++++
device/test.c | 1
service/properties_config.h | 11
service/client | 0
service/properties_config.c | 42 ++
service/test | 0
26 files changed, 973 insertions(+), 153 deletions(-)
diff --git a/common/include/usg_common.h b/common/include/usg_common.h
index bf70221..358fbb3 100644
--- a/common/include/usg_common.h
+++ b/common/include/usg_common.h
@@ -79,6 +79,10 @@
void err_exit(int error, const char *fmt, ...);
void err_msg(int error, const char *fmt, ...);
+char *ltrim(char *str, const char *seps);
+char *rtrim(char *str, const char *seps);
+char *trim(char *str, const char *seps);
+
static inline int
itoa(int num, char *str)
{
@@ -93,7 +97,34 @@
}
+
+
#ifdef __cplusplus
}
#endif
+
+
+
+#ifdef __cplusplus
+
+// static inline std::string& ltrim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
+// {
+// str.erase(0, str.find_first_not_of(chars));
+// return str;
+// }
+
+// static inline std::string& rtrim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
+// {
+// str.erase(str.find_last_not_of(chars) + 1);
+// return str;
+// }
+
+// static inline std::string& trim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
+// {
+// return ltrim(rtrim(str, chars), chars);
+// }
+
+
+#endif
+
#endif
diff --git a/common/usg_common.c b/common/usg_common.c
index 4b0f76e..6ba65ae 100644
--- a/common/usg_common.c
+++ b/common/usg_common.c
@@ -76,3 +76,41 @@
fputs(buf, stderr);
fflush(NULL); /* flushes all stdio output streams */
}
+
+char *ltrim(char *str, const char *seps)
+{
+ size_t totrim;
+ if (seps == NULL) {
+ seps = "\t\n\v\f\r ";
+ }
+ totrim = strspn(str, seps);
+ if (totrim > 0) {
+ size_t len = strlen(str);
+ if (totrim == len) {
+ str[0] = '\0';
+ }
+ else {
+ memmove(str, str + totrim, len + 1 - totrim);
+ }
+ }
+ return str;
+}
+
+char *rtrim(char *str, const char *seps)
+{
+ int i;
+ if (seps == NULL) {
+ seps = "\t\n\v\f\r ";
+ }
+ i = strlen(str) - 1;
+ while (i >= 0 && strchr(seps, str[i]) != NULL) {
+ str[i] = '\0';
+ i--;
+ }
+ return str;
+}
+
+char *trim(char *str, const char *seps)
+{
+ return ltrim(rtrim(str, seps), seps);
+}
\ No newline at end of file
diff --git a/data/config.txt b/data/config.txt
new file mode 100644
index 0000000..87c54c2
--- /dev/null
+++ b/data/config.txt
@@ -0,0 +1,11 @@
+# nng鏈湴鏈嶅姟鍦板潃
+local_url=tcp://127.0.0.1:8899
+
+# nng杩滅▼璋冪敤鍦板潃鍦板潃
+remote_url=tcp://127.0.0.1:9988
+
+# 娴峰悍鍖呰矾寰�
+hclib=../hclib/
+
+# 璐熻矗涓嬭浇浠诲姟鐨勭嚎绋嬫睜鐨勬暟閲�
+workers=4
\ No newline at end of file
diff --git a/device/hcnetdisk.c b/device/hcnetdisk.c
index b62aecc..a4fc6e0 100644
--- a/device/hcnetdisk.c
+++ b/device/hcnetdisk.c
@@ -1,5 +1,4 @@
#include "netdisk.h"
-#include "hcnetdisk.h"
bool HCNetdisk::envInited = false;
diff --git a/device/include/hcnetdisk.h b/device/include/hcnetdisk.h
index 85058c6..eeee33d 100644
--- a/device/include/hcnetdisk.h
+++ b/device/include/hcnetdisk.h
@@ -4,7 +4,6 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "HCNetSDK.h"
-#include "netdisk.h"
//娴峰悍缃戠粶纭洏
class HCNetdisk : public Netdisk{
diff --git a/device/include/netdisk.h b/device/include/netdisk.h
index 3695ac6..f92eeb4 100644
--- a/device/include/netdisk.h
+++ b/device/include/netdisk.h
@@ -73,5 +73,5 @@
};
-
+#include "hcnetdisk.h"
#endif
\ No newline at end of file
diff --git a/device/test.c b/device/test.c
index d8a777c..0b99d8f 100644
--- a/device/test.c
+++ b/device/test.c
@@ -1,6 +1,5 @@
#include "usg_common.h"
#include "netdisk.h"
-#include "hcnetdisk.h"
diff --git a/service/Makefile b/service/Makefile
index 01f98fd..af5b61d 100644
--- a/service/Makefile
+++ b/service/Makefile
@@ -14,9 +14,11 @@
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
-all: netdisk_service client test
+all: netdisk_service client test test_queue test_properties
-netdisk_service: $(ROOT)/device/hcnetdisk.c
+test_properties: properties_config.c
+
+netdisk_service: $(ROOT)/device/hcnetdisk.c login_store.c request_handler.c properties_config.c
test: $(ROOT)/device/hcnetdisk.c
diff --git a/service/client b/service/client
index f0b2e36..a792d7f 100755
--- a/service/client
+++ b/service/client
Binary files differ
diff --git a/service/client.c b/service/client.c
index 49d379d..170e788 100644
--- a/service/client.c
+++ b/service/client.c
@@ -6,6 +6,7 @@
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
const char *url = "tcp://127.0.0.1:8899";
+const char *localUrl = "tcp://127.0.0.1:9988";
void
fatal(const char *func, int rv)
{
@@ -30,7 +31,7 @@
std::string str = request.toStyledString();
- if ((rv = nng_send(sock, strdupa(str.c_str()), str.length(), 0)) != 0) {
+ if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), 0)) != 0) {
fatal("nng_send", rv);
}
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
@@ -119,10 +120,58 @@
}
int
+server()
+{
+ nng_socket sock;
+ int rv;
+
+ if ((rv = nng_rep0_open(&sock)) != 0) {
+ fatal("nng_rep0_open", rv);
+ }
+ if ((rv = nng_listen(sock, localUrl, NULL, 0)) != 0) {
+ fatal("nng_listen", rv);
+ }
+ for (;;) {
+ char * buf = NULL;
+ size_t sz;
+ if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
+ fatal("nng_recv", rv);
+ }
+ std::cout << buf << std::endl;
+
+ Json::Value response;
+ response["code"] = 0;
+ std::string str = response.toStyledString();
+ rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
+ if (rv != 0) {
+ fatal("nng_send", rv);
+ }
+ // if ((sz == sizeof(uint64_t)) &&
+ // ((GET64(buf, val)) == DATECMD)) {
+ // time_t now;
+ // printf("SERVER: RECEIVED DATE REQUEST\n");
+ // now = time(&now);
+ // printf("SERVER: SENDING DATE: ");
+ // showdate(now);
+
+ // // Reuse the buffer. We know it is big enough.
+ // PUT64(buf, (uint64_t) now);
+
+ // continue;
+ // }
+ // Unrecognized command, so toss the buffer.
+ //nng_free(buf, sz);
+ }
+}
+
+
+int
main(const int argc, const char **argv)
{
- if ((argc > 1))
- return (client(argv[1]));
+ if ((argc > 1)) {
+ client(argv[1]);
+ server();
+ }
// std::string str("123");
// char *str2="123";
// printf("str length %d, %d\n", str.length(), strlen(str2));
diff --git a/service/core b/service/core
deleted file mode 100644
index 118c81c..0000000
--- a/service/core
+++ /dev/null
Binary files differ
diff --git a/service/login_store.c b/service/login_store.c
new file mode 100644
index 0000000..0d113f6
--- /dev/null
+++ b/service/login_store.c
@@ -0,0 +1,45 @@
+#include "login_store.h"
+
+LoginStore::LoginStore() : login_data_file("../data/login.dat") {
+ Json::Reader jsonreader;
+ std::ifstream fin(login_data_file);
+ jsonreader.parse(fin, loginData);
+ fin.close();
+}
+
+
+int LoginStore::saveLoginInfo(Netdisk_LoginInfo &loginInfo) {
+
+ Json::Value item;
+ item["loginUUID"] = loginInfo.loginUUID;
+ item["deviceType"] = loginInfo.deviceType;
+ item["username"] = loginInfo.username;
+ item["password"] = loginInfo.password;
+ item["host"] = loginInfo.host;
+ item["port"] = loginInfo.port;
+
+ loginData[loginInfo.loginUUID] = item;
+
+
+ auto str = loginData.toStyledString();
+ // std::cout << str << std::endl;
+
+ std::ofstream fout(login_data_file);
+ fout << str;
+ fout.close();
+ return 0;
+}
+
+
+Netdisk_LoginInfo LoginStore::getLoginInfo(std::string uuid) {
+ Json::Value item = loginData[uuid];
+ Netdisk_LoginInfo loginInfo;
+ loginInfo.loginUUID = item["loginUUID"].asString();
+ loginInfo.deviceType = item["deviceType"].asString();
+ loginInfo.username = item["username"].asString();
+ loginInfo.password = item["password"].asString();
+ loginInfo.host = item["host"].asString();
+ loginInfo.port = item["port"].asInt();
+ return loginInfo;
+}
+
\ No newline at end of file
diff --git a/service/login_store.h b/service/login_store.h
new file mode 100644
index 0000000..8f24876
--- /dev/null
+++ b/service/login_store.h
@@ -0,0 +1,16 @@
+#ifndef _LOGINSTORE_H_
+#define _LOGINSTORE_H_
+#include "usg_common.h"
+#include "netdisk.h"
+#include <jsoncpp/json/json.h>
+
+class LoginStore {
+ Json::Value loginData;
+ std::string login_data_file;
+public:
+ LoginStore();
+ int saveLoginInfo(Netdisk_LoginInfo &loginInfo);
+ Netdisk_LoginInfo getLoginInfo(std::string uuid);
+};
+
+#endif
\ No newline at end of file
diff --git a/service/netdisk_service b/service/netdisk_service
index 196531c..ac87c6f 100755
--- a/service/netdisk_service
+++ b/service/netdisk_service
Binary files differ
diff --git a/service/netdisk_service.c b/service/netdisk_service.c
index 57fd496..8d31587 100644
--- a/service/netdisk_service.c
+++ b/service/netdisk_service.c
@@ -1,174 +1,119 @@
#include "usg_common.h"
#include "netdisk.h"
-#include "hcnetdisk.h"
+#include "safe_queue.h"
+#include "login_store.h"
+#include "request_handler.h"
+#include "properties_config.h"
#include <jsoncpp/json/json.h>
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
+
using namespace std;
-const char *url = "tcp://127.0.0.1:8899";
-Json::Value loginData;
-std::map<std::string, Netdisk *> *userDeviceMap;
-std::string login_data_file = "../data/login.dat";
-int saveLoginInfo(Netdisk_LoginInfo &loginInfo) {
-
- Json::Value item;
- item["loginUUID"] = loginInfo.loginUUID;
- item["deviceType"] = loginInfo.deviceType;
- item["username"] = loginInfo.username;
- item["password"] = loginInfo.password;
- item["host"] = loginInfo.host;
- item["port"] = loginInfo.port;
-
- loginData[loginInfo.loginUUID] = item;
-
+int work(Netdisk_DownloadRequest drequest);
- auto str = loginData.toStyledString();
- // std::cout << str << std::endl;
- std::ofstream fout(login_data_file);
- fout << str;
- fout.close();
- return 0;
-}
+PropertiesConfig config("../data/config.txt");
-Netdisk_LoginInfo getLoginInfo(std::string uuid) {
- Json::Value item = loginData[uuid];
- Netdisk_LoginInfo loginInfo;
- loginInfo.loginUUID = item["loginUUID"].asString();
- loginInfo.deviceType = item["deviceType"].asString();
- loginInfo.username = item["username"].asString();
- loginInfo.password = item["password"].asString();
- loginInfo.host = item["host"].asString();
- loginInfo.port = item["port"].asInt();
- return loginInfo;
-}
-void
-fatal(const char *func, int rv)
+int WORKERS ;
+std::string localUrl;
+std::string remoteUrl;
+
+
+
+SafeQueue<Netdisk_DownloadRequest> task_queue(10);
+
+std::map<std::string, Netdisk *> userDeviceMap;
+
+LoginStore loginStore;
+
+std::map<std::string, RequestHandleFun> requestHandleFunMap;
+
+
+
+static inline void fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
- exit(1);
+ //exit(1);
}
-
-int handleLogin(nng_socket sock, Json::Value request) {
- int rv, code;
- // char *buf;
- Netdisk *netdisk = NULL;
- Json::Value arguments = request["arguments"];
+int connectAndSend(const char *url, char * str) {
+ nng_socket sock;
+ int rv;
+ size_t sz;
+ char *buf;
- //鐧诲綍
- Netdisk_LoginInfo loginInfo;
- loginInfo.loginUUID=arguments["loginUUID"].asString();
- loginInfo.deviceType = arguments["deviceType"].asString();
- loginInfo.host = arguments["host"].asString();
- loginInfo.port = arguments["port"].asInt();
- loginInfo.username = arguments["username"].asString();
- loginInfo.password = arguments["password"].asString();
-
-
- std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID);
- if( userDeviceIter != userDeviceMap->end() ) {
- netdisk = userDeviceIter->second;
+ if ((rv = nng_req0_open(&sock)) != 0) {
+ fatal("nng_socket", rv);
}
- if (netdisk == NULL) {
- printf("=========null\n");
- if(loginInfo.deviceType == "HC") {
- // std::cout << "new HCNetdisk" << std::endl;
- netdisk = new HCNetdisk();
- userDeviceMap->insert({loginInfo.loginUUID, netdisk});
- } else {
- err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
- }
+ if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
+ fatal("nng_dial", rv);
}
+ // printf("CLIENT: SENDING DATE REQUEST\n");
- code = netdisk->login(loginInfo);
- if (code == 0) {
- saveLoginInfo(loginInfo);
- std::cout << "璧峰閫氶亾:" << netdisk->getStartChannel() << ", 鏈�澶ч�氶亾鍙凤細" << netdisk->getMaxChannels() << std::endl;
- }
-
-
- Json::Value response;
- Json::Value payload;
- payload["loginUUID"] = loginInfo.loginUUID;
- response["code"] = code;
- response["payload"] = payload;
- const std::string str = response.toStyledString();
- // nng鍐呴儴浼氶噴鏀綽uf
- rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
- //free(buf);
- if (rv != 0) {
+ if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) {
fatal("nng_send", rv);
- return rv;
}
+
+ if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
+ fatal("nng_recv", rv);
+ }
+ std::cout << buf;
+ nng_free(buf, sz);
+ nng_close(sock);
return 0;
+
}
-
-int handleDownloadByTime(nng_socket sock, Json::Value request) {
- int rv, code;
- // char *buf;
+
+void *worker(void *vargp)
+{
+ pthread_detach(pthread_self());
+ while (1)
+ {
+ Netdisk_DownloadRequest request;
+ task_queue.pop(request);
+ work(request);
+ }
+}
+
+int work(Netdisk_DownloadRequest drequest) {
Netdisk *netdisk = NULL;
- Json::Value arguments = request["arguments"];
- Netdisk_DownloadRequest drequest;
- drequest.loginUUID = arguments["loginUUID"].asString();
- Json::Value start = arguments["start"];
- drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
- drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀
- drequest.start.tm_mday = start["day"].asInt();
- drequest.start.tm_hour = start["hour"].asInt();
- drequest.start.tm_min = start["min"].asInt();
- drequest.start.tm_sec = start["sec"].asInt();
-
-
- Json::Value end = arguments["end"];
- drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
- drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀
- drequest.end.tm_mday = end["day"].asInt();
- drequest.end.tm_hour = end["hour"].asInt();
- drequest.end.tm_min = end["min"].asInt();
- drequest.end.tm_sec = end["sec"].asInt();
-
- drequest.channel = arguments["channel"].asInt();
- drequest.destpath = arguments["destpath"].asString();
-
-
- std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID);
- if( userDeviceIter != userDeviceMap->end() ) {
+ int rv;
+ std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
+ if( userDeviceIter != userDeviceMap.end() ) {
netdisk = userDeviceIter->second;
}
- Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID);
+ Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
if (netdisk == NULL) {
if(loginInfo.deviceType == "HC") {
// std::cout << "new HCNetdisk" << std::endl;
netdisk = new HCNetdisk();
- userDeviceMap->insert({loginInfo.loginUUID, netdisk});
+ userDeviceMap.insert({loginInfo.loginUUID, netdisk});
} else {
err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
}
}
- if ( (code = netdisk->login(loginInfo)) != 0 ) {
+ if ( (rv = netdisk->login(loginInfo)) != 0 ) {
printf("涓嬭浇鐧诲綍澶辫触\n");
}
std::vector<std::string> files;
- if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) {
+ if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
printf("涓嬭浇澶辫触\n");
}
-
Json::Value response;
Json::Value payload;
- response["code"] = code;
+ response["rv"] = rv;
Json::Value filelist;
for(std::string f : files) {
@@ -177,31 +122,29 @@
payload["filelist"] = filelist;
response["payload"] = payload;
std::string str = response.toStyledString();
+
+ std::cout << "download finished, call back" << std::endl;
std::cout << str << std::endl;
- rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
- if (rv != 0) {
- fatal("nng_send", rv);
- return rv;
- }
+ connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
+
return 0;
-
-
}
-int
-server()
-{
+void startServer(const char *url) {
Json::Reader jsonreader;
Json::Value request;
nng_socket sock;
int rv;
-
+
+ RequestHandleFun handleFun;
if ((rv = nng_rep0_open(&sock)) != 0) {
fatal("nng_rep0_open", rv);
}
if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
+ printf("url=%s\n", url);
fatal("nng_listen", rv);
}
+
for (;;) {
char * buf = NULL;
size_t sz;
@@ -211,32 +154,55 @@
jsonreader.parse(buf, request);
std::string method = request["method"].asString();
- if (method == "login") {
- handleLogin(sock, request);
- } else if (method == "downloadByTime") {
- handleDownloadByTime(sock, request);
+
+ std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
+ if( handleFunIter != requestHandleFunMap.end() ) {
+ handleFun = handleFunIter->second;
+ handleFun(sock, request);
} else {
std::cerr << "Don't support " << method << std::endl;
}
+ // if (method == "login") {
+ // handleLogin(sock, request);
+ // } else if (method == "downloadByTime") {
+ // handleDownloadByTimeAsync(sock, request);
+ // } else {
+ // std::cerr << "Don't support " << method << std::endl;
+ // }
// Unrecognized command, so toss the buffer.
nng_free(buf, sz);
}
}
+
+void registRequestHandleFun() {
+ requestHandleFunMap.insert({"login", handleLogin});
+ requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
+}
+
+void initThreadPool() {
+ pthread_t tid;
+ for (int i = 0; i < WORKERS; i++)
+ pthread_create(&tid, NULL, worker, NULL);
+}
+
int main()
{
- Netdisk_EnvConfig config;
- config.libpath = "../hclib/";
- HCNetdisk::netdisk_init(&config);
- Json::Reader jsonreader;
- ifstream fin(login_data_file);
- jsonreader.parse(fin, loginData);
- fin.close();
+ Netdisk_EnvConfig hcEnvConfig;
+ hcEnvConfig.libpath = config.get("hclib");
+ HCNetdisk::netdisk_init(&hcEnvConfig);
+
+ WORKERS = config.getInt("workers");
+ localUrl = config.get("local_url");
+ remoteUrl = config.get("remote_url");
- userDeviceMap = new std::map<std::string, Netdisk *>();
- server();
+ registRequestHandleFun();
+
+ initThreadPool();
+
+ startServer(localUrl.c_str());
HCNetdisk::netdisk_deinit();
diff --git a/service/properties_config.c b/service/properties_config.c
new file mode 100644
index 0000000..af88536
--- /dev/null
+++ b/service/properties_config.c
@@ -0,0 +1,42 @@
+#include "properties_config.h"
+
+PropertiesConfig::PropertiesConfig(std::string __propertiesFile) : propertiesFile(__propertiesFile) {
+
+ std::ifstream fin(propertiesFile);
+ char line[1024];
+ //std::string line;
+ char *key, *value;
+ const char *delim = "=";
+ while(fin.getline(line, 1024)) {
+ // printf("line=%s\n", line);
+ if(strlen(trim(line, NULL))== 0)
+ continue;
+ if(*line == '#') {
+ continue;
+ }
+
+ key = trim(strtok(line, delim), 0);
+ value = trim(strtok(NULL, delim), 0);
+ propertiesMap.insert({key, value});
+ // printf("key = %s, value=%s\n", key, value);
+ }
+ fin.close();
+
+}
+
+
+std::string PropertiesConfig::get(std::string name) {
+ std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name);
+ if( propertiesIter != propertiesMap.end() ) {
+ return propertiesIter->second;
+ }
+ return "";
+}
+
+int PropertiesConfig::getInt(std::string name) {
+ std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name);
+ if( propertiesIter != propertiesMap.end() ) {
+ return std::stoi(propertiesIter->second);
+ }
+ return 0;
+}
\ No newline at end of file
diff --git a/service/properties_config.h b/service/properties_config.h
new file mode 100644
index 0000000..8b4fbfb
--- /dev/null
+++ b/service/properties_config.h
@@ -0,0 +1,11 @@
+#include "usg_common.h"
+
+class PropertiesConfig {
+ std::string propertiesFile;
+ std::map<std::string, std::string> propertiesMap;
+public:
+
+ PropertiesConfig(std::string _propertiesFile="./config.properties");
+ std::string get(std::string name);
+ int getInt(std::string name);
+};
\ No newline at end of file
diff --git a/service/request_handler.c b/service/request_handler.c
new file mode 100644
index 0000000..27f2132
--- /dev/null
+++ b/service/request_handler.c
@@ -0,0 +1,193 @@
+#include "request_handler.h"
+
+extern SafeQueue<Netdisk_DownloadRequest> task_queue;
+
+extern std::map<std::string, Netdisk *> userDeviceMap;
+
+extern LoginStore loginStore;
+
+static inline void fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
+ //exit(1);
+}
+
+int handleLogin(nng_socket sock, Json::Value request) {
+std::cout << "accepted login request" << std::endl;
+ int rv, code;
+ // char *buf;
+ Netdisk *netdisk = NULL;
+
+ Json::Value arguments = request["arguments"];
+
+ //鐧诲綍
+ Netdisk_LoginInfo loginInfo;
+ loginInfo.loginUUID=arguments["loginUUID"].asString();
+ loginInfo.deviceType = arguments["deviceType"].asString();
+ loginInfo.host = arguments["host"].asString();
+ loginInfo.port = arguments["port"].asInt();
+ loginInfo.username = arguments["username"].asString();
+ loginInfo.password = arguments["password"].asString();
+
+
+ std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find(loginInfo.loginUUID);
+ if( userDeviceIter != userDeviceMap.end() ) {
+ netdisk = userDeviceIter->second;
+ }
+ if (netdisk == NULL) {
+ if(loginInfo.deviceType == "HC") {
+ // std::cout << "new HCNetdisk" << std::endl;
+ netdisk = new HCNetdisk();
+ userDeviceMap.insert({loginInfo.loginUUID, netdisk});
+ } else {
+ err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
+ }
+ }
+
+ code = netdisk->login(loginInfo);
+ if (code == 0) {
+ loginStore.saveLoginInfo(loginInfo);
+ std::cout << "璧峰閫氶亾:" << netdisk->getStartChannel() << ", 鏈�澶ч�氶亾鍙凤細" << netdisk->getMaxChannels() << std::endl;
+ }
+
+
+ Json::Value response;
+ Json::Value payload;
+ payload["loginUUID"] = loginInfo.loginUUID;
+ response["code"] = code;
+ response["payload"] = payload;
+ const std::string str = response.toStyledString();
+ // nng鍐呴儴浼氶噴鏀綽uf
+ rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
+ //free(buf);
+ if (rv != 0) {
+ fatal("nng_send", rv);
+ return rv;
+ }
+ return 0;
+}
+
+
+
+int handleDownloadByTimeAsync(nng_socket sock, Json::Value request) {
+std::cout << "accepted handleDownloadByTime request" << std::endl;
+ int rv;
+ // char *buf;
+
+ Json::Value arguments = request["arguments"];
+ Netdisk_DownloadRequest drequest;
+ drequest.loginUUID = arguments["loginUUID"].asString();
+ Json::Value start = arguments["start"];
+ drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
+ drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀
+ drequest.start.tm_mday = start["day"].asInt();
+ drequest.start.tm_hour = start["hour"].asInt();
+ drequest.start.tm_min = start["min"].asInt();
+ drequest.start.tm_sec = start["sec"].asInt();
+
+
+ Json::Value end = arguments["end"];
+ drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
+ drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀
+ drequest.end.tm_mday = end["day"].asInt();
+ drequest.end.tm_hour = end["hour"].asInt();
+ drequest.end.tm_min = end["min"].asInt();
+ drequest.end.tm_sec = end["sec"].asInt();
+
+ drequest.channel = arguments["channel"].asInt();
+ drequest.destpath = arguments["destpath"].asString();
+
+ task_queue.push(drequest);
+
+
+ Json::Value response;
+ response["code"] = 0;
+ std::string str = response.toStyledString();
+ std::cout << str << std::endl;
+ if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC)) != 0) {
+ fatal("nng_send", rv);
+ return rv;
+ }
+ return 0;
+
+
+}
+
+int handleDownloadByTime(nng_socket sock, Json::Value request) {
+std::cout << "accepted handleDownloadByTime request" << std::endl;
+ int rv, code;
+ // char *buf;
+ Netdisk *netdisk = NULL;
+ Json::Value arguments = request["arguments"];
+ Netdisk_DownloadRequest drequest;
+ drequest.loginUUID = arguments["loginUUID"].asString();
+ Json::Value start = arguments["start"];
+ drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
+ drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀
+ drequest.start.tm_mday = start["day"].asInt();
+ drequest.start.tm_hour = start["hour"].asInt();
+ drequest.start.tm_min = start["min"].asInt();
+ drequest.start.tm_sec = start["sec"].asInt();
+
+
+ Json::Value end = arguments["end"];
+ drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
+ drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀
+ drequest.end.tm_mday = end["day"].asInt();
+ drequest.end.tm_hour = end["hour"].asInt();
+ drequest.end.tm_min = end["min"].asInt();
+ drequest.end.tm_sec = end["sec"].asInt();
+
+ drequest.channel = arguments["channel"].asInt();
+ drequest.destpath = arguments["destpath"].asString();
+
+
+ std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
+ if( userDeviceIter != userDeviceMap.end() ) {
+ netdisk = userDeviceIter->second;
+ }
+
+ Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
+ if (netdisk == NULL) {
+
+ if(loginInfo.deviceType == "HC") {
+ // std::cout << "new HCNetdisk" << std::endl;
+ netdisk = new HCNetdisk();
+ userDeviceMap.insert({loginInfo.loginUUID, netdisk});
+ } else {
+ err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
+
+ }
+ }
+
+ if ( (code = netdisk->login(loginInfo)) != 0 ) {
+ printf("涓嬭浇鐧诲綍澶辫触\n");
+ }
+
+ std::vector<std::string> files;
+ if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) {
+ printf("涓嬭浇澶辫触\n");
+ }
+
+
+ Json::Value response;
+ Json::Value payload;
+ response["code"] = code;
+
+ Json::Value filelist;
+ for(std::string f : files) {
+ filelist.append(f);
+ }
+ payload["filelist"] = filelist;
+ response["payload"] = payload;
+ std::string str = response.toStyledString();
+ std::cout << str << std::endl;
+ rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
+ if (rv != 0) {
+ fatal("nng_send", rv);
+ return rv;
+ }
+ return 0;
+
+
+}
diff --git a/service/request_handler.h b/service/request_handler.h
new file mode 100644
index 0000000..da51eaf
--- /dev/null
+++ b/service/request_handler.h
@@ -0,0 +1,15 @@
+#include "usg_common.h"
+#include "netdisk.h"
+#include "safe_queue.h"
+#include "login_store.h"
+#include <jsoncpp/json/json.h>
+#include <nng/nng.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+
+
+typedef int (*RequestHandleFun)(nng_socket sock, Json::Value request);
+
+int handleLogin(nng_socket sock, Json::Value request);
+int handleDownloadByTime(nng_socket sock, Json::Value request);
+int handleDownloadByTimeAsync(nng_socket sock, Json::Value request);
\ No newline at end of file
diff --git a/service/safe_queue.h b/service/safe_queue.h
new file mode 100644
index 0000000..1db299b
--- /dev/null
+++ b/service/safe_queue.h
@@ -0,0 +1,120 @@
+#ifndef _SAFEQUEUE_H_
+#define _SAFEQUEUE_H_
+
+#include <queue>
+#include <condition_variable>
+#include <mutex>
+#include <chrono>
+#include <limits> // std::numeric_limits<>::max
+
+#define SAFE_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max()
+
+/// @brief thread-safe queue
+/// It uses a mutex+condition variables to protect the internal queue
+/// implementation. Inserting or reading elements use the same mutex
+template <typename T>
+class SafeQueue
+{
+public:
+ /// @brief constructor
+ /// @param a_maxSize optional parameter with the maximum size of the queue
+ SafeQueue(std::size_t a_maxSize = SAFE_QUEUE_DEFAULT_MAX_SIZE);
+
+ /// @brief destructor
+ ~SafeQueue();
+
+ /// @brief copy contructor
+ /// WARNING: Use with great care, this function call can take a long time
+ /// and block other threads from pushing/popping elements into the source
+ /// queue
+ SafeQueue(const SafeQueue<T>& a_src);
+
+ /// @brief operator= overloading
+ /// This function blocks the a_src and "this" SafeQueues and copies the
+ /// contents of a_src into "this" SafeQueue.
+ /// WARNING: Use with great care, this function call can take a long time
+ /// and block other threads from pushing/popping elements into the queues
+ /// @param a_src the "right" side of the operator=
+ /// @return a const reference to this object
+ const SafeQueue<T>& operator=(const SafeQueue<T> &a_src);
+
+ /// @brief move contructor
+ SafeQueue(SafeQueue<T>&& a_src);
+
+ /// @brief move assignment
+ SafeQueue<T>& operator=(SafeQueue<T>&& a_src);
+
+ /// @brief Check if the queue is empty
+ /// This call can block if another thread owns the lock that protects the
+ /// queue
+ /// @return true if the queue is empty. False otherwise
+ bool isEmpty() const;
+
+ /// @brief inserts an element into queue queue
+ /// This call can block if another thread owns the lock that protects the
+ /// queue. If the queue is full The thread will be blocked in this queue
+ /// until someone else gets an element from the queue
+ /// @param element to insert into the queue
+ void push(const T &a_elem);
+
+ /// @brief inserts an element into queue queue
+ /// This call can block if another thread owns the lock that protects the
+ /// queue. If the queue is full The call will return false and the element
+ /// won't be inserted
+ /// @param element to insert into the queue
+ /// @return True if the elem was successfully inserted into the queue.
+ /// False otherwise
+ bool tryPush(const T &a_elem);
+
+ /// @brief extracts an element from the queue (and deletes it from the q)
+ /// If the queue is empty this call will block the thread until there is
+ /// something in the queue to be extracted
+ /// @param a reference where the element from the queue will be saved to
+ void pop(T &out_data);
+
+ /// @brief extracts an element from the queue (and deletes it from the q)
+ /// This call gets the block that protects the queue. It will extract the
+ /// element from the queue only if there are elements in it
+ /// @param reference to the variable where the result will be saved
+ /// @return True if the element was retrieved from the queue.
+ /// False if the queue was empty
+ bool tryPop(T &out_data);
+
+ /// @brief extracts an element from the queue (and deletes it from the q)
+ /// If the queue is empty this call will block the thread until there
+ /// is something in the queue to be extracted or until the timer
+ /// (2nd parameter) expires
+ /// @param reference to the variable where the result will be saved
+ /// @param duration to wait before returning if the queue was empty
+ /// you may also pass into this a std::seconds or std::milliseconds
+ /// (defined in std::chrono)
+ /// @return True if the element was retrieved from the queue.
+ /// False if the timeout was hit and nothing could be extracted
+ /// from the queue
+ bool timedWaitPop(T &data, std::chrono::microseconds a_microsecs);
+
+protected:
+ /// the actual queue data structure protected by this SafeQueue wrapper
+ std::queue<T> m_theQueue;
+ /// maximum number of elements for the queue
+ std::size_t m_maximumSize;
+ /// Mutex to protect the queue
+ mutable std::mutex m_mutex;
+ /// Conditional variable to wake up threads
+ mutable std::condition_variable m_cond;
+
+ /// @brief calculate if copying a_src into this instance will need to
+ /// wake up potential threads waiting to perform push or pop ops.
+ /// WARNING: It assumes the caller holds all mutexes required to access
+ /// to the data of this and a_src SafeQueues
+ /// @param a_src const reference to the SafeQueue that will be copied
+ /// into this object
+ /// @return true if threads will need to be waken up. False otherwise
+ inline bool wakeUpSignalNeeded(const SafeQueue<T> &a_src) const;
+};
+
+// include the implementation file
+#include "safe_queue_impl.h"
+
+#endif /* _SAFEQUEUE_H_ */
+
diff --git a/service/safe_queue_impl.h b/service/safe_queue_impl.h
new file mode 100644
index 0000000..9d1d442
--- /dev/null
+++ b/service/safe_queue_impl.h
@@ -0,0 +1,258 @@
+#ifndef _SAFEQUEUEIMPL_H_
+#define _SAFEQUEUEIMPL_H_
+
+template <typename T>
+SafeQueue<T>::SafeQueue(std::size_t a_maxSize):
+ m_theQueue(),
+ m_maximumSize(a_maxSize),
+ m_mutex(),
+ m_cond()
+{
+}
+
+template <typename T>
+SafeQueue<T>::~SafeQueue()
+{
+}
+
+template <typename T>
+SafeQueue<T>::SafeQueue(const SafeQueue<T>& a_src):
+ m_theQueue(),
+ m_maximumSize(0),
+ m_mutex(),
+ m_cond()
+{
+ // copying a safe queue involves only copying the data (m_theQueue and
+ // m_maximumSize). This object has not been instantiated yet so nobody can
+ // be trying to perform push or pop operations on it, but we need to
+ // acquire a_src.m_mutex before copying its data into m_theQueue and
+ // m_maximumSize
+ std::unique_lock<std::mutex> lk(a_src.m_mutex);
+
+ this->m_maximumSize = a_src.m_maximumSize;
+ this->m_theQueue = a_src.m_theQueue;
+}
+
+template <typename T>
+const SafeQueue<T>& SafeQueue<T>::operator=(const SafeQueue<T> &a_src)
+{
+ if (this != &a_src)
+ {
+ // lock both mutexes at the same time to avoid deadlocks
+ std::unique_lock<std::mutex> this_lk(this->m_mutex, std::defer_lock);
+ std::unique_lock<std::mutex> src_lk (a_src.m_mutex, std::defer_lock);
+ std::lock(this_lk, src_lk);
+
+ // will we need to wake up waiting threads after copying the source
+ // queue?
+ bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src);
+
+ // copy data from the left side of the operator= into this intance
+ this->m_maximumSize = a_src.m_maximumSize;
+ this->m_theQueue = a_src.m_theQueue;
+
+ // time now to wake up threads waiting for data to be inserted
+ // or extracted
+ if (wakeUpWaitingThreads)
+ {
+ this->m_cond.notify_all();
+ }
+ }
+
+ return *this;
+}
+
+template <typename T>
+SafeQueue<T>::SafeQueue(SafeQueue<T>&& a_src):
+ m_theQueue(a_src.m_theQueue), // implicit std::move(a_src.m_theQueue)
+ m_maximumSize(a_src.m_maximumSize), // move constructor called implicitly
+ m_mutex(), // instantiate a new mutex
+ m_cond() // instantiate a new conditional variable
+{
+ // This object has not been instantiated yet. We can assume no one is using
+ // its mutex.
+ // Also, a_src is a temporary object so there is no need to acquire
+ // its mutex.
+ // Things can therefore be safely moved without the need for any mutex or
+ // conditional variable
+}
+
+template <typename T>
+SafeQueue<T>& SafeQueue<T>::operator=(SafeQueue<T> &&a_src)
+{
+ if (this != &a_src)
+ {
+ // make sure we hold this mutex before moving things around. a_src is
+ // a temporary object so no need to hold its mutex
+ std::unique_lock<std::mutex> lk(this->m_mutex);
+
+ // will we need to wake up waiting threads after copying the source
+ // queue?
+ bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src);
+
+ // process data from the temporary copy into this intance
+ this->m_maximumSize = std::move(a_src.m_maximumSize);
+ this->m_theQueue = std::move(a_src.m_theQueue);
+
+ // time now to wake up threads waiting for data to be inserted
+ // or extracted
+ if (wakeUpWaitingThreads)
+ {
+ this->m_cond.notify_all();
+ }
+ }
+
+ return *this;
+}
+
+template <typename T>
+bool SafeQueue<T>::wakeUpSignalNeeded(const SafeQueue<T> &a_src) const
+{
+ if (this->m_theQueue.empty() && (!a_src.m_theQueue.empty()))
+ {
+ // threads waiting for stuff to be popped off the queue
+ return true;
+ }
+ else if ((this->m_theQueue.size() >= this->m_maximumSize) &&
+ (a_src.m_theQueue.size() < a_src.m_maximumSize))
+ {
+ // threads waiting for stuff to be pushed into the queue
+ return true;
+ }
+
+ return false;
+}
+
+template <typename T>
+bool SafeQueue<T>::isEmpty() const
+{
+ std::lock_guard<std::mutex> lk(m_mutex);
+ return m_theQueue.empty();
+}
+
+template <typename T>
+void SafeQueue<T>::push(const T &a_elem)
+{
+ std::unique_lock<std::mutex> lk(m_mutex);
+
+ while (m_theQueue.size() >= m_maximumSize)
+ {
+ m_cond.wait(lk);
+ }
+
+ bool queueEmpty = m_theQueue.empty();
+
+ m_theQueue.push(a_elem);
+
+ if (queueEmpty)
+ {
+ // wake up threads waiting for stuff
+ m_cond.notify_all();
+ }
+}
+
+template <typename T>
+bool SafeQueue<T>::tryPush(const T &a_elem)
+{
+ std::lock_guard<std::mutex> lk(m_mutex);
+
+ bool rv = false;
+ bool queueEmpty = m_theQueue.empty();
+
+ if (m_theQueue.size() < m_maximumSize)
+ {
+ m_theQueue.push(a_elem);
+ rv = true;
+ }
+
+ if (queueEmpty)
+ {
+ // wake up threads waiting for stuff
+ m_cond.notify_all();
+ }
+
+ return rv;
+}
+
+template <typename T>
+void SafeQueue<T>::pop(T &out_data)
+{
+ std::unique_lock<std::mutex> lk(m_mutex);
+
+ while (m_theQueue.empty())
+ {
+ m_cond.wait(lk);
+ }
+
+ bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+ out_data = m_theQueue.front();
+ m_theQueue.pop();
+
+ if (queueFull)
+ {
+ // wake up threads waiting for stuff
+ m_cond.notify_all();
+ }
+}
+
+template <typename T>
+bool SafeQueue<T>::tryPop(T &out_data)
+{
+ std::lock_guard<std::mutex> lk(m_mutex);
+
+ bool rv = false;
+ if (!m_theQueue.empty())
+ {
+ bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+ out_data = m_theQueue.front();
+ m_theQueue.pop();
+
+ if (queueFull)
+ {
+ // wake up threads waiting for stuff
+ m_cond.notify_all();
+ }
+
+ rv = true;
+ }
+
+ return rv;
+}
+
+template <typename T>
+bool SafeQueue<T>::timedWaitPop(T &data, std::chrono::microseconds a_microsecs)
+{
+ std::unique_lock<std::mutex> lk(m_mutex);
+
+ auto wakeUpTime = std::chrono::steady_clock::now() + a_microsecs;
+ if (m_cond.wait_until(lk, wakeUpTime,
+ [this](){return (m_theQueue.size() > 0);}))
+ {
+ // wait_until returns false if the predicate (3rd parameter) still
+ // evaluates to false after the rel_time timeout expired
+ // we are in this side of the if-clause because the queue is not empty
+ // (so the 3rd parameter evaluated to true)
+ bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+ data = m_theQueue.front();
+ m_theQueue.pop();
+
+ if (queueFull)
+ {
+ // wake up threads waiting to insert things into the queue.
+ // The queue used to be full, now it's not.
+ m_cond.notify_all();
+ }
+
+ return true;
+ }
+ else
+ {
+ // timed-out and the queue is still empty
+ return false;
+ }
+}
+
+#endif /* _SAFEQUEUEIMPL_H_ */
diff --git a/service/test b/service/test
index 498a736..6a3c133 100755
--- a/service/test
+++ b/service/test
Binary files differ
diff --git a/service/test_properties b/service/test_properties
new file mode 100755
index 0000000..6b1a2fb
--- /dev/null
+++ b/service/test_properties
Binary files differ
diff --git a/service/test_properties.c b/service/test_properties.c
new file mode 100644
index 0000000..a86c503
--- /dev/null
+++ b/service/test_properties.c
@@ -0,0 +1,8 @@
+#include "usg_common.h"
+#include "properties_config.h"
+
+int main() {
+ PropertiesConfig config("../data/config.txt");
+
+ std::cout << config.get("local_url") << std::endl;
+}
\ No newline at end of file
diff --git a/service/test_queue b/service/test_queue
new file mode 100755
index 0000000..d11beb0
--- /dev/null
+++ b/service/test_queue
Binary files differ
diff --git a/service/test_queue.c b/service/test_queue.c
new file mode 100644
index 0000000..4f66425
--- /dev/null
+++ b/service/test_queue.c
@@ -0,0 +1,18 @@
+#include "usg_common.h"
+#include "safe_queue.h"
+
+#define QUEUE_SIZE 10
+
+
+using namespace std;
+
+int main() {
+ SafeQueue<int> m_queue(10);
+ m_queue.push(1);
+ m_queue.push(2);
+ int a;
+ m_queue.pop(a);
+ cout << a << endl;
+ m_queue.pop(a);
+ cout << a << endl;
+}
--
Gitblit v1.8.0