From 82b028cf63953d8080b63d85468eae488d212194 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 23 九月 2021 14:30:07 +0800
Subject: [PATCH] Fix the data parsing when in multiple threads.
---
src/socket/bus_server_socket.cpp | 819 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 773 insertions(+), 46 deletions(-)
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index b49eeb8..2d552da 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -1,21 +1,23 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
+#include "shm_socket.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
+list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
- SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+ SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
SHMKeySet::iterator set_iter;
SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
- for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
if(subscripter_set != NULL) {
- for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
cb(subscripter_set, *set_iter);
}
}
@@ -29,17 +31,16 @@
int key;
for(int i = 0; i < length; i++) {
key = keys[i];
- SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+ SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
SHMKeySet::iterator set_iter;
SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
- for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
subscripter_set->erase(set_iter);
-// printf("remove_subscripter %s, %d\n", map_iter->first, key);
count++;
}
}
@@ -51,8 +52,7 @@
BusServerSocket::BusServerSocket() {
- logger->debug("BusServerSocket Init");
- shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
topic_sub_map = NULL;
}
@@ -79,12 +79,15 @@
* 鍚姩bus
*
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-*/
+ */
int BusServerSocket::start(){
- topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+ int rv;
+
+ topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
- _run_proxy_();
- return 0;
+ rv = _run_proxy_();
+
+ return rv;
}
@@ -115,7 +118,7 @@
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
- for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
if(subscripter_set != NULL) {
subscripter_set->clear();
@@ -124,11 +127,11 @@
}
topic_sub_map->clear();
- mem_pool_free_by_key(SHM_BUS_MAP_KEY);
+ shm_mm_free_by_key(SHM_BUS_MAP_KEY);
}
- shm_close_socket(shm_socket);
- logger->debug("BusServerSocket destory 3");
- return 0;
+ shm_socket_close(shm_socket);
+
+ return 0;
}
/*
@@ -136,10 +139,10 @@
*/
void BusServerSocket::_proxy_sub( char *topic, int key) {
SHMKeySet *subscripter_set;
-
+
+ struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
SHMTopicSubMap::iterator map_iter;
SHMKeySet::iterator set_iter;
-//printf("_proxy_sub topic = %s\n", topic);
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
} else {
@@ -148,6 +151,7 @@
topic_sub_map->insert({topic, subscripter_set});
}
subscripter_set->insert(key);
+
}
/*
@@ -174,7 +178,7 @@
SHMTopicSubMap::iterator map_iter;
// SHMKeySet::iterator set_iter;
- for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
subscripter_set->erase(key);
}
@@ -183,7 +187,7 @@
/*
* 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
*/
-void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
+void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
@@ -199,9 +203,8 @@
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
- for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
send_key = *set_iter;
-// logger->debug("_proxy_pub send before %d \n", send_key);
rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
if(rv == 0) {
continue;
@@ -211,7 +214,7 @@
}
// 鍒犻櫎宸插叧闂殑绔�
- for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
+ for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
subscripter_set->erase(set_iter);
logger->debug("remove closed subscripter %d \n", send_key);
@@ -220,30 +223,688 @@
subscripter_to_del.clear();
}
+
}
+ProcInfo_query *Qurey_object(const char *object, int *length) {
+ int flag = 0;
+ int val;
+ int len;
+ int total = 0;
+ ProcInfo *Proc_ptr = NULL;
+ ProcInfo Data_stru;
+ ProcInfo_query *dataBuf = NULL;
+ SvrProc *SvrSub_ele;
+ SvrTcs::iterator svr_tcs_iter;
+ SvrProc::iterator svr_proc_iter;
+ ProcZone::iterator proc_iter;
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
+ SvrSub_ele = svr_tcs_iter->second;
+ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
+ val = *svr_proc_iter;
+
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+
+ if (dataBuf == NULL) {
+ dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
+ if (dataBuf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ total = sizeof(ProcInfo_query);
+ }
+
+ if (flag == 0) {
+ memset(dataBuf, 0x00, sizeof(ProcInfo_query));
+
+ dataBuf->num = 1;
+ strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
+
+ flag = 1;
+
+ } else {
+ dataBuf->num++;
+ len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
+ dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
+ if (dataBuf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ total += sizeof(ProcInfo);
+ memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
+ }
+
+ memset(&Data_stru, 0x00, sizeof(ProcInfo));
+ Data_stru = proc_iter->second;
+
+ Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
+ strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
+ strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
+ strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
+ strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
+
+ if (length != NULL)
+ *length = total;
+ }
+ }
+ }
+
+ return dataBuf;
+}
+
+void list::Insert(int aData, int bData)
+{
+ LinkNode *pHead = NULL;
+ LinkNode *pNew = NULL;
+ LinkNode *pCur = NULL;
+
+ pNew = new(LinkNode);
+ pNew->data = aData;
+ pNew->data_fix = bData;
+ pNew->count = 0;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL) {
+ head = pNew;
+
+ pNew->next = NULL;
+
+ } else {
+ while(pCur->next != NULL) {
+ pCur = pCur->next;
+ }
+
+ pCur->next = pNew;
+ pNew->next = NULL;
+ }
+}
+
+void list::Delete(int data)
+{
+ LinkNode *pHead;
+ LinkNode *pCur;
+ LinkNode *pNext;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL)
+ return;
+
+ while((pCur != NULL) && (pCur->data == data)) {
+
+ head = pCur->next;
+
+ delete(pCur);
+
+ pCur = head;
+
+ }
+
+ while((pCur != NULL) && (pCur->next != NULL)) {
+ pNext = pCur->next;
+
+ if(pNext->data == data) {
+ pCur->next = pNext->next;
+ pCur = pNext->next;
+
+ delete(pNext);
+ } else {
+
+ pCur = pNext;
+
+ }
+ }
+}
+
+void list::dataSet(int data, int val)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ pCur->count = val;
+ }
+
+ pCur = pCur->next;
+ }
+}
+
+int list::dataGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->count;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::dataFixGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->data_fix;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::NodeNum(void)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while(pCur != NULL) {
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return count;
+}
+
+int list::nodeGet(int index)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while((pCur != NULL) && (count <= index)) {
+
+ if (count == index) {
+ return pCur->data;
+ }
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
+{
+ char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+ int count = 0;
+ int i = 0;
+ int len = 0;
+ int data1, data2;
+ char *data_ptr;
+ ProcInfo Data_stru;
+ ProcZone::iterator proc_iter;
+ TcsZone *TcsSub_ele;
+ ProcDataZone::iterator proc_que_iter;
+ ProcTcsMap::iterator proc_tcs_iter;
+ SvrProc *SvrSub_ele;
+ SvrProc::iterator svr_proc_iter;
+ SvrTcs::iterator svr_tcs_iter;
+ TcsZone::iterator tcssub_iter;
+ ProcPartZone::iterator proc_part_iter;
+
+ struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+ memset(&Data_stru, 0x00, sizeof(ProcInfo));
+
+ if (buf != NULL) {
+
+ memcpy(Data_stru.proc_id, buf, strlen(buf) + 1);
+ count = strlen(buf) + 1;
+
+ memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ if (flag == PROC_REG) {
+ gLinkedList.Insert(key, atoi(Data_stru.int_info));
+ }
+ }
+
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
+ ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
+ if (flag == PROC_REG) {
+ if ((proc_iter = proc->find(key)) == proc->end()) {
+ proc->insert({key, Data_stru});
+ }
+
+ if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
+ procPart->insert({key, Data_stru.proc_id});
+ }
+
+ if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
+ procQuePart->insert({Data_stru.proc_id, key});
+ }
+
+ } else {
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+ for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
+ SvrSub_ele = svr_tcs_iter->second;
+
+ SvrSub_ele->erase(key);
+ }
+
+ if ((proc_iter = proc->find(key)) != proc->end()) {
+
+ data1 = atoi((proc_iter->second).int_info);
+ data2 = atoi((proc_iter->second).svr_info);
+ BusServerSocket::_data_remove(data1);
+ BusServerSocket::_data_remove(data2);
+ BusServerSocket::_data_remove(key);
+ len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
+ strncpy(buf_temp, (proc_iter->second).proc_id, len);
+ proc->erase(proc_iter);
+
+ }
+
+ if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
+
+ procPart->erase(key);
+ }
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ procQuePart->erase(buf_temp);
+ }
+
+ }
+ } else if (flag == PROC_REG_TCS) {
+ ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+ if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
+ TcsSub_ele = proc_tcs_iter->second;
+ } else {
+
+ void *ptr_set = mm_malloc(sizeof(TcsZone));
+ TcsSub_ele = new(ptr_set) TcsZone;
+ proc->insert({key, TcsSub_ele});
+ }
+
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+ data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+ while(data_ptr) {
+ TcsSub_ele->insert(data_ptr);
+ if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
+ SvrSub_ele = svr_tcs_iter->second;
+ } else {
+
+ void *ptr_set = mm_malloc(sizeof(SvrProc));
+ SvrSub_ele = new(ptr_set) SvrProc;
+ SvrData->insert({data_ptr, SvrSub_ele});
+ }
+ SvrSub_ele->insert(key);
+ data_ptr = strtok(NULL, STR_MAGIC);
+ }
+
+ } else if (flag == PROC_QUE_TCS) {
+
+ struct _temp_store {
+ void *ptr;
+ int total;
+ } *temp_store = NULL;
+
+ int num = 0;
+ int sum = 0;
+
+ ProcInfo_query *ret = NULL;
+ ProcInfo_query *ret_store = NULL;
+
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+ data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+ while(data_ptr) {
+ ret = Qurey_object(data_ptr, &len);
+ if (ret != NULL) {
+
+ if (temp_store == NULL) {
+ temp_store = (_temp_store *)malloc(sizeof(_temp_store));
+ if (temp_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ temp_store->ptr = ret;
+ temp_store->total = len;
+ num = 1;
+
+ } else {
+ num++;
+ temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
+ if (temp_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ (temp_store + num - 1)->ptr = ret;
+ (temp_store + num - 1)->total = len;
+ }
+
+ }
+ data_ptr = strtok(NULL, STR_MAGIC);
+ }
+
+ if (num > 0) {
+ for (count = 0; count < num; count++) {
+
+ if (ret_store == NULL) {
+ ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
+ if (ret_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ sum = (temp_store + count)->total;
+ memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
+
+ } else {
+
+ ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
+ if (ret_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
+
+ sum += (temp_store + count)->total;
+
+ }
+
+ free((temp_store + count)->ptr);
+
+ }
+
+ free(temp_store);
+ }
+
+ void *last_buf = malloc(sum + sizeof(int));
+ if (last_buf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ *(int *)last_buf = num;
+ if (num > 0) {
+ memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
+ free(ret_store);
+ }
+
+ shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+ free(last_buf);
+ } else if (flag == PROC_QUE_STCS) {
+
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+ if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
+ SvrSub_ele = svr_tcs_iter->second;
+
+ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
+ count = *svr_proc_iter;
+ if ((proc_iter = proc->find(count)) != proc->end()) {
+ count = atoi((proc_iter->second).svr_info);
+ }
+
+ break;
+ }
+ } else {
+ count = 0;
+ }
+
+ memset(buf_temp, 0x00, sizeof(buf_temp));
+ sprintf(buf_temp, "%d", count);
+ shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
+
+ } else {
+
+ int val;
+ int temp = 0;
+ int pos = 0;
+ int size = 0;
+ ProcInfo_sum *Data_sum = NULL;
+ SHMKeySet *subs_proc;
+ SHMKeySet::iterator subs_proc_iter;
+ SHMTopicSubMap::iterator subs_iter;
+
+ ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
+
+ memset(&Data_stru, 0x00, sizeof(Data_stru));
+
+ if (count == 0) {
+ Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
+ if (Data_sum == NULL) {
+
+ logger->error("in proxy_reg: Out of memory!\n");
+
+ exit(1);
+ }
+
+ count++;
+
+ memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
+
+ } else {
+
+ count++;
+ len = sizeof(ProcInfo_sum) * count;
+ Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
+ if (Data_sum == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+
+ exit(1);
+ }
+
+ memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
+ }
+
+ Data_stru = proc_iter->second;
+
+ memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
+ memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
+ memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
+ memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
+
+ (Data_sum + count - 1)->stat = 1;
+ (Data_sum + count - 1)->list_num = 3;
+
+ val = proc_iter->first;
+ if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
+ TcsSub_ele = proc_tcs_iter->second;
+
+ temp = 0;
+ pos = 0;
+ len = sizeof(Data_sum->reg_info) - 1;
+ for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
+
+ if (temp == 0) {
+ strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
+ pos += strlen((Data_sum + count - 1)->reg_info);
+ len -= strlen((Data_sum + count - 1)->reg_info);
+
+ temp++;
+ } else {
+
+ if (len > 0) {
+ strcat((Data_sum + count - 1)->reg_info, ",");
+
+ pos += 1;
+ len -= 1;
+ }
+
+ if (len > 0) {
+ size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
+ strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
+
+ pos += size;
+ len -= size;
+ }
+ }
+ }
+
+ pos = 0;
+ temp = 0;
+ len = sizeof(Data_sum->local_info) - 1;
+ for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
+ subs_proc = subs_iter->second;
+
+ if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
+
+ if ((temp == 0)) {
+
+ strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
+ pos += strlen((Data_sum + count - 1)->local_info);
+ len -= strlen((Data_sum + count - 1)->local_info);
+
+ temp++;
+ } else {
+
+ if (len > 0) {
+ strcat((Data_sum + count - 1)->local_info, ",");
+
+ pos += 1;
+ len -= 1;
+ }
+
+ if (len > 0) {
+ size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
+ strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
+
+ pos += size;
+ len -= size;
+ }
+ }
+
+ }
+ }
+
+ }
+ }
+
+ temp = count * sizeof(ProcInfo_sum);
+ void *last_buf = malloc(temp + sizeof(int));
+ if (last_buf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ *(int *)last_buf = count;
+ if (count > 0) {
+ memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
+ free(Data_sum);
+ }
+
+ shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+ free(last_buf);
+ }
+}
+
+int BusServerSocket::get_data(int val) {
+
+ ProcZone::iterator proc_iter;
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+ return true;
+ }
+
+ return false;
+
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag) {
+ int ret;
+
+ ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+ return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+ BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
+}
// 杩愯浠g悊
-void * BusServerSocket::_run_proxy_() {
+int BusServerSocket::_run_proxy_() {
int size;
int key;
+ int flag;
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
char * action, *topic, *topics, *buf, *content;
size_t head_len;
- char resp_buf[128];
bus_head_t head;
+ int val;
+ ProcDataZone::iterator proc_que_iter;
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
- const char *topic_delim = ",";
-// logger.debug("_run_proxy_ server receive before\n");
- while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
-// logger.debug("_run_proxy_ server recvfrom %d after: %s \n", key, buf);
+ int rv;
+ char send_buf[512] = { 0x00 };
+
+ const char *topic_delim = ",";
+ while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
head = ShmModSocket::decode_bus_head(buf);
topics = buf + BUS_HEAD_SIZE;
action = head.action;
-// logger.debug("_run_proxy_ : %s\n", action);
if(strcmp(action, "sub") == 0) {
// 璁㈤槄鏀寔澶氫富棰樿闃�
topic = strtok(topics, topic_delim);
-// logger.debug("_run_proxy_ topic = %s\n", topic);
while(topic) {
_proxy_sub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
@@ -251,7 +912,6 @@
}
else if(strcmp(action, "desub") == 0) {
-// logger.debug("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
if(strcmp(trim(topics, 0), "") == 0) {
// 鍙栨秷鎵�鏈夎闃�
_proxy_desub_all(key);
@@ -266,20 +926,87 @@
}
else if(strcmp(action, "pub") == 0) {
- content = topics + head.topic_size;
- _proxy_pub(topics, content, head.content_size, key);
+ topics[head.topic_size - 1] = '\0';
+ content = topics + head.topic_size;
- }
- else if(strcmp(action, "stop") == 0) {
- logger->info( "Stopping Bus...");
- free(buf);
- break;
- } else {
- logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
- }
- free(buf);
- }
+ _proxy_pub(topics, topics, head.topic_size + head.content_size, key);
+
+ }
+ else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
+ || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
+ || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+ content = topics + head.topic_size;
+ if (strcmp(action, "reg") == 0) {
+
+ flag = PROC_REG;
+
+ } else if (strcmp(action, "unreg") == 0) {
+
+ flag = PROC_UNREG;
+
+ } else if (strcmp(action, "tcsreg") == 0) {
+
+ flag = PROC_REG_TCS;
+
+ } else if (strcmp(action, "tcsque") == 0) {
+
+ flag = PROC_QUE_TCS;
+
+ } else if (strcmp(action, "stcsque") == 0) {
+
+ flag = PROC_QUE_STCS;
+
+ } else {
+
+ flag = PROC_QUE_ATCS;
+
+ }
+
+ if (flag == PROC_REG) {
+ memcpy(buf_temp, content, strlen(content) + 1);
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ val = proc_que_iter->second;
+ _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+ }
+ }
+
+ _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
+
+ }
+ else if(strcmp(action, "stop") == 0) {
+ free(buf);
+ break;
+ } else {
+ logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+ }
+ free(buf);
+ }
- return NULL;
+ return rv;
}
+
+void BusServerSocket::_data_remove(int val) {
+
+ int i;
+ LockFreeQueue<shm_packet_t> *queue = NULL;
+ hashtable_t *hashtable = mm_get_hashtable();
+
+ void *data_ptr = hashtable_get(hashtable, val);
+ if (data_ptr != NULL) {
+ if (data_ptr != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
+ queue->close();
+ for (i = 0; i < queue->size(); i++) {
+ mm_free((*queue)[i].buf);
+ }
+ sleep(1);
+ }
+
+ hashtable_remove(hashtable, val);
+ }
+
+}
+
--
Gitblit v1.8.0