From 591aacee97f4a6486631c38a6b418e20b2c4109c Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 10 九月 2020 14:56:47 +0800
Subject: [PATCH] update

---
 service/netdisk_service.c |  190 ++++++++++++++++++++++++++++------------------
 1 files changed, 115 insertions(+), 75 deletions(-)

diff --git a/service/netdisk_service.c b/service/netdisk_service.c
index 8d31587..d6a0b19 100644
--- a/service/netdisk_service.c
+++ b/service/netdisk_service.c
@@ -4,28 +4,26 @@
 #include "login_store.h"
 #include "request_handler.h"
 #include "properties_config.h"
+#include "netdisk_factory.h"
 #include <jsoncpp/json/json.h>
 #include <nng/nng.h>
 #include <nng/protocol/reqrep0/rep.h>
 #include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/survey0/survey.h>
+#include <nng/protocol/survey0/respond.h>
 
 using namespace std;
 
-
-
-int work(Netdisk_DownloadRequest drequest);
-
-
-PropertiesConfig config("../data/config.txt");
+static int work(Netdisk_DownloadRequest drequest);
+static int connectAndSend(const char *url, char * str);
 
 
 int  WORKERS ;
-std::string localUrl;
 std::string remoteUrl;
 
+PropertiesConfig config("../data/config.txt");
 
-
-SafeQueue<Netdisk_DownloadRequest> task_queue(10);
+SafeQueue<Netdisk_DownloadRequest> task_queue(100);
 
 std::map<std::string, Netdisk *> userDeviceMap;
 
@@ -70,6 +68,54 @@
 
 }
 
+int work(Netdisk_DownloadRequest drequest) {
+  Netdisk *netdisk = NULL;
+  std::vector<std::string> files;
+  int rv;
+  char rmsg[MAXLINE];
+  strcpy(rmsg, "success");
+  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
+  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
+  if( userDeviceIter != userDeviceMap.end() ) {
+     netdisk = userDeviceIter->second;
+  }
+
+  if (netdisk == NULL) {
+
+    netdisk = NetdiskFacotory::create(loginInfo.deviceType);
+    if(netdisk != NULL) {
+      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
+    } else {
+      snprintf(rmsg, MAXLINE, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
+    }
+  }
+
+  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
+    snprintf(rmsg, MAXLINE, "璇烽噸鏂扮櫥褰�");
+  } else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
+    snprintf(rmsg, MAXLINE, "涓嬭浇澶辫触");
+  }
+ 
+
+  Json::Value request;
+  request["method"] = "downloadByTimeCallBack";
+  Json::Value arguments;
+  Json::Value filelist;
+  for(std::string f : files) {
+    filelist.append(f);
+  }
+  arguments["fileList"] = filelist;
+  arguments["loginUUID"] = drequest.loginUUID;
+  
+  request["arguments"] = arguments;
+  std::string str = request.toStyledString();
+
+  std::cout << "SENDING download finished\n" << str << std::endl;
+  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
+  
+  return 0;
+}
+
 void *worker(void *vargp)
 {
   pthread_detach(pthread_self());
@@ -77,58 +123,12 @@
   {
     Netdisk_DownloadRequest  request;
     task_queue.pop(request);
+    err_msg(0, "====take a task");
     work(request);
   }
 }
 
-int work(Netdisk_DownloadRequest drequest) {
-  Netdisk *netdisk = NULL;
-  int rv;
-  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
-  if( userDeviceIter != userDeviceMap.end() ) {
-     netdisk = userDeviceIter->second;
-  }
 
-  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
-  if (netdisk == NULL) {
-   
-    if(loginInfo.deviceType == "HC") {
-      // std::cout << "new HCNetdisk" << std::endl;
-      netdisk = new HCNetdisk();
-      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
-    } else {
-      err_msg(0, "鏃犳硶璇嗗埆鐨勮澶囩被鍨嬶細 %s", loginInfo.deviceType.c_str());
-
-    }
-  }
-
-  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
-    printf("涓嬭浇鐧诲綍澶辫触\n");
-  }
-
-  std::vector<std::string> files;
-  if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
-    printf("涓嬭浇澶辫触\n");
-  }
-
-  Json::Value response;
-  Json::Value payload;
-  response["rv"] = rv;
-
-  Json::Value filelist;
-  for(std::string f : files) {
-    filelist.append(f);
-  }
-  payload["filelist"] = filelist;
-  response["payload"] = payload;
-  std::string str = response.toStyledString();
-
-  std::cout << "download finished, call back" << std::endl;
-  std::cout << str << std::endl;
-  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
-  
-  return 0;
-}
 
 void startServer(const char *url) {
   Json::Reader jsonreader;
@@ -151,8 +151,9 @@
     if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
       fatal("nng_recv", rv);
     }
