From 9e6ceaad059b2aec84df92c8750f6d87eab708c2 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 16 七月 2020 20:46:31 +0800 Subject: [PATCH] udpate --- queue/libshm_queue.a | 0 test/communication.c | 100 +++++++++++++++++++++++- queue/include/lock_free_queue.h | 18 +++- queue/socket.c | 89 ++++++++++++--------- test/communication | 0 5 files changed, 158 insertions(+), 49 deletions(-) diff --git a/queue/include/lock_free_queue.h b/queue/include/lock_free_queue.h index 566c7ed..f9a4667 100644 --- a/queue/include/lock_free_queue.h +++ b/queue/include/lock_free_queue.h @@ -78,6 +78,7 @@ int items; public: + int mutex; LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); /// @brief destructor of the class. @@ -150,6 +151,7 @@ // std::cout << "LockFreeQueue init reference=" << reference << std::endl; slots = SemUtil::get(IPC_PRIVATE, qsize); items = SemUtil::get(IPC_PRIVATE, 0); + mutex = SemUtil::get(IPC_PRIVATE, 1); } template < @@ -198,7 +200,8 @@ bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) { if (SemUtil::dec(slots) == -1) { - err_exit(errno, "push"); + err_msg(errno, "LockFreeQueue push"); + return false; } if ( m_qImpl.push(a_data) ) { @@ -218,8 +221,11 @@ if (SemUtil::dec_nowait(slots) == -1) { if (errno == EAGAIN) return false; - else - err_exit(errno, "push_nowait"); + else { + err_msg(errno, "LockFreeQueue push_nowait"); + return false; + } + } if ( m_qImpl.push(a_data)) { @@ -240,8 +246,10 @@ if (SemUtil::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; - else - err_exit(errno, "push_timeout"); + else { + err_msg(errno, "LockFreeQueue push_timeout"); + return false; + } } if (m_qImpl.push(a_data)){ diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a index 65416b5..553db5a 100644 --- a/queue/libshm_queue.a +++ b/queue/libshm_queue.a Binary files differ diff --git a/queue/socket.c b/queue/socket.c index 1f3ff24..a52d532 100644 --- a/queue/socket.c +++ b/queue/socket.c @@ -37,29 +37,37 @@ } -int shm_close_socket(shm_socket_t *socket) { +int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 struct timespec timeout = {1, 0}; shm_msg_t close_msg; + close_msg.port = socket->port; close_msg.size = 0; close_msg.type=SHM_SOCKET_CLOSE; - if(socket->remoteQueue != NULL) { + if(notifyRemote && socket->remoteQueue != NULL) { socket->remoteQueue->push_timeout(close_msg, &timeout); } - - - if(socket->queue != NULL) + if(socket->queue != NULL) { delete socket->queue; - if(socket->remoteQueue != NULL) + socket->queue = NULL; + } + + if(socket->remoteQueue != NULL) { delete socket->remoteQueue; + socket->remoteQueue = NULL; + } - if(socket->messageQueue != NULL) + if(socket->messageQueue != NULL) { delete socket->messageQueue; + socket->messageQueue = NULL; + } - if(socket->acceptQueue != NULL) + if(socket->acceptQueue != NULL) { delete socket->acceptQueue; + socket->acceptQueue = NULL; + } if(socket->clientSocketMap != NULL) { shm_socket_t *client_socket; @@ -68,7 +76,9 @@ client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; + client_socket->remoteQueue=NULL; delete client_socket->messageQueue; + client_socket->messageQueue=NULL; socket->clientSocketMap->erase(iter); free((void *)client_socket); } @@ -78,13 +88,16 @@ if(socket->dispatch_thread != 0) pthread_cancel(socket->dispatch_thread); - - free(socket); return 0; } + +int shm_close_socket(shm_socket_t *socket) { + return _shm_close_socket(socket, true); +} + int shm_bind(shm_socket_t * socket, int port) { @@ -119,12 +132,19 @@ shm_socket_t *client_socket; auto iter = socket->clientSocketMap->find(port); if( iter != socket->clientSocketMap->end() ) { - client_socket= iter->second; - delete client_socket->remoteQueue; - delete client_socket->messageQueue; + // client_socket= iter->second; + // if(client_socket->remoteQueue != NULL) { + // delete client_socket->remoteQueue; + // client_socket->remoteQueue = NULL; + // } + // if(client_socket->messageQueue != NULL) { + // delete client_socket->messageQueue; + // client_socket->messageQueue = NULL; + // } + socket->clientSocketMap->erase(iter); } - free((void *)client_socket); + //free((void *)client_socket); } @@ -139,7 +159,6 @@ shm_socket_t *client_socket; std::map<int, shm_socket_t* >::iterator iter; while(socket->queue->pop(src)) { -print_msg("=====_server_run_msg_rev:", src); switch (src.type) { case SHM_SOCKET_OPEN : socket->acceptQueue->push_timeout(src, &timeout); @@ -148,11 +167,9 @@ _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG : -err_msg(0, "===_server_run_msg_rev 1"); iter = socket->clientSocketMap->find(src.port); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; -err_msg(0, "===_server_run_msg_rev client_socket->messageQueue=%p", client_socket->messageQueue); client_socket->messageQueue->push_timeout(src, &timeout); } @@ -178,7 +195,7 @@ if (socket->acceptQueue->pop(src) ) { -print_msg("===accept:", src); +// print_msg("===accept:", src); client_port = src.port; client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); client_socket->port = socket->port; @@ -228,20 +245,8 @@ } void _client_close_conn_to_server(shm_socket_t* socket) { - if(socket->queue != NULL) - delete socket->queue; - if(socket->remoteQueue != NULL) - delete socket->remoteQueue; - - if(socket->messageQueue != NULL) - delete socket->messageQueue; - - if(socket->acceptQueue != NULL) - delete socket->acceptQueue; - - if(socket->dispatch_thread != 0) - pthread_cancel(socket->dispatch_thread); - + + _shm_close_socket(socket, false); } @@ -283,24 +288,30 @@ dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); - - socket->remoteQueue->push(dest); - return 0; + if(socket->remoteQueue->push(dest)) { + return 0; + } else { + err_msg(errno, "connection has been closed!"); + return -1; + } + + } int shm_recv(shm_socket_t* socket, void **buf, int *size) { shm_msg_t src; -err_msg(0, "====shm_recv socket ==%p", socket); - bool rv = socket->messageQueue->pop(src); - if (rv) { + if (socket->messageQueue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); + return 0; + } else { + return -1; } - return 0; + } diff --git a/test/communication b/test/communication index ca89adb..3a0e6b1 100755 --- a/test/communication +++ b/test/communication Binary files differ diff --git a/test/communication.c b/test/communication.c index a314dcb..16a9b05 100644 --- a/test/communication.c +++ b/test/communication.c @@ -1,5 +1,8 @@ #include "socket.h" - +typedef struct Targ { + int port; + int id; +}Targ; void * precess_client(void *_socket) { pthread_detach(pthread_self()); @@ -7,13 +10,11 @@ int size; void *recvbuf; char sendbuf[512]; - while(true) { - shm_recv(socket, &recvbuf, &size); + while (shm_recv(socket, &recvbuf, &size) == 0 ) { sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); puts(sendbuf); - shm_send(socket, sendbuf, strlen(sendbuf)+1) ; + shm_send(socket, sendbuf, strlen(sendbuf)+1); shm_free(recvbuf); - } shm_close_socket(socket); } @@ -51,6 +52,92 @@ shm_close_socket(socket); } +void client_send(shm_socket_t *socket, char *sendbuf) { + + int size; + void *recvbuf; + shm_send(socket, sendbuf, strlen(sendbuf)+1) ; + shm_recv(socket, &recvbuf, &size); + printf("reply: %s\n", (char *)recvbuf); + shm_free(recvbuf); + + +} + + +void multyProcessorsClient(int port) { + + int status, i = 0, processors = 4, scale = 100000; + pid_t productors[processors]; + pid_t pid; + char sendbuf[512]; + for ( i = 0; i < processors; i++) { + if ((productors[i] = fork()) == 0) /* Child runs user job */ + { + shm_socket_t *socket = shm_open_socket(); + shm_connect(socket, port); + while( scale-- > 0) { + sprintf(sendbuf, "processor(%d) %d", i, scale); + client_send(socket, sendbuf); + } + shm_close_socket(socket); + exit(0); + } + } + + while ((pid = waitpid(-1, &status, 0)) > 0) { + if(WIFEXITED(status)) { + //fprintf(stderr, "child %d terminated normally with exit status=%d\n", pid, WEXITSTATUS(status)); + }else + fprintf(stderr, "child %d terminated abnormally\n", pid); + } + + if (errno != ECHILD) + perror("waitpid error"); +} + +void *threadrun(void *arg) { + Targ * targ = ( Targ * )arg; + int port = targ->port; + char sendbuf[512]; + int scale = 100000; + int i; + shm_socket_t *socket = shm_open_socket(); + + shm_connect(socket, port); + for( i = 0; i<scale; i++) { + sprintf(sendbuf, "processor(%d) %d", targ->id, i); + client_send(socket, sendbuf); + } + shm_close_socket(socket); + return (void*)i; +} + +void multyThreadClient(int port) { + + int status, i = 0, processors = 2; + void *res[processors]; + Targ *targs= (Targ*)calloc(processors, sizeof(Targ)); + pthread_t tids[processors]; + char sendbuf[512]; + for ( i = 0; i < processors; i++) { + targs[i].port = port; + targs[i].id = i; + pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]); + + } + + for (i = 0; i< processors; i++) { + if(pthread_join(tids[i], &res[i])!=0) { + perror("multyThreadClient pthread_join"); + } else { + fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + +} + int main(int argc, char *argv[]) { shm_init(512); int port; @@ -67,6 +154,9 @@ if (strcmp("client", argv[1]) == 0) client(port); + + if (strcmp("mclient", argv[1]) == 0) + multyThreadClient(port); shm_destroy(); // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); return 0; -- Gitblit v1.8.0