#include "usg_common.h" #include "netdisk.h" #include "safe_queue.h" #include "login_store.h" #include "request_handler.h" #include "properties_config.h" #include #include #include #include using namespace std; static int work(Netdisk_DownloadRequest drequest); static int connectAndSend(const char *url, char * str); int WORKERS ; std::string localUrl; std::string remoteUrl; PropertiesConfig config("../data/config.txt"); SafeQueue task_queue(10); std::map userDeviceMap; LoginStore loginStore; std::map requestHandleFunMap; static inline void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); //exit(1); } int connectAndSend(const char *url, char * str) { nng_socket sock; int rv; size_t sz; char *buf; if ((rv = nng_req0_open(&sock)) != 0) { fatal("nng_socket", rv); } if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { fatal("nng_dial", rv); } // printf("CLIENT: SENDING DATE REQUEST\n"); if ((rv = nng_send(sock, str, strlen(str), 0)) != 0) { fatal("nng_send", rv); } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } std::cout << buf; nng_free(buf, sz); nng_close(sock); return 0; } int work(Netdisk_DownloadRequest drequest) { Netdisk *netdisk = NULL; int rv; std::map::iterator userDeviceIter = userDeviceMap.find( drequest.loginUUID); if( userDeviceIter != userDeviceMap.end() ) { netdisk = userDeviceIter->second; } Netdisk_LoginInfo loginInfo = loginStore.getLoginInfo(drequest.loginUUID); if (netdisk == NULL) { if(loginInfo.deviceType == "HC") { // std::cout << "new HCNetdisk" << std::endl; netdisk = new HCNetdisk(); userDeviceMap.insert({loginInfo.loginUUID, netdisk}); } else { err_msg(0, "无法识别的设备类型: %s", loginInfo.deviceType.c_str()); } } if ( (rv = netdisk->login(loginInfo)) != 0 ) { printf("下载登录失败\n"); } std::vector files; if ( (rv = netdisk->downloadByTime(drequest, &files) ) != 0) { printf("下载失败\n"); } Json::Value response; Json::Value payload; response["rv"] = rv; Json::Value filelist; for(std::string f : files) { filelist.append(f); } payload["filelist"] = filelist; response["payload"] = payload; std::string str = response.toStyledString(); std::cout << "download finished, call back" << std::endl; std::cout << str << std::endl; connectAndSend(remoteUrl.c_str(), strdup(str.c_str()) ); return 0; } void *worker(void *vargp) { pthread_detach(pthread_self()); while (1) { Netdisk_DownloadRequest request; task_queue.pop(request); work(request); } } void startServer(const char *url) { Json::Reader jsonreader; Json::Value request; nng_socket sock; int rv; RequestHandleFun handleFun; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { printf("url=%s\n", url); fatal("nng_listen", rv); } for (;;) { char * buf = NULL; size_t sz; if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } jsonreader.parse(buf, request); std::string method = request["method"].asString(); std::map::iterator handleFunIter = requestHandleFunMap.find(method); if( handleFunIter != requestHandleFunMap.end() ) { handleFun = handleFunIter->second; handleFun(sock, request); } else { std::cerr << "Don't support " << method << std::endl; } // if (method == "login") { // handleLogin(sock, request); // } else if (method == "downloadByTime") { // handleDownloadByTimeAsync(sock, request); // } else { // std::cerr << "Don't support " << method << std::endl; // } // Unrecognized command, so toss the buffer. nng_free(buf, sz); } } void registRequestHandleFun() { requestHandleFunMap.insert({"login", handleLogin}); requestHandleFunMap.insert({"downloadByTime", handleDownloadByTimeAsync}); } void initThreadPool() { pthread_t tid; for (int i = 0; i < WORKERS; i++) pthread_create(&tid, NULL, worker, NULL); } int main() { Netdisk_EnvConfig hcEnvConfig; hcEnvConfig.libpath = config.get("hclib"); HCNetdisk::netdisk_init(&hcEnvConfig); WORKERS = config.getInt("workers"); localUrl = config.get("local_url"); remoteUrl = config.get("remote_url"); registRequestHandleFun(); initThreadPool(); startServer(localUrl.c_str()); HCNetdisk::netdisk_deinit(); }