From fb3d4aaf08075f84ed7718d2b570e6ca9e8b4a70 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 01 二月 2021 18:31:16 +0800
Subject: [PATCH] update
---
src/socket/shm_socket.cpp | 646 +++++++++++++---------------------------------------------
1 files changed, 148 insertions(+), 498 deletions(-)
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 785eb4d..31598d0 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -14,36 +14,63 @@
}
static pthread_once_t _once_ = PTHREAD_ONCE_INIT;
-static pthread_key_t _tmp_recv_socket_key_;
+static pthread_key_t _perthread_socket_key_;
-static void *_server_run_msg_rev(void *_socket);
+static void _destrory_socket_perthread(void *tmp_socket);
+static void _create_socket_key_perthread(void);
-static void *_client_run_msg_rev(void *_socket);
+// 妫�鏌ey鏄惁宸茬粡琚娇鐢紝 鏈浣跨敤鍒欑粦瀹歬ey
+static LockFreeQueue<shm_msg_t> * shm_socket_bind_queue(int key, bool force) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ LockFreeQueue<shm_msg_t> *queue;
+ hashtable_lock(hashtable);
+ void *tmp_ptr = hashtable_get(hashtable, key);
-static int _shm_close_dgram_socket(shm_socket_t *socket);
-static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
-
-static void _destrory_tmp_recv_socket_(void *tmp_socket);
-static void _create_tmp_recv_socket_key(void);
-
-// 妫�鏌ey鏄惁宸茬粡琚娇鐢紝鏄繑鍥�0, 鍚﹁繑鍥�1
-static inline int _shm_socket_check_key(shm_socket_t *socket) {
- void *tmp_ptr = mm_get_by_key(socket->key);
- if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
- bus_errno = EBUS_KEY_INUSED;
- logger->error("%s. key = %d ", bus_strerror(EBUS_KEY_INUSED), socket->key);
- return 0;
- }
- return 1;
+ if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) {
+ queue = new LockFreeQueue<shm_msg_t>(16);
+ hashtable_put(hashtable, key, (void *)queue);
+ hashtable_unlock(hashtable);
+ return queue;
+ } else if(force) {
+ hashtable_unlock(hashtable);
+ return (LockFreeQueue<shm_msg_t> *) queue;
+ }
+
+ hashtable_unlock(hashtable);
+ return NULL;
}
-SHMQueue<shm_msg_t> *_attach_remote_queue(int key);
+/**
+ * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼��
+ */
+static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) {
+ LockFreeQueue<shm_msg_t> * queue;
+ hashtable_t *hashtable = mm_get_hashtable();
+ void *tmp_ptr = hashtable_get(hashtable, key);
+ if (tmp_ptr == NULL || tmp_ptr == (void *)1) {
+ //logger->error("shm_socket._remote_queue_attach锛歝onnet at key %d failed!", key);
+ return NULL;
+ }
+ queue = ( LockFreeQueue<shm_msg_t> *)tmp_ptr;
+ // hashtable_unlock(hashtable);
+ return queue;
+}
size_t shm_socket_remove_keys(int keys[], size_t length) {
- return SHMQueue<shm_msg_t>::remove_queues(keys, length);
+ hashtable_t *hashtable = mm_get_hashtable();
+ LockFreeQueue<shm_msg_t> *mqueue;
+ size_t count = 0;
+ for(int i = 0; i< length; i++) {
+ // 閿�姣佸叡浜唴瀛樼殑queue
+ mqueue = (LockFreeQueue<shm_msg_t> *)hashtable_get(hashtable, keys[i]);
+ delete mqueue;
+ hashtable_remove(hashtable, keys[i]);
+ count++;
+ }
+ return count;
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
@@ -55,8 +82,8 @@
socket->socket_type = socket_type;
socket->key = 0;
socket->force_bind = false;
- socket->dispatch_thread = 0;
- socket->status = SHM_CONN_CLOSED;
+ socket->queue = NULL;
+
s = pthread_mutexattr_init(&mtxAttr);
if (s != 0)
@@ -77,18 +104,12 @@
int shm_close_socket(shm_socket_t *socket) {
- int ret, s;
+ int s;
logger->debug("shm_close_socket\n");
- switch (socket->socket_type) {
- case SHM_SOCKET_STREAM:
- ret = _shm_close_stream_socket(socket, true);
- break;
- case SHM_SOCKET_DGRAM:
- ret = _shm_close_dgram_socket(socket);
- break;
- default:
- break;
+ if(socket->queue != NULL) {
+ delete socket->queue;
+ socket->queue = NULL;
}
s = pthread_mutex_destroy(&(socket->mutex) );
@@ -97,15 +118,9 @@
}
free(socket);
- return ret;
+ return 0;
}
-// int shm_close_socket(shm_socket_t *socket) {
-
-// // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
-
-// return shm_close_socket(socket);;
-// }
int shm_socket_bind(shm_socket_t *socket, int key) {
socket->key = key;
@@ -118,249 +133,49 @@
return 0;
}
-int shm_listen(shm_socket_t *socket) {
-
- if (socket->socket_type != SHM_SOCKET_STREAM) {
- logger->error("can not invoke shm_listen method with a socket which is not a "
- "SHM_SOCKET_STREAM socket");
- exit(1);
- }
-
- int key;
- hashtable_t *hashtable = mm_get_hashtable();
- if (socket->key == 0) {
- key = hashtable_alloc_key(hashtable);
- socket->key = key;
- } else {
-
- if(!_shm_socket_check_key(socket)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
- }
- }
-
- socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
- socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- socket->clientSocketMap = new std::map<int, shm_socket_t *>;
- socket->status = SHM_CONN_LISTEN;
- pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev,
- (void *)socket);
-
- return 0;
+int shm_socket_get_key(shm_socket_t *sk){
+ return sk->key;
}
-/**
- * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
- *
-*/
-shm_socket_t *shm_accept(shm_socket_t *socket) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
- logger->error("can not invoke shm_accept method with a socket which is not a "
- "SHM_SOCKET_STREAM socket");
- exit(1);
- }
+
+
+
+// 鐭繛鎺ユ柟寮忓彂閫�
+int shm_sendto(shm_socket_t *sockt, const void *buf, const int size,
+ const int key, const struct timespec *timeout, const int flag) {
+
+ int rv;
+
hashtable_t *hashtable = mm_get_hashtable();
- int client_key;
- shm_socket_t *client_socket;
- shm_msg_t src;
- if (socket->acceptQueue->pop(src) == 0) {
+
+ if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_sendto : pthread_mutex_lock");
- // print_msg("===accept:", src);
- client_key = src.key;
- // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
- client_socket = shm_open_socket(socket->socket_type);
- client_socket->key = socket->key;
- // client_socket->queue= socket->queue;
- //鍒濆鍖栨秷鎭痲ueue
- client_socket->messageQueue =
- new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- //杩炴帴鍒板鏂筿ueue
- client_socket->remoteQueue = _attach_remote_queue(client_key);
-
- socket->clientSocketMap->insert({client_key, client_socket});
-
- /*
-* shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉�
-* 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
- */
- //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
- struct timespec timeout = {1, 0};
- shm_msg_t msg;
- msg.key = socket->key;
- msg.size = 0;
- msg.type = SHM_SOCKET_OPEN_REPLY;
-
- if (client_socket->remoteQueue->push(msg, &timeout, BUS_TIMEOUT_FLAG) == 0) {
- client_socket->status = SHM_CONN_ESTABLISHED;
- return client_socket;
- } else {
- logger->error( "shm_accept: 鍙戦�乷pen_reply澶辫触");
- return NULL;
+ if (sockt->queue == NULL) {
+ if (sockt->key == 0) {
+ sockt->key = hashtable_alloc_key(hashtable);
}
-
- } else {
- err_exit(errno, "shm_accept");
- }
- return NULL;
-}
-
-
-/**
- * @return 0鎴愬姛. 鍏朵粬鍊煎け璐�
- */
-int shm_connect(shm_socket_t *socket, int key) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
- logger->error( "can not invoke shm_connect method with a socket which is not "
- "a SHM_SOCKET_STREAM socket");
- exit(1);
- }
- hashtable_t *hashtable = mm_get_hashtable();
- if (hashtable_get(hashtable, key) == NULL) {
- logger->error("shm_connect锛歝onnect at key %d failed!", key);
- return -1;
- }
-
- if (socket->key == 0) {
- socket->key = hashtable_alloc_key(hashtable);
- } else {
- if(!_shm_socket_check_key(socket)) {
- bus_errno = EBUS_KEY_INUSED;
+ sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
+ if(sockt->queue == NULL ) {
+ logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
return EBUS_KEY_INUSED;
}
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
-
- if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) {
- logger->error("connect to %d failted", key);
- return -1;
- }
- socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
-
- //鍙戦�乷pen璇锋眰
- struct timespec timeout = {1, 0};
- shm_msg_t msg;
- msg.key = socket->key;
- msg.size = 0;
- msg.type = SHM_SOCKET_OPEN;
- socket->remoteQueue->push(msg, &timeout, BUS_TIMEOUT_FLAG);
-
- //鎺ュ彈open reply
- if (socket->queue->pop(msg) == 0) {
- // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
- if (msg.type == SHM_SOCKET_OPEN_REPLY) {
- socket->status = SHM_CONN_ESTABLISHED;
- pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev,
- (void *)socket);
- } else {
- logger->error( "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
- exit(1);
- }
-
- } else {
- logger->error( "connect failted!");
- return -1;
- }
-
- return 0;
-}
-
-int shm_send(shm_socket_t *socket, const void *buf, const int size) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
- logger->error("shm_socket.shm_send: can not invoke shm_send method with a socket which is not a "
- "SHM_SOCKET_STREAM socket");
- exit(1);
- }
- hashtable_t *hashtable = mm_get_hashtable();
- if(socket->remoteQueue == NULL) {
- err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
- return -1;
- }
- shm_msg_t dest;
- dest.type = SHM_COMMON_MSG;
- dest.key = socket->key;
- dest.size = size;
- dest.buf = mm_malloc(size);
- memcpy(dest.buf, buf, size);
-
- if (socket->remoteQueue->push(dest) == 0) {
- return 0;
- } else {
- logger->error(errno, "connection has been closed!");
- return -1;
- }
-}
-
-int shm_recv(shm_socket_t *socket, void **buf, int *size) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
- logger->error( "shm_socket.shm_recv: can not invoke shm_recv method in a %d type socket which is "
- "not a SHM_SOCKET_STREAM socket ",
- socket->socket_type);
- exit(1);
- }
- shm_msg_t src;
-
- if (socket->messageQueue->pop(src) == 0) {
- void *_buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- mm_free(src.buf);
- return 0;
- } else {
- return -1;
- }
-}
-
-
-// 鐭繛鎺ユ柟寮忓彂閫�
-int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
- const int key, const struct timespec *timeout, const int flag) {
-
- int s;
- int rv;
-
- if (socket->socket_type != SHM_SOCKET_DGRAM) {
- logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is "
- "not a SHM_SOCKET_DGRAM socket ",
- socket->socket_type);
- exit(0);
- }
- hashtable_t *hashtable = mm_get_hashtable();
-
-
- if ((s = pthread_mutex_lock(&(socket->mutex))) != 0)
- err_exit(s, "shm_sendto : pthread_mutex_lock");
-
- if (socket->queue == NULL) {
- if (socket->key == 0) {
- socket->key = hashtable_alloc_key(hashtable);
- } else {
-
- if(!_shm_socket_check_key(socket)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
- }
-
- }
-
- socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
- }
-
- if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
- err_exit(s, "shm_sendto : pthread_mutex_unlock");
+ if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_sendto : pthread_mutex_unlock");
- // There is some case where a socket need to send to himeself, for example when bus server need to stop, he need to send himself
+ // There is some case where a sockt need to send to himeself, for example when bus server need to stop, he need to send himself
// a top message.
- // if (key == socket->key) {
- // logger->error( "can not send to your self!");
- // return -1;
- // }
+ if (key == sockt->key) {
+ logger->error( "can not send to your self!");
+ return EBUS_SENDTO_SELF;
+ }
- SHMQueue<shm_msg_t> *remoteQueue;
- if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
+ LockFreeQueue<shm_msg_t> *remoteQueue;
+ if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) {
bus_errno = EBUS_CLOSED;
logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno));
return EBUS_CLOSED;
@@ -368,7 +183,7 @@
shm_msg_t dest;
dest.type = SHM_COMMON_MSG;
- dest.key = socket->key;
+ dest.key = sockt->key;
dest.size = size;
dest.buf = mm_malloc(size);
memcpy(dest.buf, buf, size);
@@ -376,52 +191,46 @@
rv = remoteQueue->push(dest, timeout, flag);
if (rv == 0) {
- // printf("shm_sendto push after\n");
+ printf("%d sendto %d suc.\n", shm_socket_get_key(sockt), key);
return 0;
} else {
mm_free(dest.buf);
- logger->debug("sendto key %d failed %s", key, bus_strerror(rv));
- return rv;
+ if(rv == ETIMEDOUT)
+ return EBUS_TIMEOUT;
+ else {
+ logger->debug("====%d sendto key %d failed %s", shm_socket_get_key(sockt), key, bus_strerror(rv));
+ return rv;
+ }
}
}
// 鐭繛鎺ユ柟寮忔帴鍙�
-int shm_recvfrom(shm_socket_t *sokt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
- int s;
+int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
int rv;
-
- if (sokt->socket_type != SHM_SOCKET_DGRAM) {
- logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which "
- "is not a SHM_SOCKET_DGRAM socket ",
- sokt->socket_type);
- exit(1);
- }
+
hashtable_t *hashtable = mm_get_hashtable();
- if ((s = pthread_mutex_lock(&(sokt->mutex))) != 0)
- err_exit(s, "shm_recvfrom : pthread_mutex_lock");
+ if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
- if (sokt->queue == NULL) {
- if (sokt->key == 0) {
- sokt->key = hashtable_alloc_key(hashtable);
- } else {
-
- if(!_shm_socket_check_key(sokt)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
- }
+ if (sockt->queue == NULL) {
+ if (sockt->key == 0) {
+ sockt->key = hashtable_alloc_key(hashtable);
+ }
+ sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
+ if(sockt->queue == NULL ) {
+ logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+ return EBUS_KEY_INUSED;
}
-
- sokt->queue = new SHMQueue<shm_msg_t>(sokt->key, 16);
}
- if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0)
- err_exit(s, "shm_recvfrom : pthread_mutex_unlock");
+ if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
shm_msg_t src;
- rv = sokt->queue->pop(src, timeout, flag);
+ rv = sockt->queue->pop(src, timeout, flag);
if (rv == 0) {
if(buf != NULL) {
@@ -439,15 +248,20 @@
mm_free(src.buf);
return 0;
} else {
- logger->debug("shm_recvfrom failed %s", bus_strerror(rv));
- return rv;
+ if(rv == ETIMEDOUT)
+ return EBUS_TIMEOUT;
+ else {
+ logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+ return rv;
+ }
+
}
}
/* Free thread-specific data buffer */
-static void _destrory_tmp_recv_socket_(void *tmp_socket)
+static void _destrory_socket_perthread(void *tmp_socket)
{
int rv;
if(tmp_socket == NULL)
@@ -455,7 +269,7 @@
logger->debug("%d destroy tmp socket\n", pthread_self());
shm_close_socket((shm_socket_t *)tmp_socket);
- rv = pthread_setspecific(_tmp_recv_socket_key_, NULL);
+ rv = pthread_setspecific(_perthread_socket_key_, NULL);
if ( rv != 0) {
logger->error(rv, "shm_sendandrecv : pthread_setspecific");
exit(1);
@@ -463,14 +277,14 @@
}
/* One-time key creation function */
-static void _create_tmp_recv_socket_key(void)
+static void _create_socket_key_perthread(void)
{
int s;
/* Allocate a unique thread-specific data key and save the address
of the destructor for thread-specific data buffers */
- s = pthread_key_create(&_tmp_recv_socket_key_, _destrory_tmp_recv_socket_);
- //s = pthread_key_create(&_tmp_recv_socket_key_, NULL);
+ s = pthread_key_create(&_perthread_socket_key_, _destrory_socket_perthread);
+ //s = pthread_key_create(&_perthread_socket_key_, NULL);
if (s != 0) {
logger->error(s, "pthread_key_create");
abort(); /* dump core and terminate */
@@ -480,7 +294,7 @@
// use thread local
-int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf,
+int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int send_key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
int recv_key;
@@ -488,29 +302,22 @@
// 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket;
-
- if (socket->socket_type != SHM_SOCKET_DGRAM) {
- logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket "
- "which is not a SHM_SOCKET_DGRAM socket ",
- socket->socket_type);
- exit(1);
- }
- rv = pthread_once(&_once_, _create_tmp_recv_socket_key);
+ rv = pthread_once(&_once_, _create_socket_key_perthread);
if (rv != 0) {
logger->error(rv, "shm_sendandrecv pthread_once");
exit(1);
}
- tmp_socket = (shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_);
+ tmp_socket = (shm_socket_t *)pthread_getspecific(_perthread_socket_key_);
if (tmp_socket == NULL)
{
/* If first call from this thread, allocate buffer for thread, and save its location */
logger->debug("%ld create tmp socket\n", (long)pthread_self() );
tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
- rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket);
+ rv = pthread_setspecific(_perthread_socket_key_, tmp_socket);
if ( rv != 0) {
logger->error(rv, "shm_sendandrecv : pthread_setspecific");
exit(1);
@@ -519,17 +326,28 @@
if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
-printf("======send key =%d , recv key=%d\n", send_key, recv_key);
- assert( send_key == recv_key);
- if(send_key != recv_key)
- err_exit(0, "send key need to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+ if(rv != 0) {
+ logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
+ }
+ else if(rv == 0 ) {
+ logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
+
+ if(recv_key == shm_socket_get_key(sockt)) {
+ logger->debug("=====鏀跺埌浜嗚嚜宸卞彂缁欒嚜宸辩殑娑堟伅\n");
+ }
+ assert( send_key == recv_key);
+ if(send_key != recv_key) {
+ logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+ exit(1);
+ }
+ }
return rv;
} else {
return rv;
}
}
-int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
+int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int send_key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
int recv_key;
@@ -537,13 +355,6 @@
// 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket;
-
- if (socket->socket_type != SHM_SOCKET_DGRAM) {
- logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket "
- "which is not a SHM_SOCKET_DGRAM socket ",
- socket->socket_type);
- exit(1);
- }
/* If first call from this thread, allocate buffer for thread, and save its location */
// logger->debug("%d create tmp socket\n", pthread_self() );
@@ -551,15 +362,19 @@
if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
- printf("======send key =%d , recv key=%d\n", send_key, recv_key);
-
+
if(rv != 0) {
- printf("_shm_sendandrecv_alloc_new shm_recvfrom : %s\n", bus_strerror(rv));
+ printf("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv));
}
else if(rv == 0 ) {
+ printf("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
+
+ if(recv_key == shm_socket_get_key(sockt)) {
+ printf("=====鏀跺埌浜嗚嚜宸卞彂缁欒嚜宸辩殑娑堟伅\n");
+ }
assert( send_key == recv_key);
if(send_key != recv_key) {
- err_exit(0, "send key need to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+ err_exit(0, "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
}
}
@@ -573,171 +388,6 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
const int send_size, const int send_key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
- return _shm_sendandrecv_alloc_new(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags);
+
+ return _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags);
}
-
-
-// ============================================================================================================
-
-/**
- * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
- */
-SHMQueue<shm_msg_t> *_attach_remote_queue(int key) {
-
- hashtable_t *hashtable = mm_get_hashtable();
- if (hashtable_get(hashtable, key) == NULL) {
- //logger->error("shm_socket._remote_queue_attach锛歝onnet at key %d failed!", key);
- return NULL;
- }
-
- SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(key, 0);
- return queue;
-}
-
-void _server_close_conn_to_client(shm_socket_t *socket, int key) {
- shm_socket_t *client_socket;
- std::map<int, shm_socket_t *>::iterator iter =
- socket->clientSocketMap->find(key);
- if (iter != socket->clientSocketMap->end()) {
- client_socket = iter->second;
- free((void *)client_socket);
- socket->clientSocketMap->erase(iter);
- }
-}
-
-/**
- * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
- */
-void *_server_run_msg_rev(void *_socket) {
- pthread_detach(pthread_self());
- shm_socket_t *socket = (shm_socket_t *)_socket;
- struct timespec timeout = {1, 0};
- shm_msg_t src;
- shm_socket_t *client_socket;
- std::map<int, shm_socket_t *>::iterator iter;
-
- while (socket->queue->pop(src) == 0) {
-
- switch (src.type) {
- case SHM_SOCKET_OPEN:
- socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
- break;
- case SHM_SOCKET_CLOSE:
- _server_close_conn_to_client(socket, src.key);
- break;
- case SHM_COMMON_MSG:
-
- iter = socket->clientSocketMap->find(src.key);
- if (iter != socket->clientSocketMap->end()) {
- client_socket = iter->second;
- // print_msg("_server_run_msg_rev push before", src);
- client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
- // print_msg("_server_run_msg_rev push after", src);
- }
-
- break;
-
- default:
- logger->error("shm_socket._server_run_msg_rev: undefined message type.");
- }
- }
-
- return NULL;
-}
-
-void _client_close_conn_to_server(shm_socket_t *socket) {
-
- _shm_close_stream_socket(socket, false);
-}
-
-/**
- * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
- */
-void *_client_run_msg_rev(void *_socket) {
- pthread_detach(pthread_self());
- shm_socket_t *socket = (shm_socket_t *)_socket;
- struct timespec timeout = {1, 0};
- shm_msg_t src;
-
- while (socket->queue->pop(src) == 0) {
- switch (src.type) {
-
- case SHM_SOCKET_CLOSE:
- _client_close_conn_to_server(socket);
- break;
- case SHM_COMMON_MSG:
- socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
- break;
- default:
- logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
- }
- }
-
- return NULL;
-}
-
-int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) {
- socket->status = SHM_CONN_CLOSED;
- //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
- struct timespec timeout = {1, 0};
- shm_msg_t close_msg;
-
- close_msg.key = socket->key;
- close_msg.size = 0;
- close_msg.type = SHM_SOCKET_CLOSE;
- if (notifyRemote && socket->remoteQueue != NULL) {
- socket->remoteQueue->push(close_msg, &timeout, BUS_TIMEOUT_FLAG);
- }
-
- if (socket->queue != NULL) {
- delete socket->queue;
- socket->queue = NULL;
- }
-
-
-
- if (socket->messageQueue != NULL) {
- delete socket->messageQueue;
- socket->messageQueue = NULL;
- }
-
- if (socket->acceptQueue != NULL) {
- delete socket->acceptQueue;
- socket->acceptQueue = NULL;
- }
-
- if (socket->clientSocketMap != NULL) {
- shm_socket_t *client_socket;
- for (auto iter = socket->clientSocketMap->begin();
- iter != socket->clientSocketMap->end(); iter++) {
- client_socket = iter->second;
-
- client_socket->remoteQueue->push(close_msg, &timeout, BUS_TIMEOUT_FLAG);
- client_socket->remoteQueue = NULL;
-
- delete client_socket->messageQueue;
- client_socket->messageQueue = NULL;
- free((void *)client_socket);
- }
- delete socket->clientSocketMap;
- }
-
- if (socket->dispatch_thread != 0)
- pthread_cancel(socket->dispatch_thread);
-
-
- return 0;
-
-}
-
-int _shm_close_dgram_socket(shm_socket_t *socket){
- if(socket->queue != NULL) {
- delete socket->queue;
- socket->queue = NULL;
- }
-
- return 0;
-}
-
-
-
--
Gitblit v1.8.0