From ddbeeaaffeab5bc997a0b7a7e8dcac863610feee Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期三, 05 八月 2020 20:04:52 +0800 Subject: [PATCH] udpate --- test_socket/dgram_mod_req_rep.c | 144 +++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 130 insertions(+), 14 deletions(-) diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c index a857ce6..d35c6ea 100644 --- a/test_socket/dgram_mod_req_rep.c +++ b/test_socket/dgram_mod_req_rep.c @@ -1,26 +1,64 @@ #include "dgram_mod_socket.h" #include "shm_mm.h" #include "usg_common.h" +#include "lock_free_queue.h" + +#define WORKERS 4 + +typedef struct task_t { + void *buf; + int size; + int port; + +} task_t; + + +typedef struct Targ { + int port; + int id; + +}Targ; + +LockFreeQueue<task_t, DM_Allocator> task_queue(128); + + +void *worker(void *socket) { + pthread_detach(pthread_self()); + char sendbuf[512]; + task_t task; + while(true) { + task_queue.pop(task); + sprintf(sendbuf, "SERVER RECEIVED: %s", task.buf); + // puts(sendbuf); + dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, task.port); + free(task.buf); + } + return NULL; +} + +void initThreadPool(void *socket) { + + pthread_t tid; + for (int i = 0; i < WORKERS; i++) + pthread_create(&tid, NULL, worker, socket); +} void server(int port) { - void *socket = dgram_mod_open_socket(REQ_REP); + void *socket = dgram_mod_open_socket(); dgram_mod_bind(socket, port); - int size; - void *recvbuf; - char sendbuf[512]; + initThreadPool(socket); + int rv; - int remote_port; - while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { - sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); - puts(sendbuf); - dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); - free(recvbuf); + task_t task; + while ( (rv = dgram_mod_recvfrom(socket, &task.buf, &task.size, &task.port) ) == 0) { + task_queue.push(task); + } dgram_mod_close_socket(socket); } void client(int port) { - void *socket = dgram_mod_open_socket(REQ_REP); + void *socket = dgram_mod_open_socket(); int size; void *recvbuf; char sendbuf[512]; @@ -34,6 +72,81 @@ dgram_mod_close_socket(socket); } +void client2(int port) { + void *socket = dgram_mod_open_socket(); + int size; + void *recvbuf; + char sendbuf[512]; + while (true) { + sprintf(sendbuf, "hello\n" ); + dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); + printf("reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + + + +void *runclient(void *arg) { + Targ *targ = (Targ *)arg; + int port = targ->port; + char sendbuf[512]; + int scale = 100000; + int i; + void *socket = dgram_mod_open_socket(); + + char filename[512]; + sprintf(filename, "test%d.tmp", targ->id); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + + int recvsize; + void *recvbuf; + for (i = 0; i < scale; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + fprintf(fp, "requst:%s\n", sendbuf); + dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &recvsize); + fprintf(fp, "reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + fclose(fp); + dgram_mod_close_socket(socket); + return (void *)i; +} + +void startClients(int port) { + + int status, i = 0, processors = 4; + void *res[processors]; + Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + pthread_t tids[processors]; + char sendbuf[512]; + + struct timeval start; + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].port = port; + targs[i].id = i; + pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + struct timeval end; + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffmsec = difftime - diffsec*1000000; + printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); +} int main(int argc, char *argv[]) { @@ -47,11 +160,14 @@ port = atoi(argv[2]); if (strcmp("server", argv[1]) == 0) { + // int temp = shm_alloc_key(); + // printf("tmp=%d\n", temp); server(port); + } else if (strcmp("client", argv[1]) == 0) { + startClients(port); + } else { + printf("input invalidate arguments\n"); } - - if (strcmp("client", argv[1]) == 0) - client(port); return 0; -- Gitblit v1.8.0