#include "usg_common.h" #include "netdisk.h" #include "safe_queue.h" #include "login_store.h" #include "request_handler.h" #include "properties_config.h" #include "netdisk_factory.h" #include #include #include #include #include #include using namespace std; static int work(Netdisk_DownloadRequest drequest); static int connectAndSend(const char *url, char * str); int WORKERS ; std::string remoteUrl; PropertiesConfig config("../data/config.txt"); SafeQueue task_queue(100); std::map userDeviceMap; LoginStore loginStore; std::map requestHandleFunMap; static inline void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); //exit(1); } int connectAndSend(const char *url, char * str) { nng_socket sock; int rv; size_t sz; char *buf; if ((rv = nng_req0_open(&sock)) != 0) { fatal("nng_socket", rv); } if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { fatal("nng_dial", rv); } // printf("CLIENT: SENDING DATE REQUEST\n"); if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) { fatal("nng_send", rv); } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } std::cout << buf; nng_free(buf, sz); nng_close(sock); return 0; } int work(Netdisk_DownloadRequest drequest) { Netdisk *netdisk = NULL; std::vector files; int rv; char rmsg[MAXLINE]; strcpy(rmsg, "success"); Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); std::map::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } if (netdisk == NULL) { netdisk = NetdiskFacotory::create(loginInfo.deviceType); if(netdisk != NULL) { userDeviceMap.insert({loginInfo.loginUUID, netdisk}); } else { snprintf(rmsg, MAXLINE, "无法识别的设备类型: %s", loginInfo.deviceType.c_str()); } } if ( (rv = netdisk->login(loginInfo)) != 0 ) { snprintf(rmsg, MAXLINE, "请重新登录"); } else if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { snprintf(rmsg, MAXLINE, "下载失败"); } Json::Value request; request["method"] = "downloadByTimeCallBack"; Json::Value arguments; Json::Value filelist; for(std::string f : files) { filelist.append(f); } arguments["fileList"] = filelist; arguments["loginUUID"] = drequest.loginUUID; request["arguments"] = arguments; std::string str = request.toStyledString(); std::cout << "SENDING download finished\n" << str << std::endl; connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); return 0; } void *worker(void *vargp) { pthread_detach(pthread_self()); while (1) { Netdisk_DownloadRequest request; task_queue.pop(request); err_msg(0, "====take a task"); work(request); } } void startServer(const char *url) { Json::Reader jsonreader; Json::Value request; nng_socket sock; int rv; RequestHandleFun handleFun; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { printf("url=%s\n", url); fatal("nng_listen", rv); } for (;;) { char * buf = NULL; size_t sz; if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } printf("RECEIVED RPC REQUEST:\n %s", buf); jsonreader.parse(buf, request); nng_free(buf, sz); std::string method = request["method"].asString(); std::map::iterator handleFunIter = requestHandleFunMap.find(method); if( handleFunIter != requestHandleFunMap.end() ) { handleFun = handleFunIter->second; handleFun(sock, request); } else { std::cerr << "Don't support " << method << std::endl; } } } /** * 注册请求处理的方法 */ void registRequestHandleFun() { requestHandleFunMap.insert({"login", handleLogin}); requestHandleFunMap.insert({"logout", handleLogout}); requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync}); requestHandleFunMap.insert({"getDeviceInfo", handleGetDeviceInfo}); } void initThreadPool() { pthread_t tid; for (int i = 0; i < WORKERS; i++) pthread_create(&tid, NULL, worker, NULL); } void heartBeat(const char *url, const char *name) { nng_socket sock; int rv; if ((rv = nng_respondent0_open(&sock)) != 0) { fatal("nng_respondent0_open", rv); } if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) { fatal("nng_dial", rv); } for (;;) { char *buf = NULL; size_t sz; if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) { //printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf); nng_free(buf, sz); char response[1024]; sprintf(response, "%s-%d", name, getpid()); //printf("CLIENT (%s): SENDING SURVEY RESPONSE:%s\n", name, response); if ((rv = nng_send(sock, response, strlen(response) + 1, 0)) != 0) { fatal("nng_send", rv); } } } } void *heart(void *vargp) { pthread_detach(pthread_self()); heartBeat(config.get("heart_server").c_str(), "netdisk"); return NULL; } // 心跳发送进程 void initHeart() { pthread_t tid; pthread_create(&tid, NULL, heart, NULL); } int main() { //环境变量初始化 WORKERS = config.getInt("workers"); remoteUrl = config.get("client_url"); //海康设备环境初始化 Netdisk_EnvConfig hcEnvConfig; hcEnvConfig.libpath = "../lib/hc"; HCNetdisk::netdisk_init(&hcEnvConfig); registRequestHandleFun(); initThreadPool(); initHeart(); startServer(config.get("server_url").c_str()); HCNetdisk::netdisk_deinit(); }