From c479ef57baaaa28964fc3ec8d80ff99dffa7d49f Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期三, 10 十一月 2021 09:49:29 +0800
Subject: [PATCH] Fix the system hang issue when the app is killed contantly.
---
src/net/net_mod_server_socket_wrapper.cpp | 2
src/bus_proxy_start.cpp | 3
src/shm/mm.cpp | 49 +++++-
src/shm/hashtable.h | 3
src/shm/mm.h | 5
src/net/net_mod_socket.cpp | 1
src/socket/shm_socket.cpp | 37 ++--
src/msg_trigger/msg_mgr.h | 11 -
src/socket/bus_server_socket.cpp | 75 +++++++++-
src/bh_api.h | 2
src/socket/bus_server_socket.h | 4
src/bh_api.cpp | 130 ++++++++++++------
src/proc_def.h | 3
src/socket/shm_mod_socket.cpp | 13 +
src/shm/hashtable.cpp | 56 +++++--
15 files changed, 271 insertions(+), 123 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 37f8377..c74c80d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -23,6 +23,7 @@
static pthread_t gTids;
+
static void *client_run_check(void *skptr) {
pthread_detach(pthread_self());
@@ -36,7 +37,7 @@
sec = TIME_WAIT;
nsec = 0;
- sprintf(buf, "%s", "Success");
+ sprintf(buf, "%s", STR_EXEC);
data = net_mod_socket_int_get(gNetmod_socket);
while(true) {
@@ -45,9 +46,13 @@
BHFree(buf_temp, size);
- rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
- if (rv != 0) {
- logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
+ if ((gNetmod_socket != NULL) && (gRun_stat != 0)) {
+ rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
+ if (rv != 0) {
+ logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
+ }
+ } else {
+ break;
}
} else {
@@ -123,25 +128,25 @@
shm_mm_wrapper_init(SHM_RES_SIZE);
#if defined(PRO_DE_SERIALIZE)
- if (_input.proc_id != NULL) {
+ if (strlen(_input.proc_id) > 0) {
count = strlen(_input.proc_id) + 1;
min = count > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : count;
strncpy(pData.proc_id, _input.proc_id, min);
}
- if (_input.name != NULL) {
+ if (strlen(_input.name) > 0) {
count = strlen(_input.name) + 1;
min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN -1) : count;
strncpy(pData.name, _input.name, min);
}
- if (_input.public_info != NULL) {
+ if (strlen(_input.public_info) > 0) {
count = strlen(_input.public_info) + 1;
min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1) : count;
strncpy(pData.public_info, _input.public_info, min);
}
- if (_input.private_info != NULL) {
+ if (strlen(_input.private_info) > 0) {
count = strlen(_input.private_info) + 1;
min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1): count;
strncpy(pData.private_info, _input.private_info, min);
@@ -172,11 +177,12 @@
}
#endif
- if (pData.proc_id == NULL) {
+ if (strlen(pData.proc_id) == 0) {
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
+ gRun_stat = 0;
pthread_mutex_unlock(&mutex);
goto exit_entry;
@@ -185,13 +191,13 @@
gNetmod_socket = net_mod_socket_open();
hashtable_t *hashtable = mm_get_hashtable();
key = hashtable_alloc_key(hashtable);
+ net_mod_socket_bind(gNetmod_socket, key);
count = hashtable_alloc_key(hashtable);
rv = hashtable_alloc_key(hashtable);
net_mod_socket_int_set(gNetmod_socket, count);
net_mod_socket_svr_set(gNetmod_socket, rv);
sprintf(pData.int_info, "%d", count);
sprintf(pData.svr_info, "%d", rv);
- net_mod_socket_bind(gNetmod_socket, key);
rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
@@ -539,21 +545,28 @@
if (rv == 0) {
- ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
- mtr_list_num = ptr->num;
+ min = *(int *)buf;
+ if (min > 0) {
+ ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
+ mtr_list_num = ptr->num;
+
+ if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
+ mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
+ }
- if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
- mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
+ Proc_ptr = &(ptr->procData);
+ for(int i = 0; i < mtr_list_num; i++) {
+ mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
+ mtr_list[i].mq_id = ID_RSV;
+ mtr_list[i].abs_addr = ABS_ID_RSV;
+ mtr_list[i].ip = "127.0.0.1";
+ mtr_list[i].port = 5000;
+ }
+ } else {
+ mtr_list_num = 0;
}
-
- Proc_ptr = &(ptr->procData);
- for(int i = 0; i < mtr_list_num; i++) {
- mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
- mtr_list[i].mq_id = ID_RSV;
- mtr_list[i].abs_addr = ABS_ID_RSV;
- mtr_list[i].ip = "127.0.0.1";
- mtr_list[i].port = 5000;
- }
+
+ free(buf);
}
exit_entry:
@@ -684,27 +697,31 @@
if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
}
-
- Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
- for(int i = 0; i < mpr_list_num; i++) {
- mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
- mpr_list[i].name = (Proc_ptr + i)->procData.name;
- mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
- mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
- mpr_list[i].online = (Proc_ptr + i)->stat;
- mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
-
- for(int j = 0; j < mpr_list[i].topic_list_num; j++)
- {
- if (j == 0) {
- mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
- } else if (j == 1) {
- mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
- } else if (j == 2) {
- mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
+
+ if (mpr_list_num > 0) {
+ Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
+ for(int i = 0; i < mpr_list_num; i++) {
+ mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
+ mpr_list[i].name = (Proc_ptr + i)->procData.name;
+ mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
+ mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
+ mpr_list[i].online = (Proc_ptr + i)->stat;
+ mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
+
+ for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+ {
+ if (j == 0) {
+ mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
+ } else if (j == 1) {
+ mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
+ } else if (j == 2) {
+ mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
+ }
}
}
}
+
+ free(buf);
}
errString = bus_strerror(0, 1);
@@ -866,8 +883,8 @@
::bhome_msg::MsgCommonReply mcr;
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
mcr.mutable_errmsg()->set_errstring(errString);
- *reply_len=mcr.ByteSizeLong();
- *reply=malloc(*reply_len);
+ *reply_len = mcr.ByteSizeLong();
+ *reply = malloc(*reply_len);
mcr.SerializePartialToArray(*reply,*reply_len);
#else
len = strlen(errString) + 1;
@@ -1583,6 +1600,13 @@
exit_entry:
errString = bus_strerror(0, 1);
+ if (rv != 0) {
+ if ((proc_id != NULL) && (proc_id_len != NULL)) {
+ *proc_id_len = 0;
+ *proc_id = NULL;
+ }
+ }
+
#if defined(PRO_DE_SERIALIZE)
::bhome_msg::MsgRequestTopicReply mrt;
mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -1707,6 +1731,12 @@
if (rv != 0) {
rrr.topic = STR_RSV;
rrr.data = STR_RSV;
+
+ if ((proc_id != NULL) && (proc_id_len != NULL)) {
+
+ *proc_id_len = 0;
+ *proc_id = NULL;
+ }
}
#if defined(PRO_DE_SERIALIZE)
@@ -1814,13 +1844,21 @@
}
-#if defined(MSG_HANDLER)
int inter_key_get(void)
{
if (gNetmod_socket != NULL)
return net_mod_socket_get_key(gNetmod_socket);
- return 0;
+ return SHM_BUS_KEY;
}
-#endif
+
+void *socket_data_get(void)
+{
+ return gNetmod_socket;
+}
+
+void inter_key_set(int key)
+{
+ net_mod_socket_bind(gNetmod_socket, key);
+}
diff --git a/src/bh_api.h b/src/bh_api.h
index a231b0c..2a9e39a 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -110,6 +110,8 @@
int BHGetLastError(void **msg, int *msg_len);
int inter_key_get(void);
+void inter_key_set(int key);
+void *socket_data_get(void);
#ifdef __cplusplus
}
#endif
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
index c5eaaf5..6029e6e 100644
--- a/src/bus_proxy_start.cpp
+++ b/src/bus_proxy_start.cpp
@@ -10,6 +10,7 @@
#include <errno.h>
#include <getopt.h>
#include <stdlib.h>
+#include "proc_def.h"
#include "msg_mgr.h"
using namespace std;
@@ -103,7 +104,7 @@
}
}
- sleep(10);
+ sleep(WT_INT);
}
return NULL;
diff --git a/src/msg_trigger/msg_mgr.h b/src/msg_trigger/msg_mgr.h
index 809808b..686d47c 100644
--- a/src/msg_trigger/msg_mgr.h
+++ b/src/msg_trigger/msg_mgr.h
@@ -1,9 +1,7 @@
#ifndef __MSG_MGR_DEF_
#define __MSG_MGR_DEF_
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include "shm_allocator.h"
#define SEM_TYPE_ID 0
#define RSV_TYPE_ID 1
@@ -37,16 +35,15 @@
} Msg_info;
-#ifdef __cplusplus
-}
-#endif
-
int msg_init(void);
void msg_distrib(int msg_id, Msg_info *message);
int get_msg_info(int msg_id, Msg_info *message);
void *sem_msg_handler(void *skptr);
void msg_info_set(int index, Msg_info msg_obj);
+typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > recvbuf_val;
+typedef std::map<int, recvbuf_val *, std::less<int>, SHM_STL_Allocator<std::pair<int, recvbuf_val *> > > recvbuf_data;
+
#endif //end of file
diff --git a/src/net/net_mod_server_socket_wrapper.cpp b/src/net/net_mod_server_socket_wrapper.cpp
index e1ad04d..13b7f37 100644
--- a/src/net/net_mod_server_socket_wrapper.cpp
+++ b/src/net/net_mod_server_socket_wrapper.cpp
@@ -2,13 +2,11 @@
#include "net_mod_server_socket_wrapper.h"
void *net_mod_server_socket_open(int port) {
- printf("====net_mod_server_socket_open\n");
NetModServerSocket *sockt = new NetModServerSocket(port);
return (void *)sockt;
}
void net_mod_server_socket_close(void *_sockt) {
- printf("====net_mod_server_socket_close\n");
NetModServerSocket *sockt = (NetModServerSocket *)_sockt;
delete sockt;
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index c48c33b..acd9053 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -121,7 +121,6 @@
if (mpool == NULL)
{
/* If first call from this thread, allocate buffer for thread, and save its location */
- logger->debug("Create connPool");
mpool = new NetConnPool();
if (mpool == NULL) {
LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
diff --git a/src/proc_def.h b/src/proc_def.h
index 0fa2fe9..4b6bf41 100644
--- a/src/proc_def.h
+++ b/src/proc_def.h
@@ -16,11 +16,13 @@
#define PROC_QUE_TCS 4
#define PROC_QUE_STCS 5
#define PROC_QUE_ATCS 6
+#define PROC_REG_BUF 7
#define ID_RSV 16
#define ABS_ID_RSV 18
#define STR_MAGIC ","
+#define STR_EXEC "Success"
typedef struct _ProcInfo {
#if 0
@@ -64,6 +66,7 @@
} ProcInfo_query;
#define STR_RSV "empty"
+#define WT_INT 10
#ifdef __cplusplus
}
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index 62d052e..14daaf0 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -2,6 +2,7 @@
#include "hashtable.h"
#include "mm.h"
#include "svsem.h"
+#include "bh_api.h"
#include "logger_factory.h"
#include <set>
#include <functional>
@@ -52,11 +53,17 @@
}
else
{
-
TAILQ_FOREACH(item, my_tailq_head, joint)
{
- if (key == item->key)
+ if ((item != NULL) && (key == item->key)) {
return item->value;
+ } else {
+ mm_free(my_tailq_head);
+ hashtable->array[code] = NULL;
+ hashtable->queueCount--;
+
+ return NULL;
+ }
}
}
return NULL;
@@ -71,7 +78,12 @@
tailq_header_t *my_tailq_head = hashtable->array[code] ;
if ( my_tailq_head == NULL)
{
+ if (inter_key_get() == 0) {
+ inter_key_set(key);
+ }
+
my_tailq_head = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t ));
+
TAILQ_INIT(my_tailq_head);
hashtable->array[code] = my_tailq_head;
goto putnew;
@@ -79,27 +91,30 @@
TAILQ_FOREACH(item, my_tailq_head, joint)
{
- if (key ==item->key)
+ if ((item != NULL) && (key == item->key))
{
oldvalue = item -> value;
item->key= key;
- item -> value = value;
+ item->value = value;
return oldvalue;
- }
+ }
}
putnew:
+
+ if (inter_key_get() == 0) {
+ inter_key_set(key);
+ }
item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t));
item->key = key;
- item -> value = value;
+ item->value = value;
TAILQ_INSERT_TAIL(my_tailq_head, item, joint);
return NULL;
}
-void *hashtable_remove(hashtable_t *hashtable, int key)
+void hashtable_remove(hashtable_t *hashtable, int key)
{
size_t code = hashcode(key);
tailq_entry_t *item;
- void *oldvalue;
int rv;
if( (rv = svsem_uni_wait(hashtable->mutex)) != 0) {
@@ -111,29 +126,31 @@
if((rv = svsem_post(hashtable->mutex)) != 0) {
LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
}
- return NULL;
+ return;
} else {
- for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
+ for (item = TAILQ_FIRST(my_tailq_head); item != NULL;)
{
- if (key == item->key)
- {
- oldvalue = item->value;
- /* Remove the item from the tail queue. */
- TAILQ_REMOVE(my_tailq_head, item, joint);
+ /* Remove the item from the tail queue. */
+ TAILQ_REMOVE(my_tailq_head, item, joint);
- /* mm_free the item as we don't need it anymore. */
- mm_free(item);
+ /* mm_free the item as we don't need it anymore. */
+ mm_free(item);
+
+ item = TAILQ_NEXT(item, joint);
+ if (item == NULL) {
+ mm_free(my_tailq_head);
+ hashtable->array[code] = NULL;
hashtable->queueCount--;
svsem_post(hashtable->mutex);
- return oldvalue;
}
+ return;
}
if((rv = svsem_post(hashtable->mutex)) != 0) {
LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
}
- return NULL;
+ return;
}
}
@@ -217,6 +234,7 @@
key++;
}
// 鍗犵敤key
+
_hashtable_put(hashtable, key, (void *)1);
hashtable->currentKey = key;
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 7bb6eac..8909ae5 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -31,7 +31,8 @@
void hashtable_put(hashtable_t *hashtable, int key, void *value) ;
bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) ;
-void *hashtable_remove(hashtable_t *hashtable, int key);
+static inline void _hashtable_remove(hashtable_t *hashtable, int key);
+void hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_removeall(hashtable_t *hashtable);
int hashtable_get_queue_count(hashtable_t *hashtable) ;
/**
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index 13ec443..5837906 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -4,6 +4,7 @@
#include "mm.h"
#include "sem_util.h"
#include "logger_factory.h"
+#include "bh_api.h"
#include <sys/sem.h>
#include <sys/shm.h>
@@ -135,10 +136,10 @@
/*
* mm_free - Free a block
*/
-void mm_free(void *ptr)
+void mm_free(void *ptr, int enable)
{
- if (ptr == 0)
- return;
+ if ((ptr == 0) || (*(size_t *)(ptr - SIZE_T_SIZE) == 0x00))
+ return;
/*
*if (!is_allocated(ptr) ) {
@@ -147,15 +148,19 @@
*}
*/
- SemUtil::dec_uni(mutex);
+ if (enable == true) {
+ SemUtil::dec_uni(mutex);
+ }
+ ptr -= SIZE_T_SIZE;
size_t size = GET_SIZE(HDRP(ptr));
PUT(HDRP(ptr), PACK(size, 0));
PUT(FTRP(ptr), PACK(size, 0));
+ *(size_t *)ptr = 0x00;
coalesce(ptr);
- SemUtil::inc(mutex);
+ if (enable == true) {
+ SemUtil::inc(mutex);
+ }
}
-
-
/*
* mm_realloc - Naive implementation of realloc
@@ -389,15 +394,13 @@
PUT(HDRP(bp), PACK(size, 0)); /* Free block header */ //line:vm:mm:freeblockhdr
PUT(FTRP(bp), PACK(size, 0)); /* Free block footer */ //line:vm:mm:freeblockftr
PUT(HDRP(NEXT_BLKP(bp)), PACK(0, 1)); /* New epilogue header */ //line:vm:mm:newepihdr
-
+
/* Coalesce if the previous block was free */
return coalesce(bp); //line:vm:mm:returnblock
}
static void insert_fblock (void *bp)
{
- //鍚庤繘鍏堝嚭鐨勬柟寮忔彃鍏ワ紝鍗虫彃鍏ラ摼琛ㄥご浣嶇疆
-
// insert into the header of the free list
PUT_PTR(SUCCRP(bp), NEXT_FBLKP(heap_listp)); //the successor of bp point to the old first free block
PUT_PTR(PREDRP(NEXT_FBLKP(heap_listp)), bp); //the predecessor of the old first free block point to bp
@@ -489,7 +492,10 @@
PUT(FTRP(bp), PACK(csize, 1));
rm_fblock(bp);
}
- return bp;
+
+ *(size_t *)bp = inter_key_get();
+
+ return (bp + SIZE_T_SIZE);
}
static int is_allocated(void *ptr)
@@ -514,6 +520,24 @@
}
+void find_mm_data(int val)
+{
+ void *bp = heap_listp;
+
+ SemUtil::dec(mutex);
+ for (bp = heap_listp; GET_SIZE(HDRP(bp)) > 0; bp = NEXT_BLKP(bp))
+ {
+ if (GET_ALLOC(HDRP(bp))) {
+ if ((*(size_t *)bp) == val) {
+ mm_free(bp + SIZE_T_SIZE, false);
+ }
+ }
+ }
+ SemUtil::inc(mutex);
+
+ return;
+}
+
/*
* find_fit - Find a fit for a block with size bytes
*/
@@ -526,6 +550,9 @@
if (!GET_ALLOC(HDRP(bp)) && (size <= GET_SIZE(HDRP(bp))))
{
return bp;
+ } else if (GET_ALLOC(HDRP(bp)) && (GET_SIZE(HDRP(bp)) == 0))
+ {
+ break;
}
}
return NULL; /* No fit */
diff --git a/src/shm/mm.h b/src/shm/mm.h
index 6dbb979..d7d9a84 100644
--- a/src/shm/mm.h
+++ b/src/shm/mm.h
@@ -8,9 +8,10 @@
extern bool mm_init(size_t heap_size);
extern bool mm_destroy(void);
-extern void *mm_malloc (size_t size);
-extern void mm_free (void *ptr);
+void *mm_malloc (size_t size);
+void mm_free (void *ptr, int enable = true);
extern void *mm_realloc(void *ptr, size_t size);
+extern void find_mm_data(int val);
extern void * mm_get_by_key(int key);
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 315c356..0a44949 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -2,6 +2,7 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
+#include "msg_mgr.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -303,7 +304,7 @@
LinkNode *pNew = NULL;
LinkNode *pCur = NULL;
- pNew = new(LinkNode);
+ pNew = (LinkNode *)malloc(sizeof(LinkNode));
pNew->data = aData;
pNew->data_fix = bData;
pNew->count = 0;
@@ -340,7 +341,7 @@
head = pCur->next;
- delete(pCur);
+ free(pCur);
pCur = head;
@@ -353,7 +354,7 @@
pCur->next = pNext->next;
pCur = pNext->next;
- delete(pNext);
+ free(pNext);
} else {
pCur = pNext;
@@ -559,7 +560,10 @@
procQuePart->erase(buf_temp);
}
+ BusServerSocket::buf_data_remove(key);
+ find_mm_data(key);
}
+
} else if (flag == PROC_REG_TCS) {
ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -709,7 +713,7 @@
sprintf(data_buf, "%d", count);
shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
- } else {
+ } else if (flag == PROC_QUE_ATCS) {
int val;
int temp = 0;
@@ -853,6 +857,17 @@
shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
free(last_buf);
+ } else {
+
+ char *ptr = NULL;
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+
+ data1 = atoi(buf_temp);
+ ptr = strstr(buf_temp, STR_MAGIC);
+ if (ptr != NULL) {
+ data2 = atoi(ptr + 1);
+ }
+ BusServerSocket::buf_data_set(data1, data2);
}
}
@@ -888,7 +903,7 @@
int key;
int flag;
char buf_temp[MAX_STR_LEN] = { 0x00 };
- char * action, *topic, *topics, *buf, *content;
+ char *action, *topic, *topics, *buf, *content;
size_t head_len;
bus_head_t head;
int val;
@@ -935,7 +950,8 @@
}
else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
|| (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
- || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+ || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
+ || (strcmp(action, "bufreg") == 0)) {
content = topics + head.topic_size;
if (strcmp(action, "reg") == 0) {
@@ -957,15 +973,19 @@
flag = PROC_QUE_STCS;
- } else {
+ } else if (strcmp(action, "atcsque") == 0) {
flag = PROC_QUE_ATCS;
+
+ } else {
+
+ flag = PROC_REG_BUF;
}
if (flag == PROC_REG) {
memcpy(buf_temp, content, strlen(content) + 1);
-
+
if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
val = proc_que_iter->second;
@@ -996,6 +1016,7 @@
hashtable_t *hashtable = mm_get_hashtable();
void *data_ptr = hashtable_get(hashtable, val);
+
if (data_ptr != NULL) {
if (data_ptr != (void *)1) {
queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
@@ -1011,3 +1032,41 @@
}
+void BusServerSocket::buf_data_set(int data, int val) {
+ recvbuf_val *val_buf;
+ recvbuf_data::iterator data_iter;
+ recvbuf_val::iterator val_iter;
+
+ if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+ val_buf = data_iter->second;
+ } else {
+ void *set_ptr = mm_malloc(sizeof(recvbuf_val));
+
+ val_buf = new(set_ptr) recvbuf_val;
+ recvBuf_data.insert({data, val_buf});
+ }
+
+ val_buf->insert(val);
+}
+
+void BusServerSocket::buf_data_remove(int data) {
+
+ int val;
+ recvbuf_val *val_buf;
+ recvbuf_data::iterator data_iter;
+ recvbuf_val::iterator val_iter;
+
+ if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+
+ val_buf = data_iter->second;
+ for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
+ val = *val_iter;
+
+ BusServerSocket::_data_remove(val);
+ }
+
+ recvBuf_data.erase(data);
+ }
+}
+
+
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 7b94ba1..4cd0140 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -8,6 +8,7 @@
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
+#include "msg_mgr.h"
#include "socket_def.h"
#include <set>
@@ -62,6 +63,7 @@
// pthread_t recv_thread;
// <涓婚锛� 璁㈤槄鑰�>
SHMTopicSubMap *topic_sub_map;
+ recvbuf_data recvBuf_data;
private:
int destroy();
@@ -122,6 +124,8 @@
int get_key() ;
void _data_remove(int val);
+ void buf_data_set(int data, int val);
+ void buf_data_remove(int data);
};
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 6139d34..6058308 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -69,6 +69,10 @@
memcpy(head.action, "atcsque", sizeof(head.action));
+ } else if (flag == PROC_REG_BUF) {
+
+ memcpy(head.action, "bufreg", sizeof(head.action));
+
} else {
return -1;
@@ -115,7 +119,7 @@
ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
- if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
@@ -127,7 +131,7 @@
} else if (timeout_ms == 0) {
- if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
@@ -139,7 +143,7 @@
} else {
- if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
@@ -165,7 +169,6 @@
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
if(rv == 0) {
- logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
return 0;
}
@@ -183,7 +186,6 @@
int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
if(rv == 0) {
- logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
return 0;
}
@@ -202,7 +204,6 @@
int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
if(rv == 0) {
- logger->debug("ShmModSocket::sendandrecv: sendandrecv to %d success.\n", send_key);
return 0;
}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 709505f..ae9a98b 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -2,10 +2,12 @@
#include "socket_def.h"
#include "hashtable.h"
#include "logger_factory.h"
+#include "net_mod_socket_wrapper.h"
#include <map>
#include <cassert>
#include "bus_error.h"
#include "sole.h"
+#include "bh_api.h"
#include "shm_mm.h"
#include "key_def.h"
@@ -105,9 +107,6 @@
static int _shm_socket_close_(shm_socket_t *sockt) {
-
- int rv, i;
- hashtable_t *hashtable = mm_get_hashtable();
// if(sockt->key != 0) {
// auto it = shmQueueStMap->find(sockt->key);
@@ -117,18 +116,6 @@
// }
// }
- if(sockt->queue != NULL) {
- sockt->queue->close();
- for( i = 0; i < sockt->queue->size(); i++) {
- mm_free((*(sockt->queue))[i].buf);
- logger->info("======= %d free queue element buf\n", sockt->key);
- }
- sleep(1);
-
- hashtable_remove(hashtable, sockt->key);
- // sockt->queue = NULL;
- }
-
pthread_mutex_destroy(&(sockt->mutex) );
free(sockt);
return 0;
@@ -404,13 +391,16 @@
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
-
+ int data;
+ int timeout_ms;
+ char data_buf[MAX_STR_LEN] = { 0x00 };
int rv = 0, tryn = 16;
shm_packet_t sendpak;
shm_packet_t recvpak;
std::map<int, shm_packet_t>::iterator recvbufIter;
shm_socket_t *tmp_socket = NULL;
-
+ hashtable_t *hashtable = mm_get_hashtable();
+
rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
if (rv != 0) {
logger->error(rv, "shm_sendandrecv pthread_once");
@@ -421,7 +411,15 @@
if (tmp_socket == NULL)
{
tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
-
+
+ tmp_socket->key = hashtable_alloc_key(hashtable);
+ data = inter_key_get();
+ timeout_ms = timeout->tv_sec * 1000 + 3000;
+ sprintf(data_buf, "%d, %d", data, tmp_socket->key);
+ if (socket_data_get() != NULL) {
+ net_mod_socket_reg(socket_data_get(), data_buf, strlen(data_buf), NULL, 0, timeout_ms, PROC_REG_BUF);
+ }
+
rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
if ( rv != 0) {
logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -564,6 +562,7 @@
if (sockt->key == 0) {
sockt->key = hashtable_alloc_key(hashtable);
}
+
sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
if(sockt->queue == NULL ) {
logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
@@ -729,7 +728,7 @@
count += strlen(ptr->int_info) + 1;
memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
count += strlen(ptr->svr_info) + 1;
-
+
*counter = count;
}
--
Gitblit v1.8.0