wangzhengquan
2020-12-23 1b94589dcb8d497d2d8a208efd61a54631f6b84e
update
1个文件已添加
1 文件已重命名
8个文件已修改
278 ■■■■ 已修改文件
src/queue/lock_free_queue.h 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.c 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.c 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 60 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/Makefile 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.c 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_bus_stop.c 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h
@@ -11,6 +11,7 @@
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the 
// queue. If it is undefined  that function will try to take a snapshot of 
// the queue, but returned value might be bogus
@@ -200,7 +201,7 @@
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
 // printf("==================LockFreeQueue push before\n");
LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (SemUtil::dec(slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return false;
@@ -209,7 +210,7 @@
    if ( m_qImpl.push(a_data) ) {
        SemUtil::inc(items);   
 // printf("==================LockFreeQueue push after\n");
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return true;
    }
    return false;
@@ -247,18 +248,19 @@
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if (SemUtil::dec_timeout(slots, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
        else {
            // err_msg(errno, "LockFreeQueue push_timeout");
            err_msg(errno, "LockFreeQueue push_timeout");
            return false;
        }
    }
    if (m_qImpl.push(a_data)){
        SemUtil::inc(items);
        SemUtil::inc(items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return true;
    }
    return false;
@@ -274,7 +276,8 @@
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
 // printf("==================LockFreeQueue pop before\n");
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (SemUtil::dec(items) == -1) {
        err_msg(errno, "LockFreeQueue pop");
        return false;
@@ -282,7 +285,7 @@
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);
 // printf("==================LockFreeQueue pop after\n");
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return true;
    }
    return false;
@@ -319,7 +322,7 @@
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
// printf("==================LockFreeQueue pop_timeout before\n");
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
    if (SemUtil::dec_timeout(items, timeout) == -1) {
        if (errno == EAGAIN)
            return false;
@@ -331,7 +334,7 @@
    if (m_qImpl.pop(a_data)) {
        SemUtil::inc(slots);  
// printf("==================LockFreeQueue pop_timeout after\n");
LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        return true;
    }
    return false;
@@ -346,6 +349,7 @@
    return m_qImpl.operator[](i);
}
template <
    typename ELEM_T, 
    typename Allocator,
src/socket/bus_server_socket.c
@@ -60,15 +60,19 @@
BusServerSocket::BusServerSocket() {
    logger->debug("BusServerSocket Init");
    shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    topic_sub_map = NULL;
}
BusServerSocket::~BusServerSocket() {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    logger->debug("BusServerSocket destory 1");
    stop();
    logger->debug("BusServerSocket destory 2");
     
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
@@ -83,6 +87,7 @@
        mem_pool_free_by_key(BUS_MAP_KEY);
    }
    shm_close_socket(shm_socket);
    logger->debug("BusServerSocket destory 3");
}
@@ -109,14 +114,13 @@
 
    run_pubsub_proxy();
    // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了
    sleep(3);
    return 0;
}
int  BusServerSocket::stop(){
    int ret;
    logger->debug("====>stopping");
    if( shm_socket->key <= 0) {
        return -1;
    }
@@ -127,15 +131,11 @@
    head.topic_size = 0;
    head.content_size = 0;
    void *recv_buf;
    int recv_size;
    void *buf;
    int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
    if(size > 0) {
        ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size);
        ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL);
        free(buf);
        free(recv_buf);
        return ret;
    } else {
        return -1;
@@ -260,7 +260,8 @@
        topic =  strtok(NULL, topic_delim);
          }
        } else if(strcmp(action, "desub") == 0) {
        }
        else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
            if(strcmp(trim(topics, 0), "") == 0) {
                // 取消所有订阅
@@ -274,27 +275,26 @@
              }
            }
            
        } else if(strcmp(action, "pub") == 0) {
        }
        else if(strcmp(action, "pub") == 0) {
             content = topics + head.topic_size;
            _proxy_pub(topics, content, head.content_size, key);
        }  else if(strcmp(action, "stop") == 0) {
            logger->info( "Stopping Bus...");
             // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
            shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
        }
        else if(strcmp(action, "stop") == 0) {
            free(buf);
            break;
        } else {
            logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action);
        }
        
        // free(action);
        // free(topics);
        // } else {
        //     logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
        // }
        free(buf);
    }
    logger->info( "Stopping Bus...");
    shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
    return NULL;
}
src/socket/bus_server_socket_wrapper.c
@@ -7,7 +7,7 @@
 * 创建
 */
void * bus_server_socket_wrapper_open() {
    printf("===bus_server_socket_wrapper_open\n");
    logger->debug("===bus_server_socket_wrapper_open\n");
    BusServerSocket *sockt = new BusServerSocket;
    return (void *)sockt;
}
@@ -16,9 +16,10 @@
 * 关闭
 */
