| | |
| | | |
| | | MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) |
| | | |
| | | ifeq ($(DEBUG),y) |
| | | MYLIBS = $(LIBSQUEUE) |
| | | else |
| | | MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) |
| | | endif |
| | | |
| | | all: build |
| | | |
| | | |
| | |
| | | TAILQ_ENTRY(tailq_entry_t) joint; |
| | | } tailq_entry_t; |
| | | |
| | | static int hashtable_mutex; |
| | | |
| | | #define START_KEY 1000 |
| | | |
| | | typedef TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t; |
| | |
| | | |
| | | } |
| | | |
| | | void* _hashtable_put(hashtable_t *hashtable, int key, void *value) |
| | | void * _hashtable_put(hashtable_t *hashtable, int key, void *value) |
| | | { |
| | | size_t code = hashcode(key); |
| | | void *oldvalue; |
| | |
| | | return res; |
| | | } |
| | | |
| | | void* hashtable_put(hashtable_t *hashtable, int key, void *value) { |
| | | void hashtable_put(hashtable_t *hashtable, int key, void *value) { |
| | | SemUtil::dec(hashtable->mutex); |
| | | while (hashtable->readcnt > 0) |
| | | { |
| | |
| | | } |
| | | } |
| | | return keyset; |
| | | } |
| | | } |
| | |
| | | |
| | | void hashtable_init(hashtable_t *hashtable); |
| | | void *hashtable_get(hashtable_t *hashtable, int key); |
| | | void* hashtable_put(hashtable_t *hashtable, int key, void *value); |
| | | void hashtable_put(hashtable_t *hashtable, int key, void *value); |
| | | void *hashtable_remove(hashtable_t *hashtable, int key); |
| | | void hashtable_removeall(hashtable_t *hashtable); |
| | | |
| | |
| | | bool found; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for(int i = 0; i < length; i++) { |
| | | for(size_t i = 0; i < length; i++) { |
| | | if(*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | /** |
| | | * 初始化共享内存 |
| | | * @size 共享内存大小 |
| | | * |
| | | */ |
| | | void shm_init(int size); |
| | | |
| | | /** |
| | | * 销毁共享内存 |
| | | * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 |
| | | */ |
| | | void shm_destroy(); |
| | | |
| | | |
| | | |
New file |
| | |
| | | #ifndef __BASIC_SHM_SOCKET_H__ |
| | | #define __BASIC_SHM_SOCKET_H__ |
| | | |
| | | #include "usg_common.h" |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | enum shm_mod_t |
| | | { |
| | | PULL_PUSH = 1, |
| | | REQ_REP = 2, |
| | | PAIR = 3, |
| | | PUB_SUB = 4, |
| | | SURVEY = 5, |
| | | BUS = 6 |
| | | |
| | | }; |
| | | |
| | | |
| | | /** |
| | | * 初始化共享内存 |
| | | * @size 共享内存大小, 单位M |
| | | * |
| | | */ |
| | | void shm_init(int size); |
| | | |
| | | /** |
| | | * 销毁共享内存 |
| | | * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 |
| | | */ |
| | | void shm_destroy(); |
| | | |
| | | /** |
| | | * 释放recv方法分配的buf |
| | | */ |
| | | void shm_free(void *buf); |
| | | |
| | | void *shm_open_socket(int mod); |
| | | |
| | | |
| | | int shm_close_socket(void *socket) ; |
| | | |
| | | |
| | | int shm_bind(void* socket, int port) ; |
| | | |
| | | int shm_listen(void* socket) ; |
| | | |
| | | int shm_connect(void* socket, int port); |
| | | |
| | | int shm_send(void *socket, void *buf, int size) ; |
| | | |
| | | int shm_recv(void* socket, void **buf, int *size) ; |
| | | |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
| | |
| | | } shmqueue_t; |
| | | |
| | | |
| | | void shm_init(int size) { |
| | | mem_pool_init(size); |
| | | } |
| | | |
| | | void shm_destroy() { |
| | | mem_pool_destroy(); |
| | | } |
| | | |
| | | //移除不包含在keys中的队列 |
| | | void shm_remove_queues_exclue(void *keys, int length) { |
New file |
| | |
| | | #include "usg_common.h" |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | #include "mem_pool.h" |
| | | #include "hashtable.h" |
| | | #include "sem_util.h" |
| | | #include "socket.h" |
| | | #include <map> |
| | | |
| | | |
| | | enum shm_msg_type_t |
| | | { |
| | | SHM_SOCKET_OPEN = 1, |
| | | SHM_SOKET_CLOSE = 2, |
| | | SHM_COMMON_MSG = 3 |
| | | |
| | | }; |
| | | |
| | | typedef struct shm_msg_t { |
| | | int port; |
| | | shm_msg_type_t type; |
| | | size_t size; |
| | | void * buf; |
| | | |
| | | } shm_msg_t; |
| | | |
| | | typedef struct shm_socket_t { |
| | | int port; |
| | | shm_mod_t mod; |
| | | SHMQueue<shm_msg_t> *queue; |
| | | std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap; |
| | | int slots; |
| | | int items; |
| | | int is_server; |
| | | |
| | | } shm_socket_t; |
| | | |
| | | |
| | | |
| | | void shm_init(int size) { |
| | | mem_pool_init(size); |
| | | } |
| | | |
| | | void shm_destroy() { |
| | | mem_pool_destroy(); |
| | | } |
| | | |
| | | void shm_free(void *buf) { |
| | | free(buf); |
| | | } |
| | | |
| | | void *shm_open_socket(int mod) { |
| | | shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); |
| | | socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >; |
| | | socket->port = -1; |
| | | socket->mod = (shm_mod_t)mod; |
| | | printf("mod===%d\n", socket->mod); |
| | | socket->is_server = 0; |
| | | if (mod == REQ_REP) { |
| | | socket->slots = SemUtil::get(IPC_PRIVATE, 1); |
| | | socket->items = SemUtil::get(IPC_PRIVATE, 0); |
| | | } |
| | | |
| | | return (void *)socket; |
| | | } |
| | | |
| | | |
| | | int shm_close_socket(void *socket) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | delete _socket->queue; |
| | | |
| | | std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; |
| | | for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { |
| | | delete iter->second; |
| | | } |
| | | delete _socket->remoteQueueMap; |
| | | |
| | | if (_socket->mod == REQ_REP) { |
| | | SemUtil::remove(_socket->slots); |
| | | SemUtil::remove(_socket->items); |
| | | } |
| | | |
| | | free(socket); |
| | | return 0; |
| | | |
| | | } |
| | | |
| | | |
| | | int shm_bind(void* socket, int port) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | _socket -> port = port; |
| | | return 0; |
| | | } |
| | | |
| | | int shm_listen(void* socket) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | _socket->is_server = 1; |
| | | int port; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(_socket -> port == -1) { |
| | | port = hashtable_alloc_key(hashtable); |
| | | _socket -> port = port; |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, _socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", _socket->port); |
| | | } |
| | | } |
| | | |
| | | _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) { |
| | | shm_msg_t src; |
| | | |
| | | std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; |
| | | bool rv = _socket->queue->pop(src); |
| | | |
| | | if (rv) { |
| | | if(src.type=="open") |
| | | |
| | | |
| | | |
| | | if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { |
| | | if(_socket->mod == REQ_REP) |
| | | SemUtil::dec(_socket->slots); |
| | | |
| | | remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); |
| | | |
| | | if(_socket->mod == REQ_REP) |
| | | SemUtil::inc(_socket->items); |
| | | } |
| | | |
| | | void * _buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | return 1; |
| | | } |
| | | } |
| | | |
| | | int shm_connect(void* socket, int port) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(hashtable_get(hashtable, port)== NULL) { |
| | | err_exit(0, "shm_connect:connect at port %d failed!", port); |
| | | } |
| | | if(_socket -> port == -1) { |
| | | _socket -> port = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, _socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", _socket->port); |
| | | } |
| | | } |
| | | |
| | | _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); |
| | | _socket->remoteQueueMap->insert({port, new SHMQueue<shm_msg_t>(port, 0)}); |
| | | return 0; |
| | | } |
| | | |
| | | int shm_send(void *socket, void *buf, int size) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_msg_t dest; |
| | | dest.port = _socket->port; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | | |
| | | std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; |
| | | for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { |
| | | if(hashtable_get(hashtable, iter->first)== NULL) { |
| | | err_msg(0, "shm_send:connect at port %d failed, the other part has been closed!", iter->first); |
| | | delete iter->second; |
| | | remoteQueueMap->erase(iter); |
| | | continue; |
| | | } |
| | | if(_socket->mod == REQ_REP && _socket->is_server == 1) |
| | | SemUtil::dec(_socket->items); |
| | | |
| | | iter->second->push(dest); |
| | | |
| | | if( _socket->mod == REQ_REP && _socket->is_server == 1) { |
| | | delete iter->second; |
| | | remoteQueueMap->erase(iter); |
| | | SemUtil::inc(_socket->slots); |
| | | } |
| | | |
| | | |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | int shm_recv(void* socket, void **buf, int *size) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | shm_msg_t src; |
| | | |
| | | std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; |
| | | bool rv = _socket->queue->pop(src); |
| | | if (rv) { |
| | | if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { |
| | | if(_socket->mod == REQ_REP) |
| | | SemUtil::dec(_socket->slots); |
| | | |
| | | remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); |
| | | |
| | | if(_socket->mod == REQ_REP) |
| | | SemUtil::inc(_socket->items); |
| | | } |
| | | |
| | | void * _buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | return 1; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = test_queue_wrapper server client pub sub |
| | | PROGS = req_rep |
| | | |
| | | |
| | | build: $(PROGS) |
New file |
| | |
| | | #include "socket.h" |
| | | |
| | | |
| | | void server(int port) { |
| | | void *socket = shm_open_socket(PUB_SUB); |
| | | shm_bind(socket, port); |
| | | shm_listen(socket); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while(true) { |
| | | shm_recv(socket, &recvbuf, &size); |
| | | sprintf(sendbuf, "pub: %s", recvbuf); |
| | | puts(sendbuf); |
| | | shm_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | shm_free(recvbuf); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = shm_open_socket(PUB_SUB); |
| | | shm_connect(socket, port); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | |
| | | sprintf(sendbuf, "sub"); |
| | | shm_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | while(true) { |
| | | |
| | | shm_recv(socket, &recvbuf, &size); |
| | | printf("received sub message: %s\n", (char *)recvbuf); |
| | | shm_free(recvbuf); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0 ) { |
| | | server(port); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | shm_destroy(); |
| | | // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); |
| | | return 0; |
| | | } |
New file |
| | |
| | | #include "socket.h" |
| | | |
| | | |
| | | void server(int port) { |
| | | void *socket = shm_open_socket(REQ_REP); |
| | | shm_bind(socket, port); |
| | | shm_listen(socket); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while(true) { |
| | | shm_recv(socket, &recvbuf, &size); |
| | | sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); |
| | | puts(sendbuf); |
| | | shm_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | shm_free(recvbuf); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = shm_open_socket(REQ_REP); |
| | | shm_connect(socket, port); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while(true) { |
| | | printf("request: "); |
| | | scanf("%s", sendbuf); |
| | | shm_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | shm_recv(socket, &recvbuf, &size); |
| | | printf("reply: %s\n", (char *)recvbuf); |
| | | shm_free(recvbuf); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0 ) { |
| | | server(port); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | shm_destroy(); |
| | | // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); |
| | | return 0; |
| | | } |