wangzhengquan
2021-02-05 607ac3ae8bfc017e10a7907e69dcbc3ab2a0fb63
add stop method
2个文件已添加
20个文件已修改
363 ■■■■ 已修改文件
src/CMakeLists.txt 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/psem.cpp 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/CMakeLists.txt 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/UDPClient.cpp 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/UDPServer.cpp 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_bus_stop.cpp 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt
@@ -29,7 +29,8 @@
)
add_library(shm_queue SHARED ${_SOURCES_})
# add_library(shm_queue SHARED ${_SOURCES_})
add_library(shm_queue STATIC ${_SOURCES_})
target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} )
src/bus_error.cpp
@@ -18,7 +18,8 @@
  "Key already in use",
  "Network fault",
  "Send to self error",
  "Receive from wrong end"
  "Receive from wrong end",
  "Service stoped"
};
src/bus_error.h
@@ -11,6 +11,7 @@
#define EBUS_NET 504
#define EBUS_SENDTO_SELF 505
#define EBUS_RECVFROM_WRONG_END 506
#define EBUS_STOPED 507
extern int bus_errno;
src/net/net_mod_socket.cpp
@@ -49,6 +49,10 @@
}
int NetModSocket::stop() {
  return shmModSocket.stop();
}
/**
 * 绑定端口到socket, 如果不绑定则系统自动分配一个
 * @return 0 成功, 其他值 失败的错误码
src/net/net_mod_socket.h
@@ -98,7 +98,7 @@
  NetModSocket();
  ~NetModSocket();
  int stop();
  /**
   * 绑定端口到socket, 如果不绑定则系统自动分配一个
   * @return 0 成功, 其他值 失败的错误码
src/net/net_mod_socket_wrapper.cpp
@@ -20,6 +20,11 @@
    delete sockt;
}
 
int net_mod_socket_stop(void *_socket) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->stop();
}
/**
 * 绑定端口到socket, 如果不绑定则系统自动分配一个
 * @return 0 成功, 其他值 失败的错误码
src/net/net_mod_socket_wrapper.h
@@ -29,12 +29,16 @@
 */
void * net_mod_socket_open();
/**
 * @brief 关闭 net_mod_socket
 */
void net_mod_socket_close(void *_sockt);
/**
 * @brief 停止 net_mod_socket
 */
int net_mod_socket_stop(void *_sockt);
/**
 * @brief 绑定端口到socket, 如果不绑定则系统自动分配一个
src/psem.cpp
@@ -6,31 +6,31 @@
int psem_timedwait(sem_t *sem, const struct timespec *ts) {
    struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
  int rv ;
  while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
      if(errno == EINTR)
          continue;
      else {
         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
         return -1;
      }
  }
  return 0;
  return sem_timedwait(sem, &abs_timeout);
  // int rv ;
  // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
  //     if(errno == EINTR)
  //         continue;
  //     else {
  //        // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
  //        return -1;
  //     }
  // }
  // return 0;
}
int psem_wait(sem_t *sem) {
  int rv;
  while ( (rv = sem_wait(sem)) == -1) {
      if(errno == EINTR)
          continue;
      else {
         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
         return -1;
      }
  }
  return 0;
  return sem_wait(sem);
  // int rv;
  // while ( (rv = sem_wait(sem)) == -1) {
  //     if(errno == EINTR)
  //         continue;
  //     else {
  //        return -1;
  //     }
  // }
  // return 0;
}
int psem_trywait(sem_t *sem) {
src/socket/bus_server_socket.cpp
@@ -58,25 +58,7 @@
}
BusServerSocket::~BusServerSocket() {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    stop();
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                subscripter_set->clear();
                mm_free((void *)subscripter_set);
            }
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(SHM_BUS_MAP_KEY);
    }
    shm_close_socket(shm_socket);
    logger->debug("BusServerSocket destory 3");
    destroy();
}
@@ -111,8 +93,6 @@
    if( shm_socket->key <= 0) {
        return -1;
    }
    // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
    // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
    bus_head_t head = {};
    memcpy(head.action, "stop", sizeof(head.action));
    head.topic_size = 0;
@@ -122,13 +102,33 @@
    void *buf;
    int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
    if(size > 0) {
        ret = client.sendandrecv( buf, size, shm_socket->key, NULL, NULL);
        ret = client.sendto( buf, size, shm_socket->key);
        free(buf);
        return ret;
    } else {
        return -1;
    }
}
int  BusServerSocket::destroy() {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                subscripter_set->clear();
                mm_free((void *)subscripter_set);
            }
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(SHM_BUS_MAP_KEY);
    }
    shm_close_socket(shm_socket);
    logger->debug("BusServerSocket destory 3");
    return 0;
}
/*
@@ -280,8 +280,6 @@
        free(buf);
    }
    shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
    return NULL;
}
src/socket/bus_server_socket.h
@@ -27,11 +27,12 @@
    SHMTopicSubMap *topic_sub_map;
private:
    int  destroy();
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, void *buf, size_t size, int key);
    void *_run_proxy_();
    // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
src/socket/bus_server_socket_wrapper.cpp
@@ -22,6 +22,10 @@
    logger->debug("===bus_server_socket_wrapper_close\n");
}
int bus_server_socket_wrapper_stop(void *_socket) {
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    return sockt->stop();
}
/**
 * 启动bus
 * 
src/socket/bus_server_socket_wrapper.h
@@ -29,6 +29,11 @@
void bus_server_socket_wrapper_close(void *_sockt);
/**
 * @brief 停止 bus_server_socket
 */
