From 82841486c36288d73e95f3316e91dd7a522d8602 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 08 二月 2021 09:39:57 +0800 Subject: [PATCH] update --- test_net_socket/CMakeLists.txt | 6 /dev/null | 42 ------- .gitignore | 1 src/psem.h | 40 ++++++ src/queue/lock_free_queue.h | 7 test_net_socket/test_net_mod_socket.cpp | 3 src/read_write_lock.h | 53 ++++++++ src/socket/shm_socket.h | 4 src/CMakeLists.txt | 1 src/sv_read_write_lock.h | 54 +++++++++ src/socket/shm_socket.cpp | 123 ++++++++++++++++---- 11 files changed, 253 insertions(+), 81 deletions(-) diff --git a/.gitignore b/.gitignore index 60e4956..4064938 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ cmake-build-debug/ *.tmp core +tags diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 506e5ed..c47e209 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,7 +13,6 @@ ./socket/shm_socket.cpp ./socket/shm_mod_socket.cpp ./time_util.cpp -./psem.cpp ./bus_error.cpp ./futex_sem.cpp ./svsem.cpp diff --git a/src/psem.cpp b/src/psem.cpp deleted file mode 100644 index 2ace11f..0000000 --- a/src/psem.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "psem.h" -#include <semaphore.h> -#include "time_util.h" - - - -int psem_timedwait(sem_t *sem, const struct timespec *ts) { - struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); - return sem_timedwait(sem, &abs_timeout); - // int rv ; - // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { - // if(errno == EINTR) - // continue; - // else { - // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); - // return -1; - // } - // } - // return 0; -} - - -int psem_wait(sem_t *sem) { - return sem_wait(sem); - // int rv; - // while ( (rv = sem_wait(sem)) == -1) { - // if(errno == EINTR) - // continue; - // else { - // return -1; - // } - // } - // return 0; -} - -int psem_trywait(sem_t *sem) { - return sem_trywait(sem); -} - -int psem_post(sem_t *sem) { - return sem_post(sem); -} diff --git a/src/psem.h b/src/psem.h index 30c6670..fb64dba 100644 --- a/src/psem.h +++ b/src/psem.h @@ -2,13 +2,45 @@ #define _PSEM_H_ #include "usg_common.h" +#include <semaphore.h> +#include "time_util.h" -int psem_wait(sem_t *sem) ; +inline int psem_timedwait(sem_t *sem, const struct timespec *ts) { + struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); + return sem_timedwait(sem, &abs_timeout); + // int rv ; + // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { + // if(errno == EINTR) + // continue; + // else { + // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); + // return -1; + // } + // } + // return 0; +} -int psem_timedwait(sem_t *sem, const struct timespec *ts); -int psem_trywait(sem_t *sem) ; +inline int psem_wait(sem_t *sem) { + return sem_wait(sem); + // int rv; + // while ( (rv = sem_wait(sem)) == -1) { + // if(errno == EINTR) + // continue; + // else { + // return -1; + // } + // } + // return 0; +} -int psem_post(sem_t *sem); +inline int psem_trywait(sem_t *sem) { + return sem_trywait(sem); +} + +inline int psem_post(sem_t *sem) { + return sem_post(sem); +} + #endif diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 6554b9a..354c6a4 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -13,6 +13,7 @@ #include "psem.h" #include "bus_error.h" #include "bus_def.h" +#include "read_write_lock.h" // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 @@ -85,6 +86,7 @@ public: sem_t mutex; + LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); @@ -269,13 +271,13 @@ } } - if (m_qImpl.pop(a_data)) { psem_post(&slots); // sigprocmask(SIG_SETMASK, &pre, NULL); // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); return 0; } + LABEL_FAILTURE: // sigprocmask(SIG_SETMASK, &pre, NULL); @@ -301,7 +303,8 @@ typename Allocator, template<typename T, typename AT> class Q_TYPE> void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { - return Allocator::deallocate(p); + LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que = (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p; + Allocator::deallocate(p); } // include implementation files diff --git a/src/read_write_lock.h b/src/read_write_lock.h new file mode 100644 index 0000000..c899115 --- /dev/null +++ b/src/read_write_lock.h @@ -0,0 +1,53 @@ +#ifndef _CLOSE_LOCK_H_ +#define _CLOSE_LOCK_H_ + +#include "usg_common.h" +#include "psem.h" +class ReadWriteLock { +private: + unsigned int readCount = 0; + sem_t countMutex; + sem_t writeMutex; + + +public: + ReadWriteLock() { + + } + + void lockRead() { + //readCount鏄叡浜彉閲忥紝鎵�浠ラ渶瑕佸疄鐜颁竴涓攣鏉ユ帶鍒惰鍐� + //synchronized(ReadWriteLock.class){} + psem_wait(&countMutex); + //鍙湁鏄涓�涓鑰咃紝鎵嶅皢鍐欓攣鍔犻攣銆傚叾浠栫殑璇昏�呴兘鏄繘琛屼笅涓�姝� + if(readCount == 0){ + psem_wait(&writeMutex); + + } + ++readCount; + psem_post(&countMutex); + } + + + void unlockRead(){ + + psem_wait(&countMutex); + readCount--; + //鍙湁褰撹鑰呴兘璇诲畬浜嗭紝鎵嶄細杩涜鍐欐搷浣� + if(readCount == 0){ + psem_post(&writeMutex); + } + psem_post(&countMutex); + } + + + void lockWrite(){ + psem_wait(&writeMutex); + } + + void unlockWrite(){ + psem_post(&writeMutex); + } +}; + +#endif diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index bd6473c..f55a111 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -429,12 +429,14 @@ // use thread local static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, - const int send_size, const int send_key, void **recv_buf, + const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { - int recv_key; - int rv; - int tryn = 0; - + + + int rv, tryn = 3; + shm_packet_t sendpak; + shm_packet_t recvpak; + std::map<int, shm_packet_t>::iterator recvbufIter; // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭� shm_socket_t *tmp_socket; @@ -459,33 +461,102 @@ exit(1); } } + // int rv; + // int tryn = 0; + // int recv_key; + // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags); - if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { + // if (() == 0) { - while(tryn < 3) { - tryn++; - rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); - if(rv != 0) { - logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); - return rv; - } + // while(tryn < 3) { + // tryn++; + // rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); + // if(rv != 0) { + // logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); + // return rv; + // } - // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐� - if(send_key != recv_key) { - logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); - // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); - // exit(1); - continue; - // return EBUS_RECVFROM_WRONG_END; + // // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐� + // if(send_key != recv_key) { + // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); + // // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); + // // exit(1); + // continue; + // // return EBUS_RECVFROM_WRONG_END; + // } + + // return 0; + // } + + // return EBUS_RECVFROM_WRONG_END; + // } + + + + sendpak.key = tmp_socket->key; + sendpak.size = send_size; + if(send_buf != NULL) { + sendpak.buf = mm_malloc(send_size); + memcpy(sendpak.buf, send_buf, send_size); + } + rv = shm_sendpakto(tmp_socket, &sendpak, key, timeout, flags); + + if(rv != 0) { + return rv; + } + + if(rv != 0) { + return rv; + } + + while(tryn > 0) { + tryn--; + recvbufIter = tmp_socket->recvbuf2.find(key); + if(recvbufIter != tmp_socket->recvbuf2.end()) { + // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨� +// logger->debug("get from recvbuf: %s", uuid.c_str()); + recvpak = recvbufIter->second; + sockt->recvbuf2.erase(recvbufIter); + goto LABLE_SUC; + } + + rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); + + if (rv != 0) { + + if(rv == ETIMEDOUT) { + return EBUS_TIMEOUT; } - return 0; - } - - return EBUS_RECVFROM_WRONG_END; - } + logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); + return rv; + } - return rv; + if (key == recvpak.key) { + // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛 + goto LABLE_SUC; + } else { + // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲� + tmp_socket->recvbuf2.insert({recvpak.key, recvpak}); + continue; + } + } + +LABLE_FAIL: + return EBUS_RECVFROM_WRONG_END; + +LABLE_SUC: + if(recv_buf != NULL) { + void *_buf = malloc(recvpak.size); + memcpy(_buf, recvpak.buf, recvpak.size); + *recv_buf = _buf; + } + + if(recv_size != NULL) + *recv_size = recvpak.size; + + mm_free(recvpak.buf); + return 0; } static int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf, diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index 516a343..ffbd425 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -38,8 +38,8 @@ LockFreeQueue<shm_packet_t> *queue; //self queue LockFreeQueue<shm_packet_t> *remoteQueue; // peer queue - std::map<std::string, shm_packet_t> recvbuf; - + std::map<std::string, shm_packet_t> recvbuf; // for uuid + std::map<int, shm_packet_t> recvbuf2; //for thread local } shm_socket_t; diff --git a/src/sv_read_write_lock.h b/src/sv_read_write_lock.h new file mode 100644 index 0000000..6d47080 --- /dev/null +++ b/src/sv_read_write_lock.h @@ -0,0 +1,54 @@ +#ifndef _CLOSE_LOCK_H_ +#define _CLOSE_LOCK_H_ + +#include "usg_common.h" +#include "svsem.h" +class SvReadWriteLock { +private: + unsigned int readCount = 0; + int countMutex; + int writeMutex; + + +public: + SvReadWriteLock() { + countMutex = svsem_get(IPC_PRIVATE, 1); + writeMutex = svsem_get(IPC_PRIVATE, 1); + } + + void lockRead() { + //readCount鏄叡浜彉閲忥紝鎵�浠ラ渶瑕佸疄鐜颁竴涓攣鏉ユ帶鍒惰鍐� + //synchronized(ReadWriteLock.class){} + svsem_wait(&countMutex); + //鍙湁鏄涓�涓鑰咃紝鎵嶅皢鍐欓攣鍔犻攣銆傚叾浠栫殑璇昏�呴兘鏄繘琛屼笅涓�姝� + if(readCount == 0){ + svsem_wait(&writeMutex); + + } + ++readCount; + svsem_post(&countMutex); + } + + + void unlockRead(){ + + svsem_wait(&countMutex); + readCount--; + //鍙湁褰撹鑰呴兘璇诲畬浜嗭紝鎵嶄細杩涜鍐欐搷浣� + if(readCount == 0){ + svsem_post(&writeMutex); + } + svsem_post(&countMutex); + } + + + void lockWrite(){ + svsem_wait(&writeMutex); + } + + void unlockWrite(){ + svsem_post(&writeMutex); + } +}; + +#endif diff --git a/test_net_socket/CMakeLists.txt b/test_net_socket/CMakeLists.txt index d11a1c9..0b85d7d 100644 --- a/test_net_socket/CMakeLists.txt +++ b/test_net_socket/CMakeLists.txt @@ -1,13 +1,13 @@ # add command add_custom_command( - OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh - COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh + OUTPUT ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh + COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ) # add the executable -add_executable(test_net_mod_socket test_net_mod_socket.cpp ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh) +add_executable(test_net_mod_socket test_net_mod_socket.cpp ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh) target_link_libraries(test_net_mod_socket PRIVATE shm_queue ${EXTRA_LIBS} ) diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index e036682..05827a1 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -370,7 +370,8 @@ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; - fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); // fflush(stdout); } -- Gitblit v1.8.0