From 4892aa7927086988e96918293272693a65d049b7 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 16 七月 2020 18:31:03 +0800
Subject: [PATCH] update
---
queue/libshm_queue.a | 0
test/Makefile | 2
test/communication.c | 73 +++++++++
queue/include/socket.h | 57 ++++++-
queue/socket.c.bk | 0
queue/socket.c | 327 ++++++++++++++++++++++++++++++++++++++++
test/communication | 0
7 files changed, 449 insertions(+), 10 deletions(-)
diff --git a/queue/include/socket.h b/queue/include/socket.h
index b19a7b4..f51c021 100644
--- a/queue/include/socket.h
+++ b/queue/include/socket.h
@@ -1,7 +1,14 @@
-#ifndef __BASIC_SHM_SOCKET_H__
-#define __BASIC_SHM_SOCKET_H__
+#ifndef __SHM_SOCKET_H__
+#define __SHM_SOCKET_H__
#include "usg_common.h"
+#include "usg_typedef.h"
+#include "shm_queue.h"
+#include "shm_allocator.h"
+
+#include "mem_pool.h"
+#include "hashtable.h"
+#include "sem_util.h"
#ifdef __cplusplus
extern "C" {
#endif
@@ -16,6 +23,36 @@
BUS = 6
};
+
+
+enum shm_msg_type_t
+{
+ SHM_SOCKET_OPEN = 1,
+ SHM_SOCKET_CLOSE = 2,
+ SHM_COMMON_MSG = 3
+
+};
+
+typedef struct shm_msg_t {
+ int port;
+ shm_msg_type_t type;
+ size_t size;
+ void * buf;
+
+} shm_msg_t;
+
+
+typedef struct shm_socket_t {
+ // 鏈湴port
+ int port;
+ SHMQueue<shm_msg_t> *queue;
+ SHMQueue<shm_msg_t> *remoteQueue;
+ LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
+ LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue;
+ std::map<int, shm_socket_t* > *clientSocketMap;
+ pthread_t dispatch_thread;
+
+} shm_socket_t;
/**
@@ -36,21 +73,23 @@
*/
void shm_free(void *buf);
-void *shm_open_socket(int mod);
+shm_socket_t *shm_open_socket();
-int shm_close_socket(void *socket) ;
+int shm_close_socket(shm_socket_t * socket) ;
-int shm_bind(void* socket, int port) ;
+int shm_bind(shm_socket_t * socket, int port) ;
-int shm_listen(void* socket) ;
+int shm_listen(shm_socket_t * socket) ;
-int shm_connect(void* socket, int port);
+shm_socket_t* shm_accept(shm_socket_t* socket);
-int shm_send(void *socket, void *buf, int size) ;
+int shm_connect(shm_socket_t * socket, int port);
-int shm_recv(void* socket, void **buf, int *size) ;
+int shm_send(shm_socket_t * socket, void *buf, int size) ;
+
+int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index 3fe8324..65416b5 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/socket.c b/queue/socket.c
new file mode 100644
index 0000000..1f3ff24
--- /dev/null
+++ b/queue/socket.c
@@ -0,0 +1,327 @@
+
+#include "socket.h"
+#include <map>
+
+
+
+
+void print_msg(char *head, shm_msg_t& msg) {
+ err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
+}
+
+void * _server_run_msg_rev(void* _socket);
+
+void * _client_run_msg_rev(void* _socket);
+
+SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
+
+void shm_init(int size) {
+ mem_pool_init(size);
+}
+
+void shm_destroy() {
+ mem_pool_destroy();
+}
+
+void shm_free(void *buf) {
+ free(buf);
+}
+
+shm_socket_t *shm_open_socket() {
+ shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
+
+ socket->port = -1;
+ socket->dispatch_thread = 0;
+
+ return socket;
+}
+
+
+int shm_close_socket(shm_socket_t *socket) {
+ //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
+ struct timespec timeout = {1, 0};
+ shm_msg_t close_msg;
+ close_msg.port = socket->port;
+ close_msg.size = 0;
+ close_msg.type=SHM_SOCKET_CLOSE;
+ if(socket->remoteQueue != NULL) {
+ socket->remoteQueue->push_timeout(close_msg, &timeout);
+ }
+
+
+
+ if(socket->queue != NULL)
+ delete socket->queue;
+ if(socket->remoteQueue != NULL)
+ delete socket->remoteQueue;
+
+ if(socket->messageQueue != NULL)
+ delete socket->messageQueue;
+
+ if(socket->acceptQueue != NULL)
+ delete socket->acceptQueue;
+
+ if(socket->clientSocketMap != NULL) {
+ shm_socket_t *client_socket;
+ for(auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) {
+ client_socket= iter->second;
+
+ client_socket->remoteQueue->push_timeout(close_msg, &timeout);
+ delete client_socket->remoteQueue;
+ delete client_socket->messageQueue;
+ socket->clientSocketMap->erase(iter);
+ free((void *)client_socket);
+ }
+ delete socket->clientSocketMap;
+ }
+
+
+ if(socket->dispatch_thread != 0)
+ pthread_cancel(socket->dispatch_thread);
+
+
+
+ free(socket);
+ return 0;
+
+}
+
+
+int shm_bind(shm_socket_t * socket, int port) {
+ shm_socket_t * _socket = (shm_socket_t *) socket;
+ _socket -> port = port;
+ return 0;
+}
+
+int shm_listen(shm_socket_t* socket) {
+ int port;
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(socket -> port == -1) {
+ port = hashtable_alloc_key(hashtable);
+ socket -> port = port;
+ } else {
+
+ if(hashtable_get(hashtable, socket->port)!= NULL) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ }
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ socket->clientSocketMap = new std::map<int, shm_socket_t* >;
+
+ pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
+
+ return 0;
+}
+
+void _server_close_conn_to_client(shm_socket_t* socket, int port) {
+ shm_socket_t *client_socket;
+ auto iter = socket->clientSocketMap->find(port);
+ if( iter != socket->clientSocketMap->end() ) {
+ client_socket= iter->second;
+ delete client_socket->remoteQueue;
+ delete client_socket->messageQueue;
+ socket->clientSocketMap->erase(iter);
+ }
+ free((void *)client_socket);
+
+}
+
+/**
+ * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
+ */
+void * _server_run_msg_rev(void* _socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t* socket = (shm_socket_t*) _socket;
+ struct timespec timeout = {1, 0};
+ shm_msg_t src;
+ shm_socket_t *client_socket;
+ std::map<int, shm_socket_t* >::iterator iter;
+ while(socket->queue->pop(src)) {
+print_msg("=====_server_run_msg_rev:", src);
+ switch (src.type) {
+ case SHM_SOCKET_OPEN :
+ socket->acceptQueue->push_timeout(src, &timeout);
+ break;
+ case SHM_SOCKET_CLOSE :
+ _server_close_conn_to_client(socket, src.port);
+ break;
+ case SHM_COMMON_MSG :
+err_msg(0, "===_server_run_msg_rev 1");
+ iter = socket->clientSocketMap->find(src.port);
+ if( iter != socket->clientSocketMap->end()) {
+ client_socket= iter->second;
+err_msg(0, "===_server_run_msg_rev client_socket->messageQueue=%p", client_socket->messageQueue);
+ client_socket->messageQueue->push_timeout(src, &timeout);
+ }
+
+ break;
+
+ default:
+ err_msg(0, "socket.__shm_rev__: undefined message type.");
+ }
+ }
+
+ return NULL;
+}
+
+
+
+
+
+shm_socket_t* shm_accept(shm_socket_t* socket) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ int client_port;
+ shm_socket_t *client_socket;
+ shm_msg_t src;
+
+ if (socket->acceptQueue->pop(src) ) {
+
+print_msg("===accept:", src);
+ client_port = src.port;
+ client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
+ client_socket->port = socket->port;
+ // client_socket->queue= socket->queue;
+ //鍒濆鍖栨秷鎭痲ueue
+ client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ //杩炴帴鍒板鏂筿ueue
+ client_socket->remoteQueue = _attach_remote_queue(client_port);
+
+ socket->clientSocketMap->insert({client_port, client_socket});
+
+ return client_socket;
+ } else {
+ err_exit(errno, "shm_accept");
+ }
+
+}
+
+
+int shm_connect(shm_socket_t* socket, int port) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(hashtable_get(hashtable, port)== NULL) {
+ err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
+ }
+ if(socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
+
+ if(hashtable_get(hashtable, socket->port)!= NULL) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ }
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->remoteQueue = new SHMQueue<shm_msg_t>(port, 0);
+ socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ struct timespec timeout = {1, 0};
+
+ shm_msg_t open_msg;
+ open_msg.port = socket->port;
+ open_msg.size = 0;
+ open_msg.type=SHM_SOCKET_OPEN;
+ socket->remoteQueue->push_timeout(open_msg, &timeout);
+
+ pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+ return 0;
+}
+
+void _client_close_conn_to_server(shm_socket_t* socket) {
+ if(socket->queue != NULL)
+ delete socket->queue;
+ if(socket->remoteQueue != NULL)
+ delete socket->remoteQueue;
+
+ if(socket->messageQueue != NULL)
+ delete socket->messageQueue;
+
+ if(socket->acceptQueue != NULL)
+ delete socket->acceptQueue;
+
+ if(socket->dispatch_thread != 0)
+ pthread_cancel(socket->dispatch_thread);
+
+}
+
+
+/**
+ * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
+ */
+void * _client_run_msg_rev(void* _socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t* socket = (shm_socket_t*) _socket;
+ struct timespec timeout = {1, 0};
+ shm_msg_t src;
+
+ while(socket->queue->pop(src)) {
+ switch (src.type) {
+
+ case SHM_SOCKET_CLOSE :
+ _client_close_conn_to_server(socket);
+ break;
+ case SHM_COMMON_MSG :
+ socket->messageQueue->push_timeout(src, &timeout);
+ break;
+ default:
+ err_msg(0, "socket.__shm_rev__: undefined message type.");
+ }
+ }
+
+ return NULL;
+}
+
+
+
+
+
+int shm_send(shm_socket_t *socket, void *buf, int size) {
+ // hashtable_t *hashtable = mm_get_hashtable();
+ shm_msg_t dest;
+ dest.type=SHM_COMMON_MSG;
+ dest.port = socket->port;
+ dest.size = size;
+ dest.buf = mm_malloc(size);
+ memcpy(dest.buf, buf, size);
+
+ socket->remoteQueue->push(dest);
+ return 0;
+}
+
+int shm_recv(shm_socket_t* socket, void **buf, int *size) {
+ shm_msg_t src;
+err_msg(0, "====shm_recv socket ==%p", socket);
+ bool rv = socket->messageQueue->pop(src);
+ if (rv) {
+ void * _buf = malloc(src.size);
+ memcpy(_buf, src.buf, src.size);
+ *buf = _buf;
+ *size = src.size;
+ mm_free(src.buf);
+ }
+
+ return 0;
+
+}
+
+
+/**
+ * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
+ */
+SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(hashtable_get(hashtable, port)== NULL) {
+ err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port);
+ return NULL;
+ }
+
+ SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
+ return queue;
+}
+
+
+
+
+
+
+
diff --git a/queue/socket.cb b/queue/socket.c.bk
similarity index 100%
rename from queue/socket.cb
rename to queue/socket.c.bk
diff --git a/test/Makefile b/test/Makefile
index b48f536..eee9f77 100755
--- a/test/Makefile
+++ b/test/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = test_queue single_productor single_consumer multiple_queue_productor multiple_queue_consumer test_timeout test_lostdata test_lockfree_queue
+PROGS = communication
build: $(PROGS)
diff --git a/test/communication b/test/communication
new file mode 100755
index 0000000..ca89adb
--- /dev/null
+++ b/test/communication
Binary files differ
diff --git a/test/communication.c b/test/communication.c
new file mode 100644
index 0000000..a314dcb
--- /dev/null
+++ b/test/communication.c
@@ -0,0 +1,73 @@
+#include "socket.h"
+
+
+void * precess_client(void *_socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t *socket = (shm_socket_t *)_socket;
+ int size;
+ void *recvbuf;
+ char sendbuf[512];
+ while(true) {
+ shm_recv(socket, &recvbuf, &size);
+ sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
+ puts(sendbuf);
+ shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+ shm_free(recvbuf);
+
+ }
+ shm_close_socket(socket);
+}
+
+void server(int port) {
+ pthread_t tid;
+ shm_socket_t *socket = shm_open_socket();
+ shm_bind(socket, port);
+ shm_listen(socket);
+ shm_socket_t *client_socket;
+ while(true) {
+ client_socket = shm_accept(socket);
+printf("server messageQueue = %p\n", client_socket->messageQueue);
+ pthread_create(&tid, NULL, precess_client , (void *)client_socket);
+ }
+
+
+}
+
+void client(int port) {
+ shm_socket_t *socket = shm_open_socket();
+ shm_connect(socket, port);
+ int size;
+ void *recvbuf;
+ char sendbuf[512];
+ while(true) {
+ printf("request: ");
+ scanf("%s", sendbuf);
+ shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+ shm_recv(socket, &recvbuf, &size);
+ printf("reply: %s\n", (char *)recvbuf);
+ shm_free(recvbuf);
+
+ }
+ shm_close_socket(socket);
+}
+
+int main(int argc, char *argv[]) {
+ shm_init(512);
+ int port;
+ if (argc < 3) {
+ fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+ return 1;
+ }
+
+ port = atoi(argv[2]);
+
+ if (strcmp("server", argv[1]) == 0 ) {
+ server(port);
+ }
+
+ if (strcmp("client", argv[1]) == 0)
+ client(port);
+ shm_destroy();
+ // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
+ return 0;
+}
\ No newline at end of file
--
Gitblit v1.8.0