From acbf282b23b4cbdebca562d67132573de3902f94 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 17 九月 2021 10:45:43 +0800
Subject: [PATCH] Merge branch 'master' of http://os.smartai.com:9091/valib/c_bhomebus
---
src/socket/bus_server_socket.cpp | 209 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 208 insertions(+), 1 deletions(-)
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index d5e757d..cfb7419 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
+list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
@@ -296,6 +297,169 @@
return dataBuf;
}
+void list::Insert(int aData, int bData)
+{
+ LinkNode *pHead = NULL;
+ LinkNode *pNew = NULL;
+ LinkNode *pCur = NULL;
+
+ pNew = new(LinkNode);
+ pNew->data = aData;
+ pNew->data_fix = bData;
+ pNew->count = 0;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL) {
+ head = pNew;
+
+ pNew->next = NULL;
+
+ } else {
+ while(pCur->next != NULL) {
+ pCur = pCur->next;
+ }
+
+ pCur->next = pNew;
+ pNew->next = NULL;
+ }
+}
+
+void list::Delete(int data)
+{
+ LinkNode *pHead;
+ LinkNode *pCur;
+ LinkNode *pNext;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL)
+ return;
+
+ while((pCur != NULL) && (pCur->data == data)) {
+
+ head = pCur->next;
+
+ delete(pCur);
+
+ pCur = head;
+
+ }
+
+ while((pCur != NULL) && (pCur->next != NULL)) {
+ pNext = pCur->next;
+
+ if(pNext->data == data) {
+ pCur->next = pNext->next;
+ pCur = pNext->next;
+
+ delete(pNext);
+ } else {
+
+ pCur = pNext;
+
+ }
+ }
+}
+
+void list::dataSet(int data, int val)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ pCur->count = val;
+ }
+
+ pCur = pCur->next;
+ }
+}
+
+int list::dataGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->count;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::dataFixGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->data_fix;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::NodeNum(void)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while(pCur != NULL) {
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return count;
+}
+
+int list::nodeGet(int index)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while((pCur != NULL) && (count <= index)) {
+
+ if (count == index) {
+ return pCur->data;
+ }
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
@@ -340,7 +504,10 @@
memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
count += strlen(buf + count) + 1;
-
+
+ if (flag == PROC_REG) {
+ gLinkedList.Insert(key, atoi(Data_stru.int_info));
+ }
}
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -685,14 +852,44 @@
}
}
+int BusServerSocket::get_data(int val) {
+
+ ProcZone::iterator proc_iter;
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+ return true;
+ }
+
+ return false;
+
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag) {
+ int ret;
+
+ ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+ return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+ BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
+}
+
// 杩愯浠g悊
int BusServerSocket::_run_proxy_() {
int size;
int key;
int flag;
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
char * action, *topic, *topics, *buf, *content;
size_t head_len;
bus_head_t head;
+ int val;
+ ProcDataZone::iterator proc_que_iter;
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
int rv;
char send_buf[512] = { 0x00 };
@@ -762,6 +959,16 @@
}
+ if (flag == PROC_REG) {
+ memcpy(buf_temp, content, strlen(content) + 1);
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ val = proc_que_iter->second;
+ _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+ }
+ }
+
_proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
}
--
Gitblit v1.8.0