From e0aea3742aed09a0a9ed384ccd7db203b6efc650 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 20 二月 2021 14:43:52 +0800
Subject: [PATCH] update
---
src/socket/shm_socket.cpp | 151 ++++++++++++++++++++-----------------------------
1 files changed, 62 insertions(+), 89 deletions(-)
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 82eaa1f..3366491 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -5,10 +5,11 @@
#include <cassert>
#include "bus_error.h"
#include "sole.h"
+#include "shm_mm.h"
static Logger *logger = LoggerFactory::getLogger();
-
+ShmQueueStMap * shmQueueStMap ;
static void print_msg(char *head, shm_packet_t &msg) {
// err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
@@ -27,7 +28,7 @@
const int key, const struct timespec *timeout, const int flag);
-static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf,
+static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags);
@@ -74,53 +75,6 @@
return queue;
}
-//鍒犻櫎鍖呭惈鍦╧eys鍐呯殑queue
-// size_t shm_socket_remove_keys(int keys[], size_t length) {
-// hashtable_t *hashtable = mm_get_hashtable();
-// LockFreeQueue<shm_packet_t> *mqueue;
-// size_t count = 0;
-// for(int i = 0; i< length; i++) {
-// // 閿�姣佸叡浜唴瀛樼殑queue
-// mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
-// delete mqueue;
-// hashtable_remove(hashtable, keys[i]);
-// count++;
-// }
-// return count;
-// }
-
-
-// // 鍒犻櫎涓嶅湪keys鍐呯殑queue
-// size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
-// hashtable_t *hashtable = mm_get_hashtable();
-// std::set<int> *keyset = hashtable_keyset(hashtable);
-// std::set<int>::iterator keyItr;
-// LockFreeQueue<shm_packet_t> *mqueue;
-// bool found;
-// size_t count = 0;
-// for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
-// found = false;
-// for (size_t i = 0; i < length; i++) {
-// if (*keyItr == keys[i]) {
-// found = true;
-// break;
-// }
-// }
-// // 100鍐呯殑鏄痓us鍐呴儴鑷繁鐢ㄧ殑
-// if (!found && *keyItr > 100) {
-// // 閿�姣佸叡浜唴瀛樼殑queue
-// mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
-// delete mqueue;
-// hashtable_remove(hashtable, *keyItr);
-// count++;
-// }
-// }
-// delete keyset;
-// return count;
-// }
-
-
-
shm_socket_t *shm_socket_open(shm_socket_type_t socket_type) {
int s, type;
pthread_mutexattr_t mtxAttr;
@@ -148,21 +102,21 @@
if (s != 0)
err_exit(s, "pthread_mutexattr_destroy");
+
+ shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+
return sockt;
}
+
int shm_socket_close(shm_socket_t *sockt) {
-
-}
-
-int _shm_socket_close_(shm_socket_t *sockt) {
int rv;
logger->debug("shm_socket_close\n");
- if(sockt->queue != NULL) {
- delete sockt->queue;
- sockt->queue = NULL;
- }
+ // if(sockt->queue != NULL) {
+ // delete sockt->queue;
+ // sockt->queue = NULL;
+ // }
rv = pthread_mutex_destroy(&(sockt->mutex) );
if(rv != 0) {
@@ -170,6 +124,12 @@
}
free(sockt);
+
+ auto it = shmQueueStMap.find(key);
+ if(it != shmQueueStMap.end()) {
+ it->second.status = SHM_QUEUE_ST_CLOSED
+ it->second.closeTime = time(NULL);
+ }
return 0;
}
@@ -221,6 +181,7 @@
int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
+ // return _shm_sendandrecv_uuid(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
}
@@ -348,7 +309,7 @@
}
-static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf,
+static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
@@ -411,7 +372,6 @@
LABLE_FAIL:
return EBUS_RECVFROM_WRONG_END;
- // return rv;
LABLE_SUC:
if(recv_buf != NULL) {
@@ -424,12 +384,7 @@
if(recv_size != NULL)
*recv_size = recvpak.size;
-
-
return 0;
-
-
-
}
// use thread local
@@ -438,7 +393,7 @@
int *recv_size, const struct timespec *timeout, int flags) {
- int rv, tryn = 3;
+ int rv, tryn = 6;
shm_packet_t sendpak;
shm_packet_t recvpak;
std::map<int, shm_packet_t>::iterator recvbufIter;
@@ -480,18 +435,14 @@
return rv;
}
- if(rv != 0) {
- return rv;
- }
-
while(tryn > 0) {
tryn--;
recvbufIter = tmp_socket->recvbuf2.find(key);
if(recvbufIter != tmp_socket->recvbuf2.end()) {
- // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨�
-// logger->debug("get from recvbuf: %s", uuid.c_str());
+ // 鍦ㄧ紦瀛橀噷鏌ュ埌浜唊ey鍖归厤鎴愬姛鐨�
+ // logger->info("get from recvbuf: %d", key);
recvpak = recvbufIter->second;
- sockt->recvbuf2.erase(recvbufIter);
+ tmp_socket->recvbuf2.erase(recvbufIter);
goto LABLE_SUC;
}
@@ -582,6 +533,7 @@
const int key, const struct timespec *timeout, const int flag) {
int rv;
+ shm_queue_status_t stRecord;
hashtable_t *hashtable = mm_get_hashtable();
if( sockt->queue != NULL)
@@ -604,6 +556,12 @@
logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
return EBUS_KEY_INUSED;
}
+
+ // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+ stRecord.status = SHM_QUEUE_ST_OPENED;
+ stRecord.createTime = time(NULL);
+ shmQueueStMap.insert({sockt->key, stRecord});
+
}
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
@@ -618,24 +576,34 @@
return EBUS_SENDTO_SELF;
}
- LockFreeQueue<shm_packet_t> *remoteQueue;
- if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) {
- bus_errno = EBUS_CLOSED;
- logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno));
- return EBUS_CLOSED;
+ // 妫�鏌ey鏍囪鐨勭姸鎬�
+ auto it = shmQueueStMap.find(key);
+ if(it != shmQueueStMap.end()) {
+ if(it->second.status == SHM_QUEUE_ST_CLOSED) {
+ // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+ goto ERR_CLOSED;
+ }
}
-
+ LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key);
+
+ if (remoteQueue == NULL ) {
+ goto ERR_CLOSED;
+ }
rv = remoteQueue->push(*sendpak, timeout, flag);
-
return rv;
+
+ERR_CLOSED:
+ logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED));
+ return EBUS_CLOSED;
+
}
// 鐭繛鎺ユ柟寮忔帴鍙�
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) {
int rv;
-
+ shm_queue_status_t stRecord;
hashtable_t *hashtable = mm_get_hashtable();
shm_packet_t recvpak;
@@ -660,6 +628,10 @@
return EBUS_KEY_INUSED;
}
+ // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+ stRecord.status = SHM_QUEUE_ST_OPENED;
+ stRecord.createTime = time(NULL);
+ shmQueueStMap.insert({sockt->key, stRecord});
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -668,8 +640,15 @@
LABEL_POP:
- //
- // printf("%p start recv.....\n", sockt);
+ // 妫�鏌ey鏍囪鐨勭姸鎬�
+ // auto shmQueueMapIter = shmQueueStMap.find(sockt->key);
+ // if(shmQueueMapIter != shmQueueStMap.end()) {
+ // stRecord = shmQueueMapIter->second;
+ // if(stRecord.status = SHM_QUEUE_ST_CLOSED) {
+ // // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+ // goto ERR_CLOSED;
+ // }
+ // }
rv = sockt->queue->pop(recvpak, timeout, flag);
if(rv != 0)
@@ -682,10 +661,4 @@
*_recvpak = recvpak;
return rv;
}
-// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
-// const int send_size, const int send_key, void **recv_buf,
-// int *recv_size, const struct timespec *timeout, int flags) {
-
-// struct timespec tm = {10, 0};
-// return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags);
-// }
+
--
Gitblit v1.8.0