From 9e6ceaad059b2aec84df92c8750f6d87eab708c2 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 16 七月 2020 20:46:31 +0800
Subject: [PATCH] udpate
---
queue/libshm_queue.a | 0
test/communication.c | 100 +++++++++++++++++++++++-
queue/include/lock_free_queue.h | 18 +++-
queue/socket.c | 89 ++++++++++++---------
test/communication | 0
5 files changed, 158 insertions(+), 49 deletions(-)
diff --git a/queue/include/lock_free_queue.h b/queue/include/lock_free_queue.h
index 566c7ed..f9a4667 100644
--- a/queue/include/lock_free_queue.h
+++ b/queue/include/lock_free_queue.h
@@ -78,6 +78,7 @@
int items;
public:
+ int mutex;
LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
/// @brief destructor of the class.
@@ -150,6 +151,7 @@
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
slots = SemUtil::get(IPC_PRIVATE, qsize);
items = SemUtil::get(IPC_PRIVATE, 0);
+ mutex = SemUtil::get(IPC_PRIVATE, 1);
}
template <
@@ -198,7 +200,8 @@
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
if (SemUtil::dec(slots) == -1) {
- err_exit(errno, "push");
+ err_msg(errno, "LockFreeQueue push");
+ return false;
}
if ( m_qImpl.push(a_data) ) {
@@ -218,8 +221,11 @@
if (SemUtil::dec_nowait(slots) == -1) {
if (errno == EAGAIN)
return false;
- else
- err_exit(errno, "push_nowait");
+ else {
+ err_msg(errno, "LockFreeQueue push_nowait");
+ return false;
+ }
+
}
if ( m_qImpl.push(a_data)) {
@@ -240,8 +246,10 @@
if (SemUtil::dec_timeout(slots, timeout) == -1) {
if (errno == EAGAIN)
return false;
- else
- err_exit(errno, "push_timeout");
+ else {
+ err_msg(errno, "LockFreeQueue push_timeout");
+ return false;
+ }
}
if (m_qImpl.push(a_data)){
diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index 65416b5..553db5a 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/socket.c b/queue/socket.c
index 1f3ff24..a52d532 100644
--- a/queue/socket.c
+++ b/queue/socket.c
@@ -37,29 +37,37 @@
}
-int shm_close_socket(shm_socket_t *socket) {
+int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
//缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
struct timespec timeout = {1, 0};
shm_msg_t close_msg;
+
close_msg.port = socket->port;
close_msg.size = 0;
close_msg.type=SHM_SOCKET_CLOSE;
- if(socket->remoteQueue != NULL) {
+ if(notifyRemote && socket->remoteQueue != NULL) {
socket->remoteQueue->push_timeout(close_msg, &timeout);
}
-
-
- if(socket->queue != NULL)
+ if(socket->queue != NULL) {
delete socket->queue;
- if(socket->remoteQueue != NULL)
+ socket->queue = NULL;
+ }
+
+ if(socket->remoteQueue != NULL) {
delete socket->remoteQueue;
+ socket->remoteQueue = NULL;
+ }
- if(socket->messageQueue != NULL)
+ if(socket->messageQueue != NULL) {
delete socket->messageQueue;
+ socket->messageQueue = NULL;
+ }
- if(socket->acceptQueue != NULL)
+ if(socket->acceptQueue != NULL) {
delete socket->acceptQueue;
+ socket->acceptQueue = NULL;
+ }
if(socket->clientSocketMap != NULL) {
shm_socket_t *client_socket;
@@ -68,7 +76,9 @@
client_socket->remoteQueue->push_timeout(close_msg, &timeout);
delete client_socket->remoteQueue;
+ client_socket->remoteQueue=NULL;
delete client_socket->messageQueue;
+ client_socket->messageQueue=NULL;
socket->clientSocketMap->erase(iter);
free((void *)client_socket);
}
@@ -78,13 +88,16 @@
if(socket->dispatch_thread != 0)
pthread_cancel(socket->dispatch_thread);
-
-
free(socket);
return 0;
}
+
+int shm_close_socket(shm_socket_t *socket) {
+ return _shm_close_socket(socket, true);
+}
+
int shm_bind(shm_socket_t * socket, int port) {
@@ -119,12 +132,19 @@
shm_socket_t *client_socket;
auto iter = socket->clientSocketMap->find(port);
if( iter != socket->clientSocketMap->end() ) {
- client_socket= iter->second;
- delete client_socket->remoteQueue;
- delete client_socket->messageQueue;
+ // client_socket= iter->second;
+ // if(client_socket->remoteQueue != NULL) {
+ // delete client_socket->remoteQueue;
+ // client_socket->remoteQueue = NULL;
+ // }
+ // if(client_socket->messageQueue != NULL) {
+ // delete client_socket->messageQueue;
+ // client_socket->messageQueue = NULL;
+ // }
+
socket->clientSocketMap->erase(iter);
}
- free((void *)client_socket);
+ //free((void *)client_socket);
}
@@ -139,7 +159,6 @@
shm_socket_t *client_socket;
std::map<int, shm_socket_t* >::iterator iter;
while(socket->queue->pop(src)) {
-print_msg("=====_server_run_msg_rev:", src);
switch (src.type) {
case SHM_SOCKET_OPEN :
socket->acceptQueue->push_timeout(src, &timeout);
@@ -148,11 +167,9 @@
_server_close_conn_to_client(socket, src.port);
break;
case SHM_COMMON_MSG :
-err_msg(0, "===_server_run_msg_rev 1");
iter = socket->clientSocketMap->find(src.port);
if( iter != socket->clientSocketMap->end()) {
client_socket= iter->second;
-err_msg(0, "===_server_run_msg_rev client_socket->messageQueue=%p", client_socket->messageQueue);
client_socket->messageQueue->push_timeout(src, &timeout);
}
@@ -178,7 +195,7 @@
if (socket->acceptQueue->pop(src) ) {
-print_msg("===accept:", src);
+// print_msg("===accept:", src);
client_port = src.port;
client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
client_socket->port = socket->port;
@@ -228,20 +245,8 @@
}
void _client_close_conn_to_server(shm_socket_t* socket) {
- if(socket->queue != NULL)
- delete socket->queue;
- if(socket->remoteQueue != NULL)
- delete socket->remoteQueue;
-
- if(socket->messageQueue != NULL)
- delete socket->messageQueue;
-
- if(socket->acceptQueue != NULL)
- delete socket->acceptQueue;
-
- if(socket->dispatch_thread != 0)
- pthread_cancel(socket->dispatch_thread);
-
+
+ _shm_close_socket(socket, false);
}
@@ -283,24 +288,30 @@
dest.size = size;
dest.buf = mm_malloc(size);
memcpy(dest.buf, buf, size);
-
- socket->remoteQueue->push(dest);
- return 0;
+ if(socket->remoteQueue->push(dest)) {
+ return 0;
+ } else {
+ err_msg(errno, "connection has been closed!");
+ return -1;
+ }
+
+
}
int shm_recv(shm_socket_t* socket, void **buf, int *size) {
shm_msg_t src;
-err_msg(0, "====shm_recv socket ==%p", socket);
- bool rv = socket->messageQueue->pop(src);
- if (rv) {
+ if (socket->messageQueue->pop(src)) {
void * _buf = malloc(src.size);
memcpy(_buf, src.buf, src.size);
*buf = _buf;
*size = src.size;
mm_free(src.buf);
+ return 0;
+ } else {
+ return -1;
}
- return 0;
+
}
diff --git a/test/communication b/test/communication
index ca89adb..3a0e6b1 100755
--- a/test/communication
+++ b/test/communication
Binary files differ
diff --git a/test/communication.c b/test/communication.c
index a314dcb..16a9b05 100644
--- a/test/communication.c
+++ b/test/communication.c
@@ -1,5 +1,8 @@
#include "socket.h"
-
+typedef struct Targ {
+ int port;
+ int id;
+}Targ;
void * precess_client(void *_socket) {
pthread_detach(pthread_self());
@@ -7,13 +10,11 @@
int size;
void *recvbuf;
char sendbuf[512];
- while(true) {
- shm_recv(socket, &recvbuf, &size);
+ while (shm_recv(socket, &recvbuf, &size) == 0 ) {
sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
puts(sendbuf);
- shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+ shm_send(socket, sendbuf, strlen(sendbuf)+1);
shm_free(recvbuf);
-
}
shm_close_socket(socket);
}
@@ -51,6 +52,92 @@
shm_close_socket(socket);
}
+void client_send(shm_socket_t *socket, char *sendbuf) {
+
+ int size;
+ void *recvbuf;
+ shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+ shm_recv(socket, &recvbuf, &size);
+ printf("reply: %s\n", (char *)recvbuf);
+ shm_free(recvbuf);
+
+
+}
+
+
+void multyProcessorsClient(int port) {
+
+ int status, i = 0, processors = 4, scale = 100000;
+ pid_t productors[processors];
+ pid_t pid;
+ char sendbuf[512];
+ for ( i = 0; i < processors; i++) {
+ if ((productors[i] = fork()) == 0) /* Child runs user job */
+ {
+ shm_socket_t *socket = shm_open_socket();
+ shm_connect(socket, port);
+ while( scale-- > 0) {
+ sprintf(sendbuf, "processor(%d) %d", i, scale);
+ client_send(socket, sendbuf);
+ }
+ shm_close_socket(socket);
+ exit(0);
+ }
+ }
+
+ while ((pid = waitpid(-1, &status, 0)) > 0) {
+ if(WIFEXITED(status)) {
+ //fprintf(stderr, "child %d terminated normally with exit status=%d\n", pid, WEXITSTATUS(status));
+ }else
+ fprintf(stderr, "child %d terminated abnormally\n", pid);
+ }
+
+ if (errno != ECHILD)
+ perror("waitpid error");
+}
+
+void *threadrun(void *arg) {
+ Targ * targ = ( Targ * )arg;
+ int port = targ->port;
+ char sendbuf[512];
+ int scale = 100000;
+ int i;
+ shm_socket_t *socket = shm_open_socket();
+
+ shm_connect(socket, port);
+ for( i = 0; i<scale; i++) {
+ sprintf(sendbuf, "processor(%d) %d", targ->id, i);
+ client_send(socket, sendbuf);
+ }
+ shm_close_socket(socket);
+ return (void*)i;
+}
+
+void multyThreadClient(int port) {
+
+ int status, i = 0, processors = 2;
+ void *res[processors];
+ Targ *targs= (Targ*)calloc(processors, sizeof(Targ));
+ pthread_t tids[processors];
+ char sendbuf[512];
+ for ( i = 0; i < processors; i++) {
+ targs[i].port = port;
+ targs[i].id = i;
+ pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]);
+
+ }
+
+ for (i = 0; i< processors; i++) {
+ if(pthread_join(tids[i], &res[i])!=0) {
+ perror("multyThreadClient pthread_join");
+ } else {
+ fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+ }
+ }
+
+
+}
+
int main(int argc, char *argv[]) {
shm_init(512);
int port;
@@ -67,6 +154,9 @@
if (strcmp("client", argv[1]) == 0)
client(port);
+
+ if (strcmp("mclient", argv[1]) == 0)
+ multyThreadClient(port);
shm_destroy();
// fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
return 0;
--
Gitblit v1.8.0