#include "shm_socket.h" #include "usg_common.h" #include "shm_mm_wraper.h" typedef struct Targ { int port; int id; }Targ; void * precess_client(void *_socket) { pthread_detach(pthread_self()); shm_socket_t *socket = (shm_socket_t *)_socket; int size; void *recvbuf; char sendbuf[512]; while (shm_recv(socket, &recvbuf, &size) == 0 ) { sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); puts(sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1); free(recvbuf); } shm_close_socket(socket); } void server(int port) { pthread_t tid; shm_socket_t *socket = shm_open_socket(); shm_socket_bind(socket, port); shm_listen(socket); shm_socket_t *client_socket; while(true) { client_socket = shm_accept(socket); // printf("server messageQueue = %p\n", client_socket->messageQueue); pthread_create(&tid, NULL, precess_client , (void *)client_socket); } } void client(int port) { shm_socket_t *socket = shm_open_socket(); shm_connect(socket, port); int size; void *recvbuf; char sendbuf[512]; while(true) { printf("request: "); scanf("%s", sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_recv(socket, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); free(recvbuf); } shm_close_socket(socket); } void client_send(shm_socket_t *socket, char *sendbuf) { int size; void *recvbuf; printf("requst:%s\n", sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_recv(socket, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); free(recvbuf); } void multyProcessorsClient(int port) { int status, i = 0, processors = 4, scale = 100000; pid_t productors[processors]; pid_t pid; char sendbuf[512]; for ( i = 0; i < processors; i++) { if ((productors[i] = fork()) == 0) /* Child runs user job */ { shm_socket_t *socket = shm_open_socket(); shm_connect(socket, port); while( scale-- > 0) { sprintf(sendbuf, "processor(%d) %d", i, scale); client_send(socket, sendbuf); } shm_close_socket(socket); exit(0); } } while ((pid = waitpid(-1, &status, 0)) > 0) { if(WIFEXITED(status)) { //fprintf(stderr, "child %d terminated normally with exit status=%d\n", pid, WEXITSTATUS(status)); }else fprintf(stderr, "child %d terminated abnormally\n", pid); } if (errno != ECHILD) perror("waitpid error"); } void *threadrun(void *arg) { Targ * targ = ( Targ * )arg; int port = targ->port; char sendbuf[512]; int scale = 100000; int i; shm_socket_t *socket = shm_open_socket(); shm_connect(socket, port); for( i = 0; iid, i); client_send(socket, sendbuf); } shm_close_socket(socket); return (void*)i; } void multyThreadClient(int port) { int status, i = 0, processors = 4; void *res[processors]; Targ *targs= (Targ*)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; char sendbuf[512]; for ( i = 0; i < processors; i++) { targs[i].port = port; targs[i].id = i; pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]); } for (i = 0; i< processors; i++) { if(pthread_join(tids[i], &res[i])!=0) { perror("multyThreadClient pthread_join"); } else { fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } } int main(int argc, char *argv[]) { shm_mm_wrapper_init(512); int port; if (argc < 3) { fprintf(stderr, "Usage: reqrep %s|%s ...\n", "server", "client"); return 1; } port = atoi(argv[2]); if (strcmp("server", argv[1]) == 0 ) { server(port); } if (strcmp("client", argv[1]) == 0) client(port); if (strcmp("mclient", argv[1]) == 0) multyThreadClient(port); shm_mm_wrapper_destroy(); // fprintf(stderr, "Usage: reqrep %s|%s ...\n", "server", "client"); return 0; }