int bus_server_socket_wrapper_stop(void *_socket);
/**
 * @brief 启动bus
 * 
 * @return 0 成功, 其他值 失败的错误码
src/socket/shm_mod_socket.cpp
@@ -26,6 +26,11 @@
    shm_close_socket(shm_socket);
}
int ShmModSocket::stop() {
    return shm_socket_stop(shm_socket);
}
int ShmModSocket::bind(int key) {
    return  shm_socket_bind(shm_socket, key);
src/socket/shm_mod_socket.h
@@ -47,6 +47,7 @@
    ShmModSocket();
    ~ShmModSocket();
     
    int stop();
    /**
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
src/socket/shm_socket.cpp
@@ -112,23 +112,31 @@
int shm_close_socket(shm_socket_t *sockt) {
  
  int s;
  int rv;
  logger->debug("shm_close_socket\n");
  if(sockt->queue != NULL) {
    delete sockt->queue;
    sockt->queue = NULL;
  }
  s =  pthread_mutex_destroy(&(sockt->mutex) );
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  rv =  pthread_mutex_destroy(&(sockt->mutex) );
  if(rv != 0) {
    err_exit(rv, "shm_close_socket");
  }
  free(sockt);
  return 0;
}
int shm_socket_stop(shm_socket_t *sockt) {
  struct timespec timeout = {5, 0};
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  sendpak.action = BUS_ACTION_STOP;
  sendpak.size = 0;
  return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG);
}
int shm_socket_bind(shm_socket_t *sockt, int key) {
  sockt->key = key;
@@ -175,6 +183,7 @@
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
  std::string uuid = sole::uuid4().str();
  
  sendpak.key = sockt->key;
@@ -507,7 +516,7 @@
  
 
 LABEL_PUSH: 
  if (key == sockt->key) {
  if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) {
    logger->error( "can not send to your self!");
    return EBUS_SENDTO_SELF;
  }
@@ -527,10 +536,11 @@
}
// 短连接方式接受
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag) {
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
  int rv;
  
  hashtable_t *hashtable = mm_get_hashtable();
  shm_packet_t recvpak;
  if( sockt->queue != NULL) 
    goto LABEL_POP;
@@ -557,11 +567,18 @@
  
LABEL_POP:
 //
  // printf("%p start recv.....\n", sockt);
 
  printf("%p start recv.....\n", sockt);
  rv = sockt->queue->pop(*recvpak, timeout, flag);
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0)
    return rv;
  if(recvpak.action == BUS_ACTION_STOP) {
    return EBUS_STOPED;
  }
  *_recvpak = recvpak;
  return rv;
}
// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
src/socket/shm_socket.h
@@ -14,13 +14,15 @@
    
};
 
#define BUS_ACTION_STOP 1
typedef struct shm_packet_t {
    int key;
    size_t size;
    void * buf;
    char uuid[64];
    int action;
} shm_packet_t;
@@ -50,6 +52,8 @@
int shm_close_socket(shm_socket_t * socket) ;
int shm_socket_stop(shm_socket_t *sockt);
int shm_socket_bind(shm_socket_t * socket, int key) ;
test/CMakeLists.txt
@@ -49,4 +49,19 @@
add_executable(UDPServer UDPServer.cpp )
target_link_libraries(UDPServer PRIVATE  ${EXTRA_LIBS} )
target_include_directories(UDPServer PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
add_executable(UDPClient UDPClient.cpp )
target_link_libraries(UDPClient PRIVATE  ${EXTRA_LIBS} )
target_include_directories(UDPClient PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
test/UDPClient.cpp
New file
@@ -0,0 +1,50 @@
// Client side implementation of UDP client-server model
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#define PORT     8080
#define MAXLINE 1024
// Driver code
int main() {
    int sockfd;
    char buffer[MAXLINE];
    char *hello = "Hello from client";
    struct sockaddr_in     servaddr;
    // Creating socket file descriptor
    if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }
    memset(&servaddr, 0, sizeof(servaddr));
    // Filling server information
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    int n;
    socklen_t len;
    sendto(sockfd, (const char *)hello, strlen(hello),
        MSG_CONFIRM, (const struct sockaddr *) &servaddr,
            sizeof(servaddr));
    printf("Hello message sent.\n");
    n = recvfrom(sockfd, (char *)buffer, MAXLINE,
                MSG_WAITALL, (struct sockaddr *) &servaddr,
                &len);
    buffer[n] = '\0';
    printf("Server : %s\n", buffer);
    close(sockfd);
    return 0;
}
test/UDPServer.cpp
New file
@@ -0,0 +1,70 @@
// Server side implementation of UDP client-server model
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#define PORT     8080
#define MAXLINE 1024
bool stop = false;
static void stop_handler(int sig) {
  printf("stop_handler\n");
  stop = true;
}
// Driver code
int main() {
    int sockfd;
    char buffer[MAXLINE];
    char *hello = "Hello from server";
    struct sockaddr_in servaddr, cliaddr;
    signal(SIGINT,  stop_handler);
    // Creating socket file descriptor
    if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));
    // Filling server information
    servaddr.sin_family = AF_INET; // IPv4
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);
    // Bind the socket with the server address
    if ( bind(sockfd, (const struct sockaddr *)&servaddr,
            sizeof(servaddr)) < 0 )
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    int n;
    socklen_t len = sizeof(cliaddr); //len is value/resuslt
    while(!stop) {
        n = recvfrom(sockfd, (char *)buffer, MAXLINE,
                MSG_WAITALL, ( struct sockaddr *) &cliaddr,
                &len);
        buffer[n] = '\0';
        printf("Client : %s\n", buffer);
        sendto(sockfd, (const char *)hello, strlen(hello),
            MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
                len);
        printf("Hello message sent.\n");
    }
    printf("===stopted.\n");
    return 0;
}
test_net_socket/net_mod_socket.sh
@@ -53,6 +53,11 @@
     
}
function stop() {
    ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -15 {}
}
function close() {
    ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
    ipcrm -a
test_net_socket/test_bus_stop.cpp
@@ -8,8 +8,8 @@
static void * server_sockt;
static void sigint_handler(int sig) {
  bus_server_socket_wrapper_close(server_sockt);
static void stop_bus_handler(int sig) {
  bus_server_socket_wrapper_stop(server_sockt);
}
static void *_start_bus_(void *arg) {
@@ -20,14 +20,17 @@
  if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) {
    printf("start bus failed\n");
  }
  printf("============_start_bus_ end\n" );
  bus_server_socket_wrapper_close(server_sockt);
  printf("============bus stopted\n" );
}
int main() {
 pthread_t tid;
 char action[512];
 signal(SIGINT,  sigint_handler);
 signal(SIGINT,  stop_bus_handler);
 signal(SIGTERM,  stop_bus_handler);
 shm_mm_wrapper_init(512);
 server_sockt = bus_server_socket_wrapper_open();
 pthread_create(&tid, NULL, _start_bus_,  NULL);
test_net_socket/test_net_mod_socket.cpp
@@ -10,6 +10,8 @@
#define  SCALE  100000
static Logger *logger = LoggerFactory::getLogger();
typedef struct Targ {
  net_node_t *node;
    char *nodelist;
@@ -133,6 +135,7 @@
void *serverSockt;
static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) {
  char sendbuf[512];
  printf( "server: RECEIVED REQUEST FROM  %d : %s\n", key, (char *)recvbuf);
@@ -145,16 +148,44 @@
  return;
bool stop = false;
static void stop_replyserver_handler(int sig) {
  printf("stop_handler\n");
  int rv = net_mod_socket_stop(serverSockt);
  if(rv ==0) {
    logger->debug("send stop suc");
    return;
  } else {
    logger->debug("send stop fail.%s\n", bus_strerror(rv));
  }
}
void start_reply(int mkey) {
  printf("start reply\n");
  logger->debug("start reply\n");
  signal(SIGINT,  stop_replyserver_handler);
  signal(SIGTERM,  stop_replyserver_handler);
  serverSockt = net_mod_socket_open();
  net_mod_socket_bind(serverSockt, mkey);
 
  int rv;
  while(true) {
    rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL );
  int rv = 0 ;
  while(  true) {
    rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL );
    if (rv == 0)
      continue;
    if(rv == EBUS_STOPED) {
      logger->debug("Stopping\n");
      break;
    }
    logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv));
  }
    //rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL );
  net_mod_socket_close(serverSockt);
  logger->debug("stopted\n");
  // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) {
  //  // printf( "server: RECEIVED REQUEST FROM  %d NAME %s\n", key, recvbuf);
  //   sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf);