From 554529bb69cd610e83db2c9a80b4f36f5225d80f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 27 七月 2020 17:56:34 +0800
Subject: [PATCH] restart bus
---
demo/dgram_mod_req_rep | 0
src/socket/include/shm_socket.h | 4 +
src/libshm_queue.a | 0
src/socket/shm_socket.c | 17 +++-
src/queue/include/shm_mm.h | 3
src/queue/include/shm_queue.h | 25 ++++--
src/socket/dgram_mod_socket.c | 56 +++++++++----
test_socket/dgram_mod_bus | 0
test_socket/dgram_mod_survey | 0
src/queue/shm_mm.c | 1
src/queue/include/mem_pool.h | 24 +++++
src/queue/include/shm_allocator.h | 8 +
test_socket/dgram_mod_bus.c | 37 +++++++-
src/queue/hashtable.c | 26 +++--
test_socket/dgram_mod_req_rep | 0
demo/dgram_mod_survey | 0
src/socket/include/dgram_mod_socket.h | 2
17 files changed, 149 insertions(+), 54 deletions(-)
diff --git a/demo/dgram_mod_req_rep b/demo/dgram_mod_req_rep
index 87ad5b5..6b03fdf 100755
--- a/demo/dgram_mod_req_rep
+++ b/demo/dgram_mod_req_rep
Binary files differ
diff --git a/demo/dgram_mod_survey b/demo/dgram_mod_survey
index 7fc3de3..a3765fc 100755
--- a/demo/dgram_mod_survey
+++ b/demo/dgram_mod_survey
Binary files differ
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index bc22bb2..56e099f 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/queue/hashtable.c b/src/queue/hashtable.c
index 21700ac..1fa6266 100755
--- a/src/queue/hashtable.c
+++ b/src/queue/hashtable.c
@@ -177,18 +177,7 @@
}
-int hashtable_alloc_key(hashtable_t *hashtable) {
- int key = START_KEY;
- SemUtil::dec(hashtable->wlock);
- while(_hashtable_get(hashtable, key) != NULL) {
- key++;
- }
-
- _hashtable_put(hashtable, key, (void *)1);
- SemUtil::inc(hashtable->wlock);
- return key;
-}
void *hashtable_get(hashtable_t *hashtable, int key) {
SemUtil::dec(hashtable->mutex);
@@ -251,6 +240,19 @@
}
}
+int hashtable_alloc_key(hashtable_t *hashtable) {
+ int key = START_KEY;
+ SemUtil::dec(hashtable->wlock);
+
+ while(_hashtable_get(hashtable, key) != NULL) {
+ key++;
+ }
+
+ _hashtable_put(hashtable, key, (void *)1);
+ SemUtil::inc(hashtable->wlock);
+ return key;
+}
+
std::set<int> * hashtable_keyset(hashtable_t *hashtable) {
std::set<int> *keyset = new std::set<int>;
tailq_entry_t *item;
@@ -267,3 +269,5 @@
}
return keyset;
}
+
+
diff --git a/src/queue/include/mem_pool.h b/src/queue/include/mem_pool.h
index 17a7c5c..d5a4110 100644
--- a/src/queue/include/mem_pool.h
+++ b/src/queue/include/mem_pool.h
@@ -34,6 +34,22 @@
return ptr;
}
+template <typename T>
+static inline T* mem_pool_attach(int key) {
+ void *ptr;
+ // T* tptr;
+ hashtable_t *hashtable = mm_get_hashtable();
+ ptr = hashtable_get(hashtable, key);
+printf("mem_pool_malloc_by_key malloc before %d, %p\n", key, ptr);
+ if(ptr == NULL || ptr == (void *)1 ) {
+ ptr = mm_malloc(sizeof(T));
+ hashtable_put(hashtable, key, ptr);
+ new(ptr) T;
+printf("mem_pool_malloc_by_key use new %d, %p\n", key, ptr);
+ }
+ return (T*)ptr;
+}
+
static inline void mem_pool_free (void *ptr) {
mm_free(ptr);
// notify malloc
@@ -45,10 +61,12 @@
return mm_realloc(ptr, size);
}
-static inline hashtable_t * mem_pool_get_hashtable() {
- return mm_get_hashtable();
-
+static inline int mem_pool_alloc_key() {
+ hashtable_t *hashtable = mm_get_hashtable();
+ return hashtable_alloc_key(hashtable);
}
+
+
// extern int mm_checkheap(int verbose);
diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h
index ae94a9c..084a678 100644
--- a/src/queue/include/shm_allocator.h
+++ b/src/queue/include/shm_allocator.h
@@ -66,11 +66,13 @@
class SHM_Allocator {
public:
static void *allocate (size_t size) {
- return mem_pool_malloc(size);
+ return mm_malloc(size);
+ // return mem_pool_malloc(size);
}
static void deallocate (void *ptr) {
- return mem_pool_free(ptr);
+ return mm_free(ptr);
+ // return mem_pool_free(ptr);
}
};
@@ -93,6 +95,6 @@
-typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
+typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
#endif
\ No newline at end of file
diff --git a/src/queue/include/shm_mm.h b/src/queue/include/shm_mm.h
index b32568e..6f28f13 100644
--- a/src/queue/include/shm_mm.h
+++ b/src/queue/include/shm_mm.h
@@ -18,6 +18,9 @@
*/
void shm_destroy();
+void* shm_malloc_by_key(int key, int size);
+
+
#ifdef __cplusplus
}
#endif
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 99f4802..e64f15a 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -38,7 +38,8 @@
inline ELEM_T &operator[](unsigned i);
- static void remove_queues_exclude(int *keys, size_t length);
+ static void remove_queues_exclude(int keys[], size_t length);
+ static void remove_queues_include(int keys[], size_t length);
private:
protected:
@@ -52,7 +53,7 @@
};
template <typename ELEM_T>
-void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) {
+void SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
hashtable_t *hashtable = mm_get_hashtable();
std::set<int> *keyset = hashtable_keyset(hashtable);
std::set<int>::iterator keyItr;
@@ -67,12 +68,21 @@
}
}
if (!found) {
- mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable,
- *keyItr);
- delete mqueue;
+ // mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+ // delete mqueue;
+ hashtable_remove(hashtable, *keyItr);
}
}
delete keyset;
+}
+
+
+template <typename ELEM_T>
+void SHMQueue<ELEM_T>::remove_queues_include(int keys[], size_t length) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ for(int i = 0; i< length; i++) {
+ hashtable_remove(hashtable, keys[i]);
+ }
}
template <typename ELEM_T>
@@ -86,8 +96,7 @@
hashtable_put(hashtable, key, (void *)queue);
}
queue->reference++;
- LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d",
- queue->reference.load());
+ LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
@@ -96,7 +105,7 @@
// LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d",
// queue->reference.load());
if (queue->reference.load() == 0) {
- delete queue;
+ // delete queue;
hashtable_t *hashtable = mm_get_hashtable();
hashtable_remove(hashtable, KEY);
// LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n");
diff --git a/src/queue/shm_mm.c b/src/queue/shm_mm.c
index 8b43316..6eeae1a 100644
--- a/src/queue/shm_mm.c
+++ b/src/queue/shm_mm.c
@@ -1,5 +1,6 @@
#include "shm_mm.h"
#include "mem_pool.h"
+#include "hashtable.h"
void shm_init(int size) {
mem_pool_init(size);
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index 62ae7ac..937d8f0 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -16,11 +16,15 @@
static Logger logger = LoggerFactory::getLogger();
+//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
+typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
+typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<SHMString, SHMKeySet *> > > SHMTopicSubMap;
+
typedef struct dgram_mod_socket_t {
shm_socket_t *shm_socket;
// pthread_t recv_thread;
// <涓婚锛� 璁㈤槄鑰�>
- std::map<std::string, std::set<int> *> *topic_sub_map;
+ SHMTopicSubMap *topic_sub_map;
} dgram_mod_socket_t;
static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
@@ -36,9 +40,9 @@
int dgram_mod_close_socket(void * _socket) {
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
- std::set<int> *subscripter_set;
- std::map<std::string, std::set<int> *>::iterator map_iter;
+ SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
+ SHMKeySet *subscripter_set;
+ SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
@@ -57,6 +61,12 @@
int dgram_mod_bind(void * _socket, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
return shm_socket_bind(socket->shm_socket, port);
+}
+
+
+int dgram_mod_force_bind(void * _socket, int port) {
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ return shm_socket_force_bind(socket->shm_socket, port);
}
int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
@@ -96,7 +106,14 @@
int dgram_mod_start_bus(void * _socket) {
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- socket->topic_sub_map = new std::map<std::string, std::set<int> *>;
+printf("mem_pool_malloc_by_key before\n");
+ // void *map_ptr = mem_pool_malloc_by_key(1, sizeof(SHMTopicSubMap));
+ socket->topic_sub_map = mem_pool_attach<SHMTopicSubMap>(1);
+printf("mem_pool_malloc_by_key after\n");
+
+ // socket->topic_sub_map = new(map_ptr) SHMTopicSubMap;
+
+ //socket->topic_sub_map = new SHMTopicSubMap;
run_pubsub_proxy(socket);
// pthread_t tid;
// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
@@ -136,16 +153,17 @@
* 澶勭悊璁㈤槄
*/
void _proxy_sub(dgram_mod_socket_t *socket, char *topic, int port) {
- std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
- std::set<int> *subscripter_set;
+ SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
+ SHMKeySet *subscripter_set;
- std::map<std::string, std::set<int> *>::iterator map_iter;
- std::set<int>::iterator set_iter;
+ SHMTopicSubMap::iterator map_iter;
+ SHMKeySet::iterator set_iter;
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
} else {
- subscripter_set = new std::set<int>;
+ void *set_ptr = mm_malloc(sizeof(SHMKeySet));
+ subscripter_set = new(set_ptr) SHMKeySet;
topic_sub_map->insert({topic, subscripter_set});
}
subscripter_set->insert(port);
@@ -155,11 +173,11 @@
* 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
*/
void _proxy_pub(dgram_mod_socket_t * socket, char *topic, size_t head_len, void *buf, size_t size, int port) {
- std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
- std::set<int> *subscripter_set;
+ SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
+ SHMKeySet *subscripter_set;
- std::map<std::string, std::set<int> *>::iterator map_iter;
- std::set<int>::iterator set_iter;
+ SHMTopicSubMap::iterator map_iter;
+ SHMKeySet::iterator set_iter;
std::vector<int> subscripter_to_del;
std::vector<int>::iterator vector_iter;
@@ -171,12 +189,14 @@
subscripter_set = map_iter->second;
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
send_port = *set_iter;
-// printf("run_accept_sub_request send before %d \n", send_port);
+ printf("_proxy_pub send before %d \n", send_port);
if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
subscripter_to_del.push_back(send_port);
+ } else {
+printf("_proxy_pub send after: %d \n", send_port);
}
-// printf("run_accept_sub_request send after: %d \n", send_port);
+
}
@@ -200,9 +220,9 @@
size_t head_len;
const char *topic_delim = ",";
-//printf("server receive before\n");
+printf("run_pubsub_proxy server receive before\n");
while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) {
-//printf("server recv after: %s \n", buf);
+printf("run_pubsub_proxy server recv after: %s \n", buf);
if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
if(strcmp(action, "sub") == 0) {
// 璁㈤槄鏀寔澶氫富棰樿闃�
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
index 1c2ad64..1c12e53 100644
--- a/src/socket/include/dgram_mod_socket.h
+++ b/src/socket/include/dgram_mod_socket.h
@@ -25,6 +25,8 @@
*/
int dgram_mod_bind(void * _socket, int port);
+
+int dgram_mod_force_bind(void * _socket, int port);
/**
* 鍙戦�佷俊鎭�
* @port 鍙戦�佺粰璋�
diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h
index 7025f9e..0ca4cfd 100644
--- a/src/socket/include/shm_socket.h
+++ b/src/socket/include/shm_socket.h
@@ -44,6 +44,7 @@
shm_socket_type_t socket_type;
// 鏈湴port
int port;
+ bool force_bind;
shm_connection_status_t status;
SHMQueue<shm_msg_t> *queue;
SHMQueue<shm_msg_t> *remoteQueue;
@@ -65,6 +66,9 @@
int shm_socket_bind(shm_socket_t * socket, int port) ;
+int shm_socket_force_bind(shm_socket_t * socket, int port) ;
+
+
int shm_listen(shm_socket_t * socket) ;
shm_socket_t* shm_accept(shm_socket_t* socket);
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 52251e6..168eb65 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -23,6 +23,7 @@
shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
socket->socket_type = socket_type;
socket->port = -1;
+ socket->force_bind = false;
socket->dispatch_thread = 0;
socket->status = SHM_CONN_CLOSED;
@@ -46,6 +47,12 @@
return 0;
}
+int shm_socket_force_bind(shm_socket_t *socket, int port) {
+ socket->force_bind = true;
+ socket->port = port;
+ return 0;
+}
+
int shm_listen(shm_socket_t *socket) {
if (socket->socket_type != SHM_SOCKET_STREAM) {
@@ -60,7 +67,7 @@
socket->port = port;
} else {
- if (hashtable_get(hashtable, socket->port) != NULL) {
+ if (hashtable_get(hashtable, socket->port) != NULL && !socket->force_bind) {
err_exit(0, "key %d has already been in used!", socket->port);
}
}
@@ -144,7 +151,7 @@
socket->port = hashtable_alloc_key(hashtable);
} else {
- if (hashtable_get(hashtable, socket->port) != NULL) {
+ if (hashtable_get(hashtable, socket->port) != NULL && !socket->force_bind ) {
err_exit(0, "key %d has already been in used!", socket->port);
}
}
@@ -243,7 +250,8 @@
} else {
if (hashtable_get(hashtable, socket->port) != NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
+ if(!socket->force_bind)
+ err_exit(0, "key %d has already been in used!", socket->port);
}
}
@@ -299,7 +307,8 @@
} else {
if (hashtable_get(hashtable, socket->port) != NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
+ if(!socket->force_bind)
+ err_exit(0, "key %d has already been in used!", socket->port);
}
}
diff --git a/test_socket/dgram_mod_bus b/test_socket/dgram_mod_bus
index 1b3c3ef..82d76a1 100755
--- a/test_socket/dgram_mod_bus
+++ b/test_socket/dgram_mod_bus
Binary files differ
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index bddc7d5..3cb6d9f 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -1,13 +1,30 @@
#include "dgram_mod_socket.h"
#include "shm_mm.h"
#include "usg_common.h"
+#include "mm.h"
-void server(int port) {
- void *socket = dgram_mod_open_socket();
- dgram_mod_bind(socket, port);
+void sigint_handler(int sig) {
+ printf("sigint_handler\n");
+ hashtable_t *hashtable = mm_get_hashtable();
+ //hashtable_remove(hashtable, 8);
+ // dgram_mod_close_socket(server_socket);
+ //SHMQueue<ELEM_T>::remove_queues_include
+ exit(0);
+}
+
+
+void server(int port, bool restart) {
+ //signal(SIGINT, sigint_handler);
+ void * server_socket = dgram_mod_open_socket();
+
+ if(restart) {
+ dgram_mod_force_bind(server_socket, port);
+ } else {
+ dgram_mod_bind(server_socket, port);
+ }
+
- dgram_mod_start_bus(socket);
-
+ dgram_mod_start_bus(server_socket);
}
@@ -68,14 +85,20 @@
shm_init(512);
int port;
if (argc < 3) {
- fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+ fprintf(stderr, "Usage: %s %s|%s <PORT> ...\n", argv[0], "server", "client");
return 1;
}
port = atoi(argv[2]);
if (strcmp("server", argv[1]) == 0) {
- server(port);
+ if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
+ server(port, true);
+ }
+ else{
+ server(port, false);
+ }
+
}
if (strcmp("client", argv[1]) == 0)
diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep
index 5c2a703..bbe11f8 100755
--- a/test_socket/dgram_mod_req_rep
+++ b/test_socket/dgram_mod_req_rep
Binary files differ
diff --git a/test_socket/dgram_mod_survey b/test_socket/dgram_mod_survey
index f9dbccb..03db8ff 100755
--- a/test_socket/dgram_mod_survey
+++ b/test_socket/dgram_mod_survey
Binary files differ
--
Gitblit v1.8.0