wangzhengquan
2020-06-11 5c9cbf5ea152f501ae976e0e0b4ef5ee98afdfba
pdate
1个文件已删除
13个文件已添加
12个文件已修改
1124 ■■■■ 已修改文件
common/include/usg_common.h 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/usg_common.c 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data/config.txt 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
device/hcnetdisk.c 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
device/include/hcnetdisk.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
device/include/netdisk.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
device/test.c 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/Makefile 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/client 补丁 | 查看 | 原始文档 | blame | 历史
service/client.c 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/core 补丁 | 查看 | 原始文档 | blame | 历史
service/login_store.c 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/login_store.h 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/netdisk_service 补丁 | 查看 | 原始文档 | blame | 历史
service/netdisk_service.c 252 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/properties_config.c 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/properties_config.h 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/request_handler.c 193 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/request_handler.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/safe_queue.h 120 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/safe_queue_impl.h 258 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/test 补丁 | 查看 | 原始文档 | blame | 历史
service/test_properties 补丁 | 查看 | 原始文档 | blame | 历史
service/test_properties.c 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
service/test_queue.c 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/include/usg_common.h
@@ -79,6 +79,10 @@
void err_exit(int error, const char *fmt, ...);
void err_msg(int error, const char *fmt, ...);
char *ltrim(char *str, const char *seps);
char *rtrim(char *str, const char *seps);
char *trim(char *str, const char *seps);
static inline int 
itoa(int num, char *str) 
{
@@ -93,7 +97,34 @@
}
#ifdef __cplusplus
}
#endif
#ifdef __cplusplus
// static inline std::string& ltrim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
// {
//     str.erase(0, str.find_first_not_of(chars));
//     return str;
// }
// static inline std::string& rtrim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
// {
//     str.erase(str.find_last_not_of(chars) + 1);
//     return str;
// }
// static inline std::string& trim(std::string& str, const std::string& chars = "\t\n\v\f\r ")
// {
//     return ltrim(rtrim(str, chars), chars);
// }
#endif
#endif
common/usg_common.c
@@ -76,3 +76,41 @@
    fputs(buf, stderr);
    fflush(NULL);        /* flushes all stdio output streams */
}
char *ltrim(char *str, const char *seps)
{
    size_t totrim;
    if (seps == NULL) {
        seps = "\t\n\v\f\r ";
    }
    totrim = strspn(str, seps);
    if (totrim > 0) {
        size_t len = strlen(str);
        if (totrim == len) {
            str[0] = '\0';
        }
        else {
            memmove(str, str + totrim, len + 1 - totrim);
        }
    }
    return str;
}
char *rtrim(char *str, const char *seps)
{
    int i;
    if (seps == NULL) {
        seps = "\t\n\v\f\r ";
    }
    i = strlen(str) - 1;
    while (i >= 0 && strchr(seps, str[i]) != NULL) {
        str[i] = '\0';
        i--;
    }
    return str;
}
char *trim(char *str, const char *seps)
{
    return ltrim(rtrim(str, seps), seps);
}
data/config.txt
New file
@@ -0,0 +1,11 @@
# nng本地服务地址
local_url=tcp://127.0.0.1:8899
# nng远程调用地址地址
remote_url=tcp://127.0.0.1:9988
# 海康包路径
hclib=../hclib/
# 负责下载任务的线程池的数量
workers=4
device/hcnetdisk.c
@@ -1,5 +1,4 @@
#include "netdisk.h"
#include "hcnetdisk.h"
 
bool HCNetdisk::envInited = false;
device/include/hcnetdisk.h
@@ -4,7 +4,6 @@
#include "usg_common.h" 
#include "usg_typedef.h" 
#include "HCNetSDK.h"
#include "netdisk.h"
//海康网络硬盘
class HCNetdisk : public Netdisk{
device/include/netdisk.h
@@ -73,5 +73,5 @@
};
#include "hcnetdisk.h"
#endif
device/test.c
@@ -1,6 +1,5 @@
#include "usg_common.h"
#include "netdisk.h"
#include "hcnetdisk.h"
service/Makefile
@@ -14,9 +14,11 @@
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
all: netdisk_service client test
all: netdisk_service client test test_queue test_properties
netdisk_service: $(ROOT)/device/hcnetdisk.c
test_properties: properties_config.c
netdisk_service: $(ROOT)/device/hcnetdisk.c login_store.c request_handler.c properties_config.c
test: $(ROOT)/device/hcnetdisk.c
service/client
Binary files differ
service/client.c
@@ -6,6 +6,7 @@
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
const char *url = "tcp://127.0.0.1:8899";
const char *localUrl = "tcp://127.0.0.1:9988";
void
fatal(const char *func, int rv)
{
@@ -30,7 +31,7 @@
    std::string str = request.toStyledString();
    if ((rv = nng_send(sock, strdupa(str.c_str()), str.length(), 0)) != 0) {
    if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), 0)) != 0) {
        fatal("nng_send", rv);
    }
    if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
