From 5410446ade40493d17f7e2d7f0d687b0998acc6a Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 27 一月 2021 11:56:54 +0800
Subject: [PATCH] timeout wait 合一
---
src/socket/shm_mod_socket.h | 52 ++---
src/net/net_mod_socket_wrapper.h | 1
test_net_socket/test_net_mod_socket.cpp | 93 +++++++++-
test_net_socket/net_mod_socket.sh | 22 ++
src/socket/shm_mod_socket.cpp | 190 ++++++--------------
src/net/net_mod_server_socket.cpp | 8
src/net/net_mod_socket.cpp | 120 ++----------
7 files changed, 210 insertions(+), 276 deletions(-)
diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index 432defb..ede0e67 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -171,10 +171,10 @@
timeout.tv_sec = request_head.timeout / 1000;
timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
// printf(" timeout.tv_sec = %d, timeout.tv_nsec=%ld\n", timeout.tv_sec, timeout.tv_nsec );
- ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout);
+ ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
}
else if(request_head.timeout == 0) {
- ret = shmModSocket.sendandrecv_unsafe_nowait(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
+ ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
}
else if(request_head.timeout == -1) {
ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
@@ -236,10 +236,10 @@
if(request_head.timeout > 0) {
timeout.tv_sec = request_head.timeout / 1000;
timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
- ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout);
+ ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
}
else if(request_head.timeout == 0) {
- ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
+ ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
}
else if(request_head.timeout == -1) {
ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index baeef65..3668b14 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -17,7 +17,7 @@
{
int s;
if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
- logger->error(errno, "NetModSocket::NetModSocket signal");
+ logger->error(errno, "NetModSocket::NetModSocket signal");
gpool = new NetConnPool();
@@ -165,11 +165,11 @@
// 鏈湴鍙戦��
if(msec == 0) {
- ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
+ ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
} else if(msec > 0){
timeout.tv_sec = msec / 1000;
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
- ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
+ ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
} else {
ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
}
@@ -334,11 +334,11 @@
// 鏈湴鍙戦��
if(node_arr == NULL || arrlen == 0) {
if(msec == 0) {
- ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
} else if(msec > 0) {
timeout.tv_sec = msec / 1000;
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
- ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
} else {
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
}
@@ -354,11 +354,11 @@
if(node->host == NULL) {
// 鏈湴鍙戦��
if(msec == 0) {
- ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
} else if(msec > 0) {
timeout.tv_sec = msec / 1000;
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
- ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
} else {
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
}
@@ -457,55 +457,20 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int NetModSocket::sendto(const void *buf, const int size, const int key){
- int rv = shmModSocket.sendto(buf, size, key);
- if(rv == 0) {
- logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
- return 0;
- }
-
- if(rv > EBUS_BASE) {
- // bus_errno = EBUS_TIMEOUT;
- logger->debug("NetModSocket::sendto: %d sendto %d failed %s", get_key(), key, bus_strerror(rv));
- } else {
- logger->error(rv, "NetModSocket::sendto : %d sendto %d failed", get_key(), key);
- }
- return rv;
+ return shmModSocket.sendto(buf, size, key);
}
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
- if(rv == 0) {
- logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
- return 0;
- }
-
- if(rv > EBUS_BASE) {
- // bus_errno = EBUS_TIMEOUT;
- logger->debug("NetModSocket::sendto_timeout : %d sendto %d failed %s", get_key(), key, bus_strerror(rv));
- } else {
- logger->error(rv, "NetModSocket::sendto_timeout: %d sendto %d failed", get_key(), key);
- }
- return rv;
+ return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
- int rv = shmModSocket.sendto_nowait(buf, size, key);
- if(rv == 0) {
- logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
- return 0;
- }
-
- if(rv > EBUS_BASE) {
- // bus_errno = EBUS_TIMEOUT;
- logger->debug("NetModSocket::sendto_nowait %d sendto %d failed %s", get_key(), key, bus_strerror(rv));
-
- } else {
- logger->error(rv, "NetModSocket::sendto_nowait %d sendto %d failed", get_key(), key);
- }
- return rv;
+ return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
+
}
/**
@@ -515,54 +480,19 @@
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
- logger->debug(" %d NetModSocket::recvfrom before", get_key());
- int rv = shmModSocket.recvfrom(buf, size, key);
-
- if(rv == 0) {
- logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
- return 0;
- }
-
- if(rv > EBUS_BASE) {
- logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
- } else {
- logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed", get_key());
- }
- return rv;
+ return shmModSocket.recvfrom(buf, size, key);
+
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
- if(rv == 0) {
- logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key);
- return 0;
- }
-
- if(rv > EBUS_BASE) {
- // bus_errno = EBUS_TIMEOUT;
- logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom failed %s", get_key(), bus_strerror(rv));
- } else {
- logger->error(rv, "NetModSocket::recvfrom_timeout: %d recvfrom failed", get_key());
- }
- return rv;
+ return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
- int rv = shmModSocket.recvfrom_nowait(buf, size, key);
- if(rv == 0) {
- logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key);
- return 0;
- }
-
- if(rv > EBUS_BASE) {
- // bus_errno = EBUS_TIMEOUT;
- logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom failed %s", get_key(), bus_strerror(rv));
- } else {
- logger->error(rv, "NetModSocket::recvfrom_nowait: %d recvfrom failed", get_key());
- }
- return rv;
+ return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
/**
@@ -576,10 +506,10 @@
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
+ return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
- return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
+ return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
}
@@ -595,10 +525,10 @@
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.sub_timeout((char *)topic, size, key, &timeout);
+ return shmModSocket.sub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::sub_nowait( void *topic, int size, int key){
- return shmModSocket.sub_nowait((char *)topic, size, key);
+ return shmModSocket.sub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG);
}
@@ -615,10 +545,10 @@
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.desub_timeout((char *)topic, size, key, &timeout);
+ return shmModSocket.desub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::desub_nowait( void *topic, int size, int key){
- return shmModSocket.desub_nowait((char *)topic, size, key);
+ return shmModSocket.desub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG);
}
@@ -635,10 +565,10 @@
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
+ return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
}
int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
- return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
+ return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index 171a3a8..b794545 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -95,6 +95,7 @@
* @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key);
+
/**
* @brief 绛夊緟鎺ユ敹淇℃伅锛屽湪鎸囧畾鐨勬椂闂村唴鍗充娇娌℃湁鎺ュ彈鍒版秷鎭篃瑕佽繑鍥�
*
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 058c081..a012e80 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -18,7 +18,7 @@
struct timespec timeout = {1, 0};
if(bus_set != NULL) {
for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
- desub_timeout(NULL, 0, *bus_iter, &timeout);
+ desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
}
delete bus_set;
}
@@ -37,154 +37,65 @@
int ShmModSocket::force_bind(int key) {
return shm_socket_force_bind(shm_socket, key);
}
+
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendto(const void *buf, const int size, const int key) {
- return shm_sendto(shm_socket, buf, size, key, NULL, 0);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
+ int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+ if(rv == 0) {
+ logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
+ return 0;
+ }
+
+ logger->debug("ShmModSocket::sendto : %d sendto %d failed %s", get_key(), key, bus_strerror(rv));
+ return rv;
}
-// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
- return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
- return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
/**
* 鎺ユ敹淇℃伅
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
-
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+ if(rv == 0) {
+ logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+ return 0;
+ }
+
+ logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
return rv;
}
-
-
-int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-
- return rv;
-}
-
-int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
- int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
- // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
- return rv;
-}
+
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
+ void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
}
+
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
+int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
+ void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
}
-int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
-int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
+
/**
* 璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::sub(char *topic, int size, int key){
- return _sub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
- return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sub_nowait(char *topic, int size, int key) {
- return _sub_(topic, size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙栨秷璁㈤槄鎸囧畾涓婚
- * @topic 涓婚
- * @size 涓婚闀垮害
- * @key 鎬荤嚎绔彛
- */
-int ShmModSocket::desub(char *topic, int size, int key){
- return _desub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
- return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::desub_nowait(char *topic, int size, int key) {
- return _desub_(topic, size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙戝竷涓婚
- * @topic 涓婚
- * @content 涓婚鍐呭
- * @key 鎬荤嚎绔彛
- */
-int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
- return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
- return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
- return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-/**
- * 鑾峰彇soket key
- */
-int ShmModSocket::get_key(){
- return shm_socket->key;
-}
-
-
-
-// =============================================================================
-/**
- * @key 鎬荤嚎绔彛
- */
-int ShmModSocket::_sub_(char *topic, int topic_size, int key,
+int ShmModSocket::sub(char *topic, int topic_size, int key,
const struct timespec *timeout, int flags) {
-
-
int ret;
bus_head_t head = {};
memcpy(head.action, "sub", sizeof(head.action));
@@ -206,10 +117,15 @@
}
+
+
/**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
*/
-int ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
+int ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
// char buf[8192];
int ret;
if(topic == NULL) {
@@ -239,18 +155,15 @@
}
-/**
- * @key 鎬荤嚎绔彛
- * @str "<**pub**>{缁忔祹}"
- */
-
-int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
- // int head_len;
- // char buf[8192+content_size];
- // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
- // head_len = strlen(buf);
- // memcpy(buf+head_len, content, content_size);
+
+/**
+ * 鍙戝竷涓婚
+ * @topic 涓婚
+ * @content 涓婚鍐呭
+ * @key 鎬荤嚎绔彛
+ */
+int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
int ret;
bus_head_t head = {};
memcpy(head.action, "pub", sizeof(head.action));
@@ -267,9 +180,20 @@
return -1;
}
+}
+
+
+/**
+ * 鑾峰彇soket key
+ */
+int ShmModSocket::get_key(){
+ return shm_socket->key;
}
+
+
+// =============================================================================
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
@@ -277,7 +201,7 @@
int buf_size;
char *buf;
int max_buf_size;
- if((buf = (char *)malloc(MAXBUF)) == NULL) {
+ if((buf = (char *) malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
exit(1);
} else {
@@ -287,7 +211,7 @@
buf_size = BUS_HEAD_SIZE + content_size + topic_size ;
if(max_buf_size < buf_size) {
- if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+ if((buf = (char *) realloc(buf, buf_size)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf");
exit(1);
} else {
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 79b3247..f5441ce 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -31,11 +31,7 @@
private:
- int _sub_( char *topic, int size, int key, const struct timespec *timeouts, int flags);
- int _pub_( char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeouts, int flags);
-
- int _desub_( char *topic, int size, int key, const struct timespec *timeouts, int flags);
-
+
static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
@@ -65,50 +61,44 @@
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
+ * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
- int sendto(const void *buf, const int size, const int key);
- // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
- int sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout);
- // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
- int sendto_nowait(const void *buf, const int size, const int key);
+
+ int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
+
/**
* 鎺ユ敹淇℃伅
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
- int recvfrom(void **buf, int *size, int *key);
- // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int recvfrom_timeout(void **buf, int *size, int *key, const struct timespec *timeout);
- int recvfrom_nowait(void **buf, int *size, int *key);
+
+ int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0);
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
+ * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
- int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
- // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ;
- int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
+
+ int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
+ const struct timespec *timeout = NULL, int flag = 0);
- int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ;
- int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
+ int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
+ const struct timespec *timeout = NULL, int flag = 0) ;
/**
* 璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
+ * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG
*/
- int sub(char *topic, int size, int key);
- // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int sub_timeout(char *topic, int size, int key, const struct timespec *timeout);
- int sub_nowait(char *topic, int size, int key);
+ int sub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
/**
@@ -116,22 +106,18 @@
* @topic 涓婚,涓婚涓虹┖鏃跺彇娑堝叏閮ㄨ闃�
* @size 涓婚闀垮害
* @key 鎬荤嚎绔彛
+ * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG
*/
- int desub( char *topic, int size, int key);
- // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int desub_timeout(char *topic, int size, int key, const struct timespec *timeout);
- int desub_nowait(char *topic, int size, int key) ;
+ int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
/**
* 鍙戝竷涓婚
* @topic 涓婚
* @content 涓婚鍐呭
* @key 鎬荤嚎绔彛
+ * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG
*/
- int pub(char *topic, int topic_size, void *content, int content_size, int key);
- // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout);
- int pub_nowait(char *topic, int topic_size, void *content, int content_size, int key);
+ int pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0);
/**
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index fc639ca..dd566b2 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -9,6 +9,7 @@
./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! && echo "pid: ${server_pid}"
}
+# 浜や簰寮忓鎴风
function client() {
# ./test_net_mod_socket --fun="start_net_client" \
@@ -23,12 +24,25 @@
}
+# 鏃犻檺寰幆send
+function send() {
+ ./test_net_mod_socket --fun="test_net_sendandrecv" \
+ --sendlist="localhost:5000:100, localhost:5000:100"
+
+}
+# 澶氱嚎绋媠end
function msend() {
./test_net_mod_socket --fun="test_net_sendandrecv_threads" \
--sendlist="localhost:5000:100, localhost:5000:100"
}
-
+# 鏃犻檺寰幆 pub
+function pub() {
+ ./test_net_mod_socket --fun="test_net_pub" \
+ --publist="localhost:5000, localhost:5000"
+
+}
+# 澶氱嚎绋媝ub
function mpub() {
./test_net_mod_socket --fun="test_net_pub_threads" \
--publist="localhost:5000, localhost:5000"
@@ -56,9 +70,15 @@
"msend")
msend
;;
+ "send")
+ send
+ ;;
"mpub")
mpub
;;
+ "pub")
+ pub
+ ;;
"close")
close
;;
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index a1a47ec..46b8be2 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -5,6 +5,7 @@
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>
+#include "logger_factory.h"
#define SCALE 100000
@@ -141,7 +142,7 @@
int remote_port;
while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
// printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
- sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf);
+ sprintf(sendbuf, "RECEIVED锛� %s", recvbuf);
net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
free(recvbuf);
}
@@ -194,7 +195,7 @@
n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
printf(" %d nodes reply\n", n);
for(i=0; i<recv_arr_size; i++) {
- printf("host:%s, port: %d, key:%d, content: %s\n",
+ printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
recv_arr[i].host,
recv_arr[i].port,
recv_arr[i].key,
@@ -247,7 +248,8 @@
Targ *targ = (Targ *)arg;
char sendbuf[512];
- int i,j, n, recv_arr_size;
+ int i,j, n;
+ int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
int total = 0;
@@ -271,7 +273,7 @@
n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
printf("%d: send %d nodes\n", i, n);
for(j=0; j < recv_arr_size; j++) {
- fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n",
+ fprintf(fp, "reply from (host:%s, port: %d, key:%d) >> %s\n",
recv_arr[j].host,
recv_arr[j].port,
recv_arr[j].key,
@@ -287,9 +289,10 @@
return (void *)total;
}
+//澶氱嚎绋媠end
void test_net_sendandrecv_threads(char *nodelist) {
- int status, i = 0, processors = 1;
+ int status, i = 0, processors = 4;
void *res[processors];
// Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
Targ targs[processors];
@@ -326,10 +329,42 @@
}
+// 鏃犻檺寰幆send
+void test_net_sendandrecv(char *nodelist) {
+
+ int n, i;
+ void * client;
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(nodelist, &node_arr);
+ char content[128];
+
+ sprintf(content, "pid:%ld say Hello!!", (long)getpid());
+ client = net_mod_socket_open();
+ while(true) {
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
+ printf(" %d nodes reply\n", n);
+ for(i=0; i<recv_arr_size; i++) {
+ LoggerFactory::getLogger()->debug("reply from (host:%s, port: %d, key:%d) >> %s\n",
+ recv_arr[i].host,
+ recv_arr[i].port,
+ recv_arr[i].key,
+ recv_arr[i].content
+ );
+ }
+
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
+
+ net_mod_socket_close(client);
+
+}
void *_run_pub_(void *arg) {
Targ *targ = (Targ *)arg;
- char sendbuf[512];
+ char sendbuf[128];
int i,j, n;
int total = 0;
@@ -338,9 +373,6 @@
int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
char *topic = "news";
-
-
-
// char filename[512];
// sprintf(filename, "test%d.tmp", targ->id);
// FILE *fp = NULL;
@@ -353,7 +385,7 @@
n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
// n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
- printf( "pub:%s to %d nodes\n", sendbuf, n);
+ LoggerFactory::getLogger()->debug( "pub:%s to %d nodes\n", sendbuf, n);
total += n;
}
// fclose(fp);
@@ -361,6 +393,7 @@
return (void *)total;
}
+//澶氱嚎绋媝ub
void test_net_pub_threads(char *nodelist) {
int status, i = 0, processors = 4;
@@ -399,6 +432,28 @@
// fflush(stdout);
net_mod_socket_close(client);
}
+
+// 鏃犻檺寰幆pub
+void test_net_pub(char *nodelist) {
+
+ int n;
+ char sendbuf[512];
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(nodelist, &node_arr);
+
+ char *topic = "news";
+ sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
+
+ void * client = net_mod_socket_open();
+ while (true) {
+ n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
+ // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+ LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n);
+ }
+ net_mod_socket_close(client);
+}
+
+
@@ -456,6 +511,15 @@
test_net_sendandrecv_threads(opt.sendlist);
}
+ else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
+ if(opt.sendlist == 0) {
+ fprintf(stderr, "Missing sendlist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+
+ test_net_sendandrecv(opt.sendlist);
+ }
else if (strcmp("test_net_pub_threads", opt.fun) == 0) {
if(opt.publist == 0) {
fprintf(stderr, "Missing publist .\n");
@@ -465,6 +529,15 @@
test_net_pub_threads(opt.publist);
}
+ else if (strcmp("test_net_pub", opt.fun) == 0) {
+ if(opt.publist == 0) {
+ fprintf(stderr, "Missing publist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+
+ test_net_pub(opt.publist);
+ }
else {
usage(argv[0]);
--
Gitblit v1.8.0