#include "dgram_mod_socket.h" #include "shm_mm_wraper.h" #include "usg_common.h" #include "lock_free_queue.h" #define WORKERS 4 typedef struct task_t { void *buf; int size; int port; } task_t; typedef struct Targ { int port; int id; }Targ; LockFreeQueue task_queue(128); void *client; void *worker(void *socket) { pthread_detach(pthread_self()); char sendbuf[512]; task_t task; while(true) { task_queue.pop(task); sprintf(sendbuf, "SERVER RECEIVED: %s", task.buf); // puts(sendbuf); dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, task.port); free(task.buf); } return NULL; } void initThreadPool(void *socket) { pthread_t tid; for (int i = 0; i < WORKERS; i++) pthread_create(&tid, NULL, worker, socket); } void server(int port) { void *socket = dgram_mod_open_socket(); dgram_mod_bind(socket, port); initThreadPool(socket); int rv; task_t task; while ( (rv = dgram_mod_recvfrom(socket, &task.buf, &task.size, &task.port) ) == 0) { task_queue.push(task); } dgram_mod_close_socket(socket); } void startClient(int port) { void *socket = dgram_mod_open_socket(); int size; void *recvbuf; char sendbuf[512]; while (true) { printf("request: "); scanf("%s", sendbuf); dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); free(recvbuf); } dgram_mod_close_socket(socket); } void client2(int port) { void *socket = dgram_mod_open_socket(); int size; void *recvbuf; char sendbuf[512]; while (true) { sprintf(sendbuf, "hello\n" ); dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); free(recvbuf); } dgram_mod_close_socket(socket); } void *runclient(void *arg) { Targ *targ = (Targ *)arg; int port = targ->port; char sendbuf[512]; int scale = 100000; int i; char filename[512]; sprintf(filename, "test%d.tmp", targ->id); FILE *fp = NULL; fp = fopen(filename, "w+"); int recvsize; void *recvbuf; for (i = 0; i < scale; i++) { sprintf(sendbuf, "thread(%d) %d", targ->id, i); fprintf(fp, "requst:%s\n", sendbuf); dgram_mod_sendandrecv(client, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &recvsize); fprintf(fp, "reply: %s\n", (char *)recvbuf); free(recvbuf); } fclose(fp); return (void *)i; } void startClients(int port) { int status, i = 0, processors = 4; void *res[processors]; Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; char sendbuf[512]; struct timeval start; client = dgram_mod_open_socket(); gettimeofday(&start, NULL); for (i = 0; i < processors; i++) { targs[i].port = port; targs[i].id = i; pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); } for (i = 0; i < processors; i++) { if (pthread_join(tids[i], &res[i]) != 0) { perror("multyThreadClient pthread_join"); } else { fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } struct timeval end; gettimeofday(&end, NULL); double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffmsec = difftime - diffsec*1000000; printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); dgram_mod_close_socket(client); } int main(int argc, char *argv[]) { shm_mm_wrapper_init(512); int port; if (argc < 3) { fprintf(stderr, "Usage: reqrep %s|%s ...\n", "server", "client"); return 1; } port = atoi(argv[2]); if (strcmp("server", argv[1]) == 0) { // int temp = shm_mm_wrapper_alloc_key(); // printf("tmp=%d\n", temp); server(port); } else if (strcmp("mclient", argv[1]) == 0) { startClients(port); } else if (strcmp("client", argv[1]) == 0) { startClient(port); } else { printf("input invalidate arguments\n"); } return 0; }