From 73689afc09ce346f9eb00e02faf7f242e55dc7ee Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期四, 09 十二月 2021 19:33:00 +0800
Subject: [PATCH] Add the sync to fix the resource clear issue.

---
 src/socket/bus_server_socket.cpp | 1042 ++++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 879 insertions(+), 163 deletions(-)

diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 9bd61b0..fc53b0b 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -1,55 +1,48 @@
 
 #include "bus_server_socket.h"
 #include "shm_mod_socket.h"
+#include "shm_socket.h"
+#include "msg_mgr.h"
 #include "bus_error.h"
 
 static Logger *logger = LoggerFactory::getLogger();
+static pthread_mutex_t gMutex;
 
+list gLinkedList;
 void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
-	SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
+	SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 	SHMKeySet *subscripter_set;
 	SHMKeySet::iterator set_iter;
 	SHMTopicSubMap::iterator map_iter;
 
 	if(topic_sub_map != NULL) {
-		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 			subscripter_set = map_iter->second;
 			if(subscripter_set != NULL) {
-				for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+				for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
 					cb(subscripter_set, *set_iter);
 				}
 			}
 		}
 	}
 }
-
-// bool BusServerSocket::include_in_keys(int key, int keys[], size_t length) {
-// 	if(length == 0) {
-// 		return false;
-// 	}
-// 	for(int i = 0; i < length; i++) {
-// 		if(keys[i] == key) 
-// 			return true;
-// 	}
-// 	return false;
-// }
+ 
 
 size_t BusServerSocket::remove_subscripters(int keys[], size_t length) {
 	size_t count = 0;
 	int key;
 	for(int i = 0; i < length; i++) {
 		key = keys[i];
-		SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
+		SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 		SHMKeySet *subscripter_set;
 		SHMKeySet::iterator set_iter;
 		SHMTopicSubMap::iterator map_iter;
 
 		if(topic_sub_map != NULL) {
-			for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+			for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 				subscripter_set = map_iter->second;
 				if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
 					subscripter_set->erase(set_iter);
-// printf("remove_subscripter %s, %d\n", map_iter->first, key);
 					count++;
 				}
 			}
@@ -61,34 +54,13 @@
 
 
 BusServerSocket::BusServerSocket() {
-	logger->debug("BusServerSocket Init");
-	shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+	shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
 	topic_sub_map = NULL;
 
 }
 
 BusServerSocket::~BusServerSocket() {
-	SHMKeySet *subscripter_set;
-	SHMTopicSubMap::iterator map_iter;
-
-	logger->debug("BusServerSocket destory 1");
-	stop();
-	logger->debug("BusServerSocket destory 2");
-	 
-	if(topic_sub_map != NULL) {
-		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
-			subscripter_set = map_iter->second;
-			if(subscripter_set != NULL) {
-				subscripter_set->clear();
-				mm_free((void *)subscripter_set);
-			}
-
-		}
-		topic_sub_map->clear();
-		mem_pool_free_by_key(BUS_MAP_KEY);
-	}
-	shm_close_socket(shm_socket);
-	logger->debug("BusServerSocket destory 3");
+	destroy();
 }
 
 
@@ -109,33 +81,35 @@
  * 鍚姩bus
  * 
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-*/
+ */
 int  BusServerSocket::start(){
-	topic_sub_map =	mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
- 
-	run_pubsub_proxy();
-	// 杩涚▼鍋滄鐨勬椂鍊欙紝棰勭暀3绉掕祫婧愬洖鏀剁殑鏃堕棿銆傚惁鍒欙紝浼氬彂鐢熻皟鐢╟lose鐨勬椂鍊欙紝鍏变韩鍐呭瓨鐨勮祫婧愯繕娌℃潵寰楀強鍥炴敹杩涚▼灏遍��鍑轰簡
-	return 0;
+  int rv;
+
+  topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
+
+  pthread_mutex_init(&gMutex, NULL);
+
+  rv = _run_proxy_();
+
+  return rv;
 }
 
 
 int  BusServerSocket::stop(){
 	int ret;
-	logger->debug("====>stopping");
 	if( shm_socket->key <= 0) {
 		return -1;
 	}
-	// snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
-	// return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
 	bus_head_t head = {};
 	memcpy(head.action, "stop", sizeof(head.action));
 	head.topic_size = 0;
 	head.content_size = 0;
 
+  ShmModSocket client;
 	void *buf;
 	int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
 	if(size > 0) {
-		ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL);
+		ret = client.sendto( buf, size, shm_socket->key);
 		free(buf);
 		return ret;
 	} else {
@@ -144,15 +118,35 @@
 
 }
 
+int  BusServerSocket::destroy() {
+	SHMKeySet *subscripter_set;
+	SHMTopicSubMap::iterator map_iter;
+	if(topic_sub_map != NULL) {
+		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
+			subscripter_set = map_iter->second;
+			if(subscripter_set != NULL) {
+				subscripter_set->clear();
+				mm_free((void *)subscripter_set);
+			}
+
+		}
+		topic_sub_map->clear();
+		shm_mm_free_by_key(SHM_BUS_MAP_KEY);
+	}
+	shm_socket_close(shm_socket);
+	
+  return 0;
+}
+
 /*
  * 澶勭悊璁㈤槄
 */
 void BusServerSocket::_proxy_sub( char *topic, int key) {
 	SHMKeySet *subscripter_set;
-
+ 
+  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
 	SHMTopicSubMap::iterator map_iter;
 	SHMKeySet::iterator set_iter;
-//printf("_proxy_sub topic = %s\n", topic);
 	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 		subscripter_set = map_iter->second;
 	} else {
@@ -161,6 +155,7 @@
 		topic_sub_map->insert({topic, subscripter_set});
 	}
 	subscripter_set->insert(key);
+
 }
 
 /*
@@ -187,7 +182,7 @@
 
 	SHMTopicSubMap::iterator map_iter;
 	// SHMKeySet::iterator set_iter;
-	for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+	for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 			subscripter_set = map_iter->second;
 			subscripter_set->erase(key);
 	}
@@ -196,7 +191,7 @@
 /*
  * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
 */
-void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
+void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
 	SHMKeySet *subscripter_set;
 
 	SHMTopicSubMap::iterator map_iter;
@@ -206,25 +201,24 @@
 	std::vector<int>::iterator vector_iter;
 
 	int send_key;
+	int rv;
 	struct timespec timeout = {1,0};
 
-	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
-		subscripter_set = map_iter->second;
-		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
-			send_key = *set_iter;
- // printf("_proxy_pub send before %d \n", send_key);
-			if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == EBUS_CLOSED ) {
-				//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
-				subscripter_to_del.push_back(send_key);
-			} else {
-// printf("_proxy_pub send after: %d \n", send_key);
-			}
+    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 
-			
+		subscripter_set = map_iter->second;
+		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
+			send_key = *set_iter;
+			rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
+			if(rv == 0) {
+				continue;
+			}
+			//瀵规柟宸插叧闂殑鎴栬�呭搴旂殑杩涚▼琚玨ill鎺夌殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
+			subscripter_to_del.push_back(send_key);
 		}
 
 		// 鍒犻櫎宸插叧闂殑绔�
-		for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
+		for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
 			if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
 				subscripter_set->erase(set_iter);
 				logger->debug("remove closed subscripter %d \n", send_key);
@@ -233,29 +227,715 @@
 		subscripter_to_del.clear();
 
 	}
+
 }
 
-void * BusServerSocket::run_pubsub_proxy() {
-	// pthread_detach(pthread_self());
+ProcInfo_query *Qurey_object(const char *object, int *length) {
+  int flag = 0;
+  int val;
+  int len;
+  int total = 0;
+  ProcInfo *Proc_ptr = NULL;
+  ProcInfo Data_stru;
+  ProcInfo_query *dataBuf = NULL;
+  SvrProc *SvrSub_ele;
+  SvrTcs::iterator svr_tcs_iter;
+  SvrProc::iterator svr_proc_iter;
+  ProcZone::iterator proc_iter;
+  SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+    
+  if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
+    SvrSub_ele = svr_tcs_iter->second;
+    for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
+      val = *svr_proc_iter; 
+    
+      if ((proc_iter = proc->find(val)) != proc->end()) {
+
+        if (dataBuf == NULL) {
+          dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
+          if (dataBuf == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          total = sizeof(ProcInfo_query);
+        }
+
+        if (flag == 0) {
+          memset(dataBuf, 0x00, sizeof(ProcInfo_query));
+
+          dataBuf->num = 1;
+          strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
+          
+          flag = 1;
+
+        } else {
+          dataBuf->num++;
+          len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
+          dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
+          if (dataBuf == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+          
+          total += sizeof(ProcInfo);
+          memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
+        }
+
+        memset(&Data_stru, 0x00, sizeof(ProcInfo));
+        Data_stru = proc_iter->second;  
+
+        Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
+        strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
+        strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
+        strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
+        strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
+
+        if (length != NULL)
+          *length = total;
+      }
+    }
+  }
+
+  return dataBuf;
+}
+
+void list::Insert(int aData, int bData) 
+{
+  LinkNode *pHead = NULL; 
+  LinkNode *pNew = NULL;
+  LinkNode *pCur = NULL;
+ 
+  pNew = (LinkNode *)malloc(sizeof(LinkNode));
+  pNew->data = aData;
+  pNew->data_fix = bData;
+  pNew->count = 0;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL) {
+    head = pNew;
+  
+    pNew->next = NULL;
+ 
+  } else {
+    while(pCur->next != NULL) {
+      pCur = pCur->next;
+    }
+    
+    pCur->next = pNew;
+    pNew->next = NULL;
+  }
+}
+
+void list::Delete(int data)  
+{
+  LinkNode *pHead;
+  LinkNode *pCur;
+  LinkNode *pNext;
+  
+  pHead = head;
+  pCur = pHead;
+  if(pHead == NULL)
+    return;
+  
+  while((pCur != NULL) && (pCur->data == data)) {
+    
+    head = pCur->next;
+    
+    free(pCur);
+    
+    pCur = head;
+    
+  }
+
+  while((pCur != NULL) && (pCur->next != NULL)) {
+    pNext = pCur->next;
+
+    if(pNext->data == data) {
+      pCur->next = pNext->next;
+      pCur = pNext->next;
+
+      free(pNext);
+    } else {
+    
+      pCur = pNext;
+      
+    }
+  }
+}
+
+void list::dataSet(int data, int val)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      pCur->count = val;
+    }
+    
+    pCur = pCur->next;
+  }
+}
+
+int list::dataGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->count;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::dataFixGet(int data)  
+{
+  LinkNode *pCur;
+  
+  pCur = head;
+  if(pCur == NULL)
+    return 0;
+  
+  while(pCur != NULL) {
+    
+    if(pCur->data == data) {
+      return pCur->data_fix;
+    }
+    
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+int list::NodeNum(void)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while(pCur != NULL) {
+   
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return count;
+}
+
+int list::nodeGet(int index)
+{
+  int count = 0;
+  LinkNode *pCur = head;
+  
+  if (pCur == NULL) {
+    return 0;
+  }
+  
+  while((pCur != NULL) && (count <= index)) {
+   
+    if (count == index) {
+      return pCur->data;
+    }
+    
+    ++count;
+    pCur = pCur->next;
+  }
+  
+  return 0;
+}
+
+void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
+{
+  char data_buf[MAX_STR_LEN] = { 0x00 };
+  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+  int count = 0;
+  int i = 0;
+  int len = 0;
+  int data1, data2;
+  char *data_ptr;
+  ProcInfo Data_stru;
+  ProcZone::iterator proc_iter;
+  TcsZone *TcsSub_ele; 
+  ProcDataZone::iterator proc_que_iter;
+  ProcTcsMap::iterator proc_tcs_iter;
+  SvrProc *SvrSub_ele;
+  SvrProc::iterator svr_proc_iter;
+  SvrTcs::iterator svr_tcs_iter;
+  TcsZone::iterator tcssub_iter;
+  ProcPartZone::iterator proc_part_iter;
+
+  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
+
+  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+    memset(&Data_stru, 0x00, sizeof(ProcInfo));
+
+    if (buf != NULL) {
+    
+      memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); 
+      count = strlen(buf) + 1;
+      
+      memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
+      count += strlen(buf + count) + 1;
+      
+      memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
+      count += strlen(buf + count) + 1;
+      
+      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
+      count += strlen(buf + count) + 1;
+
+      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+    
+      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+      
+      if (flag == PROC_REG) {
+        gLinkedList.Insert(key, atoi(Data_stru.int_info));
+      }
+    }
+
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
+    ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
+    if (flag == PROC_REG) {
+      pthread_mutex_lock(&gMutex);
+      if ((proc_iter = proc->find(key)) == proc->end()) {
+        proc->insert({key, Data_stru});
+      }
+      pthread_mutex_unlock(&gMutex);
+
+      if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
+        procPart->insert({key, Data_stru.proc_id});
+      }
+
+      if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
+        procQuePart->insert({Data_stru.proc_id, key});
+      }
+
+    } else {
+      SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); 
+
+      for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
+        SvrSub_ele = svr_tcs_iter->second;
+
+        SvrSub_ele->erase(key);
+      }
+
+      pthread_mutex_lock(&gMutex);
+      if ((proc_iter = proc->find(key)) != proc->end()) {
+
+        data1 = atoi((proc_iter->second).int_info);
+        data2 = atoi((proc_iter->second).svr_info);
+        BusServerSocket::_data_remove(data1);
+        BusServerSocket::_data_remove(data2);
+        BusServerSocket::_data_remove(key);
+        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
+        strncpy(buf_temp, (proc_iter->second).proc_id, len);
+        proc->erase(key);
+
+      }
+      pthread_mutex_unlock(&gMutex);
+
+      if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
+
+        procPart->erase(key);
+      }
+
+      if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+        procQuePart->erase(buf_temp);
+      }
+
+      pthread_mutex_lock(&gMutex);
+      BusServerSocket::buf_data_remove(key);
+      pthread_mutex_unlock(&gMutex);
+
+      find_mm_data(key);
+    }
+
+  } else if (flag == PROC_REG_TCS) {
+    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
+    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+    if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
+      TcsSub_ele = proc_tcs_iter->second;
+    } else {
+
+      void *ptr_set = mm_malloc(sizeof(TcsZone));
+      TcsSub_ele = new(ptr_set) TcsZone;
+      proc->insert({key, TcsSub_ele});
+    }
+
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); 
+    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+    while(data_ptr) {
+      data_ptr = trim(data_ptr, 0);
+      TcsSub_ele->insert(data_ptr);
+      if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
+        SvrSub_ele = svr_tcs_iter->second;
+      } else {
+
+        void *ptr_set = mm_malloc(sizeof(SvrProc));
+        SvrSub_ele = new(ptr_set) SvrProc;
+        SvrData->insert({data_ptr, SvrSub_ele});
+      }
+      SvrSub_ele->insert(key);
+      data_ptr = strtok(NULL, STR_MAGIC);
+    }
+
+  } else if (flag == PROC_QUE_TCS) {
+
+    struct _temp_store {
+      void *ptr;
+      int total;
+    } *temp_store = NULL;
+    
+    int num = 0;
+    int sum = 0;
+
+    ProcInfo_query *ret = NULL;
+    ProcInfo_query *ret_store = NULL;
+  
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+    while(data_ptr) {
+      data_ptr = trim(data_ptr, 0);
+      ret = Qurey_object(data_ptr, &len);
+      if (ret != NULL) {
+    
+        if (temp_store == NULL) {
+          temp_store = (_temp_store *)malloc(sizeof(_temp_store));
+          if (temp_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          temp_store->ptr = ret;
+          temp_store->total = len;
+          num = 1;
+
+        } else {
+          num++;
+          temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
+          if (temp_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          (temp_store + num - 1)->ptr = ret;
+          (temp_store + num - 1)->total = len;
+        }
+
+      }
+      data_ptr = strtok(NULL, STR_MAGIC);
+    }
+
+    if (num > 0) {
+      for (count = 0; count < num; count++) {
+
+        if (ret_store == NULL) {
+          ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
+          if (ret_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          sum = (temp_store + count)->total;
+          memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
+
+        } else {
+        
+          ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
+          if (ret_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
+
+          sum += (temp_store + count)->total;
+
+        }
+
+        free((temp_store + count)->ptr);
+
+      }
+
+      free(temp_store);
+    }
+
+    void *last_buf = malloc(sum + sizeof(int));
+    if (last_buf == NULL) {
+      logger->error("in proxy_reg: Out of memory!\n");
+      exit(1);
+    }   
+
+    *(int *)last_buf = num;
+    if (num > 0) {
+      memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
+      free(ret_store);
+    }
+
+    shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+    free(last_buf);
+  } else if (flag == PROC_QUE_STCS) {
+
+    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+    if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) {
+      SvrSub_ele = svr_tcs_iter->second;
+    
+      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
+        count = *svr_proc_iter;
+        if ((proc_iter = proc->find(count)) != proc->end()) {
+          count = atoi((proc_iter->second).svr_info);
+        }
+
+        break;
+      }
+    } else {
+      count = 0;
+    }
+
+    memset(data_buf, 0x00, sizeof(data_buf));
+    sprintf(data_buf, "%d", count);
+    shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
+
+  } else if (flag == PROC_QUE_ATCS) {
+
+    int val;
+    int temp = 0;
+    int pos = 0;
+    int size = 0;
+    ProcInfo_sum *Data_sum = NULL;
+    SHMKeySet *subs_proc;
+    SHMKeySet::iterator subs_proc_iter;
+    SHMTopicSubMap::iterator subs_iter;
+
+    ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); 
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+    for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
+
+      memset(&Data_stru, 0x00, sizeof(Data_stru));
+
+      if (count == 0) {
+        Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
+        if (Data_sum == NULL) {
+
+          logger->error("in proxy_reg: Out of memory!\n");
+            
+          exit(1);
+        }
+
+        count++;
+
+        memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
+
+      } else {
+
+        count++;
+        len = sizeof(ProcInfo_sum) * count;
+        Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
+        if (Data_sum == NULL) {
+          logger->error("in proxy_reg: Out of memory!\n");
+            
+          exit(1);
+        }
+
+        memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
+      }
+
+      Data_stru = proc_iter->second;
+
+      memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
+      memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
+      memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
+      memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
+
+      (Data_sum + count - 1)->stat = 1;
+      (Data_sum + count - 1)->list_num = 3;
+
+      val = proc_iter->first;
+      if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
+        TcsSub_ele = proc_tcs_iter->second;
+
+        temp = 0;
+        pos = 0;
+        len = sizeof(Data_sum->reg_info) - 1;
+        for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
+
+          if (temp == 0) {
+            strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
+            pos += strlen((Data_sum + count - 1)->reg_info);
+            len -= strlen((Data_sum + count - 1)->reg_info);
+
+            temp++;
+          } else {
+
+            if (len > 0) {
+              strcat((Data_sum + count - 1)->reg_info, ",");
+
+              pos += 1;
+              len -= 1;
+            }
+
+            if (len > 0) {
+              size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
+              strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
+
+              pos += size;
+              len -= size;
+            }
+          }
+        }
+
+        pos = 0;
+        temp = 0;
+        len = sizeof(Data_sum->local_info) - 1;
+        for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
+          subs_proc = subs_iter->second;
+          
+          if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
+
+            if ((temp == 0)) {
+
+              strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
+              pos += strlen((Data_sum + count - 1)->local_info);
+              len -= strlen((Data_sum + count - 1)->local_info);
+
+              temp++;
+            } else {
+
+              if (len > 0) {
+                strcat((Data_sum + count - 1)->local_info, ",");
+
+                pos += 1;
+                len -= 1;
+              }
+
+              if (len > 0) {
+                size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
+                strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
+
+                pos += size;
+                len -= size;
+              }
+            }
+
+          }
+        }
+
+      }
+    }
+
+    temp = count * sizeof(ProcInfo_sum);
+    void *last_buf = malloc(temp + sizeof(int));
+    if (last_buf == NULL) {
+      logger->error("in proxy_reg: Out of memory!\n");
+      exit(1);
+    }
+
+    *(int *)last_buf = count;
+    if (count > 0) {
+      memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
+      free(Data_sum);
+    }
+
+    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+    free(last_buf);
+  } else {
+
+    char *ptr = NULL;
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+
+    data1 = atoi(buf_temp);
+    ptr = strstr(buf_temp, STR_MAGIC);
+    if (ptr != NULL) {
+      data2 = atoi(ptr + 1);
+    }
+    BusServerSocket::buf_data_set(data1, data2); 
+  }
+}
+
+int BusServerSocket::get_data(int val) {
+  
+  ProcZone::iterator proc_iter;
+  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+  pthread_mutex_lock(&gMutex);
+  if ((proc_iter = proc->find(val)) != proc->end()) {
+    pthread_mutex_unlock(&gMutex);
+    return true;
+  }
+  pthread_mutex_unlock(&gMutex);
+  
+  return false;
+  
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+                          const struct timespec *timeout, const int flag) {
+  int ret;
+
+  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+  return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
+}
+
+// 杩愯浠g悊
+int BusServerSocket::_run_proxy_() {
 	int size;
 	int key;
-	char * action, *topic, *topics, *buf, *content;
+  int flag;
+  char buf_temp[MAX_STR_LEN] = { 0x00 };
+	char *action, *topic, *topics, *buf, *content;
 	size_t head_len;
-	char resp_buf[128];
 	bus_head_t head;
+    int val;
+    ProcDataZone::iterator proc_que_iter;
+    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
 
-	const char *topic_delim = ",";
-// printf("run_pubsub_proxy server receive before\n");
-	while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
-//printf("run_pubsub_proxy server recv after: %s \n", buf);
+    int rv;
+    char send_buf[512] = { 0x00 };
+
+    const char *topic_delim = ",";
+	while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
 		head = ShmModSocket::decode_bus_head(buf);
 		topics = buf + BUS_HEAD_SIZE;
 		action = head.action;
-  // printf("run_pubsub_proxy : %s, %s \n", action, topics);
 		if(strcmp(action, "sub") == 0) {
 			// 璁㈤槄鏀寔澶氫富棰樿闃�
 			topic = strtok(topics, topic_delim);
-//printf("run_pubsub_proxy topic = %s\n", topic);
 		  while(topic) {
        _proxy_sub(trim(topic, 0), key);
         topic =  strtok(NULL, topic_delim);
@@ -263,7 +943,6 @@
 
 		} 
 		else if(strcmp(action, "desub") == 0) {
-// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
 			if(strcmp(trim(topics, 0), "") == 0) {
 				// 鍙栨秷鎵�鏈夎闃�
 				_proxy_desub_all(key);
@@ -278,94 +957,131 @@
 			
 		} 
 		else if(strcmp(action, "pub") == 0) {
-			 content = topics + head.topic_size;
-			_proxy_pub(topics, content, head.content_size, key);
+      topics[head.topic_size - 1] = '\0';
+		  content = topics + head.topic_size;
 
-		}  
-		else if(strcmp(action, "stop") == 0) {
-			
-			free(buf);
-			break;
-		} else {
-			logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action);
-		}
-		
-		free(buf);
-	}
+			_proxy_pub(topics, topics, head.topic_size + head.content_size, key);
 
-	logger->info( "Stopping Bus...");
-	shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
+		} 
+    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
+            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
+            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
+            || (strcmp(action, "bufreg") == 0)) {
+      content = topics + head.topic_size;
+      if (strcmp(action, "reg") == 0) {
+        
+        flag = PROC_REG;
 
-	return NULL;
+      } else if (strcmp(action, "unreg") == 0) {
+        
+        flag = PROC_UNREG;
+
+      } else if (strcmp(action, "tcsreg") == 0) {
+        
+        flag = PROC_REG_TCS;
+
+      } else if (strcmp(action, "tcsque") == 0) {
+        
+        flag = PROC_QUE_TCS;
+
+      } else if (strcmp(action, "stcsque") == 0) {
+        
+        flag = PROC_QUE_STCS; 
+
+      } else if (strcmp(action, "atcsque") == 0) {
+        
+        flag = PROC_QUE_ATCS;
+
+      } else {
+        
+        flag = PROC_REG_BUF;
+
+      }
+        
+      if (flag == PROC_REG) {
+        memcpy(buf_temp, content, strlen(content) + 1);
+        
+        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+          val = proc_que_iter->second;
+          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+        }
+      }
+
+      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
+
+    }
+    else if(strcmp(action, "stop") == 0) {
+      free(buf);
+      break;
+    } else {
+      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
+    }
+    free(buf);
+  }
+
+
+  return rv;
+}
+
+void BusServerSocket::_data_remove(int val) {
+
+  int i;
+  LockFreeQueue<shm_packet_t> *queue = NULL;
+  hashtable_t *hashtable = mm_get_hashtable();
+
+  void *data_ptr = hashtable_get(hashtable, val);
+
+  if (data_ptr != NULL) {
+    if (data_ptr != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val);
+  }
+
+}
+
+void BusServerSocket::buf_data_set(int data, int val) {
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    val_buf = data_iter->second;
+  } else {
+    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
+
+    val_buf = new(set_ptr) recvbuf_val;
+    recvBuf_data.insert({data, val_buf}); 
+  }
+
+  val_buf->insert(val);
+}
+
+void BusServerSocket::buf_data_remove(int data) {
+
+  int val;
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    
+    val_buf = data_iter->second;
+    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
+      val = *val_iter;
+
+      BusServerSocket::_data_remove(val);
+    }
+
+    recvBuf_data.erase(data);
+  }
 }
 
 
-
-/**
- * deprecate
- * @str "<**sub**>{缁忔祹}"
- */
-
-int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
- char *ptr = str;
- char *str_end_ptr = str + size;
- char *action_start_ptr;
- char *action_end_ptr;
- size_t action_len = 0;
-
- char *topic_start_ptr;
- char *topic_end_ptr;
- size_t topic_len = 0;
-
- // if (strlen(identifier) > strlen(str)) {
- //  return 0;
- // }
-
- if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
-  ptr += strlen(ACTION_LIDENTIFIER);
-  action_start_ptr = ptr;
-  while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
-    if(ptr >= str_end_ptr) {
-      return 0;
-    }
-  }
-// printf("%s\n", ptr);
-  action_end_ptr = ptr;
-  action_len = action_end_ptr - action_start_ptr;
-  ptr += strlen(ACTION_RIDENTIFIER);
-// printf("%s\n", ptr);
-// printf("%s\n", str_end_ptr-1);
-  if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
-    topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
-   
-
-    while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
-      if(ptr >= str_end_ptr) {
-        return 0;
-      }
-    }
-    topic_end_ptr = ptr;
-    topic_len = topic_end_ptr - topic_start_ptr;
-    
-    ptr += strlen(TOPIC_RIDENTIFIER);
-   
-  } else {
-    return 0;
-  }
- } else {
-  return 0;
- }
-
- char *topic = (char *)malloc(topic_len+1);
- strncpy(topic, topic_start_ptr, topic_len);
- *(topic+topic_len) = '\0';
- *_topic = topic;
-
- char *action = (char *)malloc(action_len+1);
- strncpy(action, action_start_ptr, action_len);
- *(action+action_len) = '\0';
- *_action = action;
- *head_len = ptr-str;
-
- return 1;
-}
\ No newline at end of file

--
Gitblit v1.8.0