#include "shm_socket.h" #include "hashtable.h" #include "logger_factory.h" #include static Logger logger = LoggerFactory::getLogger(); void print_msg(char *head, shm_msg_t& msg) { //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); } void * _server_run_msg_rev(void* _socket); void * _client_run_msg_rev(void* _socket); int _shm_close_dgram_socket(shm_socket_t *socket); int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); SHMQueue * _attach_remote_queue(int port) ; shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->socket_type = socket_type; socket->port = -1; socket->dispatch_thread = 0; socket->status=SHM_CONN_CLOSED; return socket; } int shm_close_socket(shm_socket_t *socket) { switch(socket->socket_type) { case SHM_SOCKET_STREAM: return _shm_close_stream_socket(socket, true); case SHM_SOCKET_DGRAM: return _shm_close_dgram_socket(socket); default: return -1; } return -1; } int shm_socket_bind(shm_socket_t * socket, int port) { socket -> port = port; return 0; } int shm_listen(shm_socket_t* socket) { if(socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket"); } int port; hashtable_t *hashtable = mm_get_hashtable(); if(socket -> port == -1) { port = hashtable_alloc_key(hashtable); socket -> port = port; } else { if(hashtable_get(hashtable, socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", socket->port); } } socket->queue = new SHMQueue(socket->port, 16); socket->acceptQueue = new LockFreeQueue(16); socket->clientSocketMap = new std::map; socket->status = SHM_CONN_LISTEN; pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); return 0; } /** * 接受客户端建立新连接的请求 * */ shm_socket_t* shm_accept(shm_socket_t* socket) { if(socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket"); } hashtable_t *hashtable = mm_get_hashtable(); int client_port; shm_socket_t *client_socket; shm_msg_t src; if (socket->acceptQueue->pop(src) ) { // print_msg("===accept:", src); client_port = src.port; client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); client_socket->port = socket->port; // client_socket->queue= socket->queue; //初始化消息queue client_socket->messageQueue = new LockFreeQueue(16); //连接到对方queue client_socket->remoteQueue = _attach_remote_queue(client_port); socket->clientSocketMap->insert({client_port, client_socket}); /* * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题 */ //发送open_reply,回应客户端的connect请求 struct timespec timeout = {1, 0}; shm_msg_t msg; msg.port = socket->port; msg.size = 0; msg.type = SHM_SOCKET_OPEN_REPLY; if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) { client_socket->status = SHM_CONN_ESTABLISHED; return client_socket; } else { err_msg(0, "shm_accept: 发送open_reply失败"); return NULL; } } else { err_exit(errno, "shm_accept"); } return NULL; } int shm_connect(shm_socket_t* socket, int port) { if(socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket"); } hashtable_t *hashtable = mm_get_hashtable(); if(hashtable_get(hashtable, port)== NULL) { err_exit(0, "shm_connect:connect at port %d failed!", port); } if(socket->port == -1) { socket->port = hashtable_alloc_key(hashtable); } else { if(hashtable_get(hashtable, socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", socket->port); } } socket->queue = new SHMQueue(socket->port, 16); socket->remoteQueue = _attach_remote_queue(port); socket->messageQueue = new LockFreeQueue(16); //发送open请求 struct timespec timeout = {1, 0}; shm_msg_t msg; msg.port = socket->port; msg.size = 0; msg.type=SHM_SOCKET_OPEN; socket->remoteQueue->push_timeout(msg, &timeout); //接受open reply if(socket->queue->pop(msg)) { // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接 if(msg.type == SHM_SOCKET_OPEN_REPLY) { socket->status = SHM_CONN_ESTABLISHED; pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); } else { err_exit(0, "shm_connect: 不匹配的应答信息!"); } } else { err_exit(0, "connect failted!"); } return 0; } int shm_send(shm_socket_t *socket, const void *buf, const int size) { if(socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket"); } // hashtable_t *hashtable = mm_get_hashtable(); // if(socket->remoteQueue == NULL) { // err_msg(errno, "当前客户端无连接!"); // return -1; // } shm_msg_t dest; dest.type=SHM_COMMON_MSG; dest.port = socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); if(socket->remoteQueue->push(dest)) { return 0; } else { err_msg(errno, "connection has been closed!"); return -1; } } int shm_recv(shm_socket_t* socket, void **buf, int *size) { if(socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket"); } shm_msg_t src; if (socket->messageQueue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return -1; } } // 短连接方式发送 int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) { hashtable_t *hashtable = mm_get_hashtable(); if(socket->queue == NULL) { if(socket->port == -1) { socket->port = hashtable_alloc_key(hashtable); } else { if(hashtable_get(hashtable, socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", socket->port); } } socket->queue = new SHMQueue(socket->port, 16); } if (port == socket->port) { err_msg(0, "can not send to your self!"); return -1; } shm_msg_t dest; dest.type=SHM_COMMON_MSG; dest.port = socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); SHMQueue *remoteQueue = _attach_remote_queue(port); if(remoteQueue->push(dest)) { delete remoteQueue; return 0; } else { delete remoteQueue; err_msg(errno, "sendto port %d failed!", port); return -1; } } // 短连接方式接受 int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){ hashtable_t *hashtable = mm_get_hashtable(); if(socket->queue == NULL) { if(socket->port == -1) { socket->port = hashtable_alloc_key(hashtable); } else { if(hashtable_get(hashtable, socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", socket->port); } } socket->queue = new SHMQueue(socket->port, 16); } shm_msg_t src; printf("shm_recvfrom pop before"); if (socket->queue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; *port = src.port; mm_free(src.buf); printf("shm_recvfrom pop after"); return 0; } else { return -1; } } /** * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 */ SHMQueue * _attach_remote_queue(int port) { hashtable_t *hashtable = mm_get_hashtable(); if(hashtable_get(hashtable, port)== NULL) { err_exit(0, "_remote_queue_attach:connet at port %d failed!", port); return NULL; } SHMQueue *queue = new SHMQueue(port, 0); return queue; } void _server_close_conn_to_client(shm_socket_t* socket, int port) { shm_socket_t *client_socket; auto iter = socket->clientSocketMap->find(port); if( iter != socket->clientSocketMap->end() ) { socket->clientSocketMap->erase(iter); } //free((void *)client_socket); } /** * server端各种类型消息()在这里进程分拣 */ void * _server_run_msg_rev(void* _socket) { pthread_detach(pthread_self()); shm_socket_t* socket = (shm_socket_t*) _socket; struct timespec timeout = {1, 0}; shm_msg_t src; shm_socket_t *client_socket; std::map::iterator iter; while(socket->queue->pop(src)) { switch (src.type) { case SHM_SOCKET_OPEN : socket->acceptQueue->push_timeout(src, &timeout); break; case SHM_SOCKET_CLOSE : _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG : iter = socket->clientSocketMap->find(src.port); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; // print_msg("_server_run_msg_rev push before", src); client_socket->messageQueue->push_timeout(src, &timeout); // print_msg("_server_run_msg_rev push after", src); } break; default: err_msg(0, "socket.__shm_rev__: undefined message type."); } } return NULL; } void _client_close_conn_to_server(shm_socket_t* socket) { _shm_close_stream_socket(socket, false); } /** * client端的各种类型消息()在这里进程分拣 */ void * _client_run_msg_rev(void* _socket) { pthread_detach(pthread_self()); shm_socket_t* socket = (shm_socket_t*) _socket; struct timespec timeout = {1, 0}; shm_msg_t src; while(socket->queue->pop(src)) { switch (src.type) { case SHM_SOCKET_CLOSE : _client_close_conn_to_server(socket); break; case SHM_COMMON_MSG : socket->messageQueue->push_timeout(src, &timeout); break; default: err_msg(0, "socket.__shm_rev__: undefined message type."); } } return NULL; } int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) { socket->status = SHM_CONN_CLOSED; //给对方发送一个关闭连接的消息 struct timespec timeout = {1, 0}; shm_msg_t close_msg; close_msg.port = socket->port; close_msg.size = 0; close_msg.type=SHM_SOCKET_CLOSE; if(notifyRemote && socket->remoteQueue != NULL) { socket->remoteQueue->push_timeout(close_msg, &timeout); } if(socket->queue != NULL) { delete socket->queue; socket->queue = NULL; } if(socket->remoteQueue != NULL) { delete socket->remoteQueue; socket->remoteQueue = NULL; } if(socket->messageQueue != NULL) { delete socket->messageQueue; socket->messageQueue = NULL; } if(socket->acceptQueue != NULL) { delete socket->acceptQueue; socket->acceptQueue = NULL; } if(socket->clientSocketMap != NULL) { shm_socket_t *client_socket; for(auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) { client_socket= iter->second; client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; client_socket->remoteQueue=NULL; delete client_socket->messageQueue; client_socket->messageQueue=NULL; socket->clientSocketMap->erase(iter); free((void *)client_socket); } delete socket->clientSocketMap; } if(socket->dispatch_thread != 0) pthread_cancel(socket->dispatch_thread); free(socket); return 0; } int _shm_close_dgram_socket(shm_socket_t *socket){ if(socket->queue != NULL) { delete socket->queue; socket->queue = NULL; } free(socket); return 0; }