From f52f2c2828047c2f30d30fc1fe2b54d8db146d49 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 25 二月 2021 15:56:35 +0800 Subject: [PATCH] update --- src/socket/shm_mod_socket.cpp | 238 +++++++++++++++++++++-------------------------------------- 1 files changed, 84 insertions(+), 154 deletions(-) diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index 6e622a8..466d0b5 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -3,14 +3,9 @@ static Logger *logger = LoggerFactory::getLogger(); -size_t ShmModSocket::remove_keys(int keys[], size_t length) { - BusServerSocket::remove_subscripters(keys, length); - return shm_socket_remove_keys(keys, length); -} ShmModSocket::ShmModSocket() { - mod = (socket_mod_t)0; - shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); bus_set = new std::set<int>; } @@ -19,13 +14,18 @@ struct timespec timeout = {1, 0}; if(bus_set != NULL) { for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { - desub_timeout(NULL, 0, *bus_iter, &timeout); + desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG); } delete bus_set; } - shm_close_socket(shm_socket); + shm_socket_close(shm_socket); } + +int ShmModSocket::stop() { + return shm_socket_stop(shm_socket); +} + int ShmModSocket::bind(int key) { return shm_socket_bind(shm_socket, key); @@ -38,165 +38,79 @@ int ShmModSocket::force_bind(int key) { return shm_socket_force_bind(shm_socket, key); } + /** * 鍙戦�佷俊鎭� * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendto(const void *buf, const int size, const int key) { - return shm_sendto(shm_socket, buf, size, key, NULL, 0); -} -// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { - return shm_sendto(shm_socket, buf, size, key, timeout, 0); -} -// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ - return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); +int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) { + int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag); + if(rv == 0) { + logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key); + return 0; + } + + logger->debug("ShmModSocket::sendto : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); + return rv; } - -inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) { - - if(mod == BUS) { - logger->error("Can not use method recvfrom in a Bus"); - exit(1); - } -// printf("dgram_mod_recvfrom before\n"); - int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); -// printf("dgram_mod_recvfrom after\n"); - return rv; -} /** * 鎺ユ敹淇℃伅 * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::recvfrom(void **buf, int *size, int *key) { - - return _recvfrom_( buf, size, key, NULL, 0); -} +int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { + int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); + if(rv == 0) { + logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); + return 0; + } -// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { - return _recvfrom_(buf, size, key, timeout, 0); + logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); + return rv; } - -int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ - return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); -} + /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); -} -int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); -} +int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, + void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ + int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); -int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); -} -int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); + if(rv == 0) { + logger->debug("ShmModSocket::sendandrecv: sendandrecv to %d success.\n", send_key); + return 0; + } + + logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s", send_key, bus_strerror(rv)); + return rv; } - - +int ShmModSocket::recvandsend( recvandsend_callback_fn callback, + const struct timespec *timeout , int flag, void * user_data ) { + return shm_recvandsend(shm_socket, callback, timeout, flag, user_data); +} + +// // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +// int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key, +// void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ +// return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); +// } + /** * 璁㈤槄鎸囧畾涓婚 * @topic 涓婚 * @size 涓婚闀垮害 * @key 鎬荤嚎绔彛 */ -int ShmModSocket::sub(char *topic, int size, int key){ - return _sub_( topic, size, key, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){ - return _sub_(topic, size, key, timeout, 0); -} -int ShmModSocket::sub_nowait(char *topic, int size, int key) { - return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); -} - - - -/** - * 鍙栨秷璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @key 鎬荤嚎绔彛 - */ -int ShmModSocket::desub(char *topic, int size, int key){ - return _desub_( topic, size, key, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){ - return _desub_(topic, size, key, timeout, 0); -} -int ShmModSocket::desub_nowait(char *topic, int size, int key) { - return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); -} - - - -/** - * 鍙戝竷涓婚 - * @topic 涓婚 - * @content 涓婚鍐呭 - * @key 鎬荤嚎绔彛 - */ -int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ - return _pub_(topic, topic_size, content, content_size, key, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){ - return _pub_( topic, topic_size, content, content_size, key, timeout, 0); -} -int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ - return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); -} - - -/** - * 鑾峰彇soket key - */ -int ShmModSocket::get_key(){ - return shm_socket->key; -} - - - -// ============================================================================= -/** - * @key 鎬荤嚎绔彛 - */ -int ShmModSocket::_sub_(char *topic, int topic_size, int key, - struct timespec *timeout, int flags) { - // char buf[8192]; - // int rv; - // snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); - // if(rv == 0) { - // bus_set->insert(key); - // } - // return rv; - +int ShmModSocket::sub(const char *topic, int topic_size, int key, + const struct timespec *timeout, int flags) { int ret; bus_head_t head = {}; memcpy(head.action, "sub", sizeof(head.action)); @@ -218,11 +132,15 @@ } + + /** + * 鍙栨秷璁㈤槄鎸囧畾涓婚 + * @topic 涓婚 + * @size 涓婚闀垮害 * @key 鎬荤嚎绔彛 */ -int ShmModSocket::_desub_(char *topic, int topic_size, int key, - struct timespec *timeout, int flags) { +int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { // char buf[8192]; int ret; if(topic == NULL) { @@ -240,26 +158,27 @@ if(size > 0) { ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); free(buf); - return ret; + if(ret == 0) { + return 0; + } else { + logger->error("ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(ret)); + return ret; + } } else { return -1; } } -/** - * @key 鎬荤嚎绔彛 - * @str "<**pub**>{缁忔祹}" - */ - -int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, - struct timespec *timeout, int flags) { - // int head_len; - // char buf[8192+content_size]; - // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - // head_len = strlen(buf); - // memcpy(buf+head_len, content, content_size); + +/** + * 鍙戝竷涓婚 + * @topic 涓婚 + * @content 涓婚鍐呭 + * @key 鎬荤嚎绔彛 + */ +int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) { int ret; bus_head_t head = {}; memcpy(head.action, "pub", sizeof(head.action)); @@ -276,17 +195,28 @@ return -1; } +} + + +/** + * 鑾峰彇soket key + */ +int ShmModSocket::get_key(){ + return shm_socket->key; } + +// ============================================================================= + int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, - void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { + const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) { int buf_size; char *buf; int max_buf_size; - if((buf = (char *)malloc(MAXBUF)) == NULL) { + if((buf = (char *) malloc(MAXBUF)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); exit(1); } else { @@ -296,7 +226,7 @@ buf_size = BUS_HEAD_SIZE + content_size + topic_size ; if(max_buf_size < buf_size) { - if((buf = (char *)realloc(buf, buf_size)) == NULL) { + if((buf = (char *) realloc(buf, buf_size)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf"); exit(1); } else { -- Gitblit v1.8.0