@@ -119,10 +120,58 @@
}
int
server()
{
    nng_socket sock;
    int        rv;
    if ((rv = nng_rep0_open(&sock)) != 0) {
        fatal("nng_rep0_open", rv);
    }
    if ((rv = nng_listen(sock, localUrl, NULL, 0)) != 0) {
        fatal("nng_listen", rv);
    }
    for (;;) {
        char *   buf = NULL;
        size_t   sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
            fatal("nng_recv", rv);
        }
        std::cout << buf << std::endl;
        Json::Value response;
        response["code"] = 0;
        std::string str = response.toStyledString();
        rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
        if (rv != 0) {
            fatal("nng_send", rv);
        }
        // if ((sz == sizeof(uint64_t)) &&
        //     ((GET64(buf, val)) == DATECMD)) {
        //     time_t now;
        //     printf("SERVER: RECEIVED DATE REQUEST\n");
        //     now = time(&now);
        //     printf("SERVER: SENDING DATE: ");
        //     showdate(now);
        //     // Reuse the buffer.  We know it is big enough.
        //     PUT64(buf, (uint64_t) now);
        //     continue;
        // }
        // Unrecognized command, so toss the buffer.
        //nng_free(buf, sz);
    }
}
int
main(const int argc, const char **argv)
{
    if ((argc > 1))
        return (client(argv[1]));
    if ((argc > 1)) {
      client(argv[1]);
      server();
    }
    // std::string str("123");
    // char *str2="123";
    // printf("str length  %d, %d\n", str.length(), strlen(str2));
service/core
Binary files differ
service/login_store.c
New file
@@ -0,0 +1,45 @@
#include "login_store.h"
LoginStore::LoginStore() : login_data_file("../data/login.dat") {
  Json::Reader jsonreader;
  std::ifstream fin(login_data_file);
  jsonreader.parse(fin, loginData);
  fin.close();
}
int LoginStore::saveLoginInfo(Netdisk_LoginInfo &loginInfo) {
    Json::Value item;
    item["loginUUID"] = loginInfo.loginUUID;
    item["deviceType"] = loginInfo.deviceType;
    item["username"] = loginInfo.username;
    item["password"] = loginInfo.password;
    item["host"] = loginInfo.host;
    item["port"] = loginInfo.port;
    loginData[loginInfo.loginUUID] = item;
    auto str = loginData.toStyledString();
    // std::cout << str << std::endl;
    std::ofstream fout(login_data_file);
    fout << str;
    fout.close();
    return 0;
}
Netdisk_LoginInfo  LoginStore::getLoginInfo(std::string uuid) {
     Json::Value item = loginData[uuid];
     Netdisk_LoginInfo loginInfo;
     loginInfo.loginUUID = item["loginUUID"].asString();
     loginInfo.deviceType = item["deviceType"].asString();
     loginInfo.username = item["username"].asString();
     loginInfo.password = item["password"].asString();
     loginInfo.host = item["host"].asString();
     loginInfo.port = item["port"].asInt();
     return loginInfo;
}
service/login_store.h
New file
@@ -0,0 +1,16 @@
#ifndef _LOGINSTORE_H_
#define _LOGINSTORE_H_
#include "usg_common.h"
#include "netdisk.h"
#include <jsoncpp/json/json.h>
class LoginStore {
    Json::Value loginData;
    std::string login_data_file;
public:
    LoginStore();
    int saveLoginInfo(Netdisk_LoginInfo &loginInfo);
    Netdisk_LoginInfo  getLoginInfo(std::string uuid);
};
#endif
service/netdisk_service
Binary files differ
service/netdisk_service.c
@@ -1,174 +1,119 @@
#include "usg_common.h"
#include "netdisk.h"
#include "hcnetdisk.h"
#include "safe_queue.h"
#include "login_store.h"
#include "request_handler.h"
#include "properties_config.h"
#include <jsoncpp/json/json.h>
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
using namespace std;
const char *url = "tcp://127.0.0.1:8899";
Json::Value loginData;
std::map<std::string, Netdisk *> *userDeviceMap;
std::string login_data_file = "../data/login.dat";
int saveLoginInfo(Netdisk_LoginInfo &loginInfo) {
    Json::Value item;
    item["loginUUID"] = loginInfo.loginUUID;
    item["deviceType"] = loginInfo.deviceType;
    item["username"] = loginInfo.username;
    item["password"] = loginInfo.password;
    item["host"] = loginInfo.host;
    item["port"] = loginInfo.port;
    loginData[loginInfo.loginUUID] = item;
 
    auto str = loginData.toStyledString();
    // std::cout << str << std::endl;
int work(Netdisk_DownloadRequest drequest);
    std::ofstream fout(login_data_file);
    fout << str;
    fout.close();
    return 0;
}
Netdisk_LoginInfo  getLoginInfo(std::string uuid) {
 Json::Value item = loginData[uuid];
 Netdisk_LoginInfo loginInfo;
 loginInfo.loginUUID = item["loginUUID"].asString();
 loginInfo.deviceType = item["deviceType"].asString();
 loginInfo.username = item["username"].asString();
 loginInfo.password = item["password"].asString();
 loginInfo.host = item["host"].asString();
 loginInfo.port = item["port"].asInt();
 return loginInfo;
}
PropertiesConfig config("../data/config.txt");
void
fatal(const char *func, int rv)
int  WORKERS ;
std::string localUrl;
std::string remoteUrl;
SafeQueue<Netdisk_DownloadRequest> task_queue(10);
std::map<std::string, Netdisk *> userDeviceMap;
LoginStore loginStore;
std::map<std::string, RequestHandleFun> requestHandleFunMap;
static inline void fatal(const char *func, int rv)
{
  fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
  exit(1);
  //exit(1);
}
 
int handleLogin(nng_socket sock, Json::Value request) {
  int rv, code;
  // char *buf;
  Netdisk *netdisk = NULL;
int connectAndSend(const char *url, char * str) {
  nng_socket sock;
  int        rv;
  size_t     sz;
  char *buf;
  Json::Value arguments = request["arguments"];
  //登录
  Netdisk_LoginInfo loginInfo;
  loginInfo.loginUUID=arguments["loginUUID"].asString();
  loginInfo.deviceType = arguments["deviceType"].asString();
  loginInfo.host = arguments["host"].asString();
  loginInfo.port = arguments["port"].asInt();
  loginInfo.username = arguments["username"].asString();
  loginInfo.password = arguments["password"].asString();
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find(loginInfo.loginUUID);
  if( userDeviceIter != userDeviceMap->end() ) {
     netdisk = userDeviceIter->second;
  if ((rv = nng_req0_open(&sock)) != 0) {
    fatal("nng_socket", rv);
  }
  if (netdisk == NULL) {
    printf("=========null\n");
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap->insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
  if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
    fatal("nng_dial", rv);
    }
  }
 // printf("CLIENT: SENDING DATE REQUEST\n");
  code = netdisk->login(loginInfo);
  if (code == 0) {
    saveLoginInfo(loginInfo);
    std::cout << "起始通道:" << netdisk->getStartChannel() << ", 最大通道号:" << netdisk->getMaxChannels() << std::endl;
  }
  Json::Value response;
  Json::Value payload;
  payload["loginUUID"] = loginInfo.loginUUID;
  response["code"] = code;
  response["payload"]  = payload;
  const std::string str = response.toStyledString();
  // nng内部会释放buf
  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
  //free(buf);
  if (rv != 0) {
  if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
    fatal("nng_recv", rv);
  }
  std::cout << buf;
  nng_free(buf, sz);
  nng_close(sock);
  return 0;
}
    
int handleDownloadByTime(nng_socket sock, Json::Value request) {
  int rv, code;
  // char *buf;
void *worker(void *vargp)
{
  pthread_detach(pthread_self());
  while (1)
  {
    Netdisk_DownloadRequest  request;
    task_queue.pop(request);
    work(request);
  }
}
int work(Netdisk_DownloadRequest drequest) {
  Netdisk *netdisk = NULL;
  Json::Value arguments = request["arguments"];
  Netdisk_DownloadRequest drequest;
  drequest.loginUUID = arguments["loginUUID"].asString();
  Json::Value start = arguments["start"];
  drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月
  drequest.start.tm_mday = start["day"].asInt();
  drequest.start.tm_hour = start["hour"].asInt();
  drequest.start.tm_min = start["min"].asInt();
  drequest.start.tm_sec = start["sec"].asInt();
  Json::Value end = arguments["end"];
  drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月
  drequest.end.tm_mday = end["day"].asInt();
  drequest.end.tm_hour = end["hour"].asInt();
  drequest.end.tm_min = end["min"].asInt();
  drequest.end.tm_sec = end["sec"].asInt();
  drequest.channel = arguments["channel"].asInt();
  drequest.destpath = arguments["destpath"].asString();
  std::map<string, Netdisk *>::iterator userDeviceIter = userDeviceMap->find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap->end() ) {
  int rv;
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  Netdisk_LoginInfo loginInfo = getLoginInfo(drequest.loginUUID);
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  if (netdisk == NULL) {
   
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap->insert({loginInfo.loginUUID, netdisk});
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  if ( (code = netdisk->login(loginInfo)) != 0 ) {
  if ( (rv = netdisk->login(loginInfo)) != 0 ) {
    printf("下载登录失败\n");
  }
  std::vector<std::string> files;
  if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) {
  if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
    printf("下载失败\n");
  }
  Json::Value response;
  Json::Value payload;
  response["code"] = code;
  response["rv"] = rv;
  Json::Value filelist;
  for(std::string f : files) {
@@ -177,31 +122,29 @@
  payload["filelist"] = filelist;
  response["payload"] = payload;
  std::string str = response.toStyledString();
  std::cout << "download finished, call back" << std::endl;
  std::cout << str << std::endl;
  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
  if (rv != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
  return 0;
}
int
server()
{
void startServer(const char *url) {
  Json::Reader jsonreader;
  Json::Value request;
  nng_socket sock;
  int rv;
  RequestHandleFun handleFun;
  if ((rv = nng_rep0_open(&sock)) != 0) {
    fatal("nng_rep0_open", rv);
  }
  if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
    printf("url=%s\n", url);
    fatal("nng_listen", rv);
  }
  for (;;) {
    char *   buf = NULL;
    size_t   sz;
@@ -211,32 +154,55 @@
    jsonreader.parse(buf, request);
    std::string method = request["method"].asString();
    if (method == "login") {
      handleLogin(sock, request);
    } else if (method == "downloadByTime") {
      handleDownloadByTime(sock, request);
    std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
    if( handleFunIter != requestHandleFunMap.end() ) {
      handleFun = handleFunIter->second;
      handleFun(sock, request);
    } else {
      std::cerr << "Don't support " << method << std::endl;
    }
    // if (method == "login") {
    //   handleLogin(sock, request);
    // } else if (method == "downloadByTime") {
    //   handleDownloadByTimeAsync(sock, request);
    // } else {
    //   std::cerr << "Don't support " << method << std::endl;
    // }
    // Unrecognized command, so toss the buffer.
    nng_free(buf, sz);
  }
}
void registRequestHandleFun() {
  requestHandleFunMap.insert({"login", handleLogin});
  requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
}
void initThreadPool() {
  pthread_t tid;
  for (int i = 0; i < WORKERS; i++)
    pthread_create(&tid, NULL, worker, NULL);
}
int main()
{
  Netdisk_EnvConfig config;
  config.libpath = "../hclib/";
  HCNetdisk::netdisk_init(&config);
  Json::Reader jsonreader;
  ifstream fin(login_data_file);
  jsonreader.parse(fin, loginData);
  fin.close();
  Netdisk_EnvConfig hcEnvConfig;
  hcEnvConfig.libpath = config.get("hclib");
  HCNetdisk::netdisk_init(&hcEnvConfig);
  WORKERS = config.getInt("workers");
  localUrl = config.get("local_url");
  remoteUrl = config.get("remote_url");
  userDeviceMap = new std::map<std::string, Netdisk *>();
  server();
  registRequestHandleFun();
  initThreadPool();
  startServer(localUrl.c_str());
  HCNetdisk::netdisk_deinit();
service/properties_config.c
New file
@@ -0,0 +1,42 @@
#include "properties_config.h"
PropertiesConfig::PropertiesConfig(std::string __propertiesFile) : propertiesFile(__propertiesFile) {
    std::ifstream fin(propertiesFile);
    char line[1024];
    //std::string line;
    char *key, *value;
    const char *delim = "=";
    while(fin.getline(line, 1024)) {
        // printf("line=%s\n", line);
        if(strlen(trim(line, NULL))== 0)
            continue;
        if(*line == '#') {
            continue;
        }
        key = trim(strtok(line, delim), 0);
        value = trim(strtok(NULL, delim), 0);
        propertiesMap.insert({key, value});
        // printf("key = %s, value=%s\n", key, value);
    }
    fin.close();
}
std::string PropertiesConfig::get(std::string name) {
  std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name);
  if( propertiesIter != propertiesMap.end() ) {
     return propertiesIter->second;
  }
  return "";
}
int PropertiesConfig::getInt(std::string name) {
  std::map<std::string, std::string>::iterator propertiesIter = propertiesMap.find(name);
  if( propertiesIter != propertiesMap.end() ) {
     return  std::stoi(propertiesIter->second);
  }
  return 0;
}
service/properties_config.h
New file
@@ -0,0 +1,11 @@
#include "usg_common.h"
class PropertiesConfig {
    std::string propertiesFile;
    std::map<std::string, std::string> propertiesMap;
public:
    PropertiesConfig(std::string _propertiesFile="./config.properties");
    std::string get(std::string name);
    int getInt(std::string name);
};
service/request_handler.c
New file
@@ -0,0 +1,193 @@
#include "request_handler.h"
extern SafeQueue<Netdisk_DownloadRequest> task_queue;
extern std::map<std::string, Netdisk *> userDeviceMap;
extern LoginStore loginStore;
static inline void fatal(const char *func, int rv)
{
  fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
  //exit(1);
}
int handleLogin(nng_socket sock, Json::Value request) {
std::cout << "accepted login request" << std::endl;
  int rv, code;
  // char *buf;
  Netdisk *netdisk = NULL;
  Json::Value arguments = request["arguments"];
  //登录
  Netdisk_LoginInfo loginInfo;
  loginInfo.loginUUID=arguments["loginUUID"].asString();
  loginInfo.deviceType = arguments["deviceType"].asString();
  loginInfo.host = arguments["host"].asString();
  loginInfo.port = arguments["port"].asInt();
  loginInfo.username = arguments["username"].asString();
  loginInfo.password = arguments["password"].asString();
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find(loginInfo.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  if (netdisk == NULL) {
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  code = netdisk->login(loginInfo);
  if (code == 0) {
    loginStore.saveLoginInfo(loginInfo);
    std::cout << "起始通道:" << netdisk->getStartChannel() << ", 最大通道号:" << netdisk->getMaxChannels() << std::endl;
  }
  Json::Value response;
  Json::Value payload;
  payload["loginUUID"] = loginInfo.loginUUID;
  response["code"] = code;
  response["payload"]  = payload;
  const std::string str = response.toStyledString();
  // nng内部会释放buf
  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
  //free(buf);
  if (rv != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  return 0;
}
int handleDownloadByTimeAsync(nng_socket sock, Json::Value request) {
std::cout << "accepted handleDownloadByTime request" << std::endl;
  int rv;
  // char *buf;
  Json::Value arguments = request["arguments"];
  Netdisk_DownloadRequest drequest;
  drequest.loginUUID = arguments["loginUUID"].asString();
  Json::Value start = arguments["start"];
  drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月
  drequest.start.tm_mday = start["day"].asInt();
  drequest.start.tm_hour = start["hour"].asInt();
  drequest.start.tm_min = start["min"].asInt();
  drequest.start.tm_sec = start["sec"].asInt();
  Json::Value end = arguments["end"];
  drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月
  drequest.end.tm_mday = end["day"].asInt();
  drequest.end.tm_hour = end["hour"].asInt();
  drequest.end.tm_min = end["min"].asInt();
  drequest.end.tm_sec = end["sec"].asInt();
  drequest.channel = arguments["channel"].asInt();
  drequest.destpath = arguments["destpath"].asString();
  task_queue.push(drequest);
  Json::Value response;
  response["code"] = 0;
  std::string str = response.toStyledString();
  std::cout << str << std::endl;
  if ((rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC)) != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  return 0;
}
int handleDownloadByTime(nng_socket sock, Json::Value request) {
std::cout << "accepted handleDownloadByTime request" << std::endl;
  int rv, code;
  // char *buf;
  Netdisk *netdisk = NULL;
  Json::Value arguments = request["arguments"];
  Netdisk_DownloadRequest drequest;
  drequest.loginUUID = arguments["loginUUID"].asString();
  Json::Value start = arguments["start"];
  drequest.start.tm_year = start["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.start.tm_mon = start["mon"].asInt()-1; // 0是第一个月
  drequest.start.tm_mday = start["day"].asInt();
  drequest.start.tm_hour = start["hour"].asInt();
  drequest.start.tm_min = start["min"].asInt();
  drequest.start.tm_sec = start["sec"].asInt();
  Json::Value end = arguments["end"];
  drequest.end.tm_year = end["year"].asInt()-1900; // 这个时间类型从1900开始算作第一年
  drequest.end.tm_mon = end["mon"].asInt()-1; // 0是第一个月
  drequest.end.tm_mday = end["day"].asInt();
  drequest.end.tm_hour = end["hour"].asInt();
  drequest.end.tm_min = end["min"].asInt();
  drequest.end.tm_sec = end["sec"].asInt();
  drequest.channel = arguments["channel"].asInt();
  drequest.destpath = arguments["destpath"].asString();
  std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
  if( userDeviceIter != userDeviceMap.end() ) {
     netdisk = userDeviceIter->second;
  }
  Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
  if (netdisk == NULL) {
    if(loginInfo.deviceType == "HC") {
      // std::cout << "new HCNetdisk" << std::endl;
      netdisk = new HCNetdisk();
      userDeviceMap.insert({loginInfo.loginUUID, netdisk});
    } else {
      err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
    }
  }
  if ( (code = netdisk->login(loginInfo)) != 0 ) {
    printf("下载登录失败\n");
  }
  std::vector<std::string> files;
  if ( (code = netdisk->downloadByTime(drequest, &files) ) != 0) {
    printf("下载失败\n");
  }
  Json::Value response;
  Json::Value payload;
  response["code"] = code;
  Json::Value filelist;
  for(std::string f : files) {
    filelist.append(f);
  }
  payload["filelist"] = filelist;
  response["payload"] = payload;
  std::string str = response.toStyledString();
  std::cout << str << std::endl;
  rv = nng_send(sock, strdup(str.c_str()), str.length(), NNG_FLAG_ALLOC);
  if (rv != 0) {
    fatal("nng_send", rv);
    return rv;
  }
  return 0;
}
service/request_handler.h
New file
@@ -0,0 +1,15 @@
#include "usg_common.h"
#include "netdisk.h"
#include "safe_queue.h"
#include "login_store.h"
#include <jsoncpp/json/json.h>
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
typedef int (*RequestHandleFun)(nng_socket sock, Json::Value request);
int handleLogin(nng_socket sock, Json::Value request);
int handleDownloadByTime(nng_socket sock, Json::Value request);
int handleDownloadByTimeAsync(nng_socket sock, Json::Value request);
service/safe_queue.h
New file
@@ -0,0 +1,120 @@
#ifndef _SAFEQUEUE_H_
#define _SAFEQUEUE_H_
#include <queue>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <limits> // std::numeric_limits<>::max
#define SAFE_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max()
/// @brief thread-safe queue
/// It uses a mutex+condition variables to protect the internal queue
/// implementation. Inserting or reading elements use the same mutex
template <typename T>
class SafeQueue
{
public:
    /// @brief constructor
    /// @param a_maxSize optional parameter with the maximum size of the queue
    SafeQueue(std::size_t a_maxSize = SAFE_QUEUE_DEFAULT_MAX_SIZE);
    /// @brief destructor
    ~SafeQueue();
    /// @brief copy contructor
    /// WARNING: Use with great care, this function call can take a long time
    /// and block other threads from pushing/popping elements into the source
    /// queue
    SafeQueue(const SafeQueue<T>& a_src);
    /// @brief operator= overloading
    /// This function blocks the a_src and "this" SafeQueues and copies the
    /// contents of a_src into "this" SafeQueue.
    /// WARNING: Use with great care, this function call can take a long time
    /// and block other threads from pushing/popping elements into the queues
    /// @param a_src the "right" side of the operator=
    /// @return a const reference to this object
    const SafeQueue<T>& operator=(const SafeQueue<T> &a_src);
    /// @brief move contructor
    SafeQueue(SafeQueue<T>&& a_src);
    /// @brief move assignment
    SafeQueue<T>& operator=(SafeQueue<T>&& a_src);
    /// @brief Check if the queue is empty
    /// This call can block if another thread owns the lock that protects the
    /// queue
    /// @return true if the queue is empty. False otherwise
    bool isEmpty() const;
    /// @brief inserts an element into queue queue
    /// This call can block if another thread owns the lock that protects the
    /// queue. If the queue is full The thread will be blocked in this queue
    /// until someone else gets an element from the queue
    /// @param element to insert into the queue
    void push(const T &a_elem);
    /// @brief inserts an element into queue queue
    /// This call can block if another thread owns the lock that protects the
    /// queue. If the queue is full The call will return false and the element
    /// won't be inserted
    /// @param element to insert into the queue
    /// @return True if the elem was successfully inserted into the queue.
    ///         False otherwise
    bool tryPush(const T &a_elem);
    /// @brief extracts an element from the queue (and deletes it from the q)
    /// If the queue is empty this call will block the thread until there is
    /// something in the queue to be extracted
    /// @param a reference where the element from the queue will be saved to
    void pop(T &out_data);
    /// @brief extracts an element from the queue (and deletes it from the q)
    /// This call gets the block that protects the queue. It will extract the
    /// element from the queue only if there are elements in it
    /// @param reference to the variable where the result will be saved
    /// @return True if the element was retrieved from the queue.
    ///         False if the queue was empty
    bool tryPop(T &out_data);
    /// @brief extracts an element from the queue (and deletes it from the q)
    /// If the queue is empty this call will block the thread until there
    /// is something in the queue to be extracted or until the timer
    /// (2nd parameter) expires
    /// @param reference to the variable where the result will be saved
    /// @param duration to wait before returning if the queue was empty
    ///        you may also pass into this a std::seconds or std::milliseconds
    ///        (defined in std::chrono)
    /// @return True if the element was retrieved from the queue.
    ///         False if the timeout was hit and nothing could be extracted
    ///         from the queue
    bool timedWaitPop(T &data, std::chrono::microseconds a_microsecs);
protected:
    /// the actual queue data structure protected by this SafeQueue wrapper
    std::queue<T> m_theQueue;
    /// maximum number of elements for the queue
    std::size_t m_maximumSize;
    /// Mutex to protect the queue
    mutable std::mutex m_mutex;
    /// Conditional variable to wake up threads
    mutable std::condition_variable m_cond;
    /// @brief calculate if copying a_src into this instance will need to
    ///        wake up potential threads waiting to perform push or pop ops.
    /// WARNING: It assumes the caller holds all mutexes required to access
    /// to the data of this and a_src SafeQueues
    /// @param a_src const reference to the SafeQueue that will be copied
    ///        into this object
    /// @return true if threads will need to be waken up. False otherwise
    inline bool wakeUpSignalNeeded(const SafeQueue<T> &a_src) const;
};
// include the implementation file
#include "safe_queue_impl.h"
#endif /* _SAFEQUEUE_H_ */
service/safe_queue_impl.h
New file
@@ -0,0 +1,258 @@
#ifndef _SAFEQUEUEIMPL_H_
#define _SAFEQUEUEIMPL_H_
template <typename T>
SafeQueue<T>::SafeQueue(std::size_t a_maxSize):
    m_theQueue(),
    m_maximumSize(a_maxSize),
    m_mutex(),
    m_cond()
{
}
template <typename T>
SafeQueue<T>::~SafeQueue()
{
}
template <typename T>
SafeQueue<T>::SafeQueue(const SafeQueue<T>& a_src):
    m_theQueue(),
    m_maximumSize(0),
    m_mutex(),
    m_cond()
{
    // copying a safe queue involves only copying the data (m_theQueue and
    // m_maximumSize). This object has not been instantiated yet so nobody can
    // be trying to perform push or pop operations on it, but we need to
    // acquire a_src.m_mutex before copying its data into m_theQueue and
    // m_maximumSize
    std::unique_lock<std::mutex> lk(a_src.m_mutex);
    this->m_maximumSize = a_src.m_maximumSize;
    this->m_theQueue = a_src.m_theQueue;
}
template <typename T>
const SafeQueue<T>& SafeQueue<T>::operator=(const SafeQueue<T> &a_src)
{
    if (this != &a_src)
    {
        // lock both mutexes at the same time to avoid deadlocks
        std::unique_lock<std::mutex> this_lk(this->m_mutex, std::defer_lock);
        std::unique_lock<std::mutex> src_lk (a_src.m_mutex, std::defer_lock);
        std::lock(this_lk, src_lk);
        // will we need to wake up waiting threads after copying the source
        // queue?
        bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src);
        // copy data from the left side of the operator= into this intance
        this->m_maximumSize = a_src.m_maximumSize;
        this->m_theQueue = a_src.m_theQueue;
        // time now to wake up threads waiting for data to be inserted
        // or extracted
        if (wakeUpWaitingThreads)
        {
            this->m_cond.notify_all();
        }
    }
    return *this;
}
template <typename T>
SafeQueue<T>::SafeQueue(SafeQueue<T>&& a_src):
    m_theQueue(a_src.m_theQueue),       // implicit std::move(a_src.m_theQueue)
    m_maximumSize(a_src.m_maximumSize), // move constructor called implicitly
    m_mutex(), // instantiate a new mutex
    m_cond()   // instantiate a new conditional variable
{
    // This object has not been instantiated yet. We can assume no one is using
    // its mutex.
    // Also, a_src is a temporary object so there is no need to acquire
    // its mutex.
    // Things can therefore be safely moved without the need for any mutex or
    // conditional variable
}
template <typename T>
SafeQueue<T>& SafeQueue<T>::operator=(SafeQueue<T> &&a_src)
{
    if (this != &a_src)
    {
        // make sure we hold this mutex before moving things around. a_src is
        // a temporary object so no need to hold its mutex
        std::unique_lock<std::mutex> lk(this->m_mutex);
        // will we need to wake up waiting threads after copying the source
        // queue?
        bool wakeUpWaitingThreads = WakeUpSignalNeeded(a_src);
        // process data from the temporary copy into this intance
        this->m_maximumSize = std::move(a_src.m_maximumSize);
        this->m_theQueue = std::move(a_src.m_theQueue);
        // time now to wake up threads waiting for data to be inserted
        // or extracted
        if (wakeUpWaitingThreads)
        {
            this->m_cond.notify_all();
        }
    }
    return *this;
}
template <typename T>
bool SafeQueue<T>::wakeUpSignalNeeded(const SafeQueue<T> &a_src) const
{
    if (this->m_theQueue.empty() && (!a_src.m_theQueue.empty()))
    {
        // threads waiting for stuff to be popped off the queue
        return true;
    }
    else if ((this->m_theQueue.size() >= this->m_maximumSize) &&
             (a_src.m_theQueue.size() < a_src.m_maximumSize))
    {
        // threads waiting for stuff to be pushed into the queue
        return true;
    }
    return false;
}
template <typename T>
bool SafeQueue<T>::isEmpty() const
{
    std::lock_guard<std::mutex> lk(m_mutex);
    return m_theQueue.empty();
}
template <typename T>
void SafeQueue<T>::push(const T &a_elem)
{
    std::unique_lock<std::mutex> lk(m_mutex);
    while (m_theQueue.size() >= m_maximumSize)
    {
        m_cond.wait(lk);
    }
    bool queueEmpty = m_theQueue.empty();
    m_theQueue.push(a_elem);
    if (queueEmpty)
    {
        // wake up threads waiting for stuff
        m_cond.notify_all();
    }
}
template <typename T>
bool SafeQueue<T>::tryPush(const T &a_elem)
{
    std::lock_guard<std::mutex> lk(m_mutex);
    bool rv = false;
    bool queueEmpty = m_theQueue.empty();
    if (m_theQueue.size() < m_maximumSize)
    {
        m_theQueue.push(a_elem);
        rv = true;
    }
    if (queueEmpty)
    {
        // wake up threads waiting for stuff
        m_cond.notify_all();
    }
    return rv;
}
template <typename T>
void SafeQueue<T>::pop(T &out_data)
{
    std::unique_lock<std::mutex> lk(m_mutex);
    while (m_theQueue.empty())
    {
        m_cond.wait(lk);
    }
    bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
    out_data = m_theQueue.front();
    m_theQueue.pop();
    if (queueFull)
    {
        // wake up threads waiting for stuff
        m_cond.notify_all();
    }
}
template <typename T>
bool SafeQueue<T>::tryPop(T &out_data)
{
    std::lock_guard<std::mutex> lk(m_mutex);
    bool rv = false;
    if (!m_theQueue.empty())
    {
        bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
        out_data = m_theQueue.front();
        m_theQueue.pop();
        if (queueFull)
        {
            // wake up threads waiting for stuff
            m_cond.notify_all();
        }
        rv = true;
    }
    return rv;
}
template <typename T>
bool SafeQueue<T>::timedWaitPop(T &data, std::chrono::microseconds a_microsecs)
{
    std::unique_lock<std::mutex> lk(m_mutex);
    auto wakeUpTime = std::chrono::steady_clock::now() + a_microsecs;
    if (m_cond.wait_until(lk, wakeUpTime,
        [this](){return (m_theQueue.size() > 0);}))
    {
        // wait_until returns false if the predicate (3rd parameter) still
        // evaluates to false after the rel_time timeout expired
        // we are in this side of the if-clause because the queue is not empty
        // (so the 3rd parameter evaluated to true)
        bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
        data = m_theQueue.front();
        m_theQueue.pop();
        if (queueFull)
        {
            // wake up threads waiting to insert things into the queue.
            // The queue used to be full, now it's not.
            m_cond.notify_all();
        }
        return true;
    }
    else
    {
        // timed-out and the queue is still empty
        return false;
    }
}
#endif /* _SAFEQUEUEIMPL_H_ */
service/test
Binary files differ
service/test_properties
Binary files differ
service/test_properties.c
New file
@@ -0,0 +1,8 @@
#include "usg_common.h"
#include "properties_config.h"
int main() {
    PropertiesConfig config("../data/config.txt");
    std::cout << config.get("local_url") << std::endl;
}
service/test_queue
Binary files differ
service/test_queue.c
New file
@@ -0,0 +1,18 @@
#include "usg_common.h"
#include "safe_queue.h"
#define QUEUE_SIZE 10
using namespace std;
int main() {
    SafeQueue<int> m_queue(10);
    m_queue.push(1);
    m_queue.push(2);
    int a;
    m_queue.pop(a);
    cout << a << endl;
    m_queue.pop(a);
    cout << a << endl;
}