fujuntang
2021-10-10 d69e2a2ed12d639cca99a4718250aacd6579987c
Add the deadlock detect feature.
9个文件已修改
203 ■■■■ 已修改文件
CMakeLists.txt 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/sem_util.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_proxy_start.cpp 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem.cpp 142 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -21,6 +21,8 @@
list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/include/usgcommon")
list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/proto/source")
list(APPEND EXTRA_INCLUDES "${PROJECT_SOURCE_DIR}/src/msg_trigger")
list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a  pthread rt protobuf.a protobuf-lite.a)
# build api doc
include/usgcommon/sem_util.h
@@ -9,18 +9,18 @@
    int get(key_t key, unsigned int value);
    int dec(int semId);
    int dec_nowait(int semId);
    int dec_timeout(const int semId, const struct timespec * timeout);
    int dec_timeout(int semId, struct timespec * timeout);
    int dec_uni(int semId);
    int zero(int semId);
    int zero_nowait(int semId);
    int zero_timeout(const int semId, const struct timespec * timeout);
    int zero_timeout(int semId, struct timespec * timeout);
    int inc(int semId);
    int set(int semId, int val);
    void remove(int semid);
}
#define SEM_USG_WT_TIMEOUT  60
#endif
src/CMakeLists.txt
@@ -31,6 +31,7 @@
../proto/source/bhome_msg.pb.cc
../proto/source/error_msg.pb.cc
./shm/shm_mm_wrapper.cpp
./msg_trigger/msg_mgr.cpp
./shm/mm.cpp
./shm/hashtable.cpp
./shm/shm_mm.cpp
@@ -57,6 +58,7 @@
                           ${CMAKE_CURRENT_SOURCE_DIR}/proto
                           ${CMAKE_CURRENT_SOURCE_DIR}/queue
                           ${CMAKE_CURRENT_SOURCE_DIR}/socket
                           ${CMAKE_CURRENT_SOURCE_DIR}/msg_trigger
                           ${CMAKE_CURRENT_SOURCE_DIR}/net
                           )
src/bh_api.cpp
@@ -7,6 +7,7 @@
#include "bh_api.h"
#include <pthread.h>
#include <getopt.h>
#include "msg_mgr.h"
#include "../proto/source/error_msg.pb.h"
#include "../proto/source/bhome_msg.pb.h"
#include "../proto/source/bhome_msg_api.pb.h"
@@ -102,6 +103,10 @@
  memset(&pData, 0x00, sizeof(ProcInfo));
  if (gRun_stat == 0) {
    pthread_mutex_init(&mutex, NULL);
#if defined(MSG_HANDLER)
    msg_init();
#endif
  } else {
    logger->error("the process has already registered!\n");
@@ -1814,9 +1819,13 @@
}
#if defined(MSG_HANDLER)
int inter_key_get(void)
{
  return net_mod_socket_get_key(gNetmod_socket);
}
  if (gNetmod_socket != NULL)
    return net_mod_socket_get_key(gNetmod_socket);
  return 0;
}
#endif
src/bus_proxy_start.cpp
@@ -10,6 +10,7 @@
#include <errno.h>
#include <getopt.h>
#include <stdlib.h>
#include "msg_mgr.h"
using namespace std;
@@ -148,6 +149,10 @@
    }
  }
#if defined(MSG_HANDLER)
  msg_init();
#endif
  if (gShm_size == -1) {
    gShm_size = SHM_RES_SIZE;
  }