void bus_server_socket_wrapper_close(void *_socket) {
    printf("===bus_server_socket_wrapper_close\n");
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    delete sockt;
    // BusServerSocket *sockt = (BusServerSocket *)_socket;
    //delete sockt;
    logger->debug("===bus_server_socket_wrapper_close\n");
}
/**
src/socket/net_mod_server_socket.c
@@ -168,9 +168,7 @@
    if(request_head.timeout > 0) {
      timeout.tv_sec = request_head.timeout / 1000;
      timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
      // printf(" timeout.tv_sec = %d,  timeout.tv_nsec=%ld\n",  timeout.tv_sec,  timeout.tv_nsec );
      ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout);
    }
    else if(request_head.timeout == 0) {
src/socket/net_mod_socket.c
@@ -86,15 +86,15 @@
  int i, n, recv_size, connfd;
  net_node_t *node;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_resp =0;
  
  net_mod_request_head_t request_head = {};
  int n_req = 0, n_recv_suc = 0, n_resp =0;
   
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  int ret;
  NetConnPool *mpool;
  /* Make first caller allocate key for thread-specific data */
@@ -131,7 +131,17 @@
    node = &node_arr[i];
    if(node->host == NULL || strcmp(node->host, "") == 0 ) {
      // 本地发送
      if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) {
      if(msec == 0) {
        ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
      } else if(msec > 0){
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
      } else {
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
      if( ret == 0) {
        strcpy( ret_arr[n_recv_suc].host,"");
        ret_arr[n_recv_suc].port = 0;
        ret_arr[n_recv_suc].key = node->key;
@@ -229,7 +239,12 @@
  mpool->maxi = -1;
  *recv_arr = ret_arr;
  if(recv_arr != NULL) {
    *recv_arr = ret_arr;
  } else {
    free_recv_msg_arr(ret_arr, n_recv_suc);
  }
  if(recv_arr_size != NULL) {
    *recv_arr_size = n_recv_suc;
  }
@@ -264,9 +279,10 @@
// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
 int content_size, int  timeout) {
 int content_size, int  msec) {
  int i, connfd;
  net_node_t *node;
  struct timespec timeout;
 
  net_mod_request_head_t request_head;
  net_mod_recv_msg_t recv_msg;
@@ -302,7 +318,16 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
    }
    if(ret == 0 ) {
      n_pub_suc++;
    }
  }
@@ -312,9 +337,20 @@
    node = &node_arr[i];
    if(node->host == NULL) {
      // 本地发送
      if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
         n_pub_suc++;
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
      }
      if(ret == 0 ) {
        n_pub_suc++;
      }
     
    } else {
      sprintf(portstr, "%d", node->port);
@@ -326,7 +362,7 @@
      request_head.key = node->key;
      request_head.content_length = content_size;
      request_head.topic_length = strlen(topic) + 1;
      request_head.timeout = timeout;
      request_head.timeout = msec;
      if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
        LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
@@ -341,7 +377,7 @@
  while(n_resp < n_req)
  {
    /* Wait for listening/connected descriptor(s) to become ready */
    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) {
    if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
       // wirite_set 和 read_set 在指定时间内都没准备好
      break;
    }
src/socket/shm_socket.c
@@ -45,6 +45,7 @@
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  logger->debug("shm_open_socket\n");
  shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  socket->socket_type = socket_type;
  socket->key = -1;
@@ -52,11 +53,11 @@
  socket->dispatch_thread = 0;
  socket->status = SHM_CONN_CLOSED;
  socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
  logger->debug("shm_open_socket\n");
  return socket;
}
static int _shm_close_socket(shm_socket_t *socket) {
int shm_close_socket(shm_socket_t *socket) {
  
  int ret;
@@ -76,12 +77,12 @@
  return ret;
}
int shm_close_socket(shm_socket_t *socket) {
// int shm_close_socket(shm_socket_t *socket) {
  
  // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
//   // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
 
  return _shm_close_socket(socket);;
}
//   return shm_close_socket(socket);;
// }
int shm_socket_bind(shm_socket_t *socket, int key) {
  socket->key = key;
@@ -391,11 +392,18 @@
  }
  if (rv) {
    void *_buf = malloc(src.size);
    memcpy(_buf, src.buf, src.size);
    *buf = _buf;
    *size = src.size;
    *key = src.key;
    if(buf != NULL) {
      void *_buf = malloc(src.size);
      memcpy(_buf, src.buf, src.size);
      *buf = _buf;
    }
    if(size != NULL)
      *size = src.size;
    if(key != NULL)
      *key = src.key;
    mm_free(src.buf);
    // printf("shm_recvfrom pop after\n");
    return 0;
@@ -411,12 +419,13 @@
  int rv;
  if(tmp_socket == NULL)
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self()); 
  _shm_close_socket((shm_socket_t *)tmp_socket);
  shm_close_socket((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_tmp_recv_socket_key_, NULL);
  if ( rv != 0) {
      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
      exit(1);
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
    exit(1);
  }
}
@@ -438,7 +447,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  int recv_key;
@@ -508,6 +517,12 @@
  return -1;
}
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  return  shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
}
// ============================================================================================================
/**
test_net_socket/Makefile
@@ -14,8 +14,7 @@
#-I$(ROOT)/include/usgcommon
INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include -I${ROOT}/include/usgcommon
PROGS = ${DEST}/test_net_mod_socket
PROGS = ${DEST}/test_net_mod_socket ${DEST}/test_bus_stop ${DEST}/heart_beat
DEPENDENCES = $(patsubst %, %.d, $(PROGS)) 
test_net_socket/heart_beat.c
File was renamed from test_socket/dgram_mod_survey.c
@@ -1,6 +1,10 @@
#include "dgram_mod_socket.h"
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wraper.h"
#include "usg_common.h"
#include <getopt.h>
typedef struct Targ {
@@ -10,43 +14,50 @@
}Targ;
void sigint_handler(int sig) {
   //dgram_mod_close_socket(server_socket);
   // net_mod_socket_close(server_socket);
  printf("===Catch sigint======================\n");
  shm_mm_wrapper_destroy();
  exit(0);
}
void server(int port) {
  void *socket = dgram_mod_open_socket();
  dgram_mod_bind(socket, port);
  void *serv = net_mod_socket_open();
  net_mod_socket_bind(serv, port);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  while (true) {
    if ((rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 15, 0) ) == 0) {
    if ((rv =  net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) ) == 0) {
      printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
      free(recvbuf);
    }
    
  }
  dgram_mod_close_socket(socket);
  net_mod_socket_close(serv);
}
void client(int port) {
  void *socket = dgram_mod_open_socket();
  int rv;
  void *client = net_mod_socket_open();
  int size;
  char sendbuf[512];
  long i = 0;
  net_node_t node_arr[] = {"", 0, 100};
  int node_arr_size = 1;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  while (true) {
    sprintf(sendbuf, "%d", i);
    printf("SEND HEART:%s\n", sendbuf);
    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
   // sleep(1);
    i++;
  }
  dgram_mod_close_socket(socket);
   net_mod_socket_close(client);
}
@@ -54,20 +65,26 @@
  signal(SIGINT,  sigint_handler);
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  void *socket = dgram_mod_open_socket();
  void *socket = net_mod_socket_open();
  int size;
  char sendbuf[512];
  long scale = 10;
  long i = 0;
  net_node_t node_arr[] = {"", 0, 100};
  int node_arr_size = 1;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  while (i < scale) {
    sprintf(sendbuf, "%d", i);
    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    sleep(1);
    i++;
  }
  
  dgram_mod_close_socket(socket);
   net_mod_socket_close(socket);
  return (void *)i;
}
test_net_socket/test_bus_stop.c
New file
@@ -0,0 +1,54 @@
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wraper.h"
#include "usg_common.h"
#include <getopt.h>
static void * server_sockt;
static void *_start_bus_(void *arg) {
 // pthread_detach(pthread_self());
    printf("Start bus server\n");
  pthread_t tid;
  server_sockt = bus_server_socket_wrapper_open();
  if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) {
    printf("start bus failed\n");
  }
}
int main() {
 pthread_t tid;
 char action[512];
 shm_mm_wrapper_init(512);
 pthread_create(&tid, NULL, _start_bus_,  NULL);
 while (true) {
    printf("Input action: Close?\n");
    if(scanf("%s", action) < 1) {
      printf("Invalide action\n");
      continue;
    }
    if(strcmp(action, "close") == 0) {
      bus_server_socket_wrapper_close(server_sockt);
      break;
    } else {
      printf("Invalide action\n");
    }
 }
 if (pthread_join(tid, NULL) != 0) {
    perror(" pthread_join");
 }
 shm_mm_wrapper_destroy();
}
test_net_socket/test_net_mod_socket.c
@@ -137,6 +137,7 @@
    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
    net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
    sleep(1000);
  }
}
@@ -259,8 +260,9 @@
  for (i = 0; i < SCALE; i++) {
    sprintf(sendbuf, "thread(%d) %d", targ->id, i);
    fprintf(fp, "requst:%s\n", sendbuf);
    n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    //printf("send %d nodes\n", n);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
     n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    printf("send %d nodes\n", n);
    for(j=0; j < recv_arr_size; j++) {
        fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", 
            recv_arr[j].host,