-
+printf("RECEIVED RPC REQUEST:\n %s", buf);
     jsonreader.parse(buf, request);
+    nng_free(buf, sz);
     std::string method = request["method"].asString();
 
     std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
@@ -162,23 +163,21 @@
     } else {
       std::cerr << "Don't support " << method << std::endl;
     }
-    // if (method == "login") {
-    //   handleLogin(sock, request);
-    // } else if (method == "downloadByTime") {
-    //   handleDownloadByTimeAsync(sock, request);
-    // } else {
-    //   std::cerr << "Don't support " << method << std::endl;
-    // }
-    // Unrecognized command, so toss the buffer.
-    nng_free(buf, sz);
+    
+    
   }
 }
 
-
+/**
+ * 娉ㄥ唽璇锋眰澶勭悊鐨勬柟娉�
+ */
 void registRequestHandleFun() {
   requestHandleFunMap.insert({"login", handleLogin});
+  requestHandleFunMap.insert({"logout", handleLogout});
   requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
+  requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo});
 }
+
 
 void initThreadPool() {
   pthread_t tid;
@@ -186,23 +185,64 @@
     pthread_create(&tid, NULL, worker, NULL);
 }
 
+
+void heartBeat(const char *url, const char *name)
+{
+  nng_socket sock;
+  int rv;
+
+  if ((rv = nng_respondent0_open(&sock)) != 0) {
+    fatal("nng_respondent0_open", rv);
+  }
+  if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {
+    fatal("nng_dial", rv);
+  }
+  for (;;) {
+    char *buf = NULL;
+    size_t sz;
+    if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
+//printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf);
+      nng_free(buf, sz);
+      char response[1024];
+      sprintf(response, "%s-%d", name, getpid());
+//printf("CLIENT (%s): SENDING SURVEY RESPONSE:%s\n", name, response);
+      if ((rv = nng_send(sock, response, strlen(response) + 1, 0)) != 0) {
+        fatal("nng_send", rv);
+      }
+    }
+  }
+}
+
+void *heart(void *vargp)
+{
+  pthread_detach(pthread_self());
+  heartBeat(config.get("heart_server").c_str(), "netdisk");
+  return NULL;
+}
+
+// 蹇冭烦鍙戦�佽繘绋�
+void initHeart() {
+  pthread_t tid;
+  pthread_create(&tid, NULL, heart, NULL);
+}
+
 int main()
 {
-
-  Netdisk_EnvConfig hcEnvConfig;
-  hcEnvConfig.libpath = config.get("hclib");
-  HCNetdisk::netdisk_init(&hcEnvConfig);
-
+  //鐜鍙橀噺鍒濆鍖�
   WORKERS = config.getInt("workers");
-  localUrl = config.get("local_url");
-  remoteUrl = config.get("remote_url");
+  remoteUrl = config.get("client_url");
 
+  //娴峰悍璁惧鐜鍒濆鍖�
+  Netdisk_EnvConfig hcEnvConfig;
+  hcEnvConfig.libpath = "../lib/hc";
+  HCNetdisk::netdisk_init(&hcEnvConfig);
 
   registRequestHandleFun();
 
   initThreadPool();
+  initHeart();
 
-  startServer(localUrl.c_str());
+  startServer(config.get("server_url").c_str());
 
 
   HCNetdisk::netdisk_deinit();

--
Gitblit v1.8.0