From 5c9cbf5ea152f501ae976e0e0b4ef5ee98afdfba Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 11 六月 2020 19:20:40 +0800
Subject: [PATCH] pdate

---
 service/netdisk_service.c |  254 ++++++++++++++++++++++----------------------------
 1 files changed, 110 insertions(+), 144 deletions(-)

diff --git a/service/netdisk_service.c b/service/netdisk_service.c
index 57fd496..8d31587 100644
--- a/service/netdisk_service.c
+++ b/service/netdisk_service.c
@@ -1,174 +1,119 @@
 #include "usg_common.h"
 #include "netdisk.h"
-#include "hcnetdisk.h"
+#include "safe_queue.h"
+#include "login_store.h"
+#include "request_handler.h"
+#include "properties_config.h"
 #include <jsoncpp/json/json.h>
 #include <nng/nng.h>
 #include <nng/protocol/reqrep0/rep.h>
 #include <nng/protocol/reqrep0/req.h>
+
 using namespace std;
 
-const char *url = "tcp://127.0.0.1:8899";
-Json::Value loginData;
 
-std::map<std::string, Netdisk *> *userDeviceMap;
 
-std::string login_data_file = "../data/login.dat";
-int saveLoginInfo(Netdisk_LoginInfo &loginInfo) {
-   
-    Json::Value item;
-    item["loginUUID"] = loginInfo.loginUUID;
-    item["deviceType"] = loginInfo.deviceType;
-    item["username"] = loginInfo.username;
-    item["password"] = loginInfo.password;
-    item["host"] = loginInfo.host;
-    item["port"] = loginInfo.port;
- 
-    loginData[loginInfo.loginUUID] = item;
- 
+int work(Netdisk_DownloadRequest drequest);
 
-    auto str = loginData.toStyledString();
-    // std::cout << str << std::endl;
 
-    std::ofstream fout(login_data_file);
-    fout << str;
-    fout.close();
-    return 0;
-}
+PropertiesConfig config("../data/config.txt");
 
-Netdisk_LoginInfo  getLoginInfo(std::string uuid) {
- Json::Value item = loginData[uuid];
- Netdisk_LoginInfo loginInfo;
- loginInfo.loginUUID = item["loginUUID"].asString();
- loginInfo.deviceType = item["deviceType"].asString();
- loginInfo.username = item["username"].asString();
- loginInfo.password = item["password"].asString();
- loginInfo.host = item["host"].asString();
- loginInfo.port = item["port"].asInt();
- return loginInfo;
-}
 
-void
-fatal(const char *func, int rv)
+int  WORKERS ;
+std::string localUrl;
+std::string remoteUrl;
+
+
+
+SafeQueue<Netdisk_DownloadRequest> task_queue(10);
+
+std::map<std::string, Netdisk *> userDeviceMap;
+
+LoginStore loginStore;
+
+std::map<std::string, RequestHandleFun> requestHandleFunMap;
+
+
+
+static inline void fatal(const char *func, int rv)
 {
   fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
-  exit(1);
+  //exit(1);
 }
 
- 
-int handleLogin(nng_socket sock, Json::Value request) {
-  int rv, code;
-  // char *buf;
-  Netdisk *netdisk = NULL;
 
-  Json::Value arguments = request["arguments"];
+int connectAndSend(const char *url, char * str) {
+  nng_socket sock;
+  int        rv;
+  size_t     sz;
+  char *buf;
 
-  //鐧诲綍
-  Netdisk_LoginInfo loginInfo;
-  loginInfo.loginUUID=arguments["loginUUID"].asString();
-  loginInfo.deviceType = arguments["deviceType"].asString();
-  loginInfo.host = arguments["host"].asString();
-  loginInfo.port = arguments["port"].asInt();
-  loginInfo.username = arguments["username"].asString();
-  loginInfo.password = arguments["password"].asString();
-  
-
-  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID);
-  if( userDeviceIter != userDeviceMap->end() ) {
-     netdisk = userDeviceIter->second;
+  if ((rv = nng_req0_open(&sock)) != 0) {
+    fatal("nng_socket", rv);
   }
-  if (netdisk == NULL) {
-    printf("=========null\n");
-    if(loginInfo.deviceType == "HC") {
-      // std::cout << "new HCNetdisk" << std::endl;
-      netdisk = new HCNetdisk();
-      userDeviceMap->insert({loginInfo.loginUUID, netdisk});
-    } else {
-      err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
-    }
+  if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
+    fatal("nng_dial", rv);
   }
+ // printf("CLIENT: SENDING DATE REQUEST\n");
 
-  code = netdisk->login(loginInfo);
-  if (code == 0) {
-    saveLoginInfo(loginInfo);
-    std::cout << "璧峰閫氶亾:" << netdisk->getStartChannel() << ", 鏈�澶ч�氶亾鍙凤細" << netdisk->getMaxChannels() << std::endl;
-  }
- 
-
-  Json::Value response;
-  Json::Value payload;
-  payload["loginUUID"] = loginInfo.loginUUID;
-  response["code"] = code;
-  response["payload"]  = payload;
-  const std::string str = response.toStyledString();
-  // nng鍐呴儴浼氶噴鏀綽uf
-  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
-  //free(buf);
-  if (rv != 0) {
+  if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) {
     fatal("nng_send", rv);
-    return rv;
   }
+    
+  if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
+    fatal("nng_recv", rv);
+  }
+  std::cout << buf;
+  nng_free(buf, sz);
+  nng_close(sock);
   return 0;
+
 }
