#include "usg_common.h"
|
#include "netdisk.h"
|
#include "safe_queue.h"
|
#include "login_store.h"
|
#include "request_handler.h"
|
#include "properties_config.h"
|
#include "netdisk_factory.h"
|
#include <jsoncpp/json/json.h>
|
#include <nng/nng.h>
|
#include <nng/protocol/reqrep0/rep.h>
|
#include <nng/protocol/reqrep0/req.h>
|
|
using namespace std;
|
|
static int work(Netdisk_DownloadRequest drequest);
|
static int connectAndSend(const char *url, char * str);
|
|
|
int WORKERS ;
|
std::string localUrl;
|
std::string remoteUrl;
|
|
PropertiesConfig config("../data/config.txt");
|
|
SafeQueue<Netdisk_DownloadRequest> task_queue(100);
|
|
std::map<std::string, Netdisk *> userDeviceMap;
|
|
LoginStore loginStore;
|
|
std::map<std::string, RequestHandleFun> requestHandleFunMap;
|
|
|
|
static inline void fatal(const char *func, int rv)
|
{
|
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
|
//exit(1);
|
}
|
|
|
int connectAndSend(const char *url, char * str) {
|
nng_socket sock;
|
int rv;
|
size_t sz;
|
char *buf;
|
|
if ((rv = nng_req0_open(&sock)) != 0) {
|
fatal("nng_socket", rv);
|
}
|
if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
|
fatal("nng_dial", rv);
|
}
|
// printf("CLIENT: SENDING DATE REQUEST\n");
|
|
if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) {
|
fatal("nng_send", rv);
|
}
|
|
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
|
fatal("nng_recv", rv);
|
}
|
std::cout << buf;
|
nng_free(buf, sz);
|
nng_close(sock);
|
return 0;
|
|
}
|
|
int work(Netdisk_DownloadRequest drequest) {
|
Netdisk *netdisk = NULL;
|
std::vector<std::string> files;
|
int rv;
|
char rmsg[MAXLINE];
|
strcpy(rmsg, "success");
|
Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID);
|
std::map<std::string, Netdisk *>::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID);
|
if( userDeviceIter != userDeviceMap.end() ) {
|
netdisk = userDeviceIter->second;
|
}
|
|
if (netdisk == NULL) {
|
|
netdisk = NetdiskFacotory::create(loginInfo.deviceType);
|
if(netdisk != NULL) {
|
userDeviceMap.insert({loginInfo.loginUUID, netdisk});
|
} else {
|
snprintf(rmsg, MAXLINE, "无法识别的设备类型: %s", loginInfo.deviceType.c_str());
|
}
|
}
|
|
if ( (rv = netdisk->login(loginInfo)) != 0 ) {
|
snprintf(rmsg, MAXLINE, "请重新登录");
|
} else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) {
|
snprintf(rmsg, MAXLINE, "下载失败");
|
}
|
|
|
// Json::Value response;
|
// Json::Value payload;
|
// response["code"] = rv;
|
// response["msg"] = rmsg;
|
|
// Json::Value filelist;
|
// for(std::string f : files) {
|
// filelist.append(f);
|
// }
|
// payload["filelist"] = filelist;
|
// response["payload"] = payload;
|
|
Json::Value request;
|
request["method"] = "downloadByTimeCallBack";
|
Json::Value arguments;
|
Json::Value filelist;
|
for(std::string f : files) {
|
filelist.append(f);
|
}
|
arguments["fileList"] = filelist;
|
arguments["loginUUID"] = drequest.loginUUID;
|
|
request["arguments"] = arguments;
|
std::string str = request.toStyledString();
|
|
std::cout << "download finished, call back" << std::endl;
|
std::cout << str << std::endl;
|
connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) );
|
|
return 0;
|
}
|
|
void *worker(void *vargp)
|
{
|
pthread_detach(pthread_self());
|
while (1)
|
{
|
Netdisk_DownloadRequest request;
|
task_queue.pop(request);
|
err_msg(0, "====take a task");
|
work(request);
|
}
|
}
|
|
|
|
void startServer(const char *url) {
|
Json::Reader jsonreader;
|
Json::Value request;
|
nng_socket sock;
|
int rv;
|
|
RequestHandleFun handleFun;
|
if ((rv = nng_rep0_open(&sock)) != 0) {
|
fatal("nng_rep0_open", rv);
|
}
|
if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
|
printf("url=%s\n", url);
|
fatal("nng_listen", rv);
|
}
|
|
for (;;) {
|
char * buf = NULL;
|
size_t sz;
|
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
|
fatal("nng_recv", rv);
|
}
|
|
jsonreader.parse(buf, request);
|
nng_free(buf, sz);
|
std::string method = request["method"].asString();
|
|
std::map<std::string, RequestHandleFun>::iterator handleFunIter = requestHandleFunMap.find(method);
|
if( handleFunIter != requestHandleFunMap.end() ) {
|
handleFun = handleFunIter->second;
|
handleFun(sock, request);
|
} else {
|
std::cerr << "Don't support " << method << std::endl;
|
}
|
|
|
}
|
}
|
|
/**
|
* 注册请求处理的方法
|
*/
|
void registRequestHandleFun() {
|
requestHandleFunMap.insert({"login", handleLogin});
|
requestHandleFunMap.insert({"logout", handleLogout});
|
requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync});
|
requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo});
|
}
|
|
|
void initThreadPool() {
|
pthread_t tid;
|
for (int i = 0; i < WORKERS; i++)
|
pthread_create(&tid, NULL, worker, NULL);
|
}
|
|
int main()
|
{
|
//环境变量初始化
|
WORKERS = config.getInt("workers");
|
localUrl = config.get("local_url");
|
remoteUrl = config.get("remote_url");
|
|
//海康设备环境初始化
|
Netdisk_EnvConfig hcEnvConfig;
|
hcEnvConfig.libpath = config.get("hclib");
|
HCNetdisk::netdisk_init(&hcEnvConfig);
|
|
|
|
registRequestHandleFun();
|
|
initThreadPool();
|
|
startServer(localUrl.c_str());
|
|
|
HCNetdisk::netdisk_deinit();
|
|
|
}
|