wangzhengquan
2021-02-20 b5ae34d4422399c5d5458d071cca8c9bc89d20bb
status map for close
16个文件已修改
431 ■■■■ 已修改文件
src/CMakeLists.txt 67 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/key_def.h 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_sem_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_allocator.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm.h 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm_wrapper.cpp 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm_wrapper.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 194 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt
@@ -25,12 +25,14 @@
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
./shm/shm_mm.cpp
)
if (BUILD_SHARED_LIBS)
  add_library(shm_queue SHARED ${_SOURCES_})
else()
 add_library(shm_queue SHARED ${_SOURCES_})
 add_library(shm_queue STATIC ${_SOURCES_})
endif()
# STATIC
@@ -53,36 +55,39 @@
# install rules
install(TARGETS shm_queue  DESTINATION lib)
install(FILES 
  ./socket/socket_def.h
  ./socket/bus_server_socket.h
  ./socket/shm_socket.h
  ./socket/shm_mod_socket.h
  ./socket/bus_server_socket_wrapper.h
  ./psem.h
  ./key_def.h
  ./time_util.h
  ./futex_sem.h
  ./bus_error.h
  ./bus_def.h
  ./sole.h
  ./logger_factory.h
  ./queue/linked_lock_free_queue.h
  ./queue/array_lock_free_queue.h
  ./queue/shm_queue.h
  ./queue/array_lock_free_sem_queue.h
  ./queue/lock_free_queue.h
  ./svsem.h
  ./net/net_conn_pool.h
  ./net/net_mod_socket.h
  ./net/net_mod_server_socket_wrapper.h
  ./net/net_mod_socket_io.h
  ./net/net_mod_server_socket.h
  ./net/net_mod_socket_wrapper.h
  ./shm/hashtable.h
  ./shm/mem_pool.h
  ./shm/mm.h
  ./shm/shm_mm_wrapper.h
  ./shm/shm_allocator.h
 ./socket/socket_def.h
./socket/bus_server_socket.h
./socket/shm_socket.h
./socket/shm_mod_socket.h
./socket/bus_server_socket_wrapper.h
./psem.h
./pread_write_lock.h
./key_def.h
./time_util.h
./sv_read_write_lock.h
./futex_sem.h
./bus_error.h
./bus_def.h
./logger_factory.h
./sole.h
./queue/linked_lock_free_queue.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
./queue/array_lock_free_sem_queue.h
./queue/lock_free_queue.h
./svsem.h
./net/net_conn_pool.h
./net/net_mod_socket.h
./net/net_mod_server_socket_wrapper.h
./net/net_mod_socket_io.h
./net/net_mod_server_socket.h
./net/net_mod_socket_wrapper.h
./shm/hashtable.h
./shm/mm.h
./shm/shm_mm_wrapper.h
./shm/shm_allocator.h
./shm/shm_mm.h
  DESTINATION include)
src/key_def.h
@@ -1,12 +1,15 @@
#ifndef _KEY_DEF_H_
#define _KEY_DEF_H_
// bus中主题与订阅者对应关系的map
#define SHM_BUS_MAP_KEY 1  
// 队列状态标记的map
#define SHM_QUEUE_ST_KEY 3
// BUS key
#define SHM_BUS_KEY 8
// 网络代理key
#define SHM_NET_PROXY_KEY 99
src/queue/array_lock_free_queue.h
@@ -5,7 +5,7 @@
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for 
src/queue/array_lock_free_sem_queue.h
@@ -4,7 +4,7 @@
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
src/queue/lock_free_queue.h
@@ -6,7 +6,7 @@
#include <usg_common.h>
#include <assert.h> // assert()
#include "mem_pool.h"
#include "shm_mm.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
src/queue/shm_queue.h
@@ -79,10 +79,7 @@
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
  LoggerFactory::getLogger()->debug("SHMQueue destroy");
  if(owner) {
    delete queue;
    hashtable_remove(hashtable, mkey);
  }
  
}
src/shm/shm_allocator.h
@@ -1,7 +1,7 @@
#ifndef __SHM_ALLOCATOR_H__
#define __SHM_ALLOCATOR_H__
#include "usg_common.h"
#include "mem_pool.h"
#include "mm.h"
#include <new>
#include <cstdlib> // for exit()
#include <climits> // for UNIX_MAX
src/shm/shm_mm.cpp
@@ -23,21 +23,6 @@
}
template <typename T>
 T* shm_mm_attach(int key) {
    void *ptr;
    // T* tptr;
    hashtable_t *hashtable = mm_get_hashtable();
  ptr = hashtable_get(hashtable, key);
// printf("shm_mm_malloc_by_key  malloc before %d, %p\n", key, ptr);
  if(ptr == NULL || ptr == (void *)1 ) {
    ptr = mm_malloc(sizeof(T));
    hashtable_put(hashtable, key, ptr);
    new(ptr) T;
// printf("shm_mm_malloc_by_key  use new %d, %p\n", key, ptr);
  }
  return (T*)ptr;
}
void shm_mm_free_by_key(int key) {
    return mm_free_by_key(key);
@@ -52,9 +37,3 @@
     
    return mm_alloc_key();
}
// extern int mm_checkheap(int verbose);
#endif
src/shm/shm_mm.h
@@ -1,9 +1,12 @@
#ifndef __SHM_MM_H__
#define __SHM_MM_H__
#include "usg_common.h"
#include "shm_allocator.h"
#include "key_def.h"
#define SHM_QUEUE_ST_OPENED 0
#define SHM_QUEUE_ST_CLOSED 1
#define SHM_QUEUE_ST_RECYCLED 2
struct shm_queue_status_t {
@@ -24,8 +27,22 @@
void shm_mm_free (void *ptr);
template <typename T>
T* shm_mm_attach(int key) ;
T* shm_mm_attach(int key) {
    void *ptr;
    // T* tptr;
    hashtable_t *hashtable = mm_get_hashtable();
  ptr = hashtable_get(hashtable, key);
// printf("shm_mm_malloc_by_key  malloc before %d, %p\n", key, ptr);
  if(ptr == NULL || ptr == (void *)1 ) {
    ptr = mm_malloc(sizeof(T));
    hashtable_put(hashtable, key, ptr);
    new(ptr) T;
// printf("shm_mm_malloc_by_key  use new %d, %p\n", key, ptr);
  }
  return (T*)ptr;
}
void shm_mm_free_by_key(int key) ;
src/shm/shm_mm_wrapper.cpp
@@ -1,10 +1,10 @@
#include "shm_mm_wrapper.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "lock_free_queue.h"
#include "shm_socket.h"
#define BUFFER_TIME 10
#define BUFFER_TIME 1
void shm_mm_wrapper_init(int size) {
@@ -30,21 +30,24 @@
  LockFreeQueue<shm_packet_t> *mqueue;
  while(true) {
    for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) {
      if(it->second.status = SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > 2 ) {
        mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
        if(mqueue != NULL) {
          delete mqueue;
          hashtable_remove(hashtable, it->first);
          printf("reove queue %d\n", it->first);
          // 不能 erase ,否则会出现多进程之间的同步问题, 而这正是这里要解决的问题
          // it = shmQueueStMap->erase(it);
          // continue;
        }
      if(it->second.status == SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > BUFFER_TIME ) {
        // mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
        // if(mqueue != NULL) {
        //   delete mqueue;
        // }
        hashtable_remove(hashtable, it->first);
        printf("reomved queue %d\n\n", it->first);
        it->second.status = SHM_QUEUE_ST_RECYCLED;
        // 不能 erase ,否则会出现多进程之间的同步问题, 而这正是这里要解决的问题
        // it = shmQueueStMap->erase(it);
        // continue;
      }
    }
    sleep(1);
  }
  return 0;
}
//删除包含在keys内的queue
@@ -54,16 +57,9 @@
  int count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
    if(mqueue == NULL) {
        continue;
    }
    if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
      delete mqueue;
      hashtable_remove(hashtable, keys[i]);
      LoggerFactory::getLogger()->debug("remove queue %d",  keys[i]);
      count++;
    }
    hashtable_remove(hashtable, keys[i]);
    LoggerFactory::getLogger()->debug("remove queue %d",  keys[i]);
    count++;
    
  }
  return count;
@@ -89,13 +85,9 @@
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
        delete mqueue;
        hashtable_remove(hashtable, *keyItr);
        LoggerFactory::getLogger()->debug("remove queue %d",  *keyItr);
        count++;
      }
      hashtable_remove(hashtable, *keyItr);
      LoggerFactory::getLogger()->debug("remove queue %d",  *keyItr);
      count++;
     
    }
  }
src/shm/shm_mm_wrapper.h
@@ -25,6 +25,11 @@
 */
void shm_mm_wrapper_destroy();
/**
 * @brief 回收标记为删除的队列
 * @return 错误码
 */
int shm_mm_wrapper_start_resycle() ;
/**
 * @brief 分配一个key给申请者
src/socket/bus_server_socket.h
@@ -3,7 +3,7 @@
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
src/socket/shm_mod_socket.h
@@ -3,7 +3,7 @@
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
src/socket/shm_socket.cpp
@@ -6,6 +6,7 @@
#include "bus_error.h"
#include "sole.h"
#include "shm_mm.h"
#include "key_def.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -109,7 +110,7 @@
}
int shm_socket_close(shm_socket_t *sockt) {
static int _shm_socket_close_(shm_socket_t *sockt) {
  
  int rv;
  logger->debug("shm_socket_close\n");
@@ -118,17 +119,27 @@
  //   sockt->queue = NULL;
  // }
  pthread_mutex_destroy(&(sockt->mutex) );
  // hashtable_remove(hashtable, mkey);
  free(sockt);
  auto it =  shmQueueStMap.find(key);
  if(it != shmQueueStMap.end()) {
    it->second.status = SHM_QUEUE_ST_CLOSED
    it->second.closeTime = time(NULL);
  if(sockt->key != 0) {
    auto it =  shmQueueStMap->find(sockt->key);
    if(it != shmQueueStMap->end()) {
      it->second.status = SHM_QUEUE_ST_CLOSED;
      it->second.closeTime = time(NULL);
    }
  }
  pthread_mutex_destroy(&(sockt->mutex) );
  free(sockt);
  return 0;
}
int shm_socket_close(shm_socket_t *sockt) {
  return _shm_socket_close_(sockt);
}
@@ -283,7 +294,7 @@
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self()); 
  shm_socket_close((shm_socket_t *)tmp_socket);
  _shm_socket_close_((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_perthread_socket_key_, NULL);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -520,7 +531,7 @@
    return EBUS_RECVFROM_WRONG_END;
  } 
   
  shm_socket_close(tmp_socket);
  _shm_socket_close_(tmp_socket);
  return rv;
 
}
@@ -532,6 +543,7 @@
  int rv;
  shm_queue_status_t stRecord;
  LockFreeQueue<shm_packet_t> *remoteQueue;
  hashtable_t *hashtable = mm_get_hashtable();
  if( sockt->queue != NULL) 
@@ -558,7 +570,7 @@
      // 标记key对应的状态 ,为opened
      stRecord.status = SHM_QUEUE_ST_OPENED;
      stRecord.createTime = time(NULL);
      shmQueueStMap.insert({sockt->key, stRecord});
      shmQueueStMap->insert({sockt->key, stRecord});
      
    }
@@ -575,15 +587,15 @@
  }
  // 检查key标记的状态
  auto it =  shmQueueStMap.find(key);
  if(it != shmQueueStMap.end()) {
  auto it =  shmQueueStMap->find(key);
  if(it != shmQueueStMap->end()) {
    if(it->second.status == SHM_QUEUE_ST_CLOSED) {
      // key对应的状态是关闭的
      goto ERR_CLOSED;
    }
  }
  LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key);
  remoteQueue = shm_socket_attach_queue(key);
  if (remoteQueue == NULL ) {
    goto ERR_CLOSED;
@@ -629,7 +641,7 @@
    // 标记key对应的状态 ,为opened
    stRecord.status = SHM_QUEUE_ST_OPENED;
    stRecord.createTime = time(NULL);
    shmQueueStMap.insert({sockt->key, stRecord});
    shmQueueStMap->insert({sockt->key, stRecord});
    
    if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -639,8 +651,8 @@
LABEL_POP:
  // 检查key标记的状态
  // auto shmQueueMapIter =  shmQueueStMap.find(sockt->key);
  // if(shmQueueMapIter != shmQueueStMap.end()) {
  // auto shmQueueMapIter =  shmQueueStMap->find(sockt->key);
  // if(shmQueueMapIter != shmQueueStMap->end()) {
  //   stRecord = shmQueueMapIter->second;
  //   if(stRecord.status = SHM_QUEUE_ST_CLOSED) {
  //     // key对应的状态是关闭的
test_net_socket/net_mod_socket.sh
@@ -9,6 +9,9 @@
    ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! &&  echo "pid: ${server_pid}" 
    ./test_net_mod_socket --fun="start_reply" --key=101 & server_pid=$! &&  echo "pid: ${server_pid}" 
    ./test_net_mod_socket --fun="start_reply" --key=102 & server_pid=$! &&  echo "pid: ${server_pid}" 
    # 打开回队列收进程
    ./test_net_mod_socket --fun="start_resycle" & server_pid=$! &&  echo "pid: ${server_pid}"
}
# 交互式客户端
test_net_socket/test_net_mod_socket.cpp
@@ -527,102 +527,6 @@
}
int main(int argc, char *argv[]) {
    shm_mm_wrapper_init(512);
  argument_t opt = parse_args(argc, argv);
 // port = atoi(argv[2]);
  if(opt.fun == NULL) {
    usage(argv[0]);
    exit(1);
  }
    if (strcmp("start_net_proxy", opt.fun) == 0 ) {
    if(opt.port == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_net_proxy(opt);
  }
  else if (strcmp("start_bus_server", opt.fun) == 0) {
    start_bus_server(opt);
  }
  else if (strcmp("start_reply", opt.fun) == 0) {
    if(opt.key == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_reply(opt.key);
  }
  else if (strcmp("start_net_client", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist.\n");
      usage(argv[0]);
      exit(1);
    }
    start_net_client(opt.sendlist, opt.publist);
  }
  else if (strcmp("one_sendto_many", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    one_sendto_many(opt.sendlist);
  }
  else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_sendandrecv(opt.sendlist);
  }
  else if (strcmp("test_net_pub_threads", opt.fun) == 0) {
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_pub_threads(opt.publist);
  }
  else if (strcmp("test_net_pub", opt.fun) == 0) {
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_pub(opt.publist);
  }
  else if (strcmp("start_resycle", opt.fun) == 0) {
    start_resycle();
  }
  else {
    usage(argv[0]);
    exit(1);
  }
  printf("==========end========\n");
  // shm_mm_wrapper_destroy();
}
void usage(char *name)
{
@@ -799,3 +703,101 @@
  }
  printf("============node list end==========\n");
}
int main(int argc, char *argv[]) {
  shm_mm_wrapper_init(512);
  argument_t opt = parse_args(argc, argv);
 // port = atoi(argv[2]);
  if(opt.fun == NULL) {
    usage(argv[0]);
    exit(1);
  }
  if (strcmp("start_net_proxy", opt.fun) == 0 ) {
    if(opt.port == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_net_proxy(opt);
  }
  else if (strcmp("start_bus_server", opt.fun) == 0) {
    start_bus_server(opt);
  }
  else if (strcmp("start_reply", opt.fun) == 0) {
    if(opt.key == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_reply(opt.key);
  }
  else if (strcmp("start_net_client", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist.\n");
      usage(argv[0]);
      exit(1);
    }
    start_net_client(opt.sendlist, opt.publist);
  }
  else if (strcmp("one_sendto_many", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    one_sendto_many(opt.sendlist);
  }
  else if (strcmp("test_net_sendandrecv", opt.fun) == 0) {
    if(opt.sendlist == 0) {
      fprintf(stderr, "Missing sendlist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_sendandrecv(opt.sendlist);
  }
  else if (strcmp("test_net_pub_threads", opt.fun) == 0) {
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_pub_threads(opt.publist);
  }
  else if (strcmp("test_net_pub", opt.fun) == 0) {
    if(opt.publist == 0) {
      fprintf(stderr, "Missing publist .\n");
      usage(argv[0]);
      exit(1);
    }
    test_net_pub(opt.publist);
  }
  else if (strcmp("start_resycle", opt.fun) == 0) {
    start_resycle();
  }
  else {
    usage(argv[0]);
    exit(1);
  }
  printf("==========end========\n");
  // shm_mm_wrapper_destroy();
}