From 5e3e6719f7d7922decdc16d2313baf2e94210750 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 17 七月 2020 18:29:11 +0800
Subject: [PATCH] pub_sub finished
---
queue/libshm_queue.a | 0
queue/shm_socket.c | 4 ++
test2/pub_sub.c | 40 +++++++++-----------
queue/mod_socket.c | 63 ++++++++++++++++++++++---------
test2/Makefile | 2
test2/pub_sub | 0
test2/req_rep | 0
7 files changed, 67 insertions(+), 42 deletions(-)
diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index ad2d091..f082fa6 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/mod_socket.c b/queue/mod_socket.c
index fe098f5..80b5525 100644
--- a/queue/mod_socket.c
+++ b/queue/mod_socket.c
@@ -112,14 +112,29 @@
int mod_send(void * _socket, void *buf, int size) {
mod_socket_t * socket = (mod_socket_t *) _socket;
- if(!socket->is_server ) {
- return shm_send(socket->shm_socket, buf, size);
+ std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
+ std::map<int, shm_socket_t* >::iterator iter;
+ int rv;
+ if(socket->is_server ) {
+ switch(socket->mod) {
+ case REQ_REP:
+ SemUtil::dec(socket->items);
+ rv = shm_send(socket->client_socket, buf, size);
+ SemUtil::inc(socket->slots);
+ break;
+ case PUB_SUB:
+ for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
+ rv = shm_send(iter->second, buf, size);
+ }
+ break;
+ default:
+ err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod);
+ }
+ return rv;
+
}
- else if(socket->mod == REQ_REP) {
- SemUtil::dec(socket->items);
- shm_send(socket->client_socket, buf, size);
- SemUtil::inc(socket->slots);
- return 0;
+ else {
+ return shm_send(socket->shm_socket, buf, size);
}
return -1;
@@ -128,22 +143,32 @@
int mod_recv(void * _socket, void **buf, int *size) {
mod_socket_t * socket = (mod_socket_t *) _socket;
mod_entry_t entry;
+ int rv;
- if(!socket->is_server ) {
+ if(socket->is_server ) {
+ switch(socket->mod) {
+ case REQ_REP:
+ SemUtil::dec(socket->slots);
+ rv = socket->recvQueue->pop(entry);
+ *buf = entry.buf;
+ *size = entry.size;
+ socket->client_socket = entry.client_socket;
+ SemUtil::inc(socket->items);
+ break;
+ case PUB_SUB:
+ rv = 0;
+ break;
+ default:
+ err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod);
+ }
+
+ return rv;
+ }
+ else {
return shm_recv(socket->shm_socket, buf, size);
}
- else if(socket->mod == REQ_REP) {
- SemUtil::dec(socket->slots);
- socket->recvQueue->pop(entry);
- *buf = entry.buf;
- *size = entry.size;
- socket->client_socket = entry.client_socket;
- SemUtil::inc(socket->items);
- return 0;
- }
+
return -1;
-
-
}
diff --git a/queue/shm_socket.c b/queue/shm_socket.c
index e0bdac6..27c5b0a 100644
--- a/queue/shm_socket.c
+++ b/queue/shm_socket.c
@@ -308,6 +308,10 @@
int shm_send(shm_socket_t *socket, void *buf, int size) {
// hashtable_t *hashtable = mm_get_hashtable();
+ if(socket->remoteQueue == NULL) {
+ err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
+ return -1;
+ }
shm_msg_t dest;
dest.type=SHM_COMMON_MSG;
dest.port = socket->port;
diff --git a/test2/Makefile b/test2/Makefile
index e8c4421..5828f23 100644
--- a/test2/Makefile
+++ b/test2/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = req_rep
+PROGS = req_rep pub_sub
build: $(PROGS)
diff --git a/test2/pub_sub b/test2/pub_sub
new file mode 100755
index 0000000..19773d9
--- /dev/null
+++ b/test2/pub_sub
Binary files differ
diff --git a/test2/pub_sub.c b/test2/pub_sub.c
index 669783f..cf2b507 100644
--- a/test2/pub_sub.c
+++ b/test2/pub_sub.c
@@ -1,41 +1,37 @@
-#include "socket.h"
+#include "mod_socket.h"
+#include "shm_mm.h"
+#include "usg_common.h"
void server(int port) {
- void *socket = shm_open_socket(PUB_SUB);
- shm_bind(socket, port);
- shm_listen(socket);
+ void *socket = mod_open_socket(PUB_SUB);
+ mod_socket_bind(socket, port);
+ mod_listen(socket);
int size;
void *recvbuf;
char sendbuf[512];
while(true) {
- shm_recv(socket, &recvbuf, &size);
- sprintf(sendbuf, "pub: %s", recvbuf);
- puts(sendbuf);
- shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
- shm_free(recvbuf);
+ printf("璇疯緭鍏ュ彂甯冩秷鎭�:");
+ scanf("%s", sendbuf);
+ mod_send(socket, sendbuf, strlen(sendbuf)+1) ;
+ free(recvbuf);
}
- shm_close_socket(socket);
+ mod_close_socket(socket);
}
void client(int port) {
- void *socket = shm_open_socket(PUB_SUB);
- shm_connect(socket, port);
+ void *socket = mod_open_socket(PUB_SUB);
+ mod_connect(socket, port);
int size;
void *recvbuf;
- char sendbuf[512];
-
- sprintf(sendbuf, "sub");
- shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
- while(true) {
-
- shm_recv(socket, &recvbuf, &size);
- printf("received sub message: %s\n", (char *)recvbuf);
- shm_free(recvbuf);
+
+ while(mod_recv(socket, &recvbuf, &size) == 0) {
+ printf("鏀跺埌璁㈤槄娑堟伅: %s\n", (char *)recvbuf);
+ free(recvbuf);
}
- shm_close_socket(socket);
+ mod_close_socket(socket);
}
int main(int argc, char *argv[]) {
diff --git a/test2/req_rep b/test2/req_rep
index bd9df43..e3e7f24 100755
--- a/test2/req_rep
+++ b/test2/req_rep
Binary files differ
--
Gitblit v1.8.0