#include "shm_socket.h" #include "hashtable.h" #include "logger_factory.h" #include static Logger logger = LoggerFactory::getLogger(); void print_msg(char *head, shm_msg_t &msg) { // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); } static void *_server_run_msg_rev(void *_socket); static void *_client_run_msg_rev(void *_socket); static int _shm_close_dgram_socket(shm_socket_t *socket); static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); static inline int _shm_socket_check_key(shm_socket_t *socket) { void *tmp_ptr = mm_get_by_key(socket->port); if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { err_exit(0, "key %d has already been in used!", socket->port); return 0; } return 1; } SHMQueue *_attach_remote_queue(int port); shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->socket_type = socket_type; socket->port = -1; socket->force_bind = false; socket->dispatch_thread = 0; socket->status = SHM_CONN_CLOSED; return socket; } int shm_close_socket(shm_socket_t *socket) { switch (socket->socket_type) { case SHM_SOCKET_STREAM: return _shm_close_stream_socket(socket, true); case SHM_SOCKET_DGRAM: return _shm_close_dgram_socket(socket); default: return -1; } return -1; } int shm_socket_bind(shm_socket_t *socket, int port) { socket->port = port; return 0; } int shm_socket_force_bind(shm_socket_t *socket, int port) { socket->force_bind = true; socket->port = port; return 0; } int shm_listen(shm_socket_t *socket) { if (socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_listen method with a socket which is not a " "SHM_SOCKET_STREAM socket"); } int port; hashtable_t *hashtable = mm_get_hashtable(); if (socket->port == -1) { port = hashtable_alloc_key(hashtable); socket->port = port; } else { _shm_socket_check_key(socket); } socket->queue = new SHMQueue(socket->port, 16); socket->acceptQueue = new LockFreeQueue(16); socket->clientSocketMap = new std::map; socket->status = SHM_CONN_LISTEN; pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev, (void *)socket); return 0; } /** * 接受客户端建立新连接的请求 * */ shm_socket_t *shm_accept(shm_socket_t *socket) { if (socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_accept method with a socket which is not a " "SHM_SOCKET_STREAM socket"); } hashtable_t *hashtable = mm_get_hashtable(); int client_port; shm_socket_t *client_socket; shm_msg_t src; if (socket->acceptQueue->pop(src)) { // print_msg("===accept:", src); client_port = src.port; // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); client_socket = shm_open_socket(socket->socket_type); client_socket->port = socket->port; // client_socket->queue= socket->queue; //初始化消息queue client_socket->messageQueue = new LockFreeQueue(16); //连接到对方queue client_socket->remoteQueue = _attach_remote_queue(client_port); socket->clientSocketMap->insert({client_port, client_socket}); /* * shm_accept 用户执行的方法 * 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题 */ //发送open_reply,回应客户端的connect请求 struct timespec timeout = {1, 0}; shm_msg_t msg; msg.port = socket->port; msg.size = 0; msg.type = SHM_SOCKET_OPEN_REPLY; if (client_socket->remoteQueue->push_timeout(msg, &timeout)) { client_socket->status = SHM_CONN_ESTABLISHED; return client_socket; } else { err_msg(0, "shm_accept: 发送open_reply失败"); return NULL; } } else { err_exit(errno, "shm_accept"); } return NULL; } int shm_connect(shm_socket_t *socket, int port) { if (socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_connect method with a socket which is not " "a SHM_SOCKET_STREAM socket"); } hashtable_t *hashtable = mm_get_hashtable(); if (hashtable_get(hashtable, port) == NULL) { err_exit(0, "shm_connect:connect at port %d failed!", port); } if (socket->port == -1) { socket->port = hashtable_alloc_key(hashtable); } else { _shm_socket_check_key(socket); } socket->queue = new SHMQueue(socket->port, 16); if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) { err_exit(0, "connect to %d failted", port); } socket->messageQueue = new LockFreeQueue(16); //发送open请求 struct timespec timeout = {1, 0}; shm_msg_t msg; msg.port = socket->port; msg.size = 0; msg.type = SHM_SOCKET_OPEN; socket->remoteQueue->push_timeout(msg, &timeout); //接受open reply if (socket->queue->pop(msg)) { // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接 if (msg.type == SHM_SOCKET_OPEN_REPLY) { socket->status = SHM_CONN_ESTABLISHED; pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev, (void *)socket); } else { err_exit(0, "shm_connect: 不匹配的应答信息!"); } } else { err_exit(0, "connect failted!"); } return 0; } int shm_send(shm_socket_t *socket, const void *buf, const int size) { if (socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_send method with a socket which is not a " "SHM_SOCKET_STREAM socket"); } // hashtable_t *hashtable = mm_get_hashtable(); // if(socket->remoteQueue == NULL) { // err_msg(errno, "当前客户端无连接!"); // return -1; // } shm_msg_t dest; dest.type = SHM_COMMON_MSG; dest.port = socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); if (socket->remoteQueue->push(dest)) { return 0; } else { err_msg(errno, "connection has been closed!"); return -1; } } int shm_recv(shm_socket_t *socket, void **buf, int *size) { if (socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_recv method in a %d type socket which is " "not a SHM_SOCKET_STREAM socket ", socket->socket_type); } shm_msg_t src; if (socket->messageQueue->pop(src)) { void *_buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return -1; } } // 短连接方式发送 int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec *timeout, const int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is " "not a SHM_SOCKET_DGRAM socket ", socket->socket_type); } hashtable_t *hashtable = mm_get_hashtable(); if (socket->queue == NULL) { if (socket->port == -1) { socket->port = hashtable_alloc_key(hashtable); } else { _shm_socket_check_key(socket); } socket->queue = new SHMQueue(socket->port, 16); } if (port == socket->port) { err_msg(0, "can not send to your self!"); return -1; } SHMQueue *remoteQueue; if ((remoteQueue = _attach_remote_queue(port)) == NULL) { err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!"); return SHM_SOCKET_ECONNFAILED; } shm_msg_t dest; dest.type = SHM_COMMON_MSG; dest.port = socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); // printf("shm_sendto push before\n"); bool rv; if(flags & SHM_MSG_NOWAIT != 0) { rv = remoteQueue->push_nowait(dest); } else if(timeout != NULL) { rv = remoteQueue->push_timeout(dest, timeout); } else { rv = remoteQueue->push(dest); } if (rv) { // printf("shm_sendto push after\n"); delete remoteQueue; return 0; } else { delete remoteQueue; mm_free(dest.buf); err_msg(errno, "sendto port %d failed!", port); return -1; } } // 短连接方式接受 int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which " "is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); } hashtable_t *hashtable = mm_get_hashtable(); if (socket->queue == NULL) { if (socket->port == -1) { socket->port = hashtable_alloc_key(hashtable); } else { _shm_socket_check_key(socket); } socket->queue = new SHMQueue(socket->port, 16); } shm_msg_t src; // printf("shm_recvfrom pop before\n"); bool rv; if(flags & SHM_MSG_NOWAIT != 0) { rv = socket->queue->pop_nowait(src); } else if(timeout != NULL) { rv = socket->queue->pop_timeout(src, timeout); } else { rv = socket->queue->pop(src); } if (rv) { void *_buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; *port = src.port; mm_free(src.buf); // printf("shm_recvfrom pop after\n"); return 0; } else { return -1; } } int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " "which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); } int recv_port; int rv; shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags)) == 0) { rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags); shm_close_socket(tmp_socket); return rv; } else { shm_close_socket(tmp_socket); return rv; } return -1; } // ============================================================================================================ /** * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 */ SHMQueue *_attach_remote_queue(int port) { hashtable_t *hashtable = mm_get_hashtable(); if (hashtable_get(hashtable, port) == NULL) { err_msg(0, "_remote_queue_attach:connet at port %d failed!", port); return NULL; } SHMQueue *queue = new SHMQueue(port, 0); return queue; } void _server_close_conn_to_client(shm_socket_t *socket, int port) { shm_socket_t *client_socket; std::map::iterator iter = socket->clientSocketMap->find(port); if (iter != socket->clientSocketMap->end()) { client_socket = iter->second; free((void *)client_socket); socket->clientSocketMap->erase(iter); } } /** * server端各种类型消息()在这里进程分拣 */ void *_server_run_msg_rev(void *_socket) { pthread_detach(pthread_self()); shm_socket_t *socket = (shm_socket_t *)_socket; struct timespec timeout = {1, 0}; shm_msg_t src; shm_socket_t *client_socket; std::map::iterator iter; while (socket->queue->pop(src)) { switch (src.type) { case SHM_SOCKET_OPEN: socket->acceptQueue->push_timeout(src, &timeout); break; case SHM_SOCKET_CLOSE: _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG: iter = socket->clientSocketMap->find(src.port); if (iter != socket->clientSocketMap->end()) { client_socket = iter->second; // print_msg("_server_run_msg_rev push before", src); client_socket->messageQueue->push_timeout(src, &timeout); // print_msg("_server_run_msg_rev push after", src); } break; default: err_msg(0, "socket.__shm_rev__: undefined message type."); } } return NULL; } void _client_close_conn_to_server(shm_socket_t *socket) { _shm_close_stream_socket(socket, false); } /** * client端的各种类型消息()在这里进程分拣 */ void *_client_run_msg_rev(void *_socket) { pthread_detach(pthread_self()); shm_socket_t *socket = (shm_socket_t *)_socket; struct timespec timeout = {1, 0}; shm_msg_t src; while (socket->queue->pop(src)) { switch (src.type) { case SHM_SOCKET_CLOSE: _client_close_conn_to_server(socket); break; case SHM_COMMON_MSG: socket->messageQueue->push_timeout(src, &timeout); break; default: err_msg(0, "socket.__shm_rev__: undefined message type."); } } return NULL; } int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) { socket->status = SHM_CONN_CLOSED; //给对方发送一个关闭连接的消息 struct timespec timeout = {1, 0}; shm_msg_t close_msg; close_msg.port = socket->port; close_msg.size = 0; close_msg.type = SHM_SOCKET_CLOSE; if (notifyRemote && socket->remoteQueue != NULL) { socket->remoteQueue->push_timeout(close_msg, &timeout); } if (socket->queue != NULL) { delete socket->queue; socket->queue = NULL; } if (socket->remoteQueue != NULL) { delete socket->remoteQueue; socket->remoteQueue = NULL; } if (socket->messageQueue != NULL) { delete socket->messageQueue; socket->messageQueue = NULL; } if (socket->acceptQueue != NULL) { delete socket->acceptQueue; socket->acceptQueue = NULL; } if (socket->clientSocketMap != NULL) { shm_socket_t *client_socket; for (auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) { client_socket = iter->second; client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; client_socket->remoteQueue = NULL; delete client_socket->messageQueue; client_socket->messageQueue = NULL; free((void *)client_socket); } delete socket->clientSocketMap; } if (socket->dispatch_thread != 0) pthread_cancel(socket->dispatch_thread); free(socket); return 0; } int _shm_close_dgram_socket(shm_socket_t *socket){ if(socket->queue != NULL) { delete socket->queue; socket->queue = NULL; } free(socket); return 0; }