From 18705acc13f78ac65458b3dd832545e62fbc9172 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 08 二月 2021 10:44:29 +0800
Subject: [PATCH] update
---
src/socket/bus_server_socket.cpp | 4
src/queue/lock_free_queue.h | 24 +++--
src/socket/shm_socket.h | 4
src/pread_write_lock.h | 14 ++-
src/sv_read_write_lock.h | 4
src/shm/shm_mm_wrapper.cpp | 22 +++--
src/socket/shm_mod_socket.cpp | 4
src/net/net_mod_socket.cpp | 2
src/socket/shm_socket.cpp | 134 +++++++++++++--------------------
9 files changed, 99 insertions(+), 113 deletions(-)
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 09cde6c..2a23bd2 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -44,7 +44,7 @@
// delete gpool;
// s = pthread_mutex_destroy(&sendMutex);
if(s != 0) {
- err_exit(s, "shm_close_socket");
+ err_exit(s, "shm_socket_close");
}
}
diff --git a/src/read_write_lock.h b/src/pread_write_lock.h
similarity index 72%
rename from src/read_write_lock.h
rename to src/pread_write_lock.h
index c899115..ea89cce 100644
--- a/src/read_write_lock.h
+++ b/src/pread_write_lock.h
@@ -1,9 +1,9 @@
-#ifndef _CLOSE_LOCK_H_
-#define _CLOSE_LOCK_H_
+#ifndef _PREAD_WRITE_LOCK_H_
+#define _PREAD_WRITE_LOCK_H_
#include "usg_common.h"
#include "psem.h"
-class ReadWriteLock {
+class PReadWriteLock {
private:
unsigned int readCount = 0;
sem_t countMutex;
@@ -11,13 +11,17 @@
public:
- ReadWriteLock() {
+ PReadWriteLock() {
+ if (sem_init(&countMutex, 1, 1) == -1)
+ err_exit(errno, "PReadWriteLock sem_init");
+ if (sem_init(&writeMutex, 1, 1) == -1)
+ err_exit(errno, "PReadWriteLock sem_init");
}
void lockRead() {
//readCount鏄叡浜彉閲忥紝鎵�浠ラ渶瑕佸疄鐜颁竴涓攣鏉ユ帶鍒惰鍐�
- //synchronized(ReadWriteLock.class){}
+ //synchronized(PReadWriteLock.class){}
psem_wait(&countMutex);
//鍙湁鏄涓�涓鑰咃紝鎵嶅皢鍐欓攣鍔犻攣銆傚叾浠栫殑璇昏�呴兘鏄繘琛屼笅涓�姝�
if(readCount == 0){
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 354c6a4..425d9f8 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -13,7 +13,7 @@
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
-#include "read_write_lock.h"
+
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -83,9 +83,10 @@
sem_t slots;
sem_t items;
+ time_t createTime;
public:
- sem_t mutex;
+ // sem_t mutex;
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
@@ -95,7 +96,7 @@
/// template
~LockFreeQueue();
- std::atomic_uint reference;
+ // std::atomic_uint reference;
/// @brief constructor of the class
@@ -118,6 +119,10 @@
inline bool empty();
inline ELEM_T &operator[](unsigned i);
+
+ time_t getCreateTime() {
+ return createTime;
+ }
/// @brief push an element at the tail of the queue
/// @param the element to insert in the queue
@@ -153,15 +158,14 @@
typename ELEM_T,
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
-LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
if (sem_init(&slots, 1, qsize) == -1)
err_exit(errno, "LockFreeQueue sem_init");
if (sem_init(&items, 1, 0) == -1)
err_exit(errno, "LockFreeQueue sem_init");
- if (sem_init(&mutex, 1, 1) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
-
+
+ createTime = time(NULL);
}
@@ -178,9 +182,9 @@
if (sem_destroy(&items) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
- if (sem_destroy(&mutex) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
+ // if (sem_destroy(&mutex) == -1) {
+ // err_exit(errno, "LockFreeQueue sem_destroy");
+ // }
}
template<
diff --git a/src/shm/shm_mm_wrapper.cpp b/src/shm/shm_mm_wrapper.cpp
index 92350c8..6560f83 100644
--- a/src/shm/shm_mm_wrapper.cpp
+++ b/src/shm/shm_mm_wrapper.cpp
@@ -4,6 +4,7 @@
#include "lock_free_queue.h"
#include "shm_socket.h"
+#define BUFFER_TIME 10
void shm_mm_wrapper_init(int size) {
mem_pool_init(size);
}
@@ -16,8 +17,6 @@
return mm_alloc_key();
}
-
-
//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
int shm_mm_wrapper_remove_keys(int keys[], int length) {
hashtable_t *hashtable = mm_get_hashtable();
@@ -29,9 +28,12 @@
if(mqueue == NULL) {
continue;
}
- delete mqueue;
- hashtable_remove(hashtable, keys[i]);
- count++;
+ if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
+ delete mqueue;
+ hashtable_remove(hashtable, keys[i]);
+ count++;
+ }
+
}
return count;
}
@@ -57,10 +59,12 @@
if (!found && *keyItr > 100) {
// 閿�姣佸叡浜唴瀛樼殑queue
mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
-
- delete mqueue;
- hashtable_remove(hashtable, *keyItr);
- count++;
+ if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
+ delete mqueue;
+ hashtable_remove(hashtable, *keyItr);
+ count++;
+ }
+
}
}
delete keyset;
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index b49eeb8..6aa6a95 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -52,7 +52,7 @@
BusServerSocket::BusServerSocket() {
logger->debug("BusServerSocket Init");
- shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
topic_sub_map = NULL;
}
@@ -126,7 +126,7 @@
topic_sub_map->clear();
mem_pool_free_by_key(SHM_BUS_MAP_KEY);
}
- shm_close_socket(shm_socket);
+ shm_socket_close(shm_socket);
logger->debug("BusServerSocket destory 3");
return 0;
}
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 47f8e14..15d4072 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -14,7 +14,7 @@
// }
ShmModSocket::ShmModSocket() {
- shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
bus_set = new std::set<int>;
}
@@ -28,7 +28,7 @@
delete bus_set;
}
- shm_close_socket(shm_socket);
+ shm_socket_close(shm_socket);
}
int ShmModSocket::stop() {
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index f55a111..c0ddeee 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -75,57 +75,57 @@
}
//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
-size_t shm_socket_remove_keys(int keys[], size_t length) {
- hashtable_t *hashtable = mm_get_hashtable();
- LockFreeQueue<shm_packet_t> *mqueue;
- size_t count = 0;
- for(int i = 0; i< length; i++) {
- // 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
- delete mqueue;
- hashtable_remove(hashtable, keys[i]);
- count++;
- }
- return count;
-}
+// size_t shm_socket_remove_keys(int keys[], size_t length) {
+// hashtable_t *hashtable = mm_get_hashtable();
+// LockFreeQueue<shm_packet_t> *mqueue;
+// size_t count = 0;
+// for(int i = 0; i< length; i++) {
+// // 閿�姣佸叡浜唴瀛樼殑queue
+// mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
+// delete mqueue;
+// hashtable_remove(hashtable, keys[i]);
+// count++;
+// }
+// return count;
+// }
-// 鍒犻櫎涓嶅湪keys鍐呯殑queue
-size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
- hashtable_t *hashtable = mm_get_hashtable();
- std::set<int> *keyset = hashtable_keyset(hashtable);
- std::set<int>::iterator keyItr;
- LockFreeQueue<shm_packet_t> *mqueue;
- bool found;
- size_t count = 0;
- for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
- found = false;
- for (size_t i = 0; i < length; i++) {
- if (*keyItr == keys[i]) {
- found = true;
- break;
- }
- }
- // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
- if (!found && *keyItr > 100) {
- // 閿�姣佸叡浜唴瀛樼殑queue
- mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
- delete mqueue;
- hashtable_remove(hashtable, *keyItr);
- count++;
- }
- }
- delete keyset;
- return count;
-}
+// // 鍒犻櫎涓嶅湪keys鍐呯殑queue
+// size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
+// hashtable_t *hashtable = mm_get_hashtable();
+// std::set<int> *keyset = hashtable_keyset(hashtable);
+// std::set<int>::iterator keyItr;
+// LockFreeQueue<shm_packet_t> *mqueue;
+// bool found;
+// size_t count = 0;
+// for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+// found = false;
+// for (size_t i = 0; i < length; i++) {
+// if (*keyItr == keys[i]) {
+// found = true;
+// break;
+// }
+// }
+// // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
+// if (!found && *keyItr > 100) {
+// // 閿�姣佸叡浜唴瀛樼殑queue
+// mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
+// delete mqueue;
+// hashtable_remove(hashtable, *keyItr);
+// count++;
+// }
+// }
+// delete keyset;
+// return count;
+// }
-shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
+shm_socket_t *shm_socket_open(shm_socket_type_t socket_type) {
int s, type;
pthread_mutexattr_t mtxAttr;
- logger->debug("shm_open_socket\n");
+ logger->debug("shm_socket_open\n");
// shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
shm_socket_t *sockt = new shm_socket_t;
sockt->socket_type = socket_type;
@@ -151,10 +151,14 @@
return sockt;
}
-int shm_close_socket(shm_socket_t *sockt) {
+int shm_socket_close(shm_socket_t *sockt) {
+
+}
+
+int _shm_socket_close_(shm_socket_t *sockt) {
int rv;
- logger->debug("shm_close_socket\n");
+ logger->debug("shm_socket_close\n");
if(sockt->queue != NULL) {
delete sockt->queue;
sockt->queue = NULL;
@@ -162,7 +166,7 @@
rv = pthread_mutex_destroy(&(sockt->mutex) );
if(rv != 0) {
- err_exit(rv, "shm_close_socket");
+ err_exit(rv, "shm_socket_close");
}
free(sockt);
@@ -320,7 +324,7 @@
return;
logger->debug("%d destroy tmp socket\n", pthread_self());
- shm_close_socket((shm_socket_t *)tmp_socket);
+ shm_socket_close((shm_socket_t *)tmp_socket);
rv = pthread_setspecific(_perthread_socket_key_, NULL);
if ( rv != 0) {
logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -453,7 +457,7 @@
{
/* If first call from this thread, allocate buffer for thread, and save its location */
logger->debug("%ld create tmp socket\n", (long)pthread_self() );
- tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
rv = pthread_setspecific(_perthread_socket_key_, tmp_socket);
if ( rv != 0) {
@@ -461,36 +465,6 @@
exit(1);
}
}
- // int rv;
- // int tryn = 0;
- // int recv_key;
- // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags);
-
- // if (() == 0) {
-
- // while(tryn < 3) {
- // tryn++;
- // rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
- // if(rv != 0) {
- // logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
- // return rv;
- // }
-
- // // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
- // if(send_key != recv_key) {
- // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
- // // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
- // // exit(1);
- // continue;
- // // return EBUS_RECVFROM_WRONG_END;
- // }
-
- // return 0;
- // }
-
- // return EBUS_RECVFROM_WRONG_END;
- // }
-
sendpak.key = tmp_socket->key;
@@ -569,7 +543,7 @@
shm_socket_t *tmp_socket;
- tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
@@ -595,7 +569,7 @@
return EBUS_RECVFROM_WRONG_END;
}
- shm_close_socket(tmp_socket);
+ shm_socket_close(tmp_socket);
return rv;
}
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index ffbd425..4407237 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -48,10 +48,10 @@
size_t shm_socket_remove_keys(int keys[], size_t length);
size_t shm_socket_remove_keys_exclude(int keys[], size_t length);
-shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
+shm_socket_t *shm_socket_open(shm_socket_type_t socket_type);
-int shm_close_socket(shm_socket_t * socket) ;
+int shm_socket_close(shm_socket_t * socket) ;
int shm_socket_stop(shm_socket_t *sockt);
diff --git a/src/sv_read_write_lock.h b/src/sv_read_write_lock.h
index 6d47080..bbb67c0 100644
--- a/src/sv_read_write_lock.h
+++ b/src/sv_read_write_lock.h
@@ -1,5 +1,5 @@
-#ifndef _CLOSE_LOCK_H_
-#define _CLOSE_LOCK_H_
+#ifndef _SV_READ_WRITE_LOCK_H_
+#define _SV_READ_WRITE_LOCK_H_
#include "usg_common.h"
#include "svsem.h"
--
Gitblit v1.8.0