wangzhengquan
2021-01-07 6c32e1a482c14412108675ec78f49ebe4f94a374
update
16个文件已修改
145 ■■■■■ 已修改文件
Make.defines.linux 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
build.sh 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/key_def.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.cpp 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 73 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/CMakeLists.txt 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.sh 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Make.defines.linux
@@ -36,7 +36,7 @@
# preprocessor options 
CPPFLAGS += $(INCLUDES) -std=c++11
# compilation options
CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE  -D_POSIX_C_SOURCE=200112L $(EXTRA)
CFLAGS += $(DEBUGFLAGS) -Wall -DLINUX -D_GNU_SOURCE $(EXTRA)
# linked options
LDFLAGS +=
build.sh
@@ -1,7 +1,7 @@
#! /bin/bash
[ -d build ] || mkdir build
rm -rf build/*
# rm -rf build/*
cd build
 
# -DCMAKE_BUILD_TYPE=Debug  | Release
@@ -12,5 +12,5 @@
cmake --build .
cmake --build . --target install
#cmake --build . --target install
 
src/CMakeLists.txt
@@ -25,6 +25,7 @@
        shm/shm_mm_wrapper.cpp
        shm/mm.cpp
        shm/hashtable.cpp
        px_sem_util.cpp
    )
src/bus_error.h
@@ -7,7 +7,7 @@
#define EBUS_TIMEOUT 1
#define EBUS_CLOSED 2
#define EBUS_KEY_INUSED 3
#define ESHM_BUS_KEY_INUSED 3
extern int bus_errno;
src/key_def.h
@@ -1,7 +1,8 @@
#ifndef _KEY_DEF_H_
#define _KEY_DEF_H_
#define BUS_MAP_KEY 1
#define BUS_KEY 8
#define SHM_BUS_MAP_KEY 1
#define SHM_BUS_KEY 8
#define SHM_NET_PROXY_KEY 99
#endif
src/shm/hashtable.cpp
@@ -23,7 +23,7 @@
static size_t hashcode(int key);
static struct timespec TIMEOUT = {1, 0};
static struct timespec TIMEOUT = {2, 0};
void hashtable_init(hashtable_t *hashtable )
{
src/shm/mm.cpp
@@ -123,7 +123,8 @@
    return aptr;
  } else {
    SemUtil::inc(mutex);
    err_msg(0, "mm_malloc : out of memery\n");
    abort();
    err_exit(0, "mm_malloc : out of memery\n");
    return NULL;
  }
src/socket/bus_server_socket.cpp
@@ -6,7 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
    SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
    SHMKeySet *subscripter_set;
    SHMKeySet::iterator set_iter;
    SHMTopicSubMap::iterator map_iter;
@@ -39,7 +39,7 @@
    int key;
    for(int i = 0; i < length; i++) {
        key = keys[i];
        SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
        SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
        SHMKeySet *subscripter_set;
        SHMKeySet::iterator set_iter;
        SHMTopicSubMap::iterator map_iter;
@@ -85,7 +85,7 @@
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(BUS_MAP_KEY);
        mem_pool_free_by_key(SHM_BUS_MAP_KEY);
    }
    shm_close_socket(shm_socket);
    logger->debug("BusServerSocket destory 3");
@@ -111,7 +111,7 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int  BusServerSocket::start(){
    topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 
    run_pubsub_proxy();
    // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了
src/socket/bus_server_socket_wrapper.cpp
@@ -31,7 +31,7 @@
    int ret;
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    if( (ret = sockt->force_bind(BUS_KEY)) == 0) {
    if( (ret = sockt->force_bind(SHM_BUS_KEY)) == 0) {
        return sockt->start();
    } else {
        logger->error("start bus failed");
src/socket/net_mod_server_socket.cpp
@@ -25,6 +25,8 @@
  if(response_buf == NULL) {
    err_exit(errno, "NetModServerSocket::NetModServerSocket malloc");
  }
  shmModSocket.force_bind(SHM_NET_PROXY_KEY);
}
@@ -234,13 +236,13 @@
    if(request_head.timeout > 0) {
      timeout.tv_sec = request_head.timeout / 1000;
      timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY, &timeout);
      ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout);
    }
    else if(request_head.timeout == 0) {
      ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY);
      ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
    }
    else if(request_head.timeout == -1) {
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY);
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY);
    }
    response_head.code = ret;
    response_head.content_length = 0;
src/socket/net_mod_socket_wrapper.cpp
@@ -148,16 +148,16 @@
 */
