wangzhengquan
2020-09-10 591aacee97f4a6486631c38a6b418e20b2c4109c
service/netdisk_service.c
@@ -4,28 +4,26 @@
#include "login_store.h"
#include "request_handler.h"
#include "properties_config.h"
#include "netdisk_factory.h"
#include <jsoncpp/json/json.h>
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>
using namespace std;
int work(Netdisk_DownloadRequest drequest);
PropertiesConfig config("../data/config.txt");
static int work(Netdisk_DownloadRequest drequest);
static int connectAndSend(const char *url, char * str);
int  WORKERS ;
std::string localUrl;
std::string remoteUrl;
PropertiesConfig config("../data/config.txt");
SafeQueue<Netdisk_DownloadRequest> task_queue(10);
SafeQueue<Netdisk_DownloadRequest> task_queue(100);
std::map<std::string, Netdisk *> userDeviceMap;
@@ -70,6 +68,54 @@
}
int work(Netdisk_DownloadRequest drequest) {
  Netdisk *netdisk = NULL;
  std::vector<std::string> files;
  int rv;
  char rmsg[MAXLINE];
  strcpy(rmsg, "success");
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  if (netdisk == NULL) {
    netdisk = NetdiskFacotory::create(loginInfo.deviceType);
    if(netdisk != NULL) {
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      snprintf(rmsg, MAXLINE, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
    snprintf(rmsg, MAXLINE, "请重新登录");
  } else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
    snprintf(rmsg, MAXLINE, "下载失败");
  }
  Json::Value request;
  request["method"] = "downloadByTimeCallBack";
  Json::Value arguments;
  Json::Value filelist;
  for(std::string f : files) {
    filelist.append(f);
  }
  arguments["fileList"] = filelist;
  arguments["loginUUID"] = drequest.loginUUID;
  request["arguments"] = arguments;
  std::string str = request.toStyledString();
  std::cout << "SENDING download finished\n" << str << std::endl;
  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
  return 0;
}
void *worker(void *vargp)
{
  pthread_detach(pthread_self());
@@ -77,58 +123,12 @@
  {
    Netdisk_DownloadRequest  request;
    task_queue.pop(request);
    err_msg(0, "====take a task");
    work(request);
  }
}
int work(Netdisk_DownloadRequest drequest) {
  Netdisk *netdisk = NULL;
  int rv;
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  if (netdisk == NULL) {
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
    printf("下载登录失败\n");
  }
  std::vector<std::string> files;
  if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
    printf("下载失败\n");
  }
  Json::Value response;
  Json::Value payload;
  response["rv"] = rv;
  Json::Value filelist;
  for(std::string f : files) {
    filelist.append(f);
  }
  payload["filelist"] = filelist;
  response["payload"] = payload;
  std::string str = response.toStyledString();
  std::cout << "download finished, call back" << std::endl;
  std::cout << str << std::endl;
  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
  return 0;
}
void startServer(const char *url) {
  Json::Reader jsonreader;
@@ -151,8 +151,9 @@
    if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
      fatal("nng_recv", rv);
    }
printf("RECEIVED RPC REQUEST:\n %s", buf);
    jsonreader.parse(buf, request);
    nng_free(buf, sz);
    std::string method = request["method"].asString();
    std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
@@ -162,23 +163,21 @@
    } else {
      std::cerr << "Don't support " << method << std::endl;
    }
    // if (method == "login") {
    //   handleLogin(sock, request);
    // } else if (method == "downloadByTime") {
    //   handleDownloadByTimeAsync(sock, request);
    // } else {
    //   std::cerr << "Don't support " << method << std::endl;
    // }
    // Unrecognized command, so toss the buffer.
    nng_free(buf, sz);
  }
}
/**
 * 注册请求处理的方法
 */
void registRequestHandleFun() {
  requestHandleFunMap.insert({"login", handleLogin});
  requestHandleFunMap.insert({"logout", handleLogout});
  requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
  requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo});
}
void initThreadPool() {
  pthread_t tid;
@@ -186,23 +185,64 @@
    pthread_create(&tid, NULL, worker, NULL);
}
void heartBeat(const char *url, const char *name)
{
  nng_socket sock;
  int rv;
  if ((rv = nng_respondent0_open(&sock)) != 0) {
    fatal("nng_respondent0_open", rv);
  }
  if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {
    fatal("nng_dial", rv);
  }
  for (;;) {
    char *buf = NULL;
    size_t sz;
    if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
//printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf);
      nng_free(buf, sz);
      char response[1024];
      sprintf(response, "%s-%d", name, getpid());
//printf("CLIENT (%s): SENDING SURVEY RESPONSE:%s\n", name, response);
      if ((rv = nng_send(sock, response, strlen(response) + 1, 0)) != 0) {
        fatal("nng_send", rv);
      }
    }
  }
}
void *heart(void *vargp)
{
  pthread_detach(pthread_self());
  heartBeat(config.get("heart_server").c_str(), "netdisk");
  return NULL;
}
// 心跳发送进程
void initHeart() {
  pthread_t tid;
  pthread_create(&tid, NULL, heart, NULL);
}
int main()
{
  Netdisk_EnvConfig hcEnvConfig;
  hcEnvConfig.libpath = config.get("hclib");
  HCNetdisk::netdisk_init(&hcEnvConfig);
  //环境变量初始化
  WORKERS = config.getInt("workers");
  localUrl = config.get("local_url");
  remoteUrl = config.get("remote_url");
  remoteUrl = config.get("client_url");
  //海康设备环境初始化
  Netdisk_EnvConfig hcEnvConfig;
  hcEnvConfig.libpath = "../lib/hc";
  HCNetdisk::netdisk_init(&hcEnvConfig);
  registRequestHandleFun();
  initThreadPool();
  initHeart();
  startServer(localUrl.c_str());
  startServer(config.get("server_url").c_str());
  HCNetdisk::netdisk_deinit();