From 8df2b63c21d0aabaa894930e3ab1ea63c49d47ff Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 13 一月 2021 17:54:51 +0800
Subject: [PATCH] fix bug invalid argument
---
src/queue/lock_free_queue.h | 12 +++--
src/px_sem_util.cpp | 15 ++++---
src/socket/shm_mod_socket.cpp | 32 +++++++++------
src/socket/shm_socket.cpp | 17 ++++++--
src/socket/net_mod_socket.cpp | 18 ++++----
test_net_socket/heart_beat.cpp | 11 +++--
6 files changed, 64 insertions(+), 41 deletions(-)
diff --git a/src/px_sem_util.cpp b/src/px_sem_util.cpp
index 80008fb..9de9c38 100644
--- a/src/px_sem_util.cpp
+++ b/src/px_sem_util.cpp
@@ -1,13 +1,16 @@
#include "px_sem_util.h"
+#define NANO 1000000000
struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) {
- int tmp_sec;
+
+ struct timespec res;
struct timespec timeout;
if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
err_exit(errno, "clock_gettime");
- timeout.tv_nsec += ts->tv_nsec;
- tmp_sec = timeout.tv_nsec / 10e9;
- timeout.tv_nsec = timeout.tv_nsec - tmp_sec * 10e9;
- timeout.tv_sec += ts->tv_sec + tmp_sec;
- return timeout;
+
+ res.tv_sec = timeout.tv_sec + ts->tv_sec;
+ res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
+ res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
+ res.tv_nsec = res.tv_nsec % NANO;
+ return res;
}
\ No newline at end of file
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 3914b85..e1429eb 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -262,12 +262,12 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
-
+ int rv;
struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld",
// timeout.tv_sec, timeout.tv_nsec);
- while (sem_timedwait(&slots, &timeout) == -1) {
+ while ( sem_timedwait(&slots, &timeout) == -1) {
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n",
// timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
@@ -343,9 +343,11 @@
template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
+// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec, ts->tv_nsec );
+ // struct timespec timeout_tmp = {1, 0};
struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
+// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec, timeout.tv_nsec );
while (sem_timedwait(&items, &timeout) == -1) {
if (errno == ETIMEDOUT)
@@ -353,8 +355,8 @@
else if(errno == EINTR)
continue;
else {
- LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout");
- return -1;
+ // LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
+ return errno;
}
}
diff --git a/src/socket/net_mod_socket.cpp b/src/socket/net_mod_socket.cpp
index f7c1242..7eaa906 100644
--- a/src/socket/net_mod_socket.cpp
+++ b/src/socket/net_mod_socket.cpp
@@ -67,15 +67,15 @@
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
- _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
+ return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
}
int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) {
- _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
+ return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
}
int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
- _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
+ return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
}
@@ -327,13 +327,13 @@
// 鏈湴鍙戦��
if(node_arr == NULL || arrlen == 0) {
if(msec == 0) {
- ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+ ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
} else if(msec > 0) {
timeout.tv_sec = msec / 1000;
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
- ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+ ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
} else {
- ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
}
if(ret == 0 ) {
n_pub_suc++;
@@ -346,13 +346,13 @@
if(node->host == NULL) {
// 鏈湴鍙戦��
if(msec == 0) {
- ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+ ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
} else if(msec > 0) {
timeout.tv_sec = msec / 1000;
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
- ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+ ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
} else {
- ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
}
if(ret == 0 ) {
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 6e622a8..8a9133d 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -65,26 +65,32 @@
// printf("dgram_mod_recvfrom before\n");
int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
// printf("dgram_mod_recvfrom after\n");
+
return rv;
}
+
/**
* 鎺ユ敹淇℃伅
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
-
- return _recvfrom_( buf, size, key, NULL, 0);
+ int rv = _recvfrom_( buf, size, key, NULL, 0);
+ // logger->error(rv, "ShmModSocket::recvfrom failed!");
+ return rv;
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
- return _recvfrom_(buf, size, key, timeout, 0);
+ int rv = _recvfrom_(buf, size, key, timeout, 0);
+ return rv;
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
- return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+ int rv = _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+ // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
+ return rv;
}
/**
@@ -188,14 +194,7 @@
*/
int ShmModSocket::_sub_(char *topic, int topic_size, int key,
struct timespec *timeout, int flags) {
- // char buf[8192];
- // int rv;
- // snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
- // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
- // if(rv == 0) {
- // bus_set->insert(key);
- // }
- // return rv;
+
int ret;
bus_head_t head = {};
@@ -240,7 +239,14 @@
if(size > 0) {
ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
free(buf);
- return ret;
+ if(ret == EBUS_TIMEOUT) {
+ logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
+ return EBUS_TIMEOUT;
+ } else {
+ logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key);
+ return ret;
+ }
+
} else {
return -1;
}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index c0f4c44..d094753 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -386,12 +386,12 @@
delete remoteQueue;
mm_free(dest.buf);
if(rv == EBUS_TIMEOUT) {
- bus_errno = EBUS_TIMEOUT;
- logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
+ // bus_errno = EBUS_TIMEOUT;
+ logger->error(rv, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
return EBUS_TIMEOUT;
} else {
//logger->error(errno, "sendto key %d failed!", key);
- return -1;
+ return rv;
}
@@ -457,7 +457,16 @@
mm_free(src.buf);
return 0;
} else {
- return -1;
+
+ if(rv == EBUS_TIMEOUT) {
+ // bus_errno = EBUS_TIMEOUT;
+ logger->error("shm_recvfrom failed, %s", bus_strerror(EBUS_TIMEOUT));
+ return EBUS_TIMEOUT;
+ } else {
+ logger->error(rv, "shm_recvfrom failed!");
+ return rv;
+ }
+
}
}
diff --git a/test_net_socket/heart_beat.cpp b/test_net_socket/heart_beat.cpp
index 554df68..40ea621 100644
--- a/test_net_socket/heart_beat.cpp
+++ b/test_net_socket/heart_beat.cpp
@@ -28,10 +28,13 @@
char sendbuf[512];
int rv;
int remote_port;
- while (net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) == 0) {
- printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
- net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
- free(recvbuf);
+ while (true) {
+ if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
+ printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
+ net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
+ free(recvbuf);
+ }
+
}
// sleep(1000);
net_mod_socket_close(serv);
--
Gitblit v1.8.0