wangzhengquan
2020-06-11 5c9cbf5ea152f501ae976e0e0b4ef5ee98afdfba
service/netdisk_service.c
@@ -1,174 +1,119 @@
#include "usg_common.h"
#include "netdisk.h"
#include "hcnetdisk.h"
#include "safe_queue.h"
#include "login_store.h"
#include "request_handler.h"
#include "properties_config.h"
#include <jsoncpp/json/json.h>
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
using namespace std;
const char *url = "tcp://127.0.0.1:8899";
Json::Value loginData;
std::map<std::string, Netdisk *> *userDeviceMap;
std::string login_data_file = "../data/login.dat";
int saveLoginInfo(Netdisk_LoginInfo &loginInfo) {
    Json::Value item;
    item["loginUUID"] = loginInfo.loginUUID;
    item["deviceType"] = loginInfo.deviceType;
    item["username"] = loginInfo.username;
    item["password"] = loginInfo.password;
    item["host"] = loginInfo.host;
    item["port"] = loginInfo.port;
    loginData[loginInfo.loginUUID] = item;
int work(Netdisk_DownloadRequest drequest);
    auto str = loginData.toStyledString();
    // std::cout << str << std::endl;
    std::ofstream fout(login_data_file);
    fout << str;
    fout.close();
    return 0;
}
PropertiesConfig config("../data/config.txt");
Netdisk_LoginInfo  getLoginInfo(std::string uuid) {
 Json::Value item = loginData[uuid];
 Netdisk_LoginInfo loginInfo;
 loginInfo.loginUUID = item["loginUUID"].asString();
 loginInfo.deviceType = item["deviceType"].asString();
 loginInfo.username = item["username"].asString();
 loginInfo.password = item["password"].asString();
 loginInfo.host = item["host"].asString();
 loginInfo.port = item["port"].asInt();
 return loginInfo;
}
void
fatal(const char *func, int rv)
int  WORKERS ;
std::string localUrl;
std::string remoteUrl;
SafeQueue<Netdisk_DownloadRequest> task_queue(10);
std::map<std::string, Netdisk *> userDeviceMap;
LoginStore loginStore;
std::map<std::string, RequestHandleFun> requestHandleFunMap;
static inline void fatal(const char *func, int rv)
{
  fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
  exit(1);
  //exit(1);
}
int handleLogin(nng_socket sock, Json::Value request) {
  int rv, code;
  // char *buf;
  Netdisk *netdisk = NULL;
  Json::Value arguments = request["arguments"];
int connectAndSend(const char *url, char * str) {
  nng_socket sock;
  int        rv;
  size_t     sz;
  char *buf;
  //登录
  Netdisk_LoginInfo loginInfo;
  loginInfo.loginUUID=arguments["loginUUID"].asString();
  loginInfo.deviceType = arguments["deviceType"].asString();
  loginInfo.host = arguments["host"].asString();
  loginInfo.port = arguments["port"].asInt();
  loginInfo.username = arguments["username"].asString();
  loginInfo.password = arguments["password"].asString();
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID);
  if( userDeviceIter != userDeviceMap->end() ) {
     netdisk = userDeviceIter->second;
  if ((rv = nng_req0_open(&sock)) != 0) {
    fatal("nng_socket", rv);
  }
  if (netdisk == NULL) {
    printf("=========null\n");
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap->insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
    fatal("nng_dial", rv);
  }
 // printf("CLIENT: SENDING DATE REQUEST\n");
  code = netdisk->login(loginInfo);
  if (code == 0) {
    saveLoginInfo(loginInfo);
    std::cout << "起始通道:" << netdisk->getStartChannel() << ", 最大通道号:" << netdisk->getMaxChannels() << std::endl;
  }
  Json::Value response;
  Json::Value payload;
  payload["loginUUID"] = loginInfo.loginUUID;
  response["code"] = code;
  response["payload"]  = payload;
  const std::string str = response.toStyledString();
  // nng内部会释放buf
  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
  //free(buf);
  if (rv != 0) {
  if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
    fatal("nng_recv", rv);
  }
  std::cout << buf;
  nng_free(buf, sz);
  nng_close(sock);
  return 0;
}
int handleDownloadByTime(nng_socket sock, Json::Value request) {
  int rv, code;
  // char *buf;
void *worker(void *vargp)
{
  pthread_detach(pthread_self());
  while (1)
  {
    Netdisk_DownloadRequest  request;
    task_queue.pop(request);
    work(request);
  }
}
int work(Netdisk_DownloadRequest drequest) {
  Netdisk *netdisk = NULL;
  Json::Value arguments = request["arguments"];
  Netdisk_DownloadRequest drequest;
  drequest.loginUUID = arguments["loginUUID"].asString();
  Json::Value start = arguments["start"];
  drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月
  drequest.start.tm_mday = start["day"].asInt();
  drequest.start.tm_hour = start["hour"].asInt();
  drequest.start.tm_min = start["min"].asInt();
  drequest.start.tm_sec = start["sec"].asInt();
  Json::Value end = arguments["end"];
  drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月
  drequest.end.tm_mday = end["day"].asInt();
  drequest.end.tm_hour = end["hour"].asInt();
  drequest.end.tm_min = end["min"].asInt();
  drequest.end.tm_sec = end["sec"].asInt();
  drequest.channel = arguments["channel"].asInt();
  drequest.destpath = arguments["destpath"].asString();
  std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap->end() ) {
  int rv;
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID);
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  if (netdisk == NULL) {
   
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap->insert({loginInfo.loginUUID, netdisk});
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  if ( (code = netdisk->login(loginInfo)) != 0 ) {
  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
    printf("下载登录失败\n");
  }
  std::vector<std::string> files;
  if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) {
  if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
    printf("下载失败\n");
  }
  Json::Value response;
  Json::Value payload;
  response["code"] = code;
  response["rv"] = rv;
  Json::Value filelist;
  for(std::string f : files) {
@@ -177,31 +122,29 @@
  payload["filelist"] = filelist;
  response["payload"] = payload;
  std::string str = response.toStyledString();
  std::cout << "download finished, call back" << std::endl;
  std::cout << str << std::endl;
  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
  if (rv != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
  return 0;
}
int
server()
{
void startServer(const char *url) {
  Json::Reader jsonreader;
  Json::Value request;
  nng_socket sock;
  int rv;
  RequestHandleFun handleFun;
  if ((rv = nng_rep0_open(&sock)) != 0) {
    fatal("nng_rep0_open", rv);
  }
  if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
    printf("url=%s\n", url);
    fatal("nng_listen", rv);
  }
  for (;;) {
    char *   buf = NULL;
    size_t   sz;
@@ -211,32 +154,55 @@
    jsonreader.parse(buf, request);
    std::string method = request["method"].asString();
    if (method == "login") {
      handleLogin(sock, request);
    } else if (method == "downloadByTime") {
      handleDownloadByTime(sock, request);
    std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
    if( handleFunIter != requestHandleFunMap.end() ) {
      handleFun = handleFunIter->second;
      handleFun(sock, request);
    } else {
      std::cerr << "Don't support " << method << std::endl;
    }
    // if (method == "login") {
    //   handleLogin(sock, request);
    // } else if (method == "downloadByTime") {
    //   handleDownloadByTimeAsync(sock, request);
    // } else {
    //   std::cerr << "Don't support " << method << std::endl;
    // }
    // Unrecognized command, so toss the buffer.
    nng_free(buf, sz);
  }
}
void registRequestHandleFun() {
  requestHandleFunMap.insert({"login", handleLogin});
  requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
}
void initThreadPool() {
  pthread_t tid;
  for (int i = 0; i < WORKERS; i++)
    pthread_create(&tid, NULL, worker, NULL);
}
int main()
{
  Netdisk_EnvConfig config;
  config.libpath = "../hclib/";
  HCNetdisk::netdisk_init(&config);
  Json::Reader jsonreader;
  ifstream fin(login_data_file);
  jsonreader.parse(fin, loginData);
  fin.close();
  Netdisk_EnvConfig hcEnvConfig;
  hcEnvConfig.libpath = config.get("hclib");
  HCNetdisk::netdisk_init(&hcEnvConfig);
  WORKERS = config.getInt("workers");
  localUrl = config.get("local_url");
  remoteUrl = config.get("remote_url");
  userDeviceMap = new std::map<std::string, Netdisk *>();
  server();
  registRequestHandleFun();
  initThreadPool();
  startServer(localUrl.c_str());
  HCNetdisk::netdisk_deinit();