queue/include/lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/libshm_queue.a | 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/socket.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/communication | 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/communication.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
queue/include/lock_free_queue.h
@@ -78,6 +78,7 @@ int items; public: int mutex; LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); /// @brief destructor of the class. @@ -150,6 +151,7 @@ // std::cout << "LockFreeQueue init reference=" << reference << std::endl; slots = SemUtil::get(IPC_PRIVATE, qsize); items = SemUtil::get(IPC_PRIVATE, 0); mutex = SemUtil::get(IPC_PRIVATE, 1); } template < @@ -198,7 +200,8 @@ bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) { if (SemUtil::dec(slots) == -1) { err_exit(errno, "push"); err_msg(errno, "LockFreeQueue push"); return false; } if ( m_qImpl.push(a_data) ) { @@ -218,8 +221,11 @@ if (SemUtil::dec_nowait(slots) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "push_nowait"); else { err_msg(errno, "LockFreeQueue push_nowait"); return false; } } if ( m_qImpl.push(a_data)) { @@ -240,8 +246,10 @@ if (SemUtil::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; else err_exit(errno, "push_timeout"); else { err_msg(errno, "LockFreeQueue push_timeout"); return false; } } if (m_qImpl.push(a_data)){ queue/libshm_queue.aBinary files differ
queue/socket.c
@@ -37,29 +37,37 @@ } int shm_close_socket(shm_socket_t *socket) { int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { //给对方发送一个关闭连接的消息 struct timespec timeout = {1, 0}; shm_msg_t close_msg; close_msg.port = socket->port; close_msg.size = 0; close_msg.type=SHM_SOCKET_CLOSE; if(socket->remoteQueue != NULL) { if(notifyRemote && socket->remoteQueue != NULL) { socket->remoteQueue->push_timeout(close_msg, &timeout); } if(socket->queue != NULL) if(socket->queue != NULL) { delete socket->queue; if(socket->remoteQueue != NULL) socket->queue = NULL; } if(socket->remoteQueue != NULL) { delete socket->remoteQueue; socket->remoteQueue = NULL; } if(socket->messageQueue != NULL) if(socket->messageQueue != NULL) { delete socket->messageQueue; socket->messageQueue = NULL; } if(socket->acceptQueue != NULL) if(socket->acceptQueue != NULL) { delete socket->acceptQueue; socket->acceptQueue = NULL; } if(socket->clientSocketMap != NULL) { shm_socket_t *client_socket; @@ -68,7 +76,9 @@ client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; client_socket->remoteQueue=NULL; delete client_socket->messageQueue; client_socket->messageQueue=NULL; socket->clientSocketMap->erase(iter); free((void *)client_socket); } @@ -78,13 +88,16 @@ if(socket->dispatch_thread != 0) pthread_cancel(socket->dispatch_thread); free(socket); return 0; } int shm_close_socket(shm_socket_t *socket) { return _shm_close_socket(socket, true); } int shm_bind(shm_socket_t * socket, int port) { @@ -119,12 +132,19 @@ shm_socket_t *client_socket; auto iter = socket->clientSocketMap->find(port); if( iter != socket->clientSocketMap->end() ) { client_socket= iter->second; delete client_socket->remoteQueue; delete client_socket->messageQueue; // client_socket= iter->second; // if(client_socket->remoteQueue != NULL) { // delete client_socket->remoteQueue; // client_socket->remoteQueue = NULL; // } // if(client_socket->messageQueue != NULL) { // delete client_socket->messageQueue; // client_socket->messageQueue = NULL; // } socket->clientSocketMap->erase(iter); } free((void *)client_socket); //free((void *)client_socket); } @@ -139,7 +159,6 @@ shm_socket_t *client_socket; std::map<int, shm_socket_t* >::iterator iter; while(socket->queue->pop(src)) { print_msg("=====_server_run_msg_rev:", src); switch (src.type) { case SHM_SOCKET_OPEN : socket->acceptQueue->push_timeout(src, &timeout); @@ -148,11 +167,9 @@ _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG : err_msg(0, "===_server_run_msg_rev 1"); iter = socket->clientSocketMap->find(src.port); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; err_msg(0, "===_server_run_msg_rev client_socket->messageQueue=%p", client_socket->messageQueue); client_socket->messageQueue->push_timeout(src, &timeout); } @@ -178,7 +195,7 @@ if (socket->acceptQueue->pop(src) ) { print_msg("===accept:", src); // print_msg("===accept:", src); client_port = src.port; client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); client_socket->port = socket->port; @@ -228,20 +245,8 @@ } void _client_close_conn_to_server(shm_socket_t* socket) { if(socket->queue != NULL) delete socket->queue; if(socket->remoteQueue != NULL) delete socket->remoteQueue; if(socket->messageQueue != NULL) delete socket->messageQueue; if(socket->acceptQueue != NULL) delete socket->acceptQueue; if(socket->dispatch_thread != 0) pthread_cancel(socket->dispatch_thread); _shm_close_socket(socket, false); } @@ -283,24 +288,30 @@ dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); socket->remoteQueue->push(dest); return 0; if(socket->remoteQueue->push(dest)) { return 0; } else { err_msg(errno, "connection has been closed!"); return -1; } } int shm_recv(shm_socket_t* socket, void **buf, int *size) { shm_msg_t src; err_msg(0, "====shm_recv socket ==%p", socket); bool rv = socket->messageQueue->pop(src); if (rv) { if (socket->messageQueue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return -1; } return 0; } test/communicationBinary files differ
test/communication.c
@@ -1,5 +1,8 @@ #include "socket.h" typedef struct Targ { int port; int id; }Targ; void * precess_client(void *_socket) { pthread_detach(pthread_self()); @@ -7,13 +10,11 @@ int size; void *recvbuf; char sendbuf[512]; while(true) { shm_recv(socket, &recvbuf, &size); while (shm_recv(socket, &recvbuf, &size) == 0 ) { sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); puts(sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_send(socket, sendbuf, strlen(sendbuf)+1); shm_free(recvbuf); } shm_close_socket(socket); } @@ -51,6 +52,92 @@ shm_close_socket(socket); } void client_send(shm_socket_t *socket, char *sendbuf) { int size; void *recvbuf; shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_recv(socket, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); shm_free(recvbuf); } void multyProcessorsClient(int port) { int status, i = 0, processors = 4, scale = 100000; pid_t productors[processors]; pid_t pid; char sendbuf[512]; for ( i = 0; i < processors; i++) { if ((productors[i] = fork()) == 0) /* Child runs user job */ { shm_socket_t *socket = shm_open_socket(); shm_connect(socket, port); while( scale-- > 0) { sprintf(sendbuf, "processor(%d) %d", i, scale); client_send(socket, sendbuf); } shm_close_socket(socket); exit(0); } } while ((pid = waitpid(-1, &status, 0)) > 0) { if(WIFEXITED(status)) { //fprintf(stderr, "child %d terminated normally with exit status=%d\n", pid, WEXITSTATUS(status)); }else fprintf(stderr, "child %d terminated abnormally\n", pid); } if (errno != ECHILD) perror("waitpid error"); } void *threadrun(void *arg) { Targ * targ = ( Targ * )arg; int port = targ->port; char sendbuf[512]; int scale = 100000; int i; shm_socket_t *socket = shm_open_socket(); shm_connect(socket, port); for( i = 0; i<scale; i++) { sprintf(sendbuf, "processor(%d) %d", targ->id, i); client_send(socket, sendbuf); } shm_close_socket(socket); return (void*)i; } void multyThreadClient(int port) { int status, i = 0, processors = 2; void *res[processors]; Targ *targs= (Targ*)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; char sendbuf[512]; for ( i = 0; i < processors; i++) { targs[i].port = port; targs[i].id = i; pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]); } for (i = 0; i< processors; i++) { if(pthread_join(tids[i], &res[i])!=0) { perror("multyThreadClient pthread_join"); } else { fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } } int main(int argc, char *argv[]) { shm_init(512); int port; @@ -67,6 +154,9 @@ if (strcmp("client", argv[1]) == 0) client(port); if (strcmp("mclient", argv[1]) == 0) multyThreadClient(port); shm_destroy(); // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); return 0;