-    
-int handleDownloadByTime(nng_socket sock, Json::Value request) {
-  int rv, code;
-  // char *buf;
+
+void *worker(void *vargp)
+{
+  pthread_detach(pthread_self());
+  while (1)
+  {
+    Netdisk_DownloadRequest  request;
+    task_queue.pop(request);
+    work(request);
+  }
+}
+
+int work(Netdisk_DownloadRequest drequest) {
   Netdisk *netdisk = NULL;
-  Json::Value arguments = request["arguments"];
-  Netdisk_DownloadRequest drequest;
-  drequest.loginUUID = arguments["loginUUID"].asString();
-  Json::Value start = arguments["start"];
-  drequest.start.tm_year = start["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
-  drequest.start.tm_mon = start["mon"].asInt()-1; // 0鏄涓�涓湀
-  drequest.start.tm_mday = start["day"].asInt();
-  drequest.start.tm_hour = start["hour"].asInt();
-  drequest.start.tm_min = start["min"].asInt();
-  drequest.start.tm_sec = start["sec"].asInt();
-
-    
-  Json::Value end = arguments["end"];
-  drequest.end.tm_year = end["year"].asInt()-1900; // 杩欎釜鏃堕棿绫诲瀷浠�1900寮�濮嬬畻浣滅涓�骞�
-  drequest.end.tm_mon = end["mon"].asInt()-1; // 0鏄涓�涓湀
-  drequest.end.tm_mday = end["day"].asInt();
-  drequest.end.tm_hour = end["hour"].asInt();
-  drequest.end.tm_min = end["min"].asInt();
-  drequest.end.tm_sec = end["sec"].asInt();
-
-  drequest.channel = arguments["channel"].asInt();
-  drequest.destpath = arguments["destpath"].asString();
-
-
-  std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID);
-  if( userDeviceIter != userDeviceMap->end() ) {
+  int rv;
+  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
+  if( userDeviceIter != userDeviceMap.end() ) {
      netdisk = userDeviceIter->second;
   }
 
-  Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID);
+  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
   if (netdisk == NULL) {
    
     if(loginInfo.deviceType == "HC") {
       // std::cout << "new HCNetdisk" << std::endl;
       netdisk = new HCNetdisk();
-      userDeviceMap->insert({loginInfo.loginUUID, netdisk});
+      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
     } else {
       err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
 
     }
   }
 
-  if ( (code = netdisk->login(loginInfo)) != 0 ) {
+  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
     printf("涓嬭浇鐧诲綍澶辫触\n");
   }
 
   std::vector<std::string> files;
-  if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) {
+  if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
     printf("涓嬭浇澶辫触\n");
   }
 
-
   Json::Value response;
   Json::Value payload;
-  response["code"] = code;
+  response["rv"] = rv;
 
   Json::Value filelist;
   for(std::string f : files) {
@@ -177,31 +122,29 @@
   payload["filelist"] = filelist;
   response["payload"] = payload;
   std::string str = response.toStyledString();
+
+  std::cout << "download finished, call back" << std::endl;
   std::cout << str << std::endl;
-  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
-  if (rv != 0) {
-    fatal("nng_send", rv);
-    return rv;
-  }
+  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
+  
   return 0;
-
-
 }
 
-int
-server()
-{
+void startServer(const char *url) {
   Json::Reader jsonreader;
   Json::Value request;
   nng_socket sock;
   int rv;
-
+ 
+  RequestHandleFun handleFun;
   if ((rv = nng_rep0_open(&sock)) != 0) {
     fatal("nng_rep0_open", rv);
   }
   if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
+    printf("url=%s\n", url);
     fatal("nng_listen", rv);
   }
+
   for (;;) {
     char *   buf = NULL;
     size_t   sz;
@@ -211,32 +154,55 @@
 
     jsonreader.parse(buf, request);
     std::string method = request["method"].asString();
-    if (method == "login") {
-      handleLogin(sock, request);
-    } else if (method == "downloadByTime") {
-      handleDownloadByTime(sock, request);
+
+    std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
+    if( handleFunIter != requestHandleFunMap.end() ) {
+      handleFun = handleFunIter->second;
+      handleFun(sock, request);
     } else {
       std::cerr << "Don't support " << method << std::endl;
     }
+    // if (method == "login") {
+    //   handleLogin(sock, request);
+    // } else if (method == "downloadByTime") {
+    //   handleDownloadByTimeAsync(sock, request);
+    // } else {
+    //   std::cerr << "Don't support " << method << std::endl;
+    // }
     // Unrecognized command, so toss the buffer.
     nng_free(buf, sz);
   }
 }
 
+
+void registRequestHandleFun() {
+  requestHandleFunMap.insert({"login", handleLogin});
+  requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
+}
+
+void initThreadPool() {
+  pthread_t tid;
+  for (int i = 0; i < WORKERS; i++)
+    pthread_create(&tid, NULL, worker, NULL);
+}
+
 int main()
 {
-  Netdisk_EnvConfig config;
-  config.libpath = "../hclib/";
-  HCNetdisk::netdisk_init(&config);
 
-  Json::Reader jsonreader;
-  ifstream fin(login_data_file); 
-  jsonreader.parse(fin, loginData);
-  fin.close();
+  Netdisk_EnvConfig hcEnvConfig;
+  hcEnvConfig.libpath = config.get("hclib");
+  HCNetdisk::netdisk_init(&hcEnvConfig);
+
+  WORKERS = config.getInt("workers");
+  localUrl = config.get("local_url");
+  remoteUrl = config.get("remote_url");
 
 
-  userDeviceMap = new std::map<std::string, Netdisk *>();
-  server();
+  registRequestHandleFun();
+
+  initThreadPool();
+
+  startServer(localUrl.c_str());
 
 
   HCNetdisk::netdisk_deinit();

--
Gitblit v1.8.0