From 3a89a77e79407d0d638ddf983ee580410cf807c5 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 04 八月 2020 15:56:12 +0800
Subject: [PATCH] fix sendto
---
src/socket/shm_socket.c | 788 +++++++++++++++++++++++++++++--------------------------
1 files changed, 416 insertions(+), 372 deletions(-)
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 6708469..c6d9dcb 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -3,478 +3,522 @@
#include "logger_factory.h"
#include <map>
-
-
static Logger logger = LoggerFactory::getLogger();
-void print_msg(char *head, shm_msg_t& msg) {
- //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
+
+
+void print_msg(char *head, shm_msg_t &msg) {
+ // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
}
-void * _server_run_msg_rev(void* _socket);
+static void *_server_run_msg_rev(void *_socket);
-void * _client_run_msg_rev(void* _socket);
+static void *_client_run_msg_rev(void *_socket);
-int _shm_close_dgram_socket(shm_socket_t *socket);
+static int _shm_close_dgram_socket(shm_socket_t *socket);
+static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
-int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
+static inline int _shm_socket_check_key(shm_socket_t *socket) {
+ void *tmp_ptr = mm_get_by_key(socket->port);
+ if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ return 0;
+ }
+ return 1;
+}
-SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
+SHMQueue<shm_msg_t> *_attach_remote_queue(int port);
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
- shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
- socket->socket_type = socket_type;
- socket->port = -1;
- socket->dispatch_thread = 0;
- socket->status=SHM_CONN_CLOSED;
-
- return socket;
+ shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
+ socket->socket_type = socket_type;
+ socket->port = -1;
+ socket->force_bind = false;
+ socket->dispatch_thread = 0;
+ socket->status = SHM_CONN_CLOSED;
+
+ return socket;
}
int shm_close_socket(shm_socket_t *socket) {
- switch(socket->socket_type) {
- case SHM_SOCKET_STREAM:
- return _shm_close_stream_socket(socket, true);
- case SHM_SOCKET_DGRAM:
- return _shm_close_dgram_socket(socket);
- default:
- return -1;
- }
- return -1;
-
+ switch (socket->socket_type) {
+ case SHM_SOCKET_STREAM:
+ return _shm_close_stream_socket(socket, true);
+ case SHM_SOCKET_DGRAM:
+ return _shm_close_dgram_socket(socket);
+ default:
+ return -1;
+ }
+ return -1;
}
-int shm_socket_bind(shm_socket_t * socket, int port) {
- socket -> port = port;
- return 0;
+int shm_socket_bind(shm_socket_t *socket, int port) {
+ socket->port = port;
+ return 0;
}
-int shm_listen(shm_socket_t* socket) {
-
- if(socket->socket_type != SHM_SOCKET_STREAM) {
- err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket");
- }
-
- int port;
- hashtable_t *hashtable = mm_get_hashtable();
- if(socket -> port == -1) {
- port = hashtable_alloc_key(hashtable);
- socket -> port = port;
- } else {
-
- if(hashtable_get(hashtable, socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
- }
- }
-
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- socket->clientSocketMap = new std::map<int, shm_socket_t* >;
- socket->status = SHM_CONN_LISTEN;
- pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
-
-
- return 0;
+int shm_socket_force_bind(shm_socket_t *socket, int port) {
+ socket->force_bind = true;
+ socket->port = port;
+ return 0;
}
+int shm_listen(shm_socket_t *socket) {
+
+ if (socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_listen method with a socket which is not a "
+ "SHM_SOCKET_STREAM socket");
+ }
+
+ int port;
+ hashtable_t *hashtable = mm_get_hashtable();
+ if (socket->port == -1) {
+ port = hashtable_alloc_key(hashtable);
+ socket->port = port;
+ } else {
+
+ _shm_socket_check_key(socket);
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ socket->clientSocketMap = new std::map<int, shm_socket_t *>;
+ socket->status = SHM_CONN_LISTEN;
+ pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev,
+ (void *)socket);
+
+ return 0;
+}
/**
* 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
*
*/
-shm_socket_t* shm_accept(shm_socket_t* socket) {
- if(socket->socket_type != SHM_SOCKET_STREAM) {
- err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket");
- }
- hashtable_t *hashtable = mm_get_hashtable();
- int client_port;
- shm_socket_t *client_socket;
- shm_msg_t src;
+shm_socket_t *shm_accept(shm_socket_t *socket) {
+ if (socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_accept method with a socket which is not a "
+ "SHM_SOCKET_STREAM socket");
+ }
+ hashtable_t *hashtable = mm_get_hashtable();
+ int client_port;
+ shm_socket_t *client_socket;
+ shm_msg_t src;
- if (socket->acceptQueue->pop(src) ) {
+ if (socket->acceptQueue->pop(src)) {
-// print_msg("===accept:", src);
- client_port = src.port;
- client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
- client_socket->port = socket->port;
- // client_socket->queue= socket->queue;
- //鍒濆鍖栨秷鎭痲ueue
- client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- //杩炴帴鍒板鏂筿ueue
- client_socket->remoteQueue = _attach_remote_queue(client_port);
+ // print_msg("===accept:", src);
+ client_port = src.port;
+ // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
+ client_socket = shm_open_socket(socket->socket_type);
+ client_socket->port = socket->port;
+ // client_socket->queue= socket->queue;
+ //鍒濆鍖栨秷鎭痲ueue
+ client_socket->messageQueue =
+ new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ //杩炴帴鍒板鏂筿ueue
+ client_socket->remoteQueue = _attach_remote_queue(client_port);
- socket->clientSocketMap->insert({client_port, client_socket});
+ socket->clientSocketMap->insert({client_port, client_socket});
- /*
- * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
- */
- //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
- struct timespec timeout = {1, 0};
- shm_msg_t msg;
- msg.port = socket->port;
- msg.size = 0;
- msg.type = SHM_SOCKET_OPEN_REPLY;
+ /*
+* shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉�
+* 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
+ */
+ //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
+ struct timespec timeout = {1, 0};
+ shm_msg_t msg;
+ msg.port = socket->port;
+ msg.size = 0;
+ msg.type = SHM_SOCKET_OPEN_REPLY;
- if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
- {
- client_socket->status = SHM_CONN_ESTABLISHED;
- return client_socket;
- } else {
- err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
- return NULL;
- }
+ if (client_socket->remoteQueue->push_timeout(msg, &timeout)) {
+ client_socket->status = SHM_CONN_ESTABLISHED;
+ return client_socket;
+ } else {
+ err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
+ return NULL;
+ }
-
- } else {
- err_exit(errno, "shm_accept");
- }
- return NULL;
-
+ } else {
+ err_exit(errno, "shm_accept");
+ }
+ return NULL;
}
-int shm_connect(shm_socket_t* socket, int port) {
- if(socket->socket_type != SHM_SOCKET_STREAM) {
- err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket");
- }
- hashtable_t *hashtable = mm_get_hashtable();
- if(hashtable_get(hashtable, port)== NULL) {
- err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
- }
+int shm_connect(shm_socket_t *socket, int port) {
+ if (socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_connect method with a socket which is not "
+ "a SHM_SOCKET_STREAM socket");
+ }
+ hashtable_t *hashtable = mm_get_hashtable();
+ if (hashtable_get(hashtable, port) == NULL) {
+ err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
+ }
- if(socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
- } else {
-
- if(hashtable_get(hashtable, socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
- }
- }
+ if (socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
+ _shm_socket_check_key(socket);
+ }
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- socket->remoteQueue = _attach_remote_queue(port);
- socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
-
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- //鍙戦�乷pen璇锋眰
- struct timespec timeout = {1, 0};
- shm_msg_t msg;
- msg.port = socket->port;
- msg.size = 0;
- msg.type=SHM_SOCKET_OPEN;
- socket->remoteQueue->push_timeout(msg, &timeout);
+ if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) {
+ err_exit(0, "connect to %d failted", port);
+ }
+ socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- //鎺ュ彈open reply
- if(socket->queue->pop(msg)) {
- // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
- if(msg.type == SHM_SOCKET_OPEN_REPLY) {
- socket->status = SHM_CONN_ESTABLISHED;
- pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
- } else {
- err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
- }
-
- } else {
- err_exit(0, "connect failted!");
- }
-
- return 0;
+ //鍙戦�乷pen璇锋眰
+ struct timespec timeout = {1, 0};
+ shm_msg_t msg;
+ msg.port = socket->port;
+ msg.size = 0;
+ msg.type = SHM_SOCKET_OPEN;
+ socket->remoteQueue->push_timeout(msg, &timeout);
+
+ //鎺ュ彈open reply
+ if (socket->queue->pop(msg)) {
+ // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
+ if (msg.type == SHM_SOCKET_OPEN_REPLY) {
+ socket->status = SHM_CONN_ESTABLISHED;
+ pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev,
+ (void *)socket);
+ } else {
+ err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
+ }
+
+ } else {
+ err_exit(0, "connect failted!");
+ }
+
+ return 0;
}
-
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
- if(socket->socket_type != SHM_SOCKET_STREAM) {
- err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket");
- }
- // hashtable_t *hashtable = mm_get_hashtable();
- // if(socket->remoteQueue == NULL) {
- // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
- // return -1;
- // }
- shm_msg_t dest;
- dest.type=SHM_COMMON_MSG;
- dest.port = socket->port;
- dest.size = size;
- dest.buf = mm_malloc(size);
- memcpy(dest.buf, buf, size);
+ if (socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_send method with a socket which is not a "
+ "SHM_SOCKET_STREAM socket");
+ }
+ // hashtable_t *hashtable = mm_get_hashtable();
+ // if(socket->remoteQueue == NULL) {
+ // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
+ // return -1;
+ // }
+ shm_msg_t dest;
+ dest.type = SHM_COMMON_MSG;
+ dest.port = socket->port;
+ dest.size = size;
+ dest.buf = mm_malloc(size);
+ memcpy(dest.buf, buf, size);
-
- if(socket->remoteQueue->push(dest)) {
- return 0;
- } else {
- err_msg(errno, "connection has been closed!");
- return -1;
- }
+ if (socket->remoteQueue->push(dest)) {
+ return 0;
+ } else {
+ err_msg(errno, "connection has been closed!");
+ return -1;
+ }
}
+int shm_recv(shm_socket_t *socket, void **buf, int *size) {
+ if (socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_recv method in a %d type socket which is "
+ "not a SHM_SOCKET_STREAM socket ",
+ socket->socket_type);
+ }
+ shm_msg_t src;
-int shm_recv(shm_socket_t* socket, void **buf, int *size) {
- if(socket->socket_type != SHM_SOCKET_STREAM) {
- err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket");
- }
- shm_msg_t src;
-
- if (socket->messageQueue->pop(src)) {
- void * _buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- mm_free(src.buf);
- return 0;
- } else {
- return -1;
- }
-
+ if (socket->messageQueue->pop(src)) {
+ void *_buf = malloc(src.size);
+ memcpy(_buf, src.buf, src.size);
+ *buf = _buf;
+ *size = src.size;
+ mm_free(src.buf);
+ return 0;
+ } else {
+ return -1;
+ }
}
-
// 鐭繛鎺ユ柟寮忓彂閫�
-int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) {
- hashtable_t *hashtable = mm_get_hashtable();
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
+ const int port, const struct timespec *timeout, const int flags) {
+ if (socket->socket_type != SHM_SOCKET_DGRAM) {
+ err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is "
+ "not a SHM_SOCKET_DGRAM socket ",
+ socket->socket_type);
+ }
+ hashtable_t *hashtable = mm_get_hashtable();
- if(socket->queue == NULL) {
- if(socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
- } else {
-
- if(hashtable_get(hashtable, socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
- }
- }
+ if (socket->queue == NULL) {
+ if (socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- }
- if (port == socket->port) {
- err_msg(0, "can not send to your self!");
- return -1;
- }
+ _shm_socket_check_key(socket);
+ }
- shm_msg_t dest;
- dest.type=SHM_COMMON_MSG;
- dest.port = socket->port;
- dest.size = size;
- dest.buf = mm_malloc(size);
- memcpy(dest.buf, buf, size);
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ }
+ if (port == socket->port) {
+ err_msg(0, "can not send to your self!");
+ return -1;
+ }
- SHMQueue<shm_msg_t> *remoteQueue = _attach_remote_queue(port);
- if(remoteQueue->push(dest)) {
- delete remoteQueue;
- return 0;
- } else {
- delete remoteQueue;
- err_msg(errno, "sendto port %d failed!", port);
- return -1;
- }
+ SHMQueue<shm_msg_t> *remoteQueue;
+ if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
+ err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
+ return SHM_SOCKET_CONN_FAILED;
+ }
+
+ shm_msg_t dest;
+ dest.type = SHM_COMMON_MSG;
+ dest.port = socket->port;
+ dest.size = size;
+ dest.buf = mm_malloc(size);
+ memcpy(dest.buf, buf, size);
+
+ // printf("shm_sendto push before\n");
+ bool rv;
+ if(flags & SHM_MSG_NOWAIT != 0) {
+ rv = remoteQueue->push_nowait(dest);
+ } else if(timeout != NULL) {
+ rv = remoteQueue->push_timeout(dest, timeout);
+ } else {
+ rv = remoteQueue->push(dest);
+ }
+
+ if (rv) {
+ // printf("shm_sendto push after\n");
+ delete remoteQueue;
+ return 0;
+ } else {
+ delete remoteQueue;
+ mm_free(dest.buf);
+ err_msg(errno, "sendto port %d failed!", port);
+ return -1;
+ }
}
-
// 鐭繛鎺ユ柟寮忔帴鍙�
-int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){
- hashtable_t *hashtable = mm_get_hashtable();
- if(socket->queue == NULL) {
- if(socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
- } else {
-
- if(hashtable_get(hashtable, socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
- }
- }
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) {
+ if (socket->socket_type != SHM_SOCKET_DGRAM) {
+ err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which "
+ "is not a SHM_SOCKET_DGRAM socket ",
+ socket->socket_type);
+ }
+ hashtable_t *hashtable = mm_get_hashtable();
+ if (socket->queue == NULL) {
+ if (socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- }
+ _shm_socket_check_key(socket);
+ }
- shm_msg_t src;
-// printf("shm_recvfrom pop before");
- if (socket->queue->pop(src)) {
- void * _buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- *port = src.port;
- mm_free(src.buf);
-// printf("shm_recvfrom pop after");
- return 0;
- } else {
- return -1;
- }
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ }
+
+ shm_msg_t src;
+ // printf("shm_recvfrom pop before\n");
+ bool rv;
+ if(flags & SHM_MSG_NOWAIT != 0) {
+ rv = socket->queue->pop_nowait(src);
+ } else if(timeout != NULL) {
+ rv = socket->queue->pop_timeout(src, timeout);
+ } else {
+ rv = socket->queue->pop(src);
+ }
+
+ if (rv) {
+ void *_buf = malloc(src.size);
+ memcpy(_buf, src.buf, src.size);
+ *buf = _buf;
+ *size = src.size;
+ *port = src.port;
+ mm_free(src.buf);
+ // printf("shm_recvfrom pop after\n");
+ return 0;
+ } else {
+ return -1;
+ }
}
-int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) {
- int recv_port;
- int rv;
+int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
+ const int send_size, const int send_port, void **recv_buf,
+ int *recv_size, struct timespec *timeout, int flags) {
+ if (socket->socket_type != SHM_SOCKET_DGRAM) {
+ err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket "
+ "which is not a SHM_SOCKET_DGRAM socket ",
+ socket->socket_type);
+ }
+ int recv_port;
+ int rv;
- shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
- if (shm_sendto(tmp_socket, send_buf, send_size, send_port) == 0) {
- rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port);
- shm_close_socket(tmp_socket);
- return rv;
- }
- return -1;
+ shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ if (shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags) == 0) {
+ rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags);
+ shm_close_socket(tmp_socket);
+ return rv;
+ }
+ return -1;
}
+// ============================================================================================================
/**
* 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
*/
-SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
- hashtable_t *hashtable = mm_get_hashtable();
- if(hashtable_get(hashtable, port)== NULL) {
- err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port);
- return NULL;
- }
-
- SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
- return queue;
+SHMQueue<shm_msg_t> *_attach_remote_queue(int port) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ if (hashtable_get(hashtable, port) == NULL) {
+ err_msg(0, "_remote_queue_attach锛歝onnet at port %d failed!", port);
+ return NULL;
+ }
+
+ SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
+ return queue;
}
-
-
-
-
-void _server_close_conn_to_client(shm_socket_t* socket, int port) {
- shm_socket_t *client_socket;
- auto iter = socket->clientSocketMap->find(port);
- if( iter != socket->clientSocketMap->end() ) {
- socket->clientSocketMap->erase(iter);
- }
- free((void *)client_socket);
-
+void _server_close_conn_to_client(shm_socket_t *socket, int port) {
+ shm_socket_t *client_socket;
+ std::map<int, shm_socket_t *>::iterator iter =
+ socket->clientSocketMap->find(port);
+ if (iter != socket->clientSocketMap->end()) {
+ client_socket = iter->second;
+ free((void *)client_socket);
+ socket->clientSocketMap->erase(iter);
+ }
}
/**
* server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
*/
-void * _server_run_msg_rev(void* _socket) {
- pthread_detach(pthread_self());
- shm_socket_t* socket = (shm_socket_t*) _socket;
- struct timespec timeout = {1, 0};
- shm_msg_t src;
- shm_socket_t *client_socket;
- std::map<int, shm_socket_t* >::iterator iter;
+void *_server_run_msg_rev(void *_socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t *socket = (shm_socket_t *)_socket;
+ struct timespec timeout = {1, 0};
+ shm_msg_t src;
+ shm_socket_t *client_socket;
+ std::map<int, shm_socket_t *>::iterator iter;
- while(socket->queue->pop(src)) {
+ while (socket->queue->pop(src)) {
- switch (src.type) {
- case SHM_SOCKET_OPEN :
- socket->acceptQueue->push_timeout(src, &timeout);
- break;
- case SHM_SOCKET_CLOSE :
- _server_close_conn_to_client(socket, src.port);
- break;
- case SHM_COMMON_MSG :
+ switch (src.type) {
+ case SHM_SOCKET_OPEN:
+ socket->acceptQueue->push_timeout(src, &timeout);
+ break;
+ case SHM_SOCKET_CLOSE:
+ _server_close_conn_to_client(socket, src.port);
+ break;
+ case SHM_COMMON_MSG:
- iter = socket->clientSocketMap->find(src.port);
- if( iter != socket->clientSocketMap->end()) {
- client_socket= iter->second;
- // print_msg("_server_run_msg_rev push before", src);
- client_socket->messageQueue->push_timeout(src, &timeout);
- // print_msg("_server_run_msg_rev push after", src);
- }
-
- break;
+ iter = socket->clientSocketMap->find(src.port);
+ if (iter != socket->clientSocketMap->end()) {
+ client_socket = iter->second;
+ // print_msg("_server_run_msg_rev push before", src);
+ client_socket->messageQueue->push_timeout(src, &timeout);
+ // print_msg("_server_run_msg_rev push after", src);
+ }
- default:
- err_msg(0, "socket.__shm_rev__: undefined message type.");
- }
+ break;
+
+ default:
+ err_msg(0, "socket.__shm_rev__: undefined message type.");
}
-
- return NULL;
+ }
+
+ return NULL;
}
+void _client_close_conn_to_server(shm_socket_t *socket) {
-
-void _client_close_conn_to_server(shm_socket_t* socket) {
-
- _shm_close_stream_socket(socket, false);
+ _shm_close_stream_socket(socket, false);
}
-
/**
* client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
*/
-void * _client_run_msg_rev(void* _socket) {
- pthread_detach(pthread_self());
- shm_socket_t* socket = (shm_socket_t*) _socket;
- struct timespec timeout = {1, 0};
- shm_msg_t src;
-
- while(socket->queue->pop(src)) {
- switch (src.type) {
-
- case SHM_SOCKET_CLOSE :
- _client_close_conn_to_server(socket);
- break;
- case SHM_COMMON_MSG :
- socket->messageQueue->push_timeout(src, &timeout);
- break;
- default:
- err_msg(0, "socket.__shm_rev__: undefined message type.");
- }
+void *_client_run_msg_rev(void *_socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t *socket = (shm_socket_t *)_socket;
+ struct timespec timeout = {1, 0};
+ shm_msg_t src;
+
+ while (socket->queue->pop(src)) {
+ switch (src.type) {
+
+ case SHM_SOCKET_CLOSE:
+ _client_close_conn_to_server(socket);
+ break;
+ case SHM_COMMON_MSG:
+ socket->messageQueue->push_timeout(src, &timeout);
+ break;
+ default:
+ err_msg(0, "socket.__shm_rev__: undefined message type.");
}
-
- return NULL;
+ }
+
+ return NULL;
}
-
int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) {
- socket->status = SHM_CONN_CLOSED;
- //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
- struct timespec timeout = {1, 0};
- shm_msg_t close_msg;
+ socket->status = SHM_CONN_CLOSED;
+ //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
+ struct timespec timeout = {1, 0};
+ shm_msg_t close_msg;
- close_msg.port = socket->port;
- close_msg.size = 0;
- close_msg.type=SHM_SOCKET_CLOSE;
- if(notifyRemote && socket->remoteQueue != NULL) {
- socket->remoteQueue->push_timeout(close_msg, &timeout);
- }
-
- if(socket->queue != NULL) {
- delete socket->queue;
- socket->queue = NULL;
- }
+ close_msg.port = socket->port;
+ close_msg.size = 0;
+ close_msg.type = SHM_SOCKET_CLOSE;
+ if (notifyRemote && socket->remoteQueue != NULL) {
+ socket->remoteQueue->push_timeout(close_msg, &timeout);
+ }
- if(socket->remoteQueue != NULL) {
- delete socket->remoteQueue;
- socket->remoteQueue = NULL;
- }
+ if (socket->queue != NULL) {
+ delete socket->queue;
+ socket->queue = NULL;
+ }
- if(socket->messageQueue != NULL) {
- delete socket->messageQueue;
- socket->messageQueue = NULL;
- }
+ if (socket->remoteQueue != NULL) {
+ delete socket->remoteQueue;
+ socket->remoteQueue = NULL;
+ }
- if(socket->acceptQueue != NULL) {
- delete socket->acceptQueue;
- socket->acceptQueue = NULL;
- }
+ if (socket->messageQueue != NULL) {
+ delete socket->messageQueue;
+ socket->messageQueue = NULL;
+ }
- if(socket->clientSocketMap != NULL) {
- shm_socket_t *client_socket;
- for(auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) {
- client_socket= iter->second;
+ if (socket->acceptQueue != NULL) {
+ delete socket->acceptQueue;
+ socket->acceptQueue = NULL;
+ }
- client_socket->remoteQueue->push_timeout(close_msg, &timeout);
- delete client_socket->remoteQueue;
- client_socket->remoteQueue=NULL;
+ if (socket->clientSocketMap != NULL) {
+ shm_socket_t *client_socket;
+ for (auto iter = socket->clientSocketMap->begin();
+ iter != socket->clientSocketMap->end(); iter++) {
+ client_socket = iter->second;
- delete client_socket->messageQueue;
- client_socket->messageQueue=NULL;
- socket->clientSocketMap->erase(iter);
- free((void *)client_socket);
- }
- delete socket->clientSocketMap;
- }
+ client_socket->remoteQueue->push_timeout(close_msg, &timeout);
+ delete client_socket->remoteQueue;
+ client_socket->remoteQueue = NULL;
+ delete client_socket->messageQueue;
+ client_socket->messageQueue = NULL;
+ free((void *)client_socket);
+ }
+ delete socket->clientSocketMap;
+ }
- if(socket->dispatch_thread != 0)
- pthread_cancel(socket->dispatch_thread);
-
- free(socket);
- return 0;
+ if (socket->dispatch_thread != 0)
+ pthread_cancel(socket->dispatch_thread);
+
+ free(socket);
+ return 0;
}
--
Gitblit v1.8.0