#include #include #include #include #include #include #include #include #include #define NODE0 "node0" #define NODE1 "node1" #define DATE "DATE" typedef struct Targ { const char* url; int id; }Targ; void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); exit(1); } char * date(void) { time_t now = time(&now); struct tm *info = localtime(&now); char *text = asctime(info); text[strlen(text) - 1] = '\0'; // remove '\n' return (text); } int server(const char *url) { nng_socket sock; int rv; char sendbuf[512]; if ((rv = nng_rep0_open(&sock)) != 0) { fatal("nng_rep0_open", rv); } if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { fatal("nng_listen", rv); } for (;;) { char *buf = NULL; size_t sz; if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } // printf("RECEIVED:%s\n", buf); char *d = date(); sprintf(sendbuf, "%s: %s", d, buf); // printf("SENDING DATE %s\n", d); if ((rv = nng_send(sock, sendbuf, strlen(sendbuf) + 1, 0)) != 0) { fatal("nng_send", rv); } nng_free(buf, sz); } } int client(const char *url) { nng_socket sock; int rv; size_t sz; char * buf; char sendbuf[512]; if ((rv = nng_req0_open(&sock)) != 0) { fatal("nng_socket", rv); } if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { fatal("nng_dial", rv); } //printf("NODE1: SENDING DATE REQUEST %s\n", DATE); while (true) { printf("say something:\n"); scanf("%s", sendbuf); if ((rv = nng_send(sock, sendbuf, strlen(sendbuf) + 1, 0)) != 0) { fatal("nng_send", rv); } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } printf("RECEIVED DATE %s\n", buf); nng_free(buf, sz); } nng_close(sock); return (0); } #define SCALE 100000 void *runclient(void *arg) { Targ *targ = (Targ *)arg; const char* url = targ->url; char sendbuf[512]; size_t sz; void *buf; int i,j, n; nng_socket sock; int rv; char filename[512]; if ((rv = nng_req0_open(&sock)) != 0) { fatal("nng_socket", rv); } if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { fatal("nng_dial", rv); } sprintf(filename, "test%d.tmp", targ->id); FILE *fp = NULL; fp = fopen(filename, "w+"); // fp = stdout; for (i = 0; i < SCALE; i++) { sprintf(sendbuf, "thread(%d) %d", targ->id, i); fprintf(fp, "requst:%s\n", sendbuf); if ((rv = nng_send(sock, sendbuf, strlen(sendbuf) + 1, 0)) != 0) { fatal("nng_send", rv); } if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) { fatal("nng_recv", rv); } fprintf(fp, "NODE1: RECEIVED DATE %s\n", buf); nng_free(buf, sz); } fclose(fp); nng_close(sock); return (void *)i; } void mclient(const char *url) { int status, i = 0, processors = 1; void *res[processors]; // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); Targ targs[processors]; pthread_t tids[processors]; char sendbuf[512]; struct timeval start, end; long total = 0; printf("开始测试...\n"); gettimeofday(&start, NULL); for (i = 0; i < processors; i++) { targs[i].url = url; targs[i].id = i; pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); } for (i = 0; i < processors; i++) { if (pthread_join(tids[i], &res[i]) != 0) { perror("multyThreadClient pthread_join"); } else { total += (long)res[i]; //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } gettimeofday(&end, NULL); double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec * 1000000; fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total ); // fflush(stdout); } int main(const int argc, const char **argv) { const char *url = "tcp://192.168.20.10:5001"; //const char *url = "ipc:///tmp/reqrep.ipc"; if (argc < 2) { fprintf(stderr, "Usage: %s %s|%s \n", argv[0], "server", "client"); return 1; } // url = atoi(argv[2]); if (strcmp("server", argv[1]) == 0 ) { server(url); } if (strcmp("client", argv[1]) == 0) client(url); if (strcmp("mclient", argv[1]) == 0) mclient(url); }