From 7032fedd41386f8a0b779d234620b473d978f889 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 17 七月 2020 17:43:18 +0800
Subject: [PATCH] req_rep finished

---
 queue/mod_socket.c |  151 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 151 insertions(+), 0 deletions(-)

diff --git a/queue/mod_socket.c b/queue/mod_socket.c
index f73dcba..fe098f5 100644
--- a/queue/mod_socket.c
+++ b/queue/mod_socket.c
@@ -1,2 +1,153 @@
 #include "mod_socket.h"
+#include "shm_socket.h"
+#include "usg_common.h"
+typedef struct mod_entry_t
+{
+	int size;
+	void *buf;
+	shm_socket_t *client_socket;
+}mod_entry_t;
+
+typedef struct mod_socket_t {
+  socket_mod_t mod;
+  shm_socket_t *shm_socket;
+  shm_socket_t *client_socket;
+  int is_server;
+  LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue;
+  int slots;
+  int items;
+  
+
+} mod_socket_t;
+
+/**
+ * 
+ */
+void *mod_open_socket(int mod) {
+  mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
+  socket->shm_socket=shm_open_socket();
+  socket->is_server = 0;
+  socket->mod = (socket_mod_t)mod;
+  socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
+  if (mod == REQ_REP) {
+    socket->slots = SemUtil::get(IPC_PRIVATE, 1);
+    socket->items = SemUtil::get(IPC_PRIVATE, 0);
+  }
+
+  return (void *)socket;
+}
+
+
+
+int mod_close_socket(void * _socket){
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+
+	if (socket->mod == REQ_REP) {
+		SemUtil::remove(socket->slots);
+    SemUtil::remove(socket->items);
+	}
+	
+	int rv = shm_close_socket(socket->shm_socket);
+	free(_socket);
+	return rv;
+}
+
+
+int mod_socket_bind(void * _socket, int port){
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	return  shm_soket_bind(socket->shm_socket, port);
+}
+
+void * run_server_recv_client_msg(void *_socket) {
+	pthread_detach(pthread_self());
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	shm_socket_t * client_socket = socket->client_socket;
+
+	mod_entry_t entry;
+	entry.client_socket = client_socket;
+	while (socket->shm_socket->status == SHM_CONN_LISTEN &&
+		client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) {
+
+		socket->recvQueue->push(entry);
+		// shm_free(recvbuf);
+	}
+	free(_socket);
+	shm_close_socket(client_socket);
+	return NULL;
+}
+
+void *run_accept_connection(void * _socket) {
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	shm_socket_t *client_socket;
+	pthread_t tid;
+	while(socket->shm_socket->status == SHM_CONN_LISTEN) {
+		client_socket = shm_accept(socket->shm_socket);
+		
+		mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
+		memcpy(arg, _socket, sizeof(mod_socket_t));
+		arg->client_socket = client_socket;
+		pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg);
+	}
+	return NULL;
+}
+
+int mod_listen(void * _socket) {
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	pthread_t tid;
+	socket->is_server = 1;
+	int rv = shm_listen(socket->shm_socket);
+	if(rv == 0) {
+		pthread_create(&tid, NULL, run_accept_connection, _socket);
+		return 0;
+	}
+	return -1;
+}
+
+
+int mod_connect(void * _socket, int port) {
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	return shm_connect(socket->shm_socket, port);
+
+}
+
+int mod_send(void * _socket, void *buf, int size) {
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	if(!socket->is_server ) {
+		return shm_send(socket->shm_socket, buf, size);
+	}
+	else if(socket->mod == REQ_REP) {
+		SemUtil::dec(socket->items);
+		shm_send(socket->client_socket, buf, size);
+		SemUtil::inc(socket->slots);
+		return 0;
+	}
+	return -1;
+	
+}
+
+int mod_recv(void * _socket, void **buf, int *size) {
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	mod_entry_t entry;
+
+	if(!socket->is_server ) {
+		return shm_recv(socket->shm_socket, buf, size);
+	}
+	else if(socket->mod == REQ_REP) {
+		SemUtil::dec(socket->slots);
+		socket->recvQueue->pop(entry);
+		*buf = entry.buf;
+		*size = entry.size;
+		socket->client_socket = entry.client_socket;
+		SemUtil::inc(socket->items);
+		return 0;
+	}
+	return -1;
+
+
+}
+
+
+void mod_free(void *buf) {
+	free(buf);
+}
 

--
Gitblit v1.8.0