int  net_mod_socket_sub(void * _socket, void *topic, int size) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->sub((char *)topic,  size,  BUS_KEY);
    return sockt->sockt->sub((char *)topic,  size,  SHM_BUS_KEY);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int sec, int nsec){
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->sub_timeout((char *)topic,  size,  BUS_KEY, sec, nsec);
    return sockt->sockt->sub_timeout((char *)topic,  size,  SHM_BUS_KEY, sec, nsec);
}
int  net_mod_socket_sub_nowait(void * _socket, void *topic, int size){
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->sub_nowait((char *)topic,  size,  BUS_KEY);
    return sockt->sockt->sub_nowait((char *)topic,  size,  SHM_BUS_KEY);
}
@@ -169,16 +169,16 @@
 */
int  net_mod_socket_desub(void * _socket, void *topic, int size) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->desub((char *)topic,  size,  BUS_KEY);
    return sockt->sockt->desub((char *)topic,  size,  SHM_BUS_KEY);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int sec, int nsec) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->desub_timeout((char *)topic,  size,  BUS_KEY, sec, nsec);
    return sockt->sockt->desub_timeout((char *)topic,  size,  SHM_BUS_KEY, sec, nsec);
}
int  net_mod_socket_desub_nowait(void * _socket, void *topic, int size){
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->desub_nowait((char *)topic,  size,  BUS_KEY);
    return sockt->sockt->desub_nowait((char *)topic,  size,  SHM_BUS_KEY);
}
src/socket/net_mod_socket_wrapper.h
@@ -39,12 +39,12 @@
int net_mod_socket_force_bind(void * _socket, int key);
/**
 * 发送信息
 * 发送信息,发送完成才返回
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key);
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
// 发送信息, 超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec);
// 发送信息立刻返回。
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key);
src/socket/shm_socket.cpp
@@ -30,7 +30,7 @@
static inline int  _shm_socket_check_key(shm_socket_t *socket) {
   void *tmp_ptr = mm_get_by_key(socket->key);
    if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
      bus_errno = EBUS_KEY_INUSED;
      bus_errno = ESHM_BUS_KEY_INUSED;
      logger->error("%s. key = %d ", bus_strerror(bus_errno), socket->key);
      return 0;
    }
@@ -46,6 +46,8 @@
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  int s, type;
  pthread_mutexattr_t mtxAttr;
  logger->debug("shm_open_socket\n");
  shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
@@ -54,14 +56,27 @@
  socket->force_bind = false;
  socket->dispatch_thread = 0;
  socket->status = SHM_CONN_CLOSED;
  socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
  s = pthread_mutexattr_init(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_init");
  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_settype");
  s = pthread_mutex_init(&(socket->mutex), &mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutex_init");
  s = pthread_mutexattr_destroy(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_destroy");
  
  return socket;
}
int shm_close_socket(shm_socket_t *socket) {
  
  int ret;
  int ret, s;
  
  logger->debug("shm_close_socket\n");
  switch (socket->socket_type) {
@@ -74,8 +89,13 @@
    default:
      break;
  }
  s =  pthread_mutex_destroy(&(socket->mutex) );
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  }
  free(socket);
  SemUtil::remove(socket->mutex);
  return ret;
}
@@ -113,8 +133,8 @@
  } else {
   if(!_shm_socket_check_key(socket)) {
     bus_errno = EBUS_KEY_INUSED;
     return EBUS_KEY_INUSED;
     bus_errno = ESHM_BUS_KEY_INUSED;
     return ESHM_BUS_KEY_INUSED;
   }
  }
@@ -204,8 +224,8 @@
    socket->key = hashtable_alloc_key(hashtable);
  } else {
    if(!_shm_socket_check_key(socket)) {
      bus_errno = EBUS_KEY_INUSED;
      return EBUS_KEY_INUSED;
      bus_errno = ESHM_BUS_KEY_INUSED;
      return ESHM_BUS_KEY_INUSED;
    }
  }
@@ -296,6 +316,10 @@
// 短连接方式发送
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int key, const struct timespec *timeout, const int flags) {
  int s;
  bool rv;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
@@ -304,22 +328,27 @@
  }
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
  if ((s = pthread_mutex_lock(&(socket->mutex))) != 0)
    err_exit(s, "shm_sendto : pthread_mutex_lock");
  if (socket->queue == NULL) {
    if (socket->key == -1) {
      socket->key = hashtable_alloc_key(hashtable);
    } else {
     if(!_shm_socket_check_key(socket)) {
        bus_errno = EBUS_KEY_INUSED;
        return EBUS_KEY_INUSED;
        bus_errno = ESHM_BUS_KEY_INUSED;
        return ESHM_BUS_KEY_INUSED;
     }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  }
  SemUtil::inc(socket->mutex);
  if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
    err_exit(s, "shm_sendto : pthread_mutex_unlock");
  
  // if (key == socket->key) {
  //   logger->error( "can not send to your self!");
@@ -341,7 +370,7 @@
  memcpy(dest.buf, buf, size);
  // printf("shm_sendto push before\n");
  bool rv;
  if(flags & SHM_MSG_NOWAIT != 0) {
    rv = remoteQueue->push_nowait(dest);
  } else if(timeout != NULL) {
@@ -372,6 +401,9 @@
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
  int s;
  bool rv;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
@@ -379,25 +411,30 @@
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
  if ((s = pthread_mutex_lock(&(socket->mutex))) != 0)
    err_exit(s, "shm_recvfrom : pthread_mutex_lock");
  if (socket->queue == NULL) {
    if (socket->key == -1) {
      socket->key = hashtable_alloc_key(hashtable);
    } else {
      if(!_shm_socket_check_key(socket)) {
        bus_errno = EBUS_KEY_INUSED;
        return EBUS_KEY_INUSED;
        bus_errno = ESHM_BUS_KEY_INUSED;
        return ESHM_BUS_KEY_INUSED;
      }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  }
  SemUtil::inc(socket->mutex);
  if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
    err_exit(s, "shm_recvfrom : pthread_mutex_unlock");
  shm_msg_t src;
  // printf("shm_recvfrom pop before\n");
  bool rv;
   if(flags & SHM_MSG_NOWAIT != 0) {
    rv = socket->queue->pop_nowait(src);
  } else if(timeout != NULL) {
src/socket/shm_socket.h
@@ -49,7 +49,7 @@
    // 本地key
    int key;
    bool force_bind;
    int mutex;
    pthread_mutex_t mutex;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;
    SHMQueue<shm_msg_t> *remoteQueue;
test_net_socket/CMakeLists.txt
@@ -5,11 +5,17 @@
  DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh
  )
add_custom_command(
  OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh
  COMMAND cp  ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh
  DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh
  )
# add the executable
add_executable(test_net_mod_socket test_net_mod_socket.cpp  ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh)
target_link_libraries(test_net_mod_socket PUBLIC shm_queue  ${EXTRA_LIBS} )
add_executable(heart_beat heart_beat.cpp)
add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh)
target_link_libraries(heart_beat PUBLIC shm_queue )
# target_link_libraries(heart_beat PUBLIC shm_queue  ${EXTRA_LIBS} )
test_net_socket/heart_beat.sh
@@ -7,8 +7,8 @@
}
function start_server() {
    clean
    ./heart_beat server 8 & server_pid=$!
    ./heart_beat server 101 & server_pid=$!
    echo "start server pid ${server_pid}"
}
@@ -16,7 +16,7 @@
    for (( i=0; i<$PROCESSES; i++ ))
    do
        # pid_arr[$i]=$i
        ./heart_beat client 8 & pid_arr[$i]=$!
        ./heart_beat client 101 & pid_arr[$i]=$!
        echo "start ${pid_arr[$i]}" 
    done
}
@@ -26,8 +26,9 @@
    do
        echo "kill ${pid_arr[$i]}" 
        kill -9 ${pid_arr[$i]}
        #./heart_beat client 8 & ${pid_arr[$i]}=$!
        #./heart_beat client 101 & ${pid_arr[$i]}=$!
    done
    ipcrm -a
}