From 578e15c276d72bfbdd707c6c948824daa43d3780 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 18 一月 2024 14:18:35 +0800
Subject: [PATCH] less memory
---
src/socket/bus_server_socket.cpp | 383 +++++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 349 insertions(+), 34 deletions(-)
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 8b022e1..fc53b0b 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -2,10 +2,13 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
+#include "msg_mgr.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
+static pthread_mutex_t gMutex;
+list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
@@ -82,11 +85,13 @@
int BusServerSocket::start(){
int rv;
- topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
-
- rv = _run_proxy_();
+ topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
- return rv;
+ pthread_mutex_init(&gMutex, NULL);
+
+ rv = _run_proxy_();
+
+ return rv;
}
@@ -199,7 +204,7 @@
int rv;
struct timespec timeout = {1,0};
- if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
@@ -296,12 +301,177 @@
return dataBuf;
}
+void list::Insert(int aData, int bData)
+{
+ LinkNode *pHead = NULL;
+ LinkNode *pNew = NULL;
+ LinkNode *pCur = NULL;
+
+ pNew = (LinkNode *)malloc(sizeof(LinkNode));
+ pNew->data = aData;
+ pNew->data_fix = bData;
+ pNew->count = 0;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL) {
+ head = pNew;
+
+ pNew->next = NULL;
+
+ } else {
+ while(pCur->next != NULL) {
+ pCur = pCur->next;
+ }
+
+ pCur->next = pNew;
+ pNew->next = NULL;
+ }
+}
+
+void list::Delete(int data)
+{
+ LinkNode *pHead;
+ LinkNode *pCur;
+ LinkNode *pNext;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL)
+ return;
+
+ while((pCur != NULL) && (pCur->data == data)) {
+
+ head = pCur->next;
+
+ free(pCur);
+
+ pCur = head;
+
+ }
+
+ while((pCur != NULL) && (pCur->next != NULL)) {
+ pNext = pCur->next;
+
+ if(pNext->data == data) {
+ pCur->next = pNext->next;
+ pCur = pNext->next;
+
+ free(pNext);
+ } else {
+
+ pCur = pNext;
+
+ }
+ }
+}
+
+void list::dataSet(int data, int val)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ pCur->count = val;
+ }
+
+ pCur = pCur->next;
+ }
+}
+
+int list::dataGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->count;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::dataFixGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->data_fix;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::NodeNum(void)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while(pCur != NULL) {
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return count;
+}
+
+int list::nodeGet(int index)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while((pCur != NULL) && (count <= index)) {
+
+ if (count == index) {
+ return pCur->data;
+ }
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
+ char data_buf[MAX_STR_LEN] = { 0x00 };
char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
int count = 0;
int i = 0;
int len = 0;
+ int data1, data2;
char *data_ptr;
ProcInfo Data_stru;
ProcZone::iterator proc_iter;
@@ -333,15 +503,27 @@
memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ if (flag == PROC_REG) {
+ gLinkedList.Insert(key, atoi(Data_stru.int_info));
+ }
}
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
if (flag == PROC_REG) {
+ pthread_mutex_lock(&gMutex);
if ((proc_iter = proc->find(key)) == proc->end()) {
proc->insert({key, Data_stru});
}
+ pthread_mutex_unlock(&gMutex);
if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
procPart->insert({key, Data_stru.proc_id});
@@ -360,13 +542,20 @@
SvrSub_ele->erase(key);
}
+ pthread_mutex_lock(&gMutex);
if ((proc_iter = proc->find(key)) != proc->end()) {
+ data1 = atoi((proc_iter->second).int_info);
+ data2 = atoi((proc_iter->second).svr_info);
+ BusServerSocket::_data_remove(data1);
+ BusServerSocket::_data_remove(data2);
+ BusServerSocket::_data_remove(key);
len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
strncpy(buf_temp, (proc_iter->second).proc_id, len);
- proc->erase(proc_iter);
+ proc->erase(key);
}
+ pthread_mutex_unlock(&gMutex);
if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
@@ -378,7 +567,13 @@
procQuePart->erase(buf_temp);
}
+ pthread_mutex_lock(&gMutex);
+ BusServerSocket::buf_data_remove(key);
+ pthread_mutex_unlock(&gMutex);
+
+ find_mm_data(key);
}
+
} else if (flag == PROC_REG_TCS) {
ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -395,6 +590,7 @@
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
while(data_ptr) {
+ data_ptr = trim(data_ptr, 0);
TcsSub_ele->insert(data_ptr);
if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
SvrSub_ele = svr_tcs_iter->second;
@@ -424,6 +620,7 @@
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
while(data_ptr) {
+ data_ptr = trim(data_ptr, 0);
ret = Qurey_object(data_ptr, &len);
if (ret != NULL) {
@@ -504,14 +701,19 @@
free(last_buf);
} else if (flag == PROC_QUE_STCS) {
+
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
- if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
+ if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) {
SvrSub_ele = svr_tcs_iter->second;
for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
count = *svr_proc_iter;
+ if ((proc_iter = proc->find(count)) != proc->end()) {
+ count = atoi((proc_iter->second).svr_info);
+ }
break;
}
@@ -519,11 +721,11 @@
count = 0;
}
- memset(buf_temp, 0x00, sizeof(buf_temp));
- sprintf(buf_temp, "%d", count);
- shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
+ memset(data_buf, 0x00, sizeof(data_buf));
+ sprintf(data_buf, "%d", count);
+ shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
- } else {
+ } else if (flag == PROC_QUE_ATCS) {
int val;
int temp = 0;
@@ -666,7 +868,48 @@
shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+ free(last_buf);
+ } else {
+
+ char *ptr = NULL;
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+
+ data1 = atoi(buf_temp);
+ ptr = strstr(buf_temp, STR_MAGIC);
+ if (ptr != NULL) {
+ data2 = atoi(ptr + 1);
+ }
+ BusServerSocket::buf_data_set(data1, data2);
}
+}
+
+int BusServerSocket::get_data(int val) {
+
+ ProcZone::iterator proc_iter;
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ pthread_mutex_lock(&gMutex);
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+ pthread_mutex_unlock(&gMutex);
+ return true;
+ }
+ pthread_mutex_unlock(&gMutex);
+
+ return false;
+
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag) {
+ int ret;
+
+ ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+ return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+ BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
}
// 杩愯浠g悊
@@ -674,14 +917,18 @@
int size;
int key;
int flag;
- char * action, *topic, *topics, *buf, *content;
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
+ char *action, *topic, *topics, *buf, *content;
size_t head_len;
bus_head_t head;
+ int val;
+ ProcDataZone::iterator proc_que_iter;
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
- int rv;
- char send_buf[512] = { 0x00 };
+ int rv;
+ char send_buf[512] = { 0x00 };
- const char *topic_delim = ",";
+ const char *topic_delim = ",";
while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
head = ShmModSocket::decode_bus_head(buf);
topics = buf + BUS_HEAD_SIZE;
@@ -718,7 +965,8 @@
}
else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
|| (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
- || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+ || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
+ || (strcmp(action, "bufreg") == 0)) {
content = topics + head.topic_size;
if (strcmp(action, "reg") == 0) {
@@ -740,33 +988,100 @@
flag = PROC_QUE_STCS;
- } else {
+ } else if (strcmp(action, "atcsque") == 0) {
flag = PROC_QUE_ATCS;
+ } else {
+
+ flag = PROC_REG_BUF;
+
}
+ if (flag == PROC_REG) {
+ memcpy(buf_temp, content, strlen(content) + 1);
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ val = proc_que_iter->second;
+ _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+ }
+ }
+
_proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
}
- else if (strncmp(buf, "request", strlen("request")) == 0) {
- sprintf(send_buf, "%4d", key);
- strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
-
- rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
- if(rv != 0) {
- logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
- }
- }
else if(strcmp(action, "stop") == 0) {
- free(buf);
- break;
- } else {
- logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
- }
- free(buf);
- }
+ free(buf);
+ break;
+ } else {
+ logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+ }
+ free(buf);
+ }
- return rv;
+ return rv;
}
+
+void BusServerSocket::_data_remove(int val) {
+
+ int i;
+ LockFreeQueue<shm_packet_t> *queue = NULL;
+ hashtable_t *hashtable = mm_get_hashtable();
+
+ void *data_ptr = hashtable_get(hashtable, val);
+
+ if (data_ptr != NULL) {
+ if (data_ptr != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
+ queue->close();
+ for (i = 0; i < queue->size(); i++) {
+ mm_free((*queue)[i].buf);
+ }
+ sleep(1);
+ }
+
+ hashtable_remove(hashtable, val);
+ }
+
+}
+
+void BusServerSocket::buf_data_set(int data, int val) {
+ recvbuf_val *val_buf;
+ recvbuf_data::iterator data_iter;
+ recvbuf_val::iterator val_iter;
+
+ if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+ val_buf = data_iter->second;
+ } else {
+ void *set_ptr = mm_malloc(sizeof(recvbuf_val));
+
+ val_buf = new(set_ptr) recvbuf_val;
+ recvBuf_data.insert({data, val_buf});
+ }
+
+ val_buf->insert(val);
+}
+
+void BusServerSocket::buf_data_remove(int data) {
+
+ int val;
+ recvbuf_val *val_buf;
+ recvbuf_data::iterator data_iter;
+ recvbuf_val::iterator val_iter;
+
+ if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+
+ val_buf = data_iter->second;
+ for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
+ val = *val_iter;
+
+ BusServerSocket::_data_remove(val);
+ }
+
+ recvBuf_data.erase(data);
+ }
+}
+
+
--
Gitblit v1.8.0