queue/Makefile | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/hashtable.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/hashtable.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/include/shm_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/include/shm_queue_wrapper.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/include/socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/libshm_queue.a | 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/libshm_queue.so | 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/shm_queue_wrapper.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
queue/socket.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test2/Makefile | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test2/pub_sub.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test2/req_rep | 补丁 | 查看 | 原始文档 | blame | 历史 | |
test2/req_rep.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
queue/Makefile
@@ -22,6 +22,12 @@ MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) ifeq ($(DEBUG),y) MYLIBS = $(LIBSQUEUE) else MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) endif all: build queue/hashtable.c
@@ -15,8 +15,6 @@ TAILQ_ENTRY(tailq_entry_t) joint; } tailq_entry_t; static int hashtable_mutex; #define START_KEY 1000 typedef TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t; @@ -209,7 +207,7 @@ return res; } void* hashtable_put(hashtable_t *hashtable, int key, void *value) { void hashtable_put(hashtable_t *hashtable, int key, void *value) { SemUtil::dec(hashtable->mutex); while (hashtable->readcnt > 0) { queue/hashtable.h
@@ -19,7 +19,7 @@ void hashtable_init(hashtable_t *hashtable); void *hashtable_get(hashtable_t *hashtable, int key); void* hashtable_put(hashtable_t *hashtable, int key, void *value); void hashtable_put(hashtable_t *hashtable, int key, void *value); void *hashtable_remove(hashtable_t *hashtable, int key); void hashtable_removeall(hashtable_t *hashtable); queue/include/shm_queue.h
@@ -64,7 +64,7 @@ bool found; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { found = false; for(int i = 0; i < length; i++) { for(size_t i = 0; i < length; i++) { if(*keyItr == keys[i]) { found = true; break; queue/include/shm_queue_wrapper.h
@@ -9,18 +9,6 @@ #ifdef __cplusplus extern "C" { #endif /** * 初始化共享内存 * @size 共享内存大小 * */ void shm_init(int size); /** * 销毁共享内存 * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 */ void shm_destroy(); queue/include/socket.h
New file @@ -0,0 +1,61 @@ #ifndef __BASIC_SHM_SOCKET_H__ #define __BASIC_SHM_SOCKET_H__ #include "usg_common.h" #ifdef __cplusplus extern "C" { #endif enum shm_mod_t { PULL_PUSH = 1, REQ_REP = 2, PAIR = 3, PUB_SUB = 4, SURVEY = 5, BUS = 6 }; /** * 初始化共享内存 * @size 共享内存大小, 单位M * */ void shm_init(int size); /** * 销毁共享内存 * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 */ void shm_destroy(); /** * 释放recv方法分配的buf */ void shm_free(void *buf); void *shm_open_socket(int mod); int shm_close_socket(void *socket) ; int shm_bind(void* socket, int port) ; int shm_listen(void* socket) ; int shm_connect(void* socket, int port); int shm_send(void *socket, void *buf, int size) ; int shm_recv(void* socket, void **buf, int *size) ; #ifdef __cplusplus } #endif #endif queue/libshm_queue.aBinary files differ
queue/libshm_queue.soBinary files differ
queue/shm_queue_wrapper.c
@@ -13,13 +13,6 @@ } shmqueue_t; void shm_init(int size) { mem_pool_init(size); } void shm_destroy() { mem_pool_destroy(); } //移除不包含在keys中的队列 void shm_remove_queues_exclue(void *keys, int length) { queue/socket.c
New file @@ -0,0 +1,234 @@ #include "usg_common.h" #include "usg_typedef.h" #include "shm_queue.h" #include "shm_allocator.h" #include "mem_pool.h" #include "hashtable.h" #include "sem_util.h" #include "socket.h" #include <map> enum shm_msg_type_t { SHM_SOCKET_OPEN = 1, SHM_SOKET_CLOSE = 2, SHM_COMMON_MSG = 3 }; typedef struct shm_msg_t { int port; shm_msg_type_t type; size_t size; void * buf; } shm_msg_t; typedef struct shm_socket_t { int port; shm_mod_t mod; SHMQueue<shm_msg_t> *queue; std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap; int slots; int items; int is_server; } shm_socket_t; void shm_init(int size) { mem_pool_init(size); } void shm_destroy() { mem_pool_destroy(); } void shm_free(void *buf) { free(buf); } void *shm_open_socket(int mod) { shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >; socket->port = -1; socket->mod = (shm_mod_t)mod; printf("mod===%d\n", socket->mod); socket->is_server = 0; if (mod == REQ_REP) { socket->slots = SemUtil::get(IPC_PRIVATE, 1); socket->items = SemUtil::get(IPC_PRIVATE, 0); } return (void *)socket; } int shm_close_socket(void *socket) { shm_socket_t * _socket = (shm_socket_t *) socket; delete _socket->queue; std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { delete iter->second; } delete _socket->remoteQueueMap; if (_socket->mod == REQ_REP) { SemUtil::remove(_socket->slots); SemUtil::remove(_socket->items); } free(socket); return 0; } int shm_bind(void* socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket -> port = port; return 0; } int shm_listen(void* socket) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket->is_server = 1; int port; hashtable_t *hashtable = mm_get_hashtable(); if(_socket -> port == -1) { port = hashtable_alloc_key(hashtable); _socket -> port = port; } else { if(hashtable_get(hashtable, _socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", _socket->port); } } _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); return 0; } static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) { shm_msg_t src; std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; bool rv = _socket->queue->pop(src); if (rv) { if(src.type=="open") if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { if(_socket->mod == REQ_REP) SemUtil::dec(_socket->slots); remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); if(_socket->mod == REQ_REP) SemUtil::inc(_socket->items); } void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return 1; } } int shm_connect(void* socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; hashtable_t *hashtable = mm_get_hashtable(); if(hashtable_get(hashtable, port)== NULL) { err_exit(0, "shm_connect:connect at port %d failed!", port); } if(_socket -> port == -1) { _socket -> port = hashtable_alloc_key(hashtable); } else { if(hashtable_get(hashtable, _socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", _socket->port); } } _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); _socket->remoteQueueMap->insert({port, new SHMQueue<shm_msg_t>(port, 0)}); return 0; } int shm_send(void *socket, void *buf, int size) { shm_socket_t * _socket = (shm_socket_t *) socket; hashtable_t *hashtable = mm_get_hashtable(); shm_msg_t dest; dest.port = _socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { if(hashtable_get(hashtable, iter->first)== NULL) { err_msg(0, "shm_send:connect at port %d failed, the other part has been closed!", iter->first); delete iter->second; remoteQueueMap->erase(iter); continue; } if(_socket->mod == REQ_REP && _socket->is_server == 1) SemUtil::dec(_socket->items); iter->second->push(dest); if( _socket->mod == REQ_REP && _socket->is_server == 1) { delete iter->second; remoteQueueMap->erase(iter); SemUtil::inc(_socket->slots); } } return 0; } int shm_recv(void* socket, void **buf, int *size) { shm_socket_t * _socket = (shm_socket_t *) socket; shm_msg_t src; std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; bool rv = _socket->queue->pop(src); if (rv) { if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { if(_socket->mod == REQ_REP) SemUtil::dec(_socket->slots); remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); if(_socket->mod == REQ_REP) SemUtil::inc(_socket->items); } void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return 1; } } test2/Makefile
@@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) PROGS = test_queue_wrapper server client pub sub PROGS = req_rep build: $(PROGS) test2/pub_sub.c
New file @@ -0,0 +1,60 @@ #include "socket.h" void server(int port) { void *socket = shm_open_socket(PUB_SUB); shm_bind(socket, port); shm_listen(socket); int size; void *recvbuf; char sendbuf[512]; while(true) { shm_recv(socket, &recvbuf, &size); sprintf(sendbuf, "pub: %s", recvbuf); puts(sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_free(recvbuf); } shm_close_socket(socket); } void client(int port) { void *socket = shm_open_socket(PUB_SUB); shm_connect(socket, port); int size; void *recvbuf; char sendbuf[512]; sprintf(sendbuf, "sub"); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; while(true) { shm_recv(socket, &recvbuf, &size); printf("received sub message: %s\n", (char *)recvbuf); shm_free(recvbuf); } shm_close_socket(socket); } int main(int argc, char *argv[]) { shm_init(512); int port; if (argc < 3) { fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); return 1; } port = atoi(argv[2]); if (strcmp("server", argv[1]) == 0 ) { server(port); } if (strcmp("client", argv[1]) == 0) client(port); shm_destroy(); // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); return 0; } test2/req_repBinary files differ
test2/req_rep.c
New file @@ -0,0 +1,59 @@ #include "socket.h" void server(int port) { void *socket = shm_open_socket(REQ_REP); shm_bind(socket, port); shm_listen(socket); int size; void *recvbuf; char sendbuf[512]; while(true) { shm_recv(socket, &recvbuf, &size); sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); puts(sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_free(recvbuf); } shm_close_socket(socket); } void client(int port) { void *socket = shm_open_socket(REQ_REP); shm_connect(socket, port); int size; void *recvbuf; char sendbuf[512]; while(true) { printf("request: "); scanf("%s", sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_recv(socket, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); shm_free(recvbuf); } shm_close_socket(socket); } int main(int argc, char *argv[]) { shm_init(512); int port; if (argc < 3) { fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); return 1; } port = atoi(argv[2]); if (strcmp("server", argv[1]) == 0 ) { server(port); } if (strcmp("client", argv[1]) == 0) client(port); shm_destroy(); // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); return 0; }