From b6043642f60ef23a7a100418cd4fec1251a98ad9 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 23 七月 2020 14:47:50 +0800 Subject: [PATCH] update --- test_socket/dgram_mod_req_rep.c | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 109 insertions(+), 10 deletions(-) diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c index a857ce6..f4d2918 100644 --- a/test_socket/dgram_mod_req_rep.c +++ b/test_socket/dgram_mod_req_rep.c @@ -1,20 +1,58 @@ #include "dgram_mod_socket.h" #include "shm_mm.h" #include "usg_common.h" +#include "lock_free_queue.h" + +#define WORKERS 4 + +typedef struct task_t { + void *buf; + int size; + int port; + +} task_t; + + +typedef struct Targ { + int port; + int id; + +}Targ; + +LockFreeQueue<task_t, DM_Allocator> task_queue(100); + + +void *worker(void *socket) { + pthread_detach(pthread_self()); + char sendbuf[512]; + task_t task; + while(true) { + task_queue.pop(task); + sprintf(sendbuf, "SERVER RECEIVED: %s", task.buf); + // puts(sendbuf); + dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, task.port); + free(task.buf); + } + return NULL; +} + +void initThreadPool(void *socket) { + + pthread_t tid; + for (int i = 0; i < WORKERS; i++) + pthread_create(&tid, NULL, worker, socket); +} void server(int port) { void *socket = dgram_mod_open_socket(REQ_REP); dgram_mod_bind(socket, port); - int size; - void *recvbuf; - char sendbuf[512]; + initThreadPool(socket); + int rv; - int remote_port; - while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { - sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); - puts(sendbuf); - dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); - free(recvbuf); + task_t task; + while ( (rv = dgram_mod_recvfrom(socket, &task.buf, &task.size, &task.port) ) == 0) { + task_queue.push(task); + } dgram_mod_close_socket(socket); } @@ -34,6 +72,67 @@ dgram_mod_close_socket(socket); } + + +void *runclient(void *arg) { + Targ *targ = (Targ *)arg; + int port = targ->port; + char sendbuf[512]; + int scale = 100000; + int i; + void *socket = dgram_mod_open_socket(REQ_REP); + + char filename[512]; + sprintf(filename, "test%d.txt", targ->id); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + + int recvsize; + void *recvbuf; + for (i = 0; i < scale; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + fprintf(fp, "requst:%s\n", sendbuf); + dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &recvsize); + fprintf(fp, "reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + fclose(fp); + dgram_mod_close_socket(socket); + return (void *)i; +} + +void startClients(int port) { + + int status, i = 0, processors = 4; + void *res[processors]; + Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + pthread_t tids[processors]; + char sendbuf[512]; + + struct timeval start; + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].port = port; + targs[i].id = i; + pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + struct timeval end; + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffmsec = difftime - diffsec; + printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); +} int main(int argc, char *argv[]) { @@ -51,7 +150,7 @@ } if (strcmp("client", argv[1]) == 0) - client(port); + startClients(port); return 0; -- Gitblit v1.8.0