wangzhengquan
2021-01-26 0cb4f2b1acb16c1ee1bd86a40116300ea2e2cdaa
src/socket/shm_mod_socket.cpp
@@ -9,7 +9,6 @@
}
ShmModSocket::ShmModSocket() {
   mod = (socket_mod_t)0;
   shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
   bus_set = new std::set<int>;
}
@@ -48,43 +47,36 @@
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
   return shm_sendto(shm_socket, buf, size, key, timeout, 0);
   return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
   return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
   return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key,  struct timespec *timeout, int flags) {
   if(mod == BUS) {
      logger->error("Can not use method recvfrom in a Bus");
      exit(1);
   }
// printf("dgram_mod_recvfrom  before\n");
   int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
// printf("dgram_mod_recvfrom  after\n");
   return rv;
}
/**
 * 接收信息
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
      return  _recvfrom_( buf, size, key, NULL, 0);
   int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
  return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
   return _recvfrom_(buf, size, key, timeout, 0);
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
   int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
    return rv;
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
   return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
   int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
   // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
  return rv;
}
/**
@@ -96,22 +88,25 @@
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
}
@@ -127,11 +122,11 @@
   return _sub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){
   return _sub_(topic, size, key, timeout, 0);
int  ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
   return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
   return _sub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
   return _sub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
}
@@ -146,11 +141,11 @@
   return _desub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){
   return _desub_(topic, size, key, timeout, 0);
int  ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
   return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
   return _desub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
   return _desub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
}
@@ -165,11 +160,11 @@
      return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){
   return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
   return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
   return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
   return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}
@@ -187,15 +182,8 @@
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
   struct timespec *timeout, int flags) {
   // char buf[8192];
   // int rv;
   // snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   // if(rv == 0) {
   //    bus_set->insert(key);
   // }
   // return rv;
   const struct timespec *timeout, int flags) {
   int ret;
   bus_head_t head = {};
@@ -221,8 +209,7 @@
/**
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
int  ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
   // char buf[8192];
   int ret;
   if(topic == NULL) {
@@ -240,7 +227,12 @@
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      return ret;
      if(ret == 0) {
         return 0;
      } else {
         logger->error("ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(ret));
         return ret;
      }
   } else {
      return -1;
   }
@@ -252,8 +244,7 @@
 * @str "<**pub**>{经济}"
 */
 
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,
   struct timespec *timeout, int flags) {
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
   // int head_len;
   // char buf[8192+content_size];
   // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);