From 82841486c36288d73e95f3316e91dd7a522d8602 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 08 二月 2021 09:39:57 +0800
Subject: [PATCH] update
---
test_net_socket/CMakeLists.txt | 6
/dev/null | 42 -------
.gitignore | 1
src/psem.h | 40 ++++++
src/queue/lock_free_queue.h | 7
test_net_socket/test_net_mod_socket.cpp | 3
src/read_write_lock.h | 53 ++++++++
src/socket/shm_socket.h | 4
src/CMakeLists.txt | 1
src/sv_read_write_lock.h | 54 +++++++++
src/socket/shm_socket.cpp | 123 ++++++++++++++++----
11 files changed, 253 insertions(+), 81 deletions(-)
diff --git a/.gitignore b/.gitignore
index 60e4956..4064938 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,4 @@
cmake-build-debug/
*.tmp
core
+tags
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 506e5ed..c47e209 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -13,7 +13,6 @@
./socket/shm_socket.cpp
./socket/shm_mod_socket.cpp
./time_util.cpp
-./psem.cpp
./bus_error.cpp
./futex_sem.cpp
./svsem.cpp
diff --git a/src/psem.cpp b/src/psem.cpp
deleted file mode 100644
index 2ace11f..0000000
--- a/src/psem.cpp
+++ /dev/null
@@ -1,42 +0,0 @@
-#include "psem.h"
-#include <semaphore.h>
-#include "time_util.h"
-
-
-
-int psem_timedwait(sem_t *sem, const struct timespec *ts) {
- struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
- return sem_timedwait(sem, &abs_timeout);
- // int rv ;
- // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
- // if(errno == EINTR)
- // continue;
- // else {
- // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
- // return -1;
- // }
- // }
- // return 0;
-}
-
-
-int psem_wait(sem_t *sem) {
- return sem_wait(sem);
- // int rv;
- // while ( (rv = sem_wait(sem)) == -1) {
- // if(errno == EINTR)
- // continue;
- // else {
- // return -1;
- // }
- // }
- // return 0;
-}
-
-int psem_trywait(sem_t *sem) {
- return sem_trywait(sem);
-}
-
-int psem_post(sem_t *sem) {
- return sem_post(sem);
-}
diff --git a/src/psem.h b/src/psem.h
index 30c6670..fb64dba 100644
--- a/src/psem.h
+++ b/src/psem.h
@@ -2,13 +2,45 @@
#define _PSEM_H_
#include "usg_common.h"
+#include <semaphore.h>
+#include "time_util.h"
-int psem_wait(sem_t *sem) ;
+inline int psem_timedwait(sem_t *sem, const struct timespec *ts) {
+ struct timespec abs_timeout = TimeUtil::calc_abs_time(ts);
+ return sem_timedwait(sem, &abs_timeout);
+ // int rv ;
+ // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
+ // if(errno == EINTR)
+ // continue;
+ // else {
+ // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+ // return -1;
+ // }
+ // }
+ // return 0;
+}
-int psem_timedwait(sem_t *sem, const struct timespec *ts);
-int psem_trywait(sem_t *sem) ;
+inline int psem_wait(sem_t *sem) {
+ return sem_wait(sem);
+ // int rv;
+ // while ( (rv = sem_wait(sem)) == -1) {
+ // if(errno == EINTR)
+ // continue;
+ // else {
+ // return -1;
+ // }
+ // }
+ // return 0;
+}
-int psem_post(sem_t *sem);
+inline int psem_trywait(sem_t *sem) {
+ return sem_trywait(sem);
+}
+
+inline int psem_post(sem_t *sem) {
+ return sem_post(sem);
+}
+
#endif
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 6554b9a..354c6a4 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -13,6 +13,7 @@
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
+#include "read_write_lock.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -85,6 +86,7 @@
public:
sem_t mutex;
+
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -269,13 +271,13 @@
}
}
-
if (m_qImpl.pop(a_data)) {
psem_post(&slots);
// sigprocmask(SIG_SETMASK, &pre, NULL);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
return 0;
}
+
LABEL_FAILTURE:
// sigprocmask(SIG_SETMASK, &pre, NULL);
@@ -301,7 +303,8 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
- return Allocator::deallocate(p);
+ LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que = (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p;
+ Allocator::deallocate(p);
}
// include implementation files
diff --git a/src/read_write_lock.h b/src/read_write_lock.h
new file mode 100644
index 0000000..c899115
--- /dev/null
+++ b/src/read_write_lock.h
@@ -0,0 +1,53 @@
+#ifndef _CLOSE_LOCK_H_
+#define _CLOSE_LOCK_H_
+
+#include "usg_common.h"
+#include "psem.h"
+class ReadWriteLock {
+private:
+ unsigned int readCount = 0;
+ sem_t countMutex;
+ sem_t writeMutex;
+
+
+public:
+ ReadWriteLock() {
+
+ }
+
+ void lockRead() {
+ //readCount鏄叡浜彉閲忥紝鎵�浠ラ渶瑕佸疄鐜颁竴涓攣鏉ユ帶鍒惰鍐�
+ //synchronized(ReadWriteLock.class){}
+ psem_wait(&countMutex);
+ //鍙湁鏄涓�涓鑰咃紝鎵嶅皢鍐欓攣鍔犻攣銆傚叾浠栫殑璇昏�呴兘鏄繘琛屼笅涓�姝�
+ if(readCount == 0){
+ psem_wait(&writeMutex);
+
+ }
+ ++readCount;
+ psem_post(&countMutex);
+ }
+
+
+ void unlockRead(){
+
+ psem_wait(&countMutex);
+ readCount--;
+ //鍙湁褰撹鑰呴兘璇诲畬浜嗭紝鎵嶄細杩涜鍐欐搷浣�
+ if(readCount == 0){
+ psem_post(&writeMutex);
+ }
+ psem_post(&countMutex);
+ }
+
+
+ void lockWrite(){
+ psem_wait(&writeMutex);
+ }
+
+ void unlockWrite(){
+ psem_post(&writeMutex);
+ }
+};
+
+#endif
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index bd6473c..f55a111 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -429,12 +429,14 @@
// use thread local
static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf,
- const int send_size, const int send_key, void **recv_buf,
+ const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
- int recv_key;
- int rv;
- int tryn = 0;
-
+
+
+ int rv, tryn = 3;
+ shm_packet_t sendpak;
+ shm_packet_t recvpak;
+ std::map<int, shm_packet_t>::iterator recvbufIter;
// 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket;
@@ -459,33 +461,102 @@
exit(1);
}
}
+ // int rv;
+ // int tryn = 0;
+ // int recv_key;
+ // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags);
- if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
+ // if (() == 0) {
- while(tryn < 3) {
- tryn++;
- rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
- if(rv != 0) {
- logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
- return rv;
- }
+ // while(tryn < 3) {
+ // tryn++;
+ // rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
+ // if(rv != 0) {
+ // logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
+ // return rv;
+ // }
- // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
- if(send_key != recv_key) {
- logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
- // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
- // exit(1);
- continue;
- // return EBUS_RECVFROM_WRONG_END;
+ // // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
+ // if(send_key != recv_key) {
+ // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
+ // // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+ // // exit(1);
+ // continue;
+ // // return EBUS_RECVFROM_WRONG_END;
+ // }
+
+ // return 0;
+ // }
+
+ // return EBUS_RECVFROM_WRONG_END;
+ // }
+
+
+
+ sendpak.key = tmp_socket->key;
+ sendpak.size = send_size;
+ if(send_buf != NULL) {
+ sendpak.buf = mm_malloc(send_size);
+ memcpy(sendpak.buf, send_buf, send_size);
+ }
+ rv = shm_sendpakto(tmp_socket, &sendpak, key, timeout, flags);
+
+ if(rv != 0) {
+ return rv;
+ }
+
+ if(rv != 0) {
+ return rv;
+ }
+
+ while(tryn > 0) {
+ tryn--;
+ recvbufIter = tmp_socket->recvbuf2.find(key);
+ if(recvbufIter != tmp_socket->recvbuf2.end()) {
+ // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨�
+// logger->debug("get from recvbuf: %s", uuid.c_str());
+ recvpak = recvbufIter->second;
+ sockt->recvbuf2.erase(recvbufIter);
+ goto LABLE_SUC;
+ }
+
+ rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags);
+
+ if (rv != 0) {
+
+ if(rv == ETIMEDOUT) {
+ return EBUS_TIMEOUT;
}
- return 0;
- }
-
- return EBUS_RECVFROM_WRONG_END;
- }
+ logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
+ return rv;
+ }
- return rv;
+ if (key == recvpak.key) {
+ // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛
+ goto LABLE_SUC;
+ } else {
+ // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
+ tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
+ continue;
+ }
+ }
+
+LABLE_FAIL:
+ return EBUS_RECVFROM_WRONG_END;
+
+LABLE_SUC:
+ if(recv_buf != NULL) {
+ void *_buf = malloc(recvpak.size);
+ memcpy(_buf, recvpak.buf, recvpak.size);
+ *recv_buf = _buf;
+ }
+
+ if(recv_size != NULL)
+ *recv_size = recvpak.size;
+
+ mm_free(recvpak.buf);
+ return 0;
}
static int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf,
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 516a343..ffbd425 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -38,8 +38,8 @@
LockFreeQueue<shm_packet_t> *queue; //self queue
LockFreeQueue<shm_packet_t> *remoteQueue; // peer queue
- std::map<std::string, shm_packet_t> recvbuf;
-
+ std::map<std::string, shm_packet_t> recvbuf; // for uuid
+ std::map<int, shm_packet_t> recvbuf2; //for thread local
} shm_socket_t;
diff --git a/src/sv_read_write_lock.h b/src/sv_read_write_lock.h
new file mode 100644
index 0000000..6d47080
--- /dev/null
+++ b/src/sv_read_write_lock.h
@@ -0,0 +1,54 @@
+#ifndef _CLOSE_LOCK_H_
+#define _CLOSE_LOCK_H_
+
+#include "usg_common.h"
+#include "svsem.h"
+class SvReadWriteLock {
+private:
+ unsigned int readCount = 0;
+ int countMutex;
+ int writeMutex;
+
+
+public:
+ SvReadWriteLock() {
+ countMutex = svsem_get(IPC_PRIVATE, 1);
+ writeMutex = svsem_get(IPC_PRIVATE, 1);
+ }
+
+ void lockRead() {
+ //readCount鏄叡浜彉閲忥紝鎵�浠ラ渶瑕佸疄鐜颁竴涓攣鏉ユ帶鍒惰鍐�
+ //synchronized(ReadWriteLock.class){}
+ svsem_wait(&countMutex);
+ //鍙湁鏄涓�涓鑰咃紝鎵嶅皢鍐欓攣鍔犻攣銆傚叾浠栫殑璇昏�呴兘鏄繘琛屼笅涓�姝�
+ if(readCount == 0){
+ svsem_wait(&writeMutex);
+
+ }
+ ++readCount;
+ svsem_post(&countMutex);
+ }
+
+
+ void unlockRead(){
+
+ svsem_wait(&countMutex);
+ readCount--;
+ //鍙湁褰撹鑰呴兘璇诲畬浜嗭紝鎵嶄細杩涜鍐欐搷浣�
+ if(readCount == 0){
+ svsem_post(&writeMutex);
+ }
+ svsem_post(&countMutex);
+ }
+
+
+ void lockWrite(){
+ svsem_wait(&writeMutex);
+ }
+
+ void unlockWrite(){
+ svsem_post(&writeMutex);
+ }
+};
+
+#endif
diff --git a/test_net_socket/CMakeLists.txt b/test_net_socket/CMakeLists.txt
index d11a1c9..0b85d7d 100644
--- a/test_net_socket/CMakeLists.txt
+++ b/test_net_socket/CMakeLists.txt
@@ -1,13 +1,13 @@
# add command
add_custom_command(
- OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh
- COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh
+ OUTPUT ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh
+ COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh
)
# add the executable
-add_executable(test_net_mod_socket test_net_mod_socket.cpp ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh)
+add_executable(test_net_mod_socket test_net_mod_socket.cpp ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh)
target_link_libraries(test_net_mod_socket PRIVATE shm_queue ${EXTRA_LIBS} )
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index e036682..05827a1 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -370,7 +370,8 @@
double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
long diffsec = (long) (difftime/1000000);
long diffusec = difftime - diffsec*1000000;
- fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total );
+ fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n",
+ SCALE*node_arr_size, total, diffsec, diffusec, difftime/total );
// fflush(stdout);
}
--
Gitblit v1.8.0