From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@aiot.com>
Date: 星期三, 11 八月 2021 09:50:20 +0800
Subject: [PATCH] fix from 3.1 first commit
---
src/socket/shm_mod_socket.cpp | 229 +++++++++++++++++++++------------------------------------
1 files changed, 84 insertions(+), 145 deletions(-)
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index d090f0e..abd9477 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -3,28 +3,28 @@
static Logger *logger = LoggerFactory::getLogger();
-size_t ShmModSocket::remove_keys(int keys[], size_t length) {
- BusServerSocket::remove_subscripters(keys, length);
- return shm_socket_remove_keys(keys, length);
-}
ShmModSocket::ShmModSocket() {
- shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
bus_set = new std::set<int>;
}
ShmModSocket::~ShmModSocket() {
- // logger->debug("Close ShmModSocket...\n");
struct timespec timeout = {1, 0};
if(bus_set != NULL) {
for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
- desub_timeout(NULL, 0, *bus_iter, &timeout);
+ desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
}
delete bus_set;
}
- shm_close_socket(shm_socket);
+ shm_socket_close(shm_socket);
}
+
+int ShmModSocket::stop() {
+ return shm_socket_stop(shm_socket);
+}
+
int ShmModSocket::bind(int key) {
return shm_socket_bind(shm_socket, key);
@@ -37,156 +37,79 @@
int ShmModSocket::force_bind(int key) {
return shm_socket_force_bind(shm_socket, key);
}
+
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendto(const void *buf, const int size, const int key) {
- return shm_sendto(shm_socket, buf, size, key, NULL, 0);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
+ int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+ if(rv == 0) {
+ logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
+ return 0;
+ }
+
+ logger->debug("ShmModSocket::sendto : %d sendto %d failed %s", get_key(), key, bus_strerror(rv));
+ return rv;
}
-// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
- return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
- return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
/**
* 鎺ユ敹淇℃伅
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
-
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+ if(rv == 0) {
+ logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+ return 0;
+ }
+
+ logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
return rv;
}
-
-
-int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-
- return rv;
- // printf("ShmModSocket::recvfrom_timeout\n");
- // return 501;
-}
-
-int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
- int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
- // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
- return rv;
-}
+
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
+ void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+ int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
+
+ if(rv == 0) {
+ logger->debug("ShmModSocket::sendandrecv: sendandrecv to %d success.\n", send_key);
+ return 0;
+ }
+
+ logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s", send_key, bus_strerror(rv));
+ return rv;
}
-
-
-int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
+int ShmModSocket::recvandsend( recvandsend_callback_fn callback,
+ const struct timespec *timeout , int flag, void * user_data ) {
+ return shm_recvandsend(shm_socket, callback, timeout, flag, user_data);
}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
+
+// // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+// int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
+// void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+// return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
+// }
+
/**
* 璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::sub(char *topic, int size, int key){
- return _sub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
- return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sub_nowait(char *topic, int size, int key) {
- return _sub_(topic, size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙栨秷璁㈤槄鎸囧畾涓婚
- * @topic 涓婚
- * @size 涓婚闀垮害
- * @key 鎬荤嚎绔彛
- */
-int ShmModSocket::desub(char *topic, int size, int key){
- return _desub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
- return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::desub_nowait(char *topic, int size, int key) {
- return _desub_(topic, size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙戝竷涓婚
- * @topic 涓婚
- * @content 涓婚鍐呭
- * @key 鎬荤嚎绔彛
- */
-int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
- return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
- return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
- return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-/**
- * 鑾峰彇soket key
- */
-int ShmModSocket::get_key(){
- return shm_socket->key;
-}
-
-
-
-// =============================================================================
-/**
- * @key 鎬荤嚎绔彛
- */
-int ShmModSocket::_sub_(char *topic, int topic_size, int key,
+int ShmModSocket::sub(const char *topic, int topic_size, int key,
const struct timespec *timeout, int flags) {
-
-
int ret;
bus_head_t head = {};
memcpy(head.action, "sub", sizeof(head.action));
@@ -208,10 +131,15 @@
}
+
+
/**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
+int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
// char buf[8192];
int ret;
if(topic == NULL) {
@@ -241,18 +169,15 @@
}
-/**
- * @key 鎬荤嚎绔彛
- * @str "<**pub**>{缁忔祹}"
- */
-
-int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
- // int head_len;
- // char buf[8192+content_size];
- // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
- // head_len = strlen(buf);
- // memcpy(buf+head_len, content, content_size);
+
+/**
+ * 鍙戝竷涓婚
+ * @topic 涓婚
+ * @content 涓婚鍐呭
+ * @key 鎬荤嚎绔彛
+ */
+int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) {
int ret;
bus_head_t head = {};
memcpy(head.action, "pub", sizeof(head.action));
@@ -269,17 +194,29 @@
return -1;
}
+}
+
+
+/**
+ * 鑾峰彇soket key
+ */
+int ShmModSocket::get_key(){
+ return shm_socket->key;
}
+
+// =============================================================================
+
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
- void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
+ const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) {
int buf_size;
char *buf;
int max_buf_size;
- if((buf = (char *)malloc(MAXBUF)) == NULL) {
+ void *buf_ptr;
+ if((buf = (char *) malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
exit(1);
} else {
@@ -289,7 +226,7 @@
buf_size = BUS_HEAD_SIZE + content_size + topic_size ;
if(max_buf_size < buf_size) {
- if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+ if((buf = (char *) realloc(buf, buf_size)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf");
exit(1);
} else {
@@ -297,13 +234,15 @@
}
}
- memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
+ buf_ptr = ShmModSocket::encode_bus_head(request_head);
+ memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
if(topic_size != 0 )
memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
if(content_size != 0)
memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
*retbuf = buf;
+ free(buf_ptr);
return buf_size;
}
@@ -322,7 +261,7 @@
tmp_ptr += sizeof(head.action);
PUT(tmp_ptr, htonl(head.topic_size));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(head.topic_size);
PUT(tmp_ptr, htonl(head.content_size));
return headbs;
@@ -337,7 +276,7 @@
tmp_ptr += sizeof(head.action);
head.topic_size = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(head.topic_size);
head.content_size = ntohl(GET(tmp_ptr));
return head;
--
Gitblit v1.8.0