wangzhengquan
2021-02-08 18705acc13f78ac65458b3dd832545e62fbc9172
update
1 文件已重命名
8个文件已修改
198 ■■■■ 已修改文件
src/net/net_mod_socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pread_write_lock.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm_wrapper.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sv_read_write_lock.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp
@@ -44,7 +44,7 @@
  // delete gpool;
  // s =  pthread_mutex_destroy(&sendMutex);
  if(s != 0) {
    err_exit(s, "shm_close_socket");
    err_exit(s, "shm_socket_close");
  }
}
src/pread_write_lock.h
File was renamed from src/read_write_lock.h
@@ -1,9 +1,9 @@
#ifndef _CLOSE_LOCK_H_
#define _CLOSE_LOCK_H_
#ifndef _PREAD_WRITE_LOCK_H_
#define _PREAD_WRITE_LOCK_H_
#include "usg_common.h"
#include "psem.h"
class ReadWriteLock {
class PReadWriteLock {
private:
    unsigned int readCount = 0;
  sem_t countMutex;
@@ -11,13 +11,17 @@
public:
    ReadWriteLock() {
    PReadWriteLock() {
        if (sem_init(&countMutex, 1, 1) == -1)
        err_exit(errno, "PReadWriteLock sem_init");
       if (sem_init(&writeMutex, 1, 1) == -1)
        err_exit(errno, "PReadWriteLock sem_init");
    }
    void lockRead() {
        //readCount是共享变量,所以需要实现一个锁来控制读写  
 //synchronized(ReadWriteLock.class){}
 //synchronized(PReadWriteLock.class){}
      psem_wait(&countMutex);
    //只有是第一个读者,才将写锁加锁。其他的读者都是进行下一步  
    if(readCount == 0){ 
src/queue/lock_free_queue.h
@@ -13,7 +13,7 @@
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
#include "read_write_lock.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -83,9 +83,10 @@
  sem_t slots;
  sem_t items;
  time_t createTime;
public:
  sem_t mutex;
  // sem_t mutex;
  LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -95,7 +96,7 @@
  /// template
  ~LockFreeQueue();
  std::atomic_uint reference;
  // std::atomic_uint reference;
  /// @brief constructor of the class
@@ -118,6 +119,10 @@
  inline bool empty();
  inline ELEM_T &operator[](unsigned i);
  time_t getCreateTime() {
    return createTime;
  }
  /// @brief push an element at the tail of the queue
  /// @param the element to insert in the queue
@@ -153,15 +158,14 @@
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
  // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
  if (sem_init(&slots, 1, qsize) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  if (sem_init(&items, 1, 0) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  if (sem_init(&mutex, 1, 1) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  createTime = time(NULL);
}
@@ -178,9 +182,9 @@
  if (sem_destroy(&items) == -1) {
    err_exit(errno, "LockFreeQueue sem_destroy");
  }
  if (sem_destroy(&mutex) == -1) {
    err_exit(errno, "LockFreeQueue sem_destroy");
  }
  // if (sem_destroy(&mutex) == -1) {
  //   err_exit(errno, "LockFreeQueue sem_destroy");
  // }
}
template<
src/shm/shm_mm_wrapper.cpp
@@ -4,6 +4,7 @@
#include "lock_free_queue.h"
#include "shm_socket.h"
#define BUFFER_TIME 10
void shm_mm_wrapper_init(int size) {
    mem_pool_init(size);
}
@@ -16,8 +17,6 @@
    return mm_alloc_key();
}
//删除包含在keys内的queue
int shm_mm_wrapper_remove_keys(int keys[], int length) {
  hashtable_t *hashtable = mm_get_hashtable();
@@ -29,9 +28,12 @@
    if(mqueue == NULL) {
        continue;
    }
    if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
    }
  }
  return count;
}
@@ -57,11 +59,13 @@
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
    }
  }
  delete keyset;
  return count;
src/socket/bus_server_socket.cpp
@@ -52,7 +52,7 @@
BusServerSocket::BusServerSocket() {
    logger->debug("BusServerSocket Init");
    shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    topic_sub_map = NULL;
}
@@ -126,7 +126,7 @@
        topic_sub_map->clear();
        mem_pool_free_by_key(SHM_BUS_MAP_KEY);
    }
    shm_close_socket(shm_socket);
    shm_socket_close(shm_socket);
    logger->debug("BusServerSocket destory 3");
    return 0;
}
src/socket/shm_mod_socket.cpp
@@ -14,7 +14,7 @@
// }
ShmModSocket::ShmModSocket() {
    shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    bus_set = new std::set<int>;
}
@@ -28,7 +28,7 @@
        delete bus_set;
    }
    shm_close_socket(shm_socket);
    shm_socket_close(shm_socket);
}
int ShmModSocket::stop() {
src/socket/shm_socket.cpp
@@ -75,57 +75,57 @@
}
//删除包含在keys内的queue
size_t shm_socket_remove_keys(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> *mqueue;
  size_t count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
  }
  return count;
}
// size_t shm_socket_remove_keys(int keys[], size_t length) {
//   hashtable_t *hashtable = mm_get_hashtable();
//   LockFreeQueue<shm_packet_t> *mqueue;
//   size_t count = 0;
//   for(int i = 0; i< length; i++) {
//     // 销毁共享内存的queue
//     mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
//     delete mqueue;
//     hashtable_remove(hashtable, keys[i]);
//     count++;
//   }
//   return count;
// }
 
// 删除不在keys内的queue
size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<shm_packet_t> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
// // 删除不在keys内的queue
// size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
//   hashtable_t *hashtable = mm_get_hashtable();
//   std::set<int> *keyset = hashtable_keyset(hashtable);
//   std::set<int>::iterator keyItr;
//   LockFreeQueue<shm_packet_t> *mqueue;
//   bool found;
//   size_t count = 0;
//   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
//     found = false;
//     for (size_t i = 0; i < length; i++) {
//       if (*keyItr == keys[i]) {
//         found = true;
//         break;
//       }
//     }
//     // 100内的是bus内部自己用的
//     if (!found && *keyItr > 100) {
//       // 销毁共享内存的queue
//       mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
//       delete mqueue;
//       hashtable_remove(hashtable, *keyItr);
//       count++;
//     }
//   }
//   delete keyset;
//   return count;
// }
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
shm_socket_t *shm_socket_open(shm_socket_type_t socket_type) {
  int s, type;
  pthread_mutexattr_t mtxAttr;
  logger->debug("shm_open_socket\n");
  logger->debug("shm_socket_open\n");
  // shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  shm_socket_t *sockt = new shm_socket_t;
  sockt->socket_type = socket_type;
@@ -151,10 +151,14 @@
  return sockt;
}
int shm_close_socket(shm_socket_t *sockt) {
int shm_socket_close(shm_socket_t *sockt) {
}
int _shm_socket_close_(shm_socket_t *sockt) {
  
  int rv;
  logger->debug("shm_close_socket\n");
  logger->debug("shm_socket_close\n");
  if(sockt->queue != NULL) {
    delete sockt->queue;
    sockt->queue = NULL;
@@ -162,7 +166,7 @@
  rv =  pthread_mutex_destroy(&(sockt->mutex) );
  if(rv != 0) {
    err_exit(rv, "shm_close_socket");
    err_exit(rv, "shm_socket_close");
  }
  free(sockt);
@@ -320,7 +324,7 @@
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self()); 
  shm_close_socket((shm_socket_t *)tmp_socket);
  shm_socket_close((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_perthread_socket_key_, NULL);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -453,7 +457,7 @@
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("%ld create tmp socket\n", (long)pthread_self() );
    tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    rv =  pthread_setspecific(_perthread_socket_key_, tmp_socket);
    if ( rv != 0) {
@@ -461,36 +465,6 @@
      exit(1);
    }
  }
   // int rv;
  // int tryn = 0;
  // int recv_key;
  // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags);
  // if (() == 0) {
  //   while(tryn < 3) {
  //     tryn++;
  //     rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
  //     if(rv != 0) {
  //       logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
  //       return rv;
  //     }
  //      // 超时导致接发送对象,与返回对象不对应的情况
  //     if(send_key != recv_key) {
  //       logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
  //       // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
  //       // exit(1);
  //       continue;
  //       // return EBUS_RECVFROM_WRONG_END;
  //     }
  //     return 0;
  //   }
  //   return EBUS_RECVFROM_WRONG_END;
  // }
 
  sendpak.key = tmp_socket->key;
@@ -569,7 +543,7 @@
  shm_socket_t *tmp_socket;
 
 
  tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
  tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
@@ -595,7 +569,7 @@
    return EBUS_RECVFROM_WRONG_END;
  } 
   
  shm_close_socket(tmp_socket);
  shm_socket_close(tmp_socket);
  return rv;
 
}
src/socket/shm_socket.h
@@ -48,10 +48,10 @@
size_t shm_socket_remove_keys(int keys[], size_t length);
size_t shm_socket_remove_keys_exclude(int keys[], size_t length);
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
shm_socket_t *shm_socket_open(shm_socket_type_t socket_type);
int shm_close_socket(shm_socket_t * socket) ;
int shm_socket_close(shm_socket_t * socket) ;
int shm_socket_stop(shm_socket_t *sockt);
src/sv_read_write_lock.h
@@ -1,5 +1,5 @@
#ifndef _CLOSE_LOCK_H_
#define _CLOSE_LOCK_H_
#ifndef _SV_READ_WRITE_LOCK_H_
#define _SV_READ_WRITE_LOCK_H_
#include "usg_common.h"
#include "svsem.h"