.gitignore | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
demo/Makefile | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/hashtable.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/include/hashtable.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/include/shm_mm.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/shm_mm.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/dgram_mod_socket.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/include/dgram_mod_socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/test.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_socket/dgram_mod_bus.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
.gitignore
@@ -38,7 +38,7 @@ *.tar *.tar.gz .vscode/ .idea .idea/ build/ *.tmp demo/Makefile
@@ -15,6 +15,7 @@ PROGS = dgram_mod_req_rep dgram_mod_survey dgram_mod_bus build: $(PROGS) src/queue/hashtable.c
@@ -231,7 +231,7 @@ static inline void _hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) { static inline void _hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) { tailq_entry_t *item; for (int i = 0; i < MAPSIZE; i++) { tailq_header_t *my_tailq_head = hashtable->array[i] ; @@ -247,7 +247,7 @@ } void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb) { void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) { SemUtil::dec(hashtable->mutex); hashtable->readcnt++; if (hashtable->readcnt == 1) { @@ -273,7 +273,6 @@ SemUtil::inc(hashtable->mutex); } std::set<int> * hashtable_keyset(hashtable_t *hashtable) { @@ -306,5 +305,3 @@ SemUtil::inc(hashtable->wlock); return key; } src/queue/include/hashtable.h
@@ -31,7 +31,7 @@ * }); * */ void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb); void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb); // void hashtable_printall(hashtable_t *hashtable); src/queue/include/shm_mm.h
@@ -18,6 +18,8 @@ */ void shm_destroy(); int shm_alloc_key(); #ifdef __cplusplus } src/queue/shm_mm.c
@@ -10,4 +10,7 @@ mem_pool_destroy(); } int shm_alloc_key() { hashtable_t *hashtable = mm_get_hashtable(); return hashtable_alloc_key(hashtable); } src/socket/dgram_mod_socket.c
@@ -58,12 +58,15 @@ if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { subscripter_set = map_iter->second; delete subscripter_set; subscripter_set->clear(); mm_free((void *)subscripter_set); //delete subscripter_set; // printf("=============delete subscripter_set\n"); } topic_sub_map->clear(); mem_pool_free_by_key(BUS_MAP_KEY); } // printf("=============close socket\n"); shm_close_socket(socket->shm_socket); free(_socket); } @@ -121,8 +124,8 @@ int dgram_mod_start_bus(void * _socket) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; socket->mod = BUS; // printf("mem_pool_malloc_by_key before\n"); socket->topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); run_pubsub_proxy(socket); // pthread_t tid; // pthread_create(&tid, NULL, run_accept_sub_request, _socket); @@ -198,12 +201,12 @@ subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { send_port = *set_iter; printf("_proxy_pub send before %d \n", send_port); // printf("_proxy_pub send before %d \n", send_port); if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) { //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱 subscripter_to_del.push_back(send_port); } else { printf("_proxy_pub send after: %d \n", send_port); // printf("_proxy_pub send after: %d \n", send_port); } @@ -229,9 +232,9 @@ size_t head_len; const char *topic_delim = ","; printf("run_pubsub_proxy server receive before\n"); // printf("run_pubsub_proxy server receive before\n"); while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) { printf("run_pubsub_proxy server recv after: %s \n", buf); // printf("run_pubsub_proxy server recv after: %s \n", buf); if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { if(strcmp(action, "sub") == 0) { // 订阅支持多主题订阅 src/socket/include/dgram_mod_socket.h
@@ -25,7 +25,10 @@ */ int dgram_mod_bind(void * _socket, int port); /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int dgram_mod_force_bind(void * _socket, int port); /** * 发送信息 test/test.c
@@ -1,15 +1,5 @@ #include "usg_common.h" #include "usg_typedef.h" int test(char *src, int size) { int i = strlen(src); char dest[size]; strncpy(dest, src, size); puts(dest); return i; } int main() { char *str = "hello"; int r = test(str, strlen(str)); printf("%d\n", r); } test_socket/dgram_mod_bus.c
@@ -10,15 +10,15 @@ } void server(int port, bool restart) { // signal(SIGINT, sigint_handler); signal(SIGINT, sigint_handler); server_socket = dgram_mod_open_socket(); dgram_mod_force_bind(server_socket, port); // if(restart) { // } else { // // dgram_mod_bind(server_socket, port); // } if(restart) { dgram_mod_force_bind(server_socket, port); } else { dgram_mod_bind(server_socket, port); } dgram_mod_start_bus(server_socket);