wangzhengquan
2020-09-10 591aacee97f4a6486631c38a6b418e20b2c4109c
service/netdisk_service.c
@@ -9,6 +9,8 @@
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>
using namespace std;
@@ -17,7 +19,6 @@
int  WORKERS ;
std::string localUrl;
std::string remoteUrl;
PropertiesConfig config("../data/config.txt");
@@ -69,46 +70,47 @@
int work(Netdisk_DownloadRequest drequest) {
  Netdisk *netdisk = NULL;
  std::vector<std::string> files;
  int rv;
  char rmsg[MAXLINE];
  strcpy(rmsg, "success");
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  if (netdisk == NULL) {
    netdisk = NetdiskFacotory::create(loginInfo.deviceType);
    if(netdisk != NULL) {
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
      snprintf(rmsg, MAXLINE, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
    printf("下载登录失败\n");
    snprintf(rmsg, MAXLINE, "请重新登录");
  } else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
    snprintf(rmsg, MAXLINE, "下载失败");
  }
  std::vector<std::string> files;
  if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
    printf("下载失败\n");
  }
  Json::Value response;
  Json::Value payload;
  response["rv"] = rv;
  Json::Value request;
  request["method"] = "downloadByTimeCallBack";
  Json::Value arguments;
  Json::Value filelist;
  for(std::string f : files) {
    filelist.append(f);
  }
  payload["filelist"] = filelist;
  response["payload"] = payload;
  std::string str = response.toStyledString();
  arguments["fileList"] = filelist;
  arguments["loginUUID"] = drequest.loginUUID;
  request["arguments"] = arguments;
  std::string str = request.toStyledString();
  std::cout << "download finished, call back" << std::endl;
  std::cout << str << std::endl;
  std::cout << "SENDING download finished\n" << str << std::endl;
  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
  
  return 0;
@@ -121,6 +123,7 @@
  {
    Netdisk_DownloadRequest  request;
    task_queue.pop(request);
    err_msg(0, "====take a task");
    work(request);
  }
}
@@ -148,8 +151,9 @@
    if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
      fatal("nng_recv", rv);
    }
printf("RECEIVED RPC REQUEST:\n %s", buf);
    jsonreader.parse(buf, request);
    nng_free(buf, sz);
    std::string method = request["method"].asString();
    std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
@@ -160,15 +164,20 @@
      std::cerr << "Don't support " << method << std::endl;
    }
    
    nng_free(buf, sz);
  }
}
/**
 * 注册请求处理的方法
 */
void registRequestHandleFun() {
  requestHandleFunMap.insert({"login", handleLogin});
  requestHandleFunMap.insert({"logout", handleLogout});
  requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
  requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo});
}
void initThreadPool() {
  pthread_t tid;
@@ -176,23 +185,64 @@
    pthread_create(&tid, NULL, worker, NULL);
}
void heartBeat(const char *url, const char *name)
{
  nng_socket sock;
  int rv;
  if ((rv = nng_respondent0_open(&sock)) != 0) {
    fatal("nng_respondent0_open", rv);
  }
  if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {
    fatal("nng_dial", rv);
  }
  for (;;) {
    char *buf = NULL;
    size_t sz;
    if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
//printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf);
      nng_free(buf, sz);
      char response[1024];
      sprintf(response, "%s-%d", name, getpid());
//printf("CLIENT (%s): SENDING SURVEY RESPONSE:%s\n", name, response);
      if ((rv = nng_send(sock, response, strlen(response) + 1, 0)) != 0) {
        fatal("nng_send", rv);
      }
    }
  }
}
void *heart(void *vargp)
{
  pthread_detach(pthread_self());
  heartBeat(config.get("heart_server").c_str(), "netdisk");
  return NULL;
}
// 心跳发送进程
void initHeart() {
  pthread_t tid;
  pthread_create(&tid, NULL, heart, NULL);
}
int main()
{
  Netdisk_EnvConfig hcEnvConfig;
  hcEnvConfig.libpath = config.get("hclib");
  HCNetdisk::netdisk_init(&hcEnvConfig);
  //环境变量初始化
  WORKERS = config.getInt("workers");
  localUrl = config.get("local_url");
  remoteUrl = config.get("remote_url");
  remoteUrl = config.get("client_url");
  //海康设备环境初始化
  Netdisk_EnvConfig hcEnvConfig;
  hcEnvConfig.libpath = "../lib/hc";
  HCNetdisk::netdisk_init(&hcEnvConfig);
  registRequestHandleFun();
  initThreadPool();
  initHeart();
  startServer(localUrl.c_str());
  startServer(config.get("server_url").c_str());
  HCNetdisk::netdisk_deinit();