@@ -166,7 +171,9 @@
  if (gBusServer_stat >= 0) { 
    pthread_create(&tids[1], NULL, svr_start, (void *)&gPort);
    
    pthread_create(&tids[0], NULL, check_start, NULL);
    pthread_create(&tids[2], NULL, check_start, NULL);
    //pthread_create(&tids[3], NULL, sem_msg_handler, NULL);
  }
  for (i = 0; i< TOTAL_THREADS; i++) {
src/shm/hashtable.cpp
@@ -102,7 +102,7 @@
  void *oldvalue;
  int rv;
  if( (rv = svsem_wait(hashtable->mutex)) != 0) {
  if( (rv = svsem_uni_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
  }
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
@@ -140,7 +140,7 @@
void *hashtable_get(hashtable_t *hashtable, int key) {
  int rv;
  if((rv = svsem_wait(hashtable->mutex)) != 0) {
  if((rv = svsem_uni_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_get\n");
  }
  void * res = _hashtable_get(hashtable, key);
@@ -154,7 +154,7 @@
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  int rv;
  if((rv = svsem_wait(hashtable->mutex)) != 0) {
  if((rv = svsem_uni_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
  }
  _hashtable_put(hashtable, key, value); 
@@ -170,7 +170,7 @@
  int rv;
  void * val;
  if(( rv = svsem_wait(hashtable->mutex)) != 0) {
  if(( rv = svsem_uni_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
  }
  if(overwrite) {
@@ -208,7 +208,7 @@
    key = START_KEY;
  }
  rv = svsem_wait(hashtable->mutex);
  rv = svsem_uni_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
  }
@@ -271,7 +271,7 @@
{
  tailq_entry_t *item;
  int rv;
  if( (rv = svsem_wait(hashtable->mutex)) != 0) {
  if( (rv = svsem_uni_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n");
  }
  for (int i = 0; i < MAPSIZE; i++)
src/shm/mm.cpp
@@ -147,7 +147,7 @@
   *}
   */
  SemUtil::dec(mutex);
  SemUtil::dec_uni(mutex);
  size_t size = GET_SIZE(HDRP(ptr));
  PUT(HDRP(ptr), PACK(size, 0));
  PUT(FTRP(ptr), PACK(size, 0));
@@ -237,7 +237,7 @@
{
  
  //同一进程内已经初始化过了
  SemUtil::dec(mutex);
  SemUtil::dec_uni(mutex);
  if (shmid != -1){
    hashtable = (hashtable_t *)shmp;
    SemUtil::inc(mutex);
@@ -311,7 +311,7 @@
bool mm_destroy(void) {
  struct shmid_ds shmid_ds;
  
  SemUtil::dec(mutex);
  SemUtil::dec_uni(mutex);
  
  if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) {
    //LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
@@ -336,8 +336,6 @@
      //remove shared memery
      if (shmctl(shmid, IPC_RMID, 0) == -1)
        err_exit(errno, "mm_destroy shmctl IPC_RMID");
      else
         LoggerFactory::getLogger()->debug("shared memory destroy\n");
      SemUtil::inc(mutex);
      SemUtil::remove(mutex);
src/svsem.cpp
@@ -1,9 +1,11 @@
#include "bh_api.h"
#include "svsem.h"
#include "msg_mgr.h"
int svsem_get(key_t key, unsigned int value) {
  int semid, perms;
  perms = S_IRUSR | S_IWUSR;
  perms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
  semid = semget(key, 2, IPC_CREAT | IPC_EXCL | perms);
@@ -24,7 +26,7 @@
    sop.sem_num = 0; /* Operate on semaphore 0 */
    sop.sem_op = value;
    sop.sem_flg = 0;
    sop.sem_flg = SEM_UNDO;
    if (semop(semid, &sop, 1) == -1)
      err_exit(errno, "semop");
@@ -71,7 +73,7 @@
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
  sops.sem_flg = SEM_UNDO;
  while (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
@@ -79,6 +81,15 @@
      
      return -1;
    }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_GET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
@@ -88,18 +99,26 @@
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = IPC_NOWAIT | 0;
  sops.sem_flg = IPC_NOWAIT | SEM_UNDO;
 
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_GET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return semop(semid, &sops, 1) ;
}
int svsem_timedwait(const int semid, const struct timespec *timeout) {
int svsem_timedwait(int semid, struct timespec *timeout) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
  sops.sem_flg = SEM_UNDO;
  while (semtimedop(semid, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
@@ -107,9 +126,40 @@
      return -1;
    }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_GET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
int svsem_uni_wait(int semid) {
  struct timespec res;
  res.tv_sec = SEM_WT_TIMEOUT;
  res.tv_nsec = 0;
  int count = 2;
  while(count > 0) {
    if(svsem_timedwait(semid, &res) != 0) {
      if(svsem_post(semid) != 0) {
        err_msg(errno, "_inc");
      }
      count--;
    }
    break;
  }
  return 0;
}
/* Release semaphore - increment it by 1 */
int svsem_post(int semid) {
@@ -117,18 +167,25 @@
  sops.sem_num = 0;
  sops.sem_op = 1;
  sops.sem_flg = 0;
  sops.sem_flg = SEM_UNDO;
  int rv = semop(semid, &sops, 1);
  if (rv == -1) {
    // err_msg(errno, "svsem_inc");
    return -1;
  }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_POST;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
int svsem_cond_wait(int semid ){
@@ -144,12 +201,12 @@
  //释放mutex
  sops[0].sem_num = 0;
  sops[0].sem_op = 1;
  sops[0].sem_flg = 0;
  sops[0].sem_flg = SEM_UNDO;
  // 等待cond
  sops[1].sem_num = 1;
  sops[1].sem_op = 0;
  sops[1].sem_flg = 0;
  sops[1].sem_flg = SEM_UNDO;
  while (semop(semid, sops, 2) == -1)
    if (errno != EINTR) {
@@ -161,13 +218,22 @@
   //重新获取mutex
  sops[0].sem_num = 0;
  sops[0].sem_op = -1;
  sops[0].sem_flg = 0;
  sops[0].sem_flg = SEM_UNDO;
  while (semop(semid, sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "Svsvsem_dec");
      return -1;
    }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_GET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
@@ -196,13 +262,22 @@
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = 0;
  sops.sem_flg = SEM_UNDO;
  while (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_zero");
      return -1;
    }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_RESET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
@@ -213,7 +288,7 @@
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = IPC_NOWAIT;
  sops.sem_flg = IPC_NOWAIT | SEM_UNDO;
  while (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
@@ -221,15 +296,23 @@
      return -1;
    }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_RESET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
int svsem_zero_timeout(const int semid, const struct timespec *timeout) {
int svsem_zero_timeout(int semid, struct timespec *timeout) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = 0;
  sops.sem_flg = SEM_UNDO;
  while (semtimedop(semid, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
@@ -237,11 +320,16 @@
      return -1;
    }
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_RESET;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
  return 0;
}
int svsem_set(int semid, int val) {
  union semun arg;
@@ -250,14 +338,20 @@
  return semctl(semid, 0, SETVAL, arg);
}
void svsem_remove(int semid) {
  union semun dummy;
  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
    err_msg(errno, "svsem_remove");
#if defined(MSG_HANDLER)
  Msg_info msg_obj;
  msg_obj.key = inter_key_get();
  msg_obj.id = semid;
  msg_obj.act = SEM_RM;
  msg_distrib(SEM_TYPE_ID, &msg_obj);
#endif
}
 
src/svsem.h
@@ -12,8 +12,9 @@
int svsem_trywait(int semid)  ;
int svsem_timedwait(const int semid, const struct timespec *timeout) ;
int svsem_timedwait(int semid, struct timespec *timeout) ;
int svsem_uni_wait(int semid) ;
/* Release semaphore - increment it by 1 */
int svsem_post(int semid) ;
@@ -32,7 +33,7 @@
int svsem_zero_nowait(int semid) ;
int svsem_zero_timeout(const int semid, const struct timespec *timeout)  ;
int svsem_zero_timeout(int semid, struct timespec *timeout)  ;