From b861de29176891657cc96631ddbfb4ea9e114a42 Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期一, 30 八月 2021 17:52:23 +0800 Subject: [PATCH] re-structure the communication work flow. --- src/socket/shm_mod_socket.cpp | 150 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 143 insertions(+), 7 deletions(-) diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index abd9477..a94b9c3 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -38,6 +38,129 @@ return shm_socket_force_bind(shm_socket, key); } +int ShmModSocket::bind_proc_id(char *buf, int len) { + return shm_socket_bind_proc_id(shm_socket, buf, len); +} + +int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) +{ + int ret; + struct timespec ts; + + bus_head_t head = {}; + + if (flag == PROC_REG) { + + memcpy(head.action, "reg", sizeof(head.action)); + + } else if (flag == PROC_UNREG) { + + memcpy(head.action, "unreg", sizeof(head.action)); + + } else if (flag == PROC_REG_TCS) { + + memcpy(head.action, "tcsreg", sizeof(head.action)); + + } else if (flag == PROC_QUE_TCS) { + + memcpy(head.action, "tcsque", sizeof(head.action)); + + } else if (flag == PROC_QUE_STCS) { + + memcpy(head.action, "stcsque", sizeof(head.action)); + + } else if (flag == PROC_QUE_ATCS) { + + memcpy(head.action, "atcsque", sizeof(head.action)); + + } else { + + return -1; + + } + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + head.topic_size = 0; + + if (pData != NULL) { + + head.content_size = sizeof(ProcInfo); + + } else { + + head.content_size = 0; + + } + } else { + + head.topic_size = len; + + head.content_size = 0; + + } + + void *buf_temp; + int buf_size; + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp); + + } else { + + buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp); + + } + + if (timeout_ms > 0) { + + ts.tv_sec = timeout_ms /1000; + + ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000; + + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + + ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG); + + } else { + + ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG); + + } + + } else if (timeout_ms == 0) { + + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + + ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG); + + } else { + + ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG); + + } + + } else { + + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + + ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1); + + } else { + + ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1); + + } + + } + + free(buf_temp); + + return ret; + +} + /** * 鍙戦�佷俊鎭� * @key 鍙戦�佺粰璋� @@ -60,7 +183,8 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { - int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); + + int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); if(rv == 0) { logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); @@ -77,7 +201,7 @@ * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, +int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); @@ -183,8 +307,8 @@ memcpy(head.action, "pub", sizeof(head.action)); head.topic_size = topic_size = strlen(topic) + 1; head.content_size = content_size; - - void *buf; + + void *buf; int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf); if(size > 0) { ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); @@ -216,6 +340,7 @@ char *buf; int max_buf_size; void *buf_ptr; + int count = 0; if((buf = (char *) malloc(MAXBUF)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); exit(1); @@ -223,7 +348,7 @@ max_buf_size = MAXBUF; } - buf_size = BUS_HEAD_SIZE + content_size + topic_size ; + buf_size = BUS_HEAD_SIZE + content_size + topic_size; if(max_buf_size < buf_size) { if((buf = (char *) realloc(buf, buf_size)) == NULL) { @@ -238,8 +363,19 @@ memcpy(buf, buf_ptr, BUS_HEAD_SIZE); if(topic_size != 0 ) memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); - if(content_size != 0) - memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); + if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \ + (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) { + memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); + } else { + if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \ + strlen("unreg")) == 0)) && (content_buf != NULL)) { + proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count); + + request_head.content_size = count; + buf_size -= (content_size - count); + + } + } *retbuf = buf; free(buf_ptr); -- Gitblit v1.8.0