From acbf282b23b4cbdebca562d67132573de3902f94 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 17 九月 2021 10:45:43 +0800
Subject: [PATCH] Merge branch 'master' of http://os.smartai.com:9091/valib/c_bhomebus
---
src/socket/bus_server_socket.cpp | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 260 insertions(+), 0 deletions(-)
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 8b022e1..cfb7419 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
+list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
@@ -296,12 +297,176 @@
return dataBuf;
}
+void list::Insert(int aData, int bData)
+{
+ LinkNode *pHead = NULL;
+ LinkNode *pNew = NULL;
+ LinkNode *pCur = NULL;
+
+ pNew = new(LinkNode);
+ pNew->data = aData;
+ pNew->data_fix = bData;
+ pNew->count = 0;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL) {
+ head = pNew;
+
+ pNew->next = NULL;
+
+ } else {
+ while(pCur->next != NULL) {
+ pCur = pCur->next;
+ }
+
+ pCur->next = pNew;
+ pNew->next = NULL;
+ }
+}
+
+void list::Delete(int data)
+{
+ LinkNode *pHead;
+ LinkNode *pCur;
+ LinkNode *pNext;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL)
+ return;
+
+ while((pCur != NULL) && (pCur->data == data)) {
+
+ head = pCur->next;
+
+ delete(pCur);
+
+ pCur = head;
+
+ }
+
+ while((pCur != NULL) && (pCur->next != NULL)) {
+ pNext = pCur->next;
+
+ if(pNext->data == data) {
+ pCur->next = pNext->next;
+ pCur = pNext->next;
+
+ delete(pNext);
+ } else {
+
+ pCur = pNext;
+
+ }
+ }
+}
+
+void list::dataSet(int data, int val)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ pCur->count = val;
+ }
+
+ pCur = pCur->next;
+ }
+}
+
+int list::dataGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->count;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::dataFixGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->data_fix;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::NodeNum(void)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while(pCur != NULL) {
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return count;
+}
+
+int list::nodeGet(int index)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while((pCur != NULL) && (count <= index)) {
+
+ if (count == index) {
+ return pCur->data;
+ }
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
int count = 0;
int i = 0;
int len = 0;
+ int data1, data2;
char *data_ptr;
ProcInfo Data_stru;
ProcZone::iterator proc_iter;
@@ -333,6 +498,16 @@
memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ if (flag == PROC_REG) {
+ gLinkedList.Insert(key, atoi(Data_stru.int_info));
+ }
}
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +537,9 @@
if ((proc_iter = proc->find(key)) != proc->end()) {
+ data1 = atoi((proc_iter->second).int_info);
+ data2 = atoi((proc_iter->second).svr_info);
+ BusServerSocket::_data_remove(data1, data2);
len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
strncpy(buf_temp, (proc_iter->second).proc_id, len);
proc->erase(proc_iter);
@@ -504,7 +682,9 @@
free(last_buf);
} else if (flag == PROC_QUE_STCS) {
+
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +692,9 @@
for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
count = *svr_proc_iter;
+ if ((proc_iter = proc->find(count)) != proc->end()) {
+ count = atoi((proc_iter->second).svr_info);
+ }
break;
}
@@ -669,14 +852,44 @@
}
}
+int BusServerSocket::get_data(int val) {
+
+ ProcZone::iterator proc_iter;
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+ return true;
+ }
+
+ return false;
+
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag) {
+ int ret;
+
+ ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+ return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+ BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
+}
+
// 杩愯浠g悊
int BusServerSocket::_run_proxy_() {
int size;
int key;
int flag;
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
char * action, *topic, *topics, *buf, *content;
size_t head_len;
bus_head_t head;
+ int val;
+ ProcDataZone::iterator proc_que_iter;
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
int rv;
char send_buf[512] = { 0x00 };
@@ -746,6 +959,16 @@
}
+ if (flag == PROC_REG) {
+ memcpy(buf_temp, content, strlen(content) + 1);
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ val = proc_que_iter->second;
+ _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+ }
+ }
+
_proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
}
@@ -770,3 +993,40 @@
return rv;
}
+
+void BusServerSocket::_data_remove(int val1, int val2) {
+
+ int i;
+ LockFreeQueue<shm_packet_t> *queue = NULL;
+ hashtable_t *hashtable = mm_get_hashtable();
+
+ void *data_ptr1 = hashtable_get(hashtable, val1);
+ void *data_ptr2 = hashtable_get(hashtable, val2);
+ if (data_ptr1 != NULL) {
+ if (data_ptr1 != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+ queue->close();
+ for (i = 0; i < queue->size(); i++) {
+ mm_free((*queue)[i].buf);
+ }
+ sleep(1);
+ }
+
+ hashtable_remove(hashtable, val1);
+ }
+
+ if (data_ptr2 != NULL) {
+ if (data_ptr2 != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
+ queue->close();
+ for (i = 0; i < queue->size(); i++) {
+ mm_free((*queue)[i].buf);
+ }
+ sleep(1);
+ }
+
+ hashtable_remove(hashtable, val2);
+ }
+
+}
+
--
Gitblit v1.8.0