From c479ef57baaaa28964fc3ec8d80ff99dffa7d49f Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期三, 10 十一月 2021 09:49:29 +0800 Subject: [PATCH] Fix the system hang issue when the app is killed contantly. --- src/net/net_mod_server_socket_wrapper.cpp | 2 src/bus_proxy_start.cpp | 3 src/shm/mm.cpp | 49 +++++- src/shm/hashtable.h | 3 src/shm/mm.h | 5 src/net/net_mod_socket.cpp | 1 src/socket/shm_socket.cpp | 37 ++-- src/msg_trigger/msg_mgr.h | 11 - src/socket/bus_server_socket.cpp | 75 +++++++++- src/bh_api.h | 2 src/socket/bus_server_socket.h | 4 src/bh_api.cpp | 130 ++++++++++++------ src/proc_def.h | 3 src/socket/shm_mod_socket.cpp | 13 + src/shm/hashtable.cpp | 56 +++++-- 15 files changed, 271 insertions(+), 123 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 37f8377..c74c80d 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -23,6 +23,7 @@ static pthread_t gTids; + static void *client_run_check(void *skptr) { pthread_detach(pthread_self()); @@ -36,7 +37,7 @@ sec = TIME_WAIT; nsec = 0; - sprintf(buf, "%s", "Success"); + sprintf(buf, "%s", STR_EXEC); data = net_mod_socket_int_get(gNetmod_socket); while(true) { @@ -45,9 +46,13 @@ BHFree(buf_temp, size); - rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data); - if (rv != 0) { - logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + if ((gNetmod_socket != NULL) && (gRun_stat != 0)) { + rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data); + if (rv != 0) { + logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + } + } else { + break; } } else { @@ -123,25 +128,25 @@ shm_mm_wrapper_init(SHM_RES_SIZE); #if defined(PRO_DE_SERIALIZE) - if (_input.proc_id != NULL) { + if (strlen(_input.proc_id) > 0) { count = strlen(_input.proc_id) + 1; min = count > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : count; strncpy(pData.proc_id, _input.proc_id, min); } - if (_input.name != NULL) { + if (strlen(_input.name) > 0) { count = strlen(_input.name) + 1; min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN -1) : count; strncpy(pData.name, _input.name, min); } - if (_input.public_info != NULL) { + if (strlen(_input.public_info) > 0) { count = strlen(_input.public_info) + 1; min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1) : count; strncpy(pData.public_info, _input.public_info, min); } - if (_input.private_info != NULL) { + if (strlen(_input.private_info) > 0) { count = strlen(_input.private_info) + 1; min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1): count; strncpy(pData.private_info, _input.private_info, min); @@ -172,11 +177,12 @@ } #endif - if (pData.proc_id == NULL) { + if (strlen(pData.proc_id) == 0) { rv = EBUS_INVALID_PARA; bus_errorset(rv); + gRun_stat = 0; pthread_mutex_unlock(&mutex); goto exit_entry; @@ -185,13 +191,13 @@ gNetmod_socket = net_mod_socket_open(); hashtable_t *hashtable = mm_get_hashtable(); key = hashtable_alloc_key(hashtable); + net_mod_socket_bind(gNetmod_socket, key); count = hashtable_alloc_key(hashtable); rv = hashtable_alloc_key(hashtable); net_mod_socket_int_set(gNetmod_socket, count); net_mod_socket_svr_set(gNetmod_socket, rv); sprintf(pData.int_info, "%d", count); sprintf(pData.svr_info, "%d", rv); - net_mod_socket_bind(gNetmod_socket, key); rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); @@ -539,21 +545,28 @@ if (rv == 0) { - ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); - mtr_list_num = ptr->num; + min = *(int *)buf; + if (min > 0) { + ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); + mtr_list_num = ptr->num; + + if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { + mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); + } - if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { - mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); + Proc_ptr = &(ptr->procData); + for(int i = 0; i < mtr_list_num; i++) { + mtr_list[i].proc_id = (Proc_ptr + i)->proc_id; + mtr_list[i].mq_id = ID_RSV; + mtr_list[i].abs_addr = ABS_ID_RSV; + mtr_list[i].ip = "127.0.0.1"; + mtr_list[i].port = 5000; + } + } else { + mtr_list_num = 0; } - - Proc_ptr = &(ptr->procData); - for(int i = 0; i < mtr_list_num; i++) { - mtr_list[i].proc_id = (Proc_ptr + i)->proc_id; - mtr_list[i].mq_id = ID_RSV; - mtr_list[i].abs_addr = ABS_ID_RSV; - mtr_list[i].ip = "127.0.0.1"; - mtr_list[i].port = 5000; - } + + free(buf); } exit_entry: @@ -684,27 +697,31 @@ if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) { mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]); } - - Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); - for(int i = 0; i < mpr_list_num; i++) { - mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id; - mpr_list[i].name = (Proc_ptr + i)->procData.name; - mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info; - mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info; - mpr_list[i].online = (Proc_ptr + i)->stat; - mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num; - - for(int j = 0; j < mpr_list[i].topic_list_num; j++) - { - if (j == 0) { - mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info; - } else if (j == 1) { - mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info; - } else if (j == 2) { - mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info; + + if (mpr_list_num > 0) { + Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); + for(int i = 0; i < mpr_list_num; i++) { + mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id; + mpr_list[i].name = (Proc_ptr + i)->procData.name; + mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info; + mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info; + mpr_list[i].online = (Proc_ptr + i)->stat; + mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num; + + for(int j = 0; j < mpr_list[i].topic_list_num; j++) + { + if (j == 0) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info; + } else if (j == 1) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info; + } else if (j == 2) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info; + } } } } + + free(buf); } errString = bus_strerror(0, 1); @@ -866,8 +883,8 @@ ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); mcr.mutable_errmsg()->set_errstring(errString); - *reply_len=mcr.ByteSizeLong(); - *reply=malloc(*reply_len); + *reply_len = mcr.ByteSizeLong(); + *reply = malloc(*reply_len); mcr.SerializePartialToArray(*reply,*reply_len); #else len = strlen(errString) + 1; @@ -1583,6 +1600,13 @@ exit_entry: errString = bus_strerror(0, 1); + if (rv != 0) { + if ((proc_id != NULL) && (proc_id_len != NULL)) { + *proc_id_len = 0; + *proc_id = NULL; + } + } + #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgRequestTopicReply mrt; mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -1707,6 +1731,12 @@ if (rv != 0) { rrr.topic = STR_RSV; rrr.data = STR_RSV; + + if ((proc_id != NULL) && (proc_id_len != NULL)) { + + *proc_id_len = 0; + *proc_id = NULL; + } } #if defined(PRO_DE_SERIALIZE) @@ -1814,13 +1844,21 @@ } -#if defined(MSG_HANDLER) int inter_key_get(void) { if (gNetmod_socket != NULL) return net_mod_socket_get_key(gNetmod_socket); - return 0; + return SHM_BUS_KEY; } -#endif + +void *socket_data_get(void) +{ + return gNetmod_socket; +} + +void inter_key_set(int key) +{ + net_mod_socket_bind(gNetmod_socket, key); +} diff --git a/src/bh_api.h b/src/bh_api.h index a231b0c..2a9e39a 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -110,6 +110,8 @@ int BHGetLastError(void **msg, int *msg_len); int inter_key_get(void); +void inter_key_set(int key); +void *socket_data_get(void); #ifdef __cplusplus } #endif diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp index c5eaaf5..6029e6e 100644 --- a/src/bus_proxy_start.cpp +++ b/src/bus_proxy_start.cpp @@ -10,6 +10,7 @@ #include <errno.h> #include <getopt.h> #include <stdlib.h> +#include "proc_def.h" #include "msg_mgr.h" using namespace std; @@ -103,7 +104,7 @@ } } - sleep(10); + sleep(WT_INT); } return NULL; diff --git a/src/msg_trigger/msg_mgr.h b/src/msg_trigger/msg_mgr.h index 809808b..686d47c 100644 --- a/src/msg_trigger/msg_mgr.h +++ b/src/msg_trigger/msg_mgr.h @@ -1,9 +1,7 @@ #ifndef __MSG_MGR_DEF_ #define __MSG_MGR_DEF_ -#ifdef __cplusplus -extern "C" { -#endif +#include "shm_allocator.h" #define SEM_TYPE_ID 0 #define RSV_TYPE_ID 1 @@ -37,16 +35,15 @@ } Msg_info; -#ifdef __cplusplus -} -#endif - int msg_init(void); void msg_distrib(int msg_id, Msg_info *message); int get_msg_info(int msg_id, Msg_info *message); void *sem_msg_handler(void *skptr); void msg_info_set(int index, Msg_info msg_obj); +typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > recvbuf_val; +typedef std::map<int, recvbuf_val *, std::less<int>, SHM_STL_Allocator<std::pair<int, recvbuf_val *> > > recvbuf_data; + #endif //end of file diff --git a/src/net/net_mod_server_socket_wrapper.cpp b/src/net/net_mod_server_socket_wrapper.cpp index e1ad04d..13b7f37 100644 --- a/src/net/net_mod_server_socket_wrapper.cpp +++ b/src/net/net_mod_server_socket_wrapper.cpp @@ -2,13 +2,11 @@ #include "net_mod_server_socket_wrapper.h" void *net_mod_server_socket_open(int port) { - printf("====net_mod_server_socket_open\n"); NetModServerSocket *sockt = new NetModServerSocket(port); return (void *)sockt; } void net_mod_server_socket_close(void *_sockt) { - printf("====net_mod_server_socket_close\n"); NetModServerSocket *sockt = (NetModServerSocket *)_sockt; delete sockt; diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index c48c33b..acd9053 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -121,7 +121,6 @@ if (mpool == NULL) { /* If first call from this thread, allocate buffer for thread, and save its location */ - logger->debug("Create connPool"); mpool = new NetConnPool(); if (mpool == NULL) { LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc"); diff --git a/src/proc_def.h b/src/proc_def.h index 0fa2fe9..4b6bf41 100644 --- a/src/proc_def.h +++ b/src/proc_def.h @@ -16,11 +16,13 @@ #define PROC_QUE_TCS 4 #define PROC_QUE_STCS 5 #define PROC_QUE_ATCS 6 +#define PROC_REG_BUF 7 #define ID_RSV 16 #define ABS_ID_RSV 18 #define STR_MAGIC "," +#define STR_EXEC "Success" typedef struct _ProcInfo { #if 0 @@ -64,6 +66,7 @@ } ProcInfo_query; #define STR_RSV "empty" +#define WT_INT 10 #ifdef __cplusplus } diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp index 62d052e..14daaf0 100755 --- a/src/shm/hashtable.cpp +++ b/src/shm/hashtable.cpp @@ -2,6 +2,7 @@ #include "hashtable.h" #include "mm.h" #include "svsem.h" +#include "bh_api.h" #include "logger_factory.h" #include <set> #include <functional> @@ -52,11 +53,17 @@ } else { - TAILQ_FOREACH(item, my_tailq_head, joint) { - if (key == item->key) + if ((item != NULL) && (key == item->key)) { return item->value; + } else { + mm_free(my_tailq_head); + hashtable->array[code] = NULL; + hashtable->queueCount--; + + return NULL; + } } } return NULL; @@ -71,7 +78,12 @@ tailq_header_t *my_tailq_head = hashtable->array[code] ; if ( my_tailq_head == NULL) { + if (inter_key_get() == 0) { + inter_key_set(key); + } + my_tailq_head = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t )); + TAILQ_INIT(my_tailq_head); hashtable->array[code] = my_tailq_head; goto putnew; @@ -79,27 +91,30 @@ TAILQ_FOREACH(item, my_tailq_head, joint) { - if (key ==item->key) + if ((item != NULL) && (key == item->key)) { oldvalue = item -> value; item->key= key; - item -> value = value; + item->value = value; return oldvalue; - } + } } putnew: + + if (inter_key_get() == 0) { + inter_key_set(key); + } item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t)); item->key = key; - item -> value = value; + item->value = value; TAILQ_INSERT_TAIL(my_tailq_head, item, joint); return NULL; } -void *hashtable_remove(hashtable_t *hashtable, int key) +void hashtable_remove(hashtable_t *hashtable, int key) { size_t code = hashcode(key); tailq_entry_t *item; - void *oldvalue; int rv; if( (rv = svsem_uni_wait(hashtable->mutex)) != 0) { @@ -111,29 +126,31 @@ if((rv = svsem_post(hashtable->mutex)) != 0) { LoggerFactory::getLogger()->error(errno, "hashtable_remove\n"); } - return NULL; + return; } else { - for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint)) + for (item = TAILQ_FIRST(my_tailq_head); item != NULL;) { - if (key == item->key) - { - oldvalue = item->value; - /* Remove the item from the tail queue. */ - TAILQ_REMOVE(my_tailq_head, item, joint); + /* Remove the item from the tail queue. */ + TAILQ_REMOVE(my_tailq_head, item, joint); - /* mm_free the item as we don't need it anymore. */ - mm_free(item); + /* mm_free the item as we don't need it anymore. */ + mm_free(item); + + item = TAILQ_NEXT(item, joint); + if (item == NULL) { + mm_free(my_tailq_head); + hashtable->array[code] = NULL; hashtable->queueCount--; svsem_post(hashtable->mutex); - return oldvalue; } + return; } if((rv = svsem_post(hashtable->mutex)) != 0) { LoggerFactory::getLogger()->error(errno, "hashtable_remove\n"); } - return NULL; + return; } } @@ -217,6 +234,7 @@ key++; } // 鍗犵敤key + _hashtable_put(hashtable, key, (void *)1); hashtable->currentKey = key; diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h index 7bb6eac..8909ae5 100755 --- a/src/shm/hashtable.h +++ b/src/shm/hashtable.h @@ -31,7 +31,8 @@ void hashtable_put(hashtable_t *hashtable, int key, void *value) ; bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) ; -void *hashtable_remove(hashtable_t *hashtable, int key); +static inline void _hashtable_remove(hashtable_t *hashtable, int key); +void hashtable_remove(hashtable_t *hashtable, int key); void hashtable_removeall(hashtable_t *hashtable); int hashtable_get_queue_count(hashtable_t *hashtable) ; /** diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp index 13ec443..5837906 100644 --- a/src/shm/mm.cpp +++ b/src/shm/mm.cpp @@ -4,6 +4,7 @@ #include "mm.h" #include "sem_util.h" #include "logger_factory.h" +#include "bh_api.h" #include <sys/sem.h> #include <sys/shm.h> @@ -135,10 +136,10 @@ /* * mm_free - Free a block */ -void mm_free(void *ptr) +void mm_free(void *ptr, int enable) { - if (ptr == 0) - return; + if ((ptr == 0) || (*(size_t *)(ptr - SIZE_T_SIZE) == 0x00)) + return; /* *if (!is_allocated(ptr) ) { @@ -147,15 +148,19 @@ *} */ - SemUtil::dec_uni(mutex); + if (enable == true) { + SemUtil::dec_uni(mutex); + } + ptr -= SIZE_T_SIZE; size_t size = GET_SIZE(HDRP(ptr)); PUT(HDRP(ptr), PACK(size, 0)); PUT(FTRP(ptr), PACK(size, 0)); + *(size_t *)ptr = 0x00; coalesce(ptr); - SemUtil::inc(mutex); + if (enable == true) { + SemUtil::inc(mutex); + } } - - /* * mm_realloc - Naive implementation of realloc @@ -389,15 +394,13 @@ PUT(HDRP(bp), PACK(size, 0)); /* Free block header */ //line:vm:mm:freeblockhdr PUT(FTRP(bp), PACK(size, 0)); /* Free block footer */ //line:vm:mm:freeblockftr PUT(HDRP(NEXT_BLKP(bp)), PACK(0, 1)); /* New epilogue header */ //line:vm:mm:newepihdr - + /* Coalesce if the previous block was free */ return coalesce(bp); //line:vm:mm:returnblock } static void insert_fblock (void *bp) { - //鍚庤繘鍏堝嚭鐨勬柟寮忔彃鍏ワ紝鍗虫彃鍏ラ摼琛ㄥご浣嶇疆 - // insert into the header of the free list PUT_PTR(SUCCRP(bp), NEXT_FBLKP(heap_listp)); //the successor of bp point to the old first free block PUT_PTR(PREDRP(NEXT_FBLKP(heap_listp)), bp); //the predecessor of the old first free block point to bp @@ -489,7 +492,10 @@ PUT(FTRP(bp), PACK(csize, 1)); rm_fblock(bp); } - return bp; + + *(size_t *)bp = inter_key_get(); + + return (bp + SIZE_T_SIZE); } static int is_allocated(void *ptr) @@ -514,6 +520,24 @@ } +void find_mm_data(int val) +{ + void *bp = heap_listp; + + SemUtil::dec(mutex); + for (bp = heap_listp; GET_SIZE(HDRP(bp)) > 0; bp = NEXT_BLKP(bp)) + { + if (GET_ALLOC(HDRP(bp))) { + if ((*(size_t *)bp) == val) { + mm_free(bp + SIZE_T_SIZE, false); + } + } + } + SemUtil::inc(mutex); + + return; +} + /* * find_fit - Find a fit for a block with size bytes */ @@ -526,6 +550,9 @@ if (!GET_ALLOC(HDRP(bp)) && (size <= GET_SIZE(HDRP(bp)))) { return bp; + } else if (GET_ALLOC(HDRP(bp)) && (GET_SIZE(HDRP(bp)) == 0)) + { + break; } } return NULL; /* No fit */ diff --git a/src/shm/mm.h b/src/shm/mm.h index 6dbb979..d7d9a84 100644 --- a/src/shm/mm.h +++ b/src/shm/mm.h @@ -8,9 +8,10 @@ extern bool mm_init(size_t heap_size); extern bool mm_destroy(void); -extern void *mm_malloc (size_t size); -extern void mm_free (void *ptr); +void *mm_malloc (size_t size); +void mm_free (void *ptr, int enable = true); extern void *mm_realloc(void *ptr, size_t size); +extern void find_mm_data(int val); extern void * mm_get_by_key(int key); diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 315c356..0a44949 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -2,6 +2,7 @@ #include "bus_server_socket.h" #include "shm_mod_socket.h" #include "shm_socket.h" +#include "msg_mgr.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); @@ -303,7 +304,7 @@ LinkNode *pNew = NULL; LinkNode *pCur = NULL; - pNew = new(LinkNode); + pNew = (LinkNode *)malloc(sizeof(LinkNode)); pNew->data = aData; pNew->data_fix = bData; pNew->count = 0; @@ -340,7 +341,7 @@ head = pCur->next; - delete(pCur); + free(pCur); pCur = head; @@ -353,7 +354,7 @@ pCur->next = pNext->next; pCur = pNext->next; - delete(pNext); + free(pNext); } else { pCur = pNext; @@ -559,7 +560,10 @@ procQuePart->erase(buf_temp); } + BusServerSocket::buf_data_remove(key); + find_mm_data(key); } + } else if (flag == PROC_REG_TCS) { ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); @@ -709,7 +713,7 @@ sprintf(data_buf, "%d", count); shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG); - } else { + } else if (flag == PROC_QUE_ATCS) { int val; int temp = 0; @@ -853,6 +857,17 @@ shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); free(last_buf); + } else { + + char *ptr = NULL; + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + + data1 = atoi(buf_temp); + ptr = strstr(buf_temp, STR_MAGIC); + if (ptr != NULL) { + data2 = atoi(ptr + 1); + } + BusServerSocket::buf_data_set(data1, data2); } } @@ -888,7 +903,7 @@ int key; int flag; char buf_temp[MAX_STR_LEN] = { 0x00 }; - char * action, *topic, *topics, *buf, *content; + char *action, *topic, *topics, *buf, *content; size_t head_len; bus_head_t head; int val; @@ -935,7 +950,8 @@ } else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ - || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { + || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \ + || (strcmp(action, "bufreg") == 0)) { content = topics + head.topic_size; if (strcmp(action, "reg") == 0) { @@ -957,15 +973,19 @@ flag = PROC_QUE_STCS; - } else { + } else if (strcmp(action, "atcsque") == 0) { flag = PROC_QUE_ATCS; + + } else { + + flag = PROC_REG_BUF; } if (flag == PROC_REG) { memcpy(buf_temp, content, strlen(content) + 1); - + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { val = proc_que_iter->second; @@ -996,6 +1016,7 @@ hashtable_t *hashtable = mm_get_hashtable(); void *data_ptr = hashtable_get(hashtable, val); + if (data_ptr != NULL) { if (data_ptr != (void *)1) { queue = (LockFreeQueue<shm_packet_t> *)data_ptr; @@ -1011,3 +1032,41 @@ } +void BusServerSocket::buf_data_set(int data, int val) { + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + val_buf = data_iter->second; + } else { + void *set_ptr = mm_malloc(sizeof(recvbuf_val)); + + val_buf = new(set_ptr) recvbuf_val; + recvBuf_data.insert({data, val_buf}); + } + + val_buf->insert(val); +} + +void BusServerSocket::buf_data_remove(int data) { + + int val; + recvbuf_val *val_buf; + recvbuf_data::iterator data_iter; + recvbuf_val::iterator val_iter; + + if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) { + + val_buf = data_iter->second; + for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) { + val = *val_iter; + + BusServerSocket::_data_remove(val); + } + + recvBuf_data.erase(data); + } +} + + diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h index 7b94ba1..4cd0140 100644 --- a/src/socket/bus_server_socket.h +++ b/src/socket/bus_server_socket.h @@ -8,6 +8,7 @@ #include "sem_util.h" #include "logger_factory.h" #include "key_def.h" +#include "msg_mgr.h" #include "socket_def.h" #include <set> @@ -62,6 +63,7 @@ // pthread_t recv_thread; // <涓婚锛� 璁㈤槄鑰�> SHMTopicSubMap *topic_sub_map; + recvbuf_data recvBuf_data; private: int destroy(); @@ -122,6 +124,8 @@ int get_key() ; void _data_remove(int val); + void buf_data_set(int data, int val); + void buf_data_remove(int data); }; diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index 6139d34..6058308 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -69,6 +69,10 @@ memcpy(head.action, "atcsque", sizeof(head.action)); + } else if (flag == PROC_REG_BUF) { + + memcpy(head.action, "bufreg", sizeof(head.action)); + } else { return -1; @@ -115,7 +119,7 @@ ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000; - if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) { ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG); @@ -127,7 +131,7 @@ } else if (timeout_ms == 0) { - if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) { ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG); @@ -139,7 +143,7 @@ } else { - if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) { ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1); @@ -165,7 +169,6 @@ int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) { int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set); if(rv == 0) { - logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key); return 0; } @@ -183,7 +186,6 @@ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set); if(rv == 0) { - logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); return 0; } @@ -202,7 +204,6 @@ int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); if(rv == 0) { - logger->debug("ShmModSocket::sendandrecv: sendandrecv to %d success.\n", send_key); return 0; } diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 709505f..ae9a98b 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -2,10 +2,12 @@ #include "socket_def.h" #include "hashtable.h" #include "logger_factory.h" +#include "net_mod_socket_wrapper.h" #include <map> #include <cassert> #include "bus_error.h" #include "sole.h" +#include "bh_api.h" #include "shm_mm.h" #include "key_def.h" @@ -105,9 +107,6 @@ static int _shm_socket_close_(shm_socket_t *sockt) { - - int rv, i; - hashtable_t *hashtable = mm_get_hashtable(); // if(sockt->key != 0) { // auto it = shmQueueStMap->find(sockt->key); @@ -117,18 +116,6 @@ // } // } - if(sockt->queue != NULL) { - sockt->queue->close(); - for( i = 0; i < sockt->queue->size(); i++) { - mm_free((*(sockt->queue))[i].buf); - logger->info("======= %d free queue element buf\n", sockt->key); - } - sleep(1); - - hashtable_remove(hashtable, sockt->key); - // sockt->queue = NULL; - } - pthread_mutex_destroy(&(sockt->mutex) ); free(sockt); return 0; @@ -404,13 +391,16 @@ const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { - + int data; + int timeout_ms; + char data_buf[MAX_STR_LEN] = { 0x00 }; int rv = 0, tryn = 16; shm_packet_t sendpak; shm_packet_t recvpak; std::map<int, shm_packet_t>::iterator recvbufIter; shm_socket_t *tmp_socket = NULL; - + hashtable_t *hashtable = mm_get_hashtable(); + rv = pthread_once(&_once_, _create_threadlocal_socket_key_); if (rv != 0) { logger->error(rv, "shm_sendandrecv pthread_once"); @@ -421,7 +411,15 @@ if (tmp_socket == NULL) { tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM); - + + tmp_socket->key = hashtable_alloc_key(hashtable); + data = inter_key_get(); + timeout_ms = timeout->tv_sec * 1000 + 3000; + sprintf(data_buf, "%d, %d", data, tmp_socket->key); + if (socket_data_get() != NULL) { + net_mod_socket_reg(socket_data_get(), data_buf, strlen(data_buf), NULL, 0, timeout_ms, PROC_REG_BUF); + } + rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); if ( rv != 0) { logger->error(rv, "shm_sendandrecv : pthread_setspecific"); @@ -564,6 +562,7 @@ if (sockt->key == 0) { sockt->key = hashtable_alloc_key(hashtable); } + sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); if(sockt->queue == NULL ) { logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); @@ -729,7 +728,7 @@ count += strlen(ptr->int_info) + 1; memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1); count += strlen(ptr->svr_info) + 1; - + *counter = count; } -- Gitblit v1.8.0