From 7fa99e0efe07a120af719da2cae5c1151f024403 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 03 四月 2024 10:17:25 +0800
Subject: [PATCH] remove some defer
---
src/socket/shm_mod_socket.cpp | 215 +++++++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 183 insertions(+), 32 deletions(-)
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index a012e80..ababb7d 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -3,18 +3,13 @@
static Logger *logger = LoggerFactory::getLogger();
-size_t ShmModSocket::remove_keys(int keys[], size_t length) {
- BusServerSocket::remove_subscripters(keys, length);
- return shm_socket_remove_keys(keys, length);
-}
ShmModSocket::ShmModSocket() {
- shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
bus_set = new std::set<int>;
}
ShmModSocket::~ShmModSocket() {
- // logger->debug("Close ShmModSocket...\n");
struct timespec timeout = {1, 0};
if(bus_set != NULL) {
for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
@@ -23,8 +18,13 @@
delete bus_set;
}
- shm_close_socket(shm_socket);
+ shm_socket_close(shm_socket);
}
+
+int ShmModSocket::stop() {
+ return shm_socket_stop(shm_socket);
+}
+
int ShmModSocket::bind(int key) {
return shm_socket_bind(shm_socket, key);
@@ -38,15 +38,137 @@
return shm_socket_force_bind(shm_socket, key);
}
+int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
+{
+ int ret;
+ struct timespec ts;
+
+ bus_head_t head = {};
+
+ if (flag == PROC_REG) {
+
+ memcpy(head.action, "reg", sizeof(head.action));
+
+ } else if (flag == PROC_UNREG) {
+
+ memcpy(head.action, "unreg", sizeof(head.action));
+
+ } else if (flag == PROC_REG_TCS) {
+
+ memcpy(head.action, "tcsreg", sizeof(head.action));
+
+ } else if (flag == PROC_QUE_TCS) {
+
+ memcpy(head.action, "tcsque", sizeof(head.action));
+
+ } else if (flag == PROC_QUE_STCS) {
+
+ memcpy(head.action, "stcsque", sizeof(head.action));
+
+ } else if (flag == PROC_QUE_ATCS) {
+
+ memcpy(head.action, "atcsque", sizeof(head.action));
+
+ } else if (flag == PROC_REG_BUF) {
+
+ memcpy(head.action, "bufreg", sizeof(head.action));
+
+ } else {
+
+ return -1;
+
+ }
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+ head.topic_size = 0;
+
+ if (pData != NULL) {
+
+ head.content_size = sizeof(ProcInfo);
+
+ } else {
+
+ head.content_size = 0;
+
+ }
+ } else {
+
+ head.topic_size = len;
+
+ head.content_size = 0;
+
+ }
+
+ void *buf_temp;
+ int buf_size;
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+ buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
+
+ } else {
+
+ buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
+
+ }
+
+ if (timeout_ms > 0) {
+
+ ts.tv_sec = timeout_ms /1000;
+
+ ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
+
+ ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
+
+ } else {
+
+ ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
+
+ }
+
+ } else if (timeout_ms == 0) {
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
+
+ ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
+
+ } else {
+
+ ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
+
+ }
+
+ } else {
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
+
+ ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
+
+ } else {
+
+ ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
+
+ }
+
+ }
+
+ free(buf_temp);
+
+ return ret;
+
+}
+
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
- int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
+ int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
if(rv == 0) {
- logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
return 0;
}
@@ -59,11 +181,11 @@
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) {
+
+ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
if(rv == 0) {
- logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
return 0;
}
@@ -77,16 +199,29 @@
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
+ int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
+
+ if(rv == 0) {
+ return 0;
+ }
+
+ logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s", send_key, bus_strerror(rv));
+ return rv;
+}
+
+
+int ShmModSocket::recvandsend( recvandsend_callback_fn callback,
+ const struct timespec *timeout , int flag, void * user_data ) {
+ return shm_recvandsend(shm_socket, callback, timeout, flag, user_data);
}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
- void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
-}
+// // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+// int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
+// void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+// return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
+// }
/**
* 璁㈤槄鎸囧畾涓婚
@@ -94,7 +229,7 @@
* @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::sub(char *topic, int topic_size, int key,
+int ShmModSocket::sub(const char *topic, int topic_size, int key,
const struct timespec *timeout, int flags) {
int ret;
bus_head_t head = {};
@@ -125,7 +260,7 @@
* @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
+int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
// char buf[8192];
int ret;
if(topic == NULL) {
@@ -163,14 +298,15 @@
* @content 涓婚鍐呭
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
+int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) {
int ret;
bus_head_t head = {};
+ topic = trim(const_cast<char *>(topic), 0);
memcpy(head.action, "pub", sizeof(head.action));
head.topic_size = topic_size = strlen(topic) + 1;
head.content_size = content_size;
-
- void *buf;
+
+ void *buf;
int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf);
if(size > 0) {
ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
@@ -196,11 +332,13 @@
// =============================================================================
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
- void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
+ const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) {
int buf_size;
char *buf;
int max_buf_size;
+ void *buf_ptr;
+ int count = 0;
if((buf = (char *) malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
exit(1);
@@ -208,7 +346,7 @@
max_buf_size = MAXBUF;
}
- buf_size = BUS_HEAD_SIZE + content_size + topic_size ;
+ buf_size = BUS_HEAD_SIZE + content_size + topic_size;
if(max_buf_size < buf_size) {
if((buf = (char *) realloc(buf, buf_size)) == NULL) {
@@ -219,13 +357,26 @@
}
}
- memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
+ buf_ptr = ShmModSocket::encode_bus_head(request_head);
+ memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
if(topic_size != 0 )
memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
- if(content_size != 0)
- memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+ if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
+ (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
+ memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+ } else {
+ if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
+ strlen("unreg")) == 0)) && (content_buf != NULL)) {
+ proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
+
+ request_head.content_size = count;
+ buf_size -= (content_size - count);
+
+ }
+ }
*retbuf = buf;
+ free(buf_ptr);
return buf_size;
}
@@ -244,7 +395,7 @@
tmp_ptr += sizeof(head.action);
PUT(tmp_ptr, htonl(head.topic_size));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(head.topic_size);
PUT(tmp_ptr, htonl(head.content_size));
return headbs;
@@ -259,7 +410,7 @@
tmp_ptr += sizeof(head.action);
head.topic_size = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(head.topic_size);
head.content_size = ntohl(GET(tmp_ptr));
return head;
--
Gitblit v1.8.0