From f85c9b875b060681b51f57b15074ba1c7c9f5636 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 20 七月 2020 11:10:02 +0800
Subject: [PATCH] update
---
queue/mod_socket.c | 200 ++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 200 insertions(+), 0 deletions(-)
diff --git a/queue/mod_socket.c b/queue/mod_socket.c
index f73dcba..cc358f6 100644
--- a/queue/mod_socket.c
+++ b/queue/mod_socket.c
@@ -1,2 +1,202 @@
#include "mod_socket.h"
+#include "shm_socket.h"
+#include "usg_common.h"
+#include "logger_factory.h"
+static Logger logger = LoggerFactory::getLogger();
+
+typedef struct mod_entry_t
+{
+ int size;
+ void *buf;
+ shm_socket_t *client_socket;
+}mod_entry_t;
+
+typedef struct mod_socket_t {
+ socket_mod_t mod;
+ shm_socket_t *shm_socket;
+ shm_socket_t *client_socket;
+ int is_server;
+ LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue;
+ int slots;
+ int items;
+
+
+} mod_socket_t;
+
+/**
+ *
+ */
+void *mod_open_socket(int mod) {
+ mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
+ socket->shm_socket=shm_open_socket();
+ socket->is_server = 0;
+ socket->mod = (socket_mod_t)mod;
+ socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
+ if (mod == REQ_REP) {
+ socket->slots = SemUtil::get(IPC_PRIVATE, 1);
+ socket->items = SemUtil::get(IPC_PRIVATE, 0);
+ }
+
+ return (void *)socket;
+}
+
+
+
+int mod_close_socket(void * _socket){
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+
+ if (socket->mod == REQ_REP) {
+ SemUtil::remove(socket->slots);
+ SemUtil::remove(socket->items);
+ }
+
+ int rv = shm_close_socket(socket->shm_socket);
+ free(_socket);
+ return rv;
+}
+
+int mod_get_socket_port(void * _socket) {
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ return socket->shm_socket->port;
+}
+
+
+int mod_socket_bind(void * _socket, int port){
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ return shm_socket_bind(socket->shm_socket, port);
+}
+
+void * run_server_recv_client_msg(void *_socket) {
+ pthread_detach(pthread_self());
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ shm_socket_t * client_socket = socket->client_socket;
+
+ mod_entry_t entry;
+ entry.client_socket = client_socket;
+ while (socket->shm_socket->status == SHM_CONN_LISTEN &&
+ client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) {
+
+ socket->recvQueue->push(entry);
+ // shm_free(recvbuf);
+ }
+
+ free(_socket);
+ shm_close_socket(client_socket);
+ return NULL;
+}
+
+void *run_accept_connection(void * _socket) {
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ shm_socket_t *client_socket;
+ pthread_t tid;
+ while(socket->shm_socket->status == SHM_CONN_LISTEN) {
+ //鎺ュ彈瀹㈡埛绔殑杩炴帴璇锋眰
+ client_socket = shm_accept(socket->shm_socket);
+
+ mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
+ memcpy(arg, _socket, sizeof(mod_socket_t));
+ arg->client_socket = client_socket;
+ pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg);
+ }
+ return NULL;
+}
+
+int mod_listen(void * _socket) {
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ pthread_t tid;
+ socket->is_server = 1;
+ int rv = shm_listen(socket->shm_socket);
+ if(rv == 0) {
+ pthread_create(&tid, NULL, run_accept_connection, _socket);
+ return 0;
+ }
+ return -1;
+}
+
+
+int mod_connect(void * _socket, int port) {
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ return shm_connect(socket->shm_socket, port);
+
+}
+
+int mod_send(void * _socket, const void *buf, const int size) {
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
+ std::map<int, shm_socket_t* >::iterator iter;
+ int rv;
+ if(socket->is_server ) {
+ switch(socket->mod) {
+ case REQ_REP:
+logger.debug("mod_send before");
+ SemUtil::dec(socket->items);
+ rv = shm_send(socket->client_socket, buf, size);
+ SemUtil::inc(socket->slots);
+logger.debug("mod_send after");
+ break;
+ case SURVEY:
+ case PUB_SUB:
+ for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
+ rv = shm_send(iter->second, buf, size);
+ }
+ break;
+ default:
+ rv = shm_send(socket->client_socket, buf, size);
+ }
+ return rv;
+
+ }
+ else {
+logger.debug("mod_send before");
+ rv = shm_send(socket->shm_socket, buf, size);
+logger.debug("mod_send after");
+ return rv;
+ }
+ return -1;
+
+}
+
+int mod_recv(void * _socket, void **buf, int *size) {
+ mod_socket_t * socket = (mod_socket_t *) _socket;
+ mod_entry_t entry;
+
+ if(socket->is_server ) {
+ switch(socket->mod) {
+ case REQ_REP:
+logger.debug("REQ_REP mod_recv before");
+ SemUtil::dec(socket->slots);
+ socket->recvQueue->pop(entry);
+ *buf = entry.buf;
+ *size = entry.size;
+ socket->client_socket = entry.client_socket;
+ SemUtil::inc(socket->items);
+
+logger.debug("REQ_REP mod_recv after");
+ break;
+ case PUB_SUB:
+ break;
+ case SURVEY:
+ default:
+ socket->recvQueue->pop(entry);
+ *buf = entry.buf;
+ *size = entry.size;
+
+ }
+
+ return 0;
+ }
+ else {
+logger.debug("mod_recv before");
+ shm_recv(socket->shm_socket, buf, size);
+logger.debug("mod_recv after");
+ return 0;
+ }
+
+ return -1;
+}
+
+
+void mod_free(void *buf) {
+ free(buf);
+}
--
Gitblit v1.8.0