From 2a4e4619f34a742e36693e589e0431347a72979b Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 13 十月 2020 17:36:32 +0800
Subject: [PATCH] update
---
src/socket/net_mod_socket.c | 90 ++++++++++++++++-
src/libshm_queue.a | 0
src/socket/shm_socket.c | 9 +
src/logger_factory.c | 3
src/queue/lock_free_queue.h | 2
src/socket/net_mod_socket.h | 6 +
src/socket/shm_socket.h | 1
src/socket/shm_mod_socket.c | 4
src/queue/mm.c | 4
src/socket/shm_mod_socket.h | 2
test_net_socket/net_mod_req_rep.sh | 4
src/socket/mod_socket.c | 2
src/queue/shm_queue.h | 4
src/socket/net_mod_server_socket.h | 8 +
test_net_socket/net_mod_req_rep.c | 74 +++++++++++---
src/queue/linked_lock_free_queue.h | 2
src/socket/net_mod_server_socket.c | 59 ++++++++---
src/logger_factory.h | 14 ++
18 files changed, 223 insertions(+), 65 deletions(-)
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index c9f1c38..8260732 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/logger_factory.c b/src/logger_factory.c
new file mode 100644
index 0000000..c50fb54
--- /dev/null
+++ b/src/logger_factory.c
@@ -0,0 +1,3 @@
+#include "logger_factory.h"
+
+Logger * LoggerFactory::logger = NULL;
\ No newline at end of file
diff --git a/src/logger_factory.h b/src/logger_factory.h
index 34aef50..6bbaef0 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -3,11 +3,21 @@
#include "logger.h"
class LoggerFactory {
+private:
+ static Logger *logger;
+
public:
- static Logger getLogger() {
+ static Logger* getLogger() {
//ERROR ALL DEBUG INFO WARN
- static Logger logger(Logger::WARN);
+ if(logger != NULL)
+ return logger;
+
+ LoggerConfig config;
+ config.level = Logger::DEBUG;
+ config.logFile = "softbus.log";
+ config.console = 1;
+ logger = new Logger(config);
return logger;
}
};
diff --git a/src/queue/linked_lock_free_queue.h b/src/queue/linked_lock_free_queue.h
index 3906a42..af7f1ff 100644
--- a/src/queue/linked_lock_free_queue.h
+++ b/src/queue/linked_lock_free_queue.h
@@ -98,7 +98,7 @@
template <typename T>
LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
{
- LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory");
+ LoggerFactory::getLogger()->debug("LinkedLockFreeQueue destory");
Node<T> * nodeptr;
Pointer<T> tmp = Head.load(std::memory_order_relaxed);
while((nodeptr = tmp.ptr) != NULL) {
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 17e8c56..281b7e5 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -160,7 +160,7 @@
template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
- LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
+ LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
SemUtil::remove(slots);
SemUtil::remove(items);
SemUtil::remove(mutex);
diff --git a/src/queue/mm.c b/src/queue/mm.c
index 39aca9e..f09fbab 100644
--- a/src/queue/mm.c
+++ b/src/queue/mm.c
@@ -402,13 +402,13 @@
if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) {
- //LoggerFactory::getLogger().debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
+ //LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
if(shmid_ds.shm_nattch == 0) {
//remove shared memery
if (shmctl(shmid, IPC_RMID, 0) == -1)
err_exit(errno, "mm_destroy shmctl IPC_RMID");
else
- LoggerFactory::getLogger().debug("shared memory destroy\n");
+ LoggerFactory::getLogger()->debug("shared memory destroy\n");
SemUtil::inc(mutex);
SemUtil::remove(mutex);
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 5c82b05..7c7b89b 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -115,7 +115,7 @@
hashtable_put(hashtable, key, (void *)queue);
}
queue->reference++;
- LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
+ // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
@@ -126,7 +126,7 @@
SemUtil::dec(queue->mutex);
queue->reference--;
- // LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d",
+ // LoggerFactory::getLogger()->debug("SHMQueue destructor reference===%d",
if (queue->reference.load() == 0) {
delete queue;
queue = NULL;
diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c
index 7629621..b5a686a 100644
--- a/src/socket/mod_socket.c
+++ b/src/socket/mod_socket.c
@@ -7,7 +7,7 @@
#include "sem_util.h"
#include "logger_factory.h"
-static Logger logger = LoggerFactory::getLogger();
+static Logger *logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
{
diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index 1cd3838..73a2a5a 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -4,7 +4,7 @@
#include "net_mod_socket_io.h"
#include "net_mod_socket.h"
-NetModServerSocket::NetModServerSocket(int port):max_buf(1024)
+NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256)
{
char portstr[32];
@@ -17,12 +17,18 @@
if(buf == NULL) {
err_exit(errno, "process_client malloc");
}
+
+ topic_buf = malloc(max_topic_buf);
+ if(topic_buf == NULL) {
+ err_exit(errno, "process_client malloc");
+ }
}
NetModServerSocket::~NetModServerSocket() {
Close(listenfd);
- fee(buf);
+ free(buf);
+ free(topic_buf);
}
void NetModServerSocket::start() {
@@ -72,7 +78,7 @@
{
/* Add connected descriptor to the pool */
pool.clientfd[i] = connfd; //line:conc:echoservers:beginaddclient
- Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
+ // Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
/* Add the descriptor to descriptor set */
FD_SET(connfd, &pool.read_set); //line:conc:echoservers:addconnfd
@@ -90,8 +96,7 @@
/* $end add_client */
-int NetModServerSocket::process_client(rio_t *rio, int connfd) {
- int n;
+int NetModServerSocket::process_client(int connfd) {
net_mod_request_head_t request_head;
net_mod_response_head_t response_head;
char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH];
@@ -99,15 +104,15 @@
int recv_size;
- if(buf == NULL) {
- buf = malloc(max_buf);
- if(buf == NULL) {
- err_exit(errno, "process_client malloc");
- }
- }
+ // if(buf == NULL) {
+ // buf = malloc(max_buf);
+ // if(buf == NULL) {
+ // err_exit(errno, "process_client malloc");
+ // }
+ // }
- if (rio_readnb(rio, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH)
+ if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH)
{
return -1;
}
@@ -118,11 +123,12 @@
buf = realloc(buf, request_head.content_length);
max_buf = request_head.content_length;
if(buf == NULL) {
- err_exit(errno, "process_client realloc");
+ LoggerFactory::getLogger()->error(errno, "process_client realloc");
+ exit(1);
}
}
- if ((n = rio_readnb(rio, buf, request_head.content_length)) != request_head.content_length ) {
+ if (rio_readn(connfd, buf, request_head.content_length) != request_head.content_length ) {
return -1;
}
@@ -131,6 +137,21 @@
response_head.content_length = recv_size;
Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
Rio_writen(connfd, recv_buf, recv_size);
+ } else if(request_head.mod == BUS) {
+ if(request_head.topic_length > max_topic_buf) {
+ topic_buf = realloc(topic_buf, request_head.topic_length);
+ max_topic_buf = request_head.topic_length;
+ if(topic_buf == NULL) {
+ LoggerFactory::getLogger()->error(errno, "process_client realloc");
+ exit(1);
+ }
+ }
+
+ if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
+ return -1;
+ }
+ LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
+ shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
}
return 0;
@@ -141,22 +162,22 @@
void NetModServerSocket::check_clients()
{
int i, connfd;
- rio_t *rio;
+ //rio_t *rio;
for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
{
connfd = pool.clientfd[i];
- rio = &pool.clientrio[i];
+ //rio = &pool.clientrio[i];
/* If the descriptor is ready, echo a text line from it */
if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set)))
{
pool.nready--;
- if(process_client(rio, connfd) != 0) {
+ if(process_client(connfd) != 0) {
Close(connfd); //line:conc:echoservers:closeconnfd
- FD_CLR(connfd, &pool.read_set); //line:conc:echoservers:beginremove
- pool.clientfd[i] = -1; //line:conc:echoservers:endremove
+ FD_CLR(connfd, &pool.read_set);
+ pool.clientfd[i] = -1;
}
}
diff --git a/src/socket/net_mod_server_socket.h b/src/socket/net_mod_server_socket.h
index 13e43f7..4ec5b90 100644
--- a/src/socket/net_mod_server_socket.h
+++ b/src/socket/net_mod_server_socket.h
@@ -23,7 +23,7 @@
int nready; /* Number of ready descriptors from select */
int maxi; /* Highwater index into client array */
int clientfd[FD_SETSIZE]; /* Set of active descriptors */
- rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
+ // rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
} ;
private:
@@ -31,13 +31,15 @@
ShmModSocket shmModSocket;
pool pool;
- void *buf = NULL;
+ void *buf;
+ void *topic_buf;
size_t max_buf;
+ size_t max_topic_buf;
void init_pool(int listenfd);
void add_client(int connfd);
void check_clients();
- int process_client(rio_t *rio, int connfd);
+ int process_client(int connfd);
public:
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index fdce4b5..c123c87 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -12,7 +12,13 @@
NetModSocket::~NetModSocket() {
-
+ rio_t * rio;
+ for (auto map_iter = connectionMap.begin(); map_iter != connectionMap.end(); map_iter++) {
+ rio = map_iter->second;
+ if(rio != NULL) {
+ free(rio);
+ }
+ }
}
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
@@ -25,7 +31,7 @@
void *recv_buf;
int recv_size;
char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
- net_mod_request_head_t request_head;
+ net_mod_request_head_t request_head = {};
net_mod_response_head_t response_head;
std::map<std::string, rio_t*>::iterator mapIter;
rio_t *rio;
@@ -54,28 +60,34 @@
request_head.mod = REQ_REP;
request_head.key = node->key;
request_head.content_length = send_size;
+ request_head.topic_length = 0;
if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) {
- err_exit(errno, "NetModSocket::send head rio_writen");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::send head rio_writen");
+ exit(1);
}
if(rio_writen(rio->rio_fd, send_buf, send_size) != send_size ) {
- err_exit(errno, "NetModSocket::send conent rio_writen");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
+ exit(1);
}
if ( rio_readnb(rio, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) {
- err_exit(errno, "NetModSocket::send rio_readnb");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb");
+ exit(1);
}
response_head = NetModSocket::decode_response_head(response_head_bs);
recv_buf = malloc(response_head.content_length);
if(recv_buf == NULL) {
- err_exit(errno, "NetModSocket::send malloc");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
+ exit(1);
}
if ( (recv_size = rio_readnb(rio, recv_buf, response_head.content_length) ) != response_head.content_length) {
- err_exit(errno, "NetModSocket::send rio_readnb");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb");
+ exit(1);
}
LABEL_ARR_PUSH:
@@ -86,12 +98,70 @@
ret_arr[i].content_length = recv_size;
}
*recv_arr = ret_arr;
- *recv_arr_size = i;
-
+ if(recv_arr_size != NULL) {
+ *recv_arr_size = i;
+ }
+
return i;
}
+// int pub(char *topic, int topic_size, void *content, int content_size, int port);
+
+int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
+ int i, n, clientfd;
+ char portstr[32];
+ net_node_t *node;
+ char mapKey[256];
+ void *recv_buf;
+ int recv_size;
+ char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
+ net_mod_request_head_t request_head;
+ net_mod_response_head_t response_head;
+ std::map<std::string, rio_t*>::iterator mapIter;
+ rio_t *rio;
+ for (i = 0; i< arrlen; i++) {
+
+ node = &node_arr[i];
+ if(node->host == NULL) {
+ // 鏈湴鍙戦��
+ shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+
+ } else {
+ sprintf(mapKey, "%s:%d", node->host, node->port);
+ if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) {
+ rio = mapIter->second;
+ } else {
+ rio = (rio_t *)malloc(sizeof(rio_t));
+ sprintf(portstr, "%d", node->port);
+ clientfd = Open_clientfd(node-> host, portstr);
+ Rio_readinitb(rio, clientfd);
+ connectionMap.insert({mapKey, rio});
+ }
+
+ request_head.mod = BUS;
+ request_head.key = node->key;
+ request_head.content_length = content_size;
+ request_head.topic_length = strlen(topic) + 1;
+ if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::pub head rio_writen");
+ exit(1);
+
+ }
+
+ if(rio_writen(rio->rio_fd, content, content_size) != content_size ) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
+ exit(1);
+ }
+
+ if(rio_writen(rio->rio_fd, topic, request_head.topic_length) != request_head.topic_length ) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
+ exit(1);
+ }
+ }
+ }
+ return i;
+}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
@@ -112,6 +182,7 @@
PUT(head, htonl(request.mod));
PUT(head + 4, htonl(request.key));
PUT(head + 8, htonl(request.content_length));
+ PUT(head + 12, htonl(request.topic_length));
return head;
}
@@ -121,6 +192,7 @@
head.mod = ntohl(GET(headbs));
head.key = ntohl(GET(headbs + 4));
head.content_length = ntohl(GET(headbs + 8));
+ head.topic_length = ntohl(GET(headbs + 12));
return head;
}
diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h
index a155c29..d296007 100644
--- a/src/socket/net_mod_socket.h
+++ b/src/socket/net_mod_socket.h
@@ -7,7 +7,7 @@
#define GET(p) (*(uint32_t *)(p))
#define PUT(p, val) (*(uint32_t *)(p) = (val))
-#define NET_MODE_REQUEST_HEAD_LENGTH 12
+#define NET_MODE_REQUEST_HEAD_LENGTH 16
#define NET_MODE_RESPONSE_HEAD_LENGTH 4
struct net_node_t
@@ -22,6 +22,7 @@
uint32_t mod;
uint32_t key;
uint32_t content_length;
+ uint32_t topic_length;
};
struct net_mod_response_head_t {
@@ -49,8 +50,11 @@
public:
NetModSocket();
+
int sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** resp_arr, int *resp_arr_size);
+
+ int pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size);
~NetModSocket();
diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c
index 2c5821a..bd60993 100644
--- a/src/socket/shm_mod_socket.c
+++ b/src/socket/shm_mod_socket.c
@@ -314,7 +314,7 @@
SHMTopicSubMap::iterator map_iter;
SHMKeySet::iterator set_iter;
-printf("_proxy_sub topic = %s\n", topic);
+//printf("_proxy_sub topic = %s\n", topic);
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
} else {
@@ -521,4 +521,4 @@
return 1;
}
-
\ No newline at end of file
+
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 50ef4ef..885d1aa 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -14,7 +14,7 @@
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
-static Logger logger = LoggerFactory::getLogger();
+static Logger *logger = LoggerFactory::getLogger();
#define BUS_MAP_KEY 1
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index ea2f674..362e5e8 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -3,7 +3,7 @@
#include "logger_factory.h"
#include <map>
-static Logger logger = LoggerFactory::getLogger();
+static Logger *logger = LoggerFactory::getLogger();
@@ -43,7 +43,7 @@
socket->force_bind = false;
socket->dispatch_thread = 0;
socket->status = SHM_CONN_CLOSED;
-
+ socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
return socket;
}
@@ -258,6 +258,7 @@
}
hashtable_t *hashtable = mm_get_hashtable();
+ SemUtil::dec(socket->mutex);
if (socket->queue == NULL) {
if (socket->port == -1) {
socket->port = hashtable_alloc_key(hashtable);
@@ -268,6 +269,8 @@
socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
}
+ SemUtil::inc(socket->mutex);
+
if (port == socket->port) {
err_msg(0, "can not send to your self!");
return -1;
@@ -316,6 +319,7 @@
socket->socket_type);
}
hashtable_t *hashtable = mm_get_hashtable();
+ SemUtil::dec(socket->mutex);
if (socket->queue == NULL) {
if (socket->port == -1) {
socket->port = hashtable_alloc_key(hashtable);
@@ -326,6 +330,7 @@
socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
}
+ SemUtil::inc(socket->mutex);
shm_msg_t src;
// printf("shm_recvfrom pop before\n");
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index fd67d9c..e38fd0e 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -56,6 +56,7 @@
// 鏈湴port
int port;
bool force_bind;
+ int mutex;
shm_connection_status_t status;
SHMQueue<shm_msg_t> *queue;
SHMQueue<shm_msg_t> *remoteQueue;
diff --git a/test_net_socket/net_mod_req_rep.c b/test_net_socket/net_mod_req_rep.c
index 3287d58..df33bf0 100644
--- a/test_net_socket/net_mod_req_rep.c
+++ b/test_net_socket/net_mod_req_rep.c
@@ -11,30 +11,68 @@
void client(int port ){
NetModSocket client;
- char send_buf[MAXLINE];
+ char content[MAXLINE];
+ char action[512];
+ char topic[512];
net_mod_recv_msg_t *recv_arr;
- int recv_arr_size, i;
+ int recv_arr_size, i, n;
+ int node_arr_size = 3;
+ //192.168.20.104
net_node_t node_arr[] = {
- {"localhost", port, 11},
- {"localhost", port, 12},
- {"localhost", port, 13},
- {"localhost", port, 14}
+ {"192.168.20.104", port, 11},
+ {"192.168.20.104", port, 12},
+ {"192.168.20.104", port, 13}
};
- while (fgets(send_buf, MAXLINE, stdin) != NULL) {
- // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
- client.sendandrecv( node_arr, 4, send_buf, strlen(send_buf), &recv_arr, &recv_arr_size);
- for(i=0; i<recv_arr_size; i++) {
- printf("host:%s, port: %d, key:%d, content: %s\n",
- recv_arr[i].host,
- recv_arr[i].port,
- recv_arr[i].key,
- recv_arr[i].content
- );
+ int pub_node_arr_size = 3;
+ net_node_t pub_node_arr[] = {
+ {"192.168.20.104", port, 8},
+ {"192.168.20.104", port, 8},
+ {"192.168.20.104", port, 8}
+ };
+
+ while (true) {
+ //printf("Usage: pub <topic> [content] or sub <topic>\n");
+ printf("Can I help you? pub, send or quit\n");
+ scanf("%s",action);
+
+ if(strcmp(action, "pub") == 0) {
+ printf("Please input topic and content\n");
+ scanf("%s %s", topic, content);
+
+ n = client.pub(pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1);
+ printf("pub %d\n", n);
}
-//浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- NetModSocket::free_recv_msg_arr(recv_arr, recv_arr_size);
+ else if(strcmp(action, "send") == 0) {
+ getc(stdin);
+ printf("Please input content\n");
+
+ if (fgets(content, MAXLINE, stdin) != NULL) {
+ // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
+ n = client.sendandrecv( node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
+ for(i=0; i<recv_arr_size; i++) {
+ printf("host:%s, port: %d, key:%d, content: %s\n",
+ recv_arr[i].host,
+ recv_arr[i].port,
+ recv_arr[i].key,
+ recv_arr[i].content
+ );
+ }
+ //浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ NetModSocket::free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
+ }
+ else if(strcmp(action, "quit") == 0) {
+ break;
+ } else {
+ printf("error input argument\n");
+ continue;
+ }
+
}
+
+
+
}
int main(int argc, char *argv[]) {
diff --git a/test_net_socket/net_mod_req_rep.sh b/test_net_socket/net_mod_req_rep.sh
index 94f79f4..383c7f2 100755
--- a/test_net_socket/net_mod_req_rep.sh
+++ b/test_net_socket/net_mod_req_rep.sh
@@ -5,6 +5,8 @@
./dgram_mod_req_rep server 13 &
./dgram_mod_req_rep server 14 &
+ ./dgram_mod_bus server 8 &
+
./net_mod_req_rep server 5000 &
}
@@ -14,7 +16,7 @@
}
function close() {
- ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep" | awk '{print $2}' | xargs -i kill -9 {}
+ ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep" -e "dgram_mod_bus" | awk '{print $2}' | xargs -i kill -9 {}
ipcrm -a
}
--
Gitblit v1.8.0