From 2a4e4619f34a742e36693e589e0431347a72979b Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 13 十月 2020 17:36:32 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_socket.c        |   90 ++++++++++++++++-
 src/libshm_queue.a                 |    0 
 src/socket/shm_socket.c            |    9 +
 src/logger_factory.c               |    3 
 src/queue/lock_free_queue.h        |    2 
 src/socket/net_mod_socket.h        |    6 +
 src/socket/shm_socket.h            |    1 
 src/socket/shm_mod_socket.c        |    4 
 src/queue/mm.c                     |    4 
 src/socket/shm_mod_socket.h        |    2 
 test_net_socket/net_mod_req_rep.sh |    4 
 src/socket/mod_socket.c            |    2 
 src/queue/shm_queue.h              |    4 
 src/socket/net_mod_server_socket.h |    8 +
 test_net_socket/net_mod_req_rep.c  |   74 +++++++++++---
 src/queue/linked_lock_free_queue.h |    2 
 src/socket/net_mod_server_socket.c |   59 ++++++++---
 src/logger_factory.h               |   14 ++
 18 files changed, 223 insertions(+), 65 deletions(-)

diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index c9f1c38..8260732 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/logger_factory.c b/src/logger_factory.c
new file mode 100644
index 0000000..c50fb54
--- /dev/null
+++ b/src/logger_factory.c
@@ -0,0 +1,3 @@
+#include "logger_factory.h"
+
+Logger * LoggerFactory::logger = NULL;
\ No newline at end of file
diff --git a/src/logger_factory.h b/src/logger_factory.h
index 34aef50..6bbaef0 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -3,11 +3,21 @@
 #include "logger.h"
 
 class LoggerFactory {
+private:
+	static Logger *logger;
+
 public:
 
-	static Logger getLogger() {
+	static Logger* getLogger() {
 //ERROR ALL DEBUG INFO WARN
-		static Logger logger(Logger::WARN);
+		if(logger != NULL)
+			return logger;
+		 
+		LoggerConfig config;
+		config.level = Logger::DEBUG;
+		config.logFile =  "softbus.log";
+		config.console = 1;
+		logger = new Logger(config);
 		return logger;
 	}
 };
diff --git a/src/queue/linked_lock_free_queue.h b/src/queue/linked_lock_free_queue.h
index 3906a42..af7f1ff 100644
--- a/src/queue/linked_lock_free_queue.h
+++ b/src/queue/linked_lock_free_queue.h
@@ -98,7 +98,7 @@
 template <typename T>
 LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
 {
-    LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory");
+    LoggerFactory::getLogger()->debug("LinkedLockFreeQueue destory");
     Node<T> * nodeptr;
     Pointer<T> tmp = Head.load(std::memory_order_relaxed);
     while((nodeptr = tmp.ptr) != NULL) {
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 17e8c56..281b7e5 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -160,7 +160,7 @@
     template <typename T, typename AT> class Q_TYPE>
 LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
 {
-    LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
+    LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
     SemUtil::remove(slots);
     SemUtil::remove(items);
     SemUtil::remove(mutex);
diff --git a/src/queue/mm.c b/src/queue/mm.c
index 39aca9e..f09fbab 100644
--- a/src/queue/mm.c
+++ b/src/queue/mm.c
@@ -402,13 +402,13 @@
 
   
   if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) {
-    //LoggerFactory::getLogger().debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
+    //LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
     if(shmid_ds.shm_nattch == 0) {
       //remove shared memery
        if (shmctl(shmid, IPC_RMID, 0) == -1)
         err_exit(errno, "mm_destroy shmctl IPC_RMID");
        else 
-         LoggerFactory::getLogger().debug("shared memory destroy\n");
+         LoggerFactory::getLogger()->debug("shared memory destroy\n");
 
        SemUtil::inc(mutex);
        SemUtil::remove(mutex);
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 5c82b05..7c7b89b 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -115,7 +115,7 @@
     hashtable_put(hashtable, key, (void *)queue);
   }
   queue->reference++;
-  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
+  // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
 }
 
 template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
@@ -126,7 +126,7 @@
 
   SemUtil::dec(queue->mutex);
   queue->reference--;
-  // LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d",
+  // LoggerFactory::getLogger()->debug("SHMQueue destructor  reference===%d",
   if (queue->reference.load() == 0) {
       delete queue;
       queue = NULL;
diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c
index 7629621..b5a686a 100644
--- a/src/socket/mod_socket.c
+++ b/src/socket/mod_socket.c
@@ -7,7 +7,7 @@
 #include "sem_util.h"
 #include "logger_factory.h"
 
-static Logger logger = LoggerFactory::getLogger();
+static Logger *logger = LoggerFactory::getLogger();
 
 typedef struct mod_entry_t
 {
diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index 1cd3838..73a2a5a 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -4,7 +4,7 @@
 #include "net_mod_socket_io.h"
 #include "net_mod_socket.h"
  
-NetModServerSocket::NetModServerSocket(int port):max_buf(1024)
+NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256)
 {
   char portstr[32];
 
@@ -17,12 +17,18 @@
   if(buf == NULL) {
     err_exit(errno, "process_client malloc");
   }
+
+  topic_buf = malloc(max_topic_buf);
+  if(topic_buf == NULL) {
+    err_exit(errno, "process_client malloc");
+  }
 }
 
 
 NetModServerSocket::~NetModServerSocket() {
    Close(listenfd);
-   fee(buf);
+   free(buf);
+   free(topic_buf);
 }
 
 void NetModServerSocket::start() {
@@ -72,7 +78,7 @@
     {
       /* Add connected descriptor to the pool */
       pool.clientfd[i] = connfd;                 //line:conc:echoservers:beginaddclient
-      Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
+     // Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
 
       /* Add the descriptor to descriptor set */
       FD_SET(connfd, &pool.read_set); //line:conc:echoservers:addconnfd
@@ -90,8 +96,7 @@
 /* $end add_client */
 
 
-int NetModServerSocket::process_client(rio_t *rio, int connfd) {
-  int n;
+int NetModServerSocket::process_client(int connfd) {
   net_mod_request_head_t request_head;
   net_mod_response_head_t response_head;
   char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH];
@@ -99,15 +104,15 @@
   
   int recv_size;
 
-  if(buf == NULL) {
-    buf = malloc(max_buf);
-    if(buf == NULL) {
-      err_exit(errno, "process_client malloc");
-    }
-  }
+  // if(buf == NULL) {
+  //   buf = malloc(max_buf);
+  //   if(buf == NULL) {
+  //     err_exit(errno, "process_client malloc");
+  //   }
+  // }
   
  
-  if (rio_readnb(rio, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
+  if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
   {
     return -1;
   }
@@ -118,11 +123,12 @@
     buf = realloc(buf, request_head.content_length);
     max_buf = request_head.content_length;
     if(buf == NULL) {
-      err_exit(errno, "process_client realloc");
+      LoggerFactory::getLogger()->error(errno, "process_client realloc");
+      exit(1);
     }
   }  
 
-  if ((n = rio_readnb(rio, buf, request_head.content_length)) != request_head.content_length ) {
+  if (rio_readn(connfd, buf, request_head.content_length) != request_head.content_length ) {
     return -1;
   }
 
@@ -131,6 +137,21 @@
     response_head.content_length = recv_size;
     Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
     Rio_writen(connfd, recv_buf, recv_size);
+  } else if(request_head.mod == BUS) {
+    if(request_head.topic_length > max_topic_buf) {
+      topic_buf = realloc(topic_buf, request_head.topic_length);
+      max_topic_buf = request_head.topic_length;
+      if(topic_buf == NULL) {
+        LoggerFactory::getLogger()->error(errno, "process_client realloc");
+        exit(1);
+      }
+    }
+
+    if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
+      return -1;
+    }
+ LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
+    shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
   }
 
   return 0;
@@ -141,22 +162,22 @@
 void  NetModServerSocket::check_clients()
 {
   int i, connfd;
-  rio_t *rio;
+  //rio_t *rio;
   
 
   for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
   {
     connfd = pool.clientfd[i];
-    rio = &pool.clientrio[i];
+    //rio = &pool.clientrio[i];
 
     /* If the descriptor is ready, echo a text line from it */
     if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set)))
     {
       pool.nready--;
-      if(process_client(rio, connfd) != 0) {
+      if(process_client(connfd) != 0) {
         Close(connfd); //line:conc:echoservers:closeconnfd
-        FD_CLR(connfd, &pool.read_set); //line:conc:echoservers:beginremove
-        pool.clientfd[i] = -1;          //line:conc:echoservers:endremove
+        FD_CLR(connfd, &pool.read_set); 
+        pool.clientfd[i] = -1;
       }
 
     }
diff --git a/src/socket/net_mod_server_socket.h b/src/socket/net_mod_server_socket.h
index 13e43f7..4ec5b90 100644
--- a/src/socket/net_mod_server_socket.h
+++ b/src/socket/net_mod_server_socket.h
@@ -23,7 +23,7 @@
 	  int nready;       /* Number of ready descriptors from select */
 	  int maxi;         /* Highwater index into client array */
 	  int clientfd[FD_SETSIZE];    /* Set of active descriptors */
-	  rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
+	 // rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
 	} ; 
 
 private:
@@ -31,13 +31,15 @@
 	ShmModSocket shmModSocket;
 	pool pool;
 
-	void *buf = NULL;
+	void *buf;
+	void *topic_buf;
   size_t max_buf;
+  size_t max_topic_buf;
 
 	void init_pool(int listenfd);
 	void add_client(int connfd);
 	void check_clients();
-	int process_client(rio_t *rio, int connfd);
+	int process_client(int connfd);
 
 public:
 
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index fdce4b5..c123c87 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -12,7 +12,13 @@
 
 
 NetModSocket::~NetModSocket() {
-  
+  rio_t * rio;
+  for (auto map_iter = connectionMap.begin(); map_iter != connectionMap.end(); map_iter++) {
+    rio = map_iter->second;
+    if(rio != NULL) {
+      free(rio);
+    }
+  }
 }
 
 int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -25,7 +31,7 @@
   void *recv_buf;
   int recv_size;
   char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
-  net_mod_request_head_t request_head;
+  net_mod_request_head_t request_head = {};
   net_mod_response_head_t response_head;
   std::map<std::string, rio_t*>::iterator mapIter;
   rio_t *rio;
@@ -54,28 +60,34 @@
     request_head.mod = REQ_REP;
     request_head.key = node->key;
     request_head.content_length = send_size;
+    request_head.topic_length = 0;
     if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) {
-      err_exit(errno, "NetModSocket::send head rio_writen");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::send head rio_writen");
+      exit(1);
 
     }
 
     if(rio_writen(rio->rio_fd, send_buf, send_size) != send_size ) {
-       err_exit(errno, "NetModSocket::send conent rio_writen");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
+      exit(1);
     }
 
 
     if ( rio_readnb(rio, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
-      err_exit(errno, "NetModSocket::send  rio_readnb");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
+      exit(1);
     }
 
     response_head =  NetModSocket::decode_response_head(response_head_bs);
 
     recv_buf = malloc(response_head.content_length);
     if(recv_buf == NULL) {
-      err_exit(errno, "NetModSocket::send malloc");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
+      exit(1);
     }
     if ( (recv_size = rio_readnb(rio, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
-      err_exit(errno, "NetModSocket::send  rio_readnb");
+      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
+      exit(1);
     }
 
 LABEL_ARR_PUSH:
@@ -86,12 +98,70 @@
     ret_arr[i].content_length = recv_size;
   }
   *recv_arr = ret_arr;
-  *recv_arr_size = i;
-
+  if(recv_arr_size != NULL) {
+    *recv_arr_size = i;
+  }
+ 
   return i;
      
 }
 
+// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
+
+int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
+  int i, n, clientfd;
+  char portstr[32];
+  net_node_t *node;
+  char mapKey[256];
+  void *recv_buf;
+  int recv_size;
+  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
+  net_mod_request_head_t request_head;
+  net_mod_response_head_t response_head;
+  std::map<std::string, rio_t*>::iterator mapIter;
+  rio_t *rio;
+  for (i = 0; i< arrlen; i++) {
+
+    node = &node_arr[i];
+    if(node->host == NULL) {
+      // 鏈湴鍙戦��
+      shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+      
+    } else {
+      sprintf(mapKey, "%s:%d", node->host, node->port);
+      if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) {
+        rio = mapIter->second;
+      } else {
+        rio = (rio_t *)malloc(sizeof(rio_t));
+        sprintf(portstr, "%d", node->port);
+        clientfd = Open_clientfd(node-> host, portstr);
+        Rio_readinitb(rio, clientfd);
+        connectionMap.insert({mapKey, rio});
+      }
+
+      request_head.mod = BUS;
+      request_head.key = node->key;
+      request_head.content_length = content_size;
+      request_head.topic_length = strlen(topic) + 1;
+      if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) {
+        LoggerFactory::getLogger()->error(errno, "NetModSocket::pub head rio_writen");
+        exit(1);
+
+      }
+
+      if(rio_writen(rio->rio_fd, content, content_size) != content_size ) {
+        LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
+        exit(1);
+      }
+
+      if(rio_writen(rio->rio_fd, topic, request_head.topic_length) != request_head.topic_length ) {
+        LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
+        exit(1);
+      }
+    }
+  }
+  return i;
+}
 
 void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
 
@@ -112,6 +182,7 @@
   PUT(head, htonl(request.mod));
   PUT(head + 4, htonl(request.key));
   PUT(head + 8, htonl(request.content_length));
+  PUT(head + 12, htonl(request.topic_length));
   return head;
 }
 
@@ -121,6 +192,7 @@
   head.mod = ntohl(GET(headbs));
   head.key = ntohl(GET(headbs + 4));
   head.content_length = ntohl(GET(headbs + 8));
+  head.topic_length = ntohl(GET(headbs + 12));
   return head;
 }
 
diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h
index a155c29..d296007 100644
--- a/src/socket/net_mod_socket.h
+++ b/src/socket/net_mod_socket.h
@@ -7,7 +7,7 @@
 #define GET(p)       (*(uint32_t *)(p))
 #define PUT(p, val)  (*(uint32_t *)(p) = (val))
 
-#define NET_MODE_REQUEST_HEAD_LENGTH 12
+#define NET_MODE_REQUEST_HEAD_LENGTH 16
 #define NET_MODE_RESPONSE_HEAD_LENGTH 4
 
 struct net_node_t
@@ -22,6 +22,7 @@
 	uint32_t mod;
 	uint32_t key;
 	uint32_t content_length;
+	uint32_t topic_length;
 };
 
 struct net_mod_response_head_t {
@@ -49,8 +50,11 @@
 public:
 	
   NetModSocket();
+
   int sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   	net_mod_recv_msg_t ** resp_arr, int *resp_arr_size);
+
+  int pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size);
  
   ~NetModSocket();
 
diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c
index 2c5821a..bd60993 100644
--- a/src/socket/shm_mod_socket.c
+++ b/src/socket/shm_mod_socket.c
@@ -314,7 +314,7 @@
 
 	SHMTopicSubMap::iterator map_iter;
 	SHMKeySet::iterator set_iter;
-printf("_proxy_sub topic = %s\n", topic);
+//printf("_proxy_sub topic = %s\n", topic);
 	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 		subscripter_set = map_iter->second;
 	} else {
@@ -521,4 +521,4 @@
  return 1;
 }
 
- 
\ No newline at end of file
+ 
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 50ef4ef..885d1aa 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -14,7 +14,7 @@
 #define TOPIC_LIDENTIFIER "{"
 #define TOPIC_RIDENTIFIER "}"
 
-static Logger logger = LoggerFactory::getLogger();
+static Logger *logger = LoggerFactory::getLogger();
 #define BUS_MAP_KEY 1
 //typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
 typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index ea2f674..362e5e8 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -3,7 +3,7 @@
 #include "logger_factory.h"
 #include <map>
 
-static Logger logger = LoggerFactory::getLogger();
+static Logger *logger = LoggerFactory::getLogger();
 
 
 
@@ -43,7 +43,7 @@
   socket->force_bind = false;
   socket->dispatch_thread = 0;
   socket->status = SHM_CONN_CLOSED;
-
+  socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
   return socket;
 }
 
@@ -258,6 +258,7 @@
   }
   hashtable_t *hashtable = mm_get_hashtable();
 
+  SemUtil::dec(socket->mutex);
   if (socket->queue == NULL) {
     if (socket->port == -1) {
       socket->port = hashtable_alloc_key(hashtable);
@@ -268,6 +269,8 @@
 
     socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
   }
+  SemUtil::inc(socket->mutex);
+  
   if (port == socket->port) {
     err_msg(0, "can not send to your self!");
     return -1;
@@ -316,6 +319,7 @@
              socket->socket_type);
   }
   hashtable_t *hashtable = mm_get_hashtable();
+  SemUtil::dec(socket->mutex);
   if (socket->queue == NULL) {
     if (socket->port == -1) {
       socket->port = hashtable_alloc_key(hashtable);
@@ -326,6 +330,7 @@
 
     socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
   }
+  SemUtil::inc(socket->mutex);
 
   shm_msg_t src;
   // printf("shm_recvfrom pop before\n");
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index fd67d9c..e38fd0e 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -56,6 +56,7 @@
 	// 鏈湴port
 	int port;
 	bool force_bind;
+	int mutex;
 	shm_connection_status_t status;
 	SHMQueue<shm_msg_t> *queue;
 	SHMQueue<shm_msg_t> *remoteQueue;
diff --git a/test_net_socket/net_mod_req_rep.c b/test_net_socket/net_mod_req_rep.c
index 3287d58..df33bf0 100644
--- a/test_net_socket/net_mod_req_rep.c
+++ b/test_net_socket/net_mod_req_rep.c
@@ -11,30 +11,68 @@
 
 void client(int port ){
 	NetModSocket client;
-	char send_buf[MAXLINE];
+	char content[MAXLINE];
+	char action[512];
+  char topic[512];
 	net_mod_recv_msg_t *recv_arr;
-	int recv_arr_size, i;
+	int recv_arr_size, i, n;
+	int node_arr_size = 3;
+	//192.168.20.104
 	net_node_t node_arr[] = {
-		{"localhost", port, 11},
-		{"localhost", port, 12},
-		{"localhost", port, 13},
-		{"localhost", port, 14}
+		{"192.168.20.104", port, 11},
+		{"192.168.20.104", port, 12},
+		{"192.168.20.104", port, 13}
 	};
 
-  while (fgets(send_buf, MAXLINE, stdin) != NULL) {
-  	// 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
-    client.sendandrecv( node_arr, 4, send_buf, strlen(send_buf), &recv_arr, &recv_arr_size);
-    for(i=0; i<recv_arr_size; i++) {
-    	printf("host:%s, port: %d, key:%d, content: %s\n", 
-    		recv_arr[i].host,
-    		recv_arr[i].port,
-    		recv_arr[i].key,
-    		recv_arr[i].content
-    	);
+	int pub_node_arr_size = 3;
+	net_node_t pub_node_arr[] = {
+		{"192.168.20.104", port, 8},
+		{"192.168.20.104", port, 8},
+		{"192.168.20.104", port, 8}
+	};
+	
+  while (true) {
+    //printf("Usage: pub <topic> [content] or sub <topic>\n");
+    printf("Can I help you? pub, send or quit\n");
+    scanf("%s",action);
+    
+    if(strcmp(action, "pub") == 0) {
+    	printf("Please input topic and content\n");
+      scanf("%s %s", topic, content);
+
+    	n = client.pub(pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1);
+    	printf("pub %d\n", n);
     }
-//浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
-    NetModSocket::free_recv_msg_arr(recv_arr, recv_arr_size);
+    else if(strcmp(action, "send") == 0) {
+    	getc(stdin);
+    	printf("Please input  content\n");
+    	
+		  if (fgets(content, MAXLINE, stdin) != NULL) {
+		  	// 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
+		    n = client.sendandrecv( node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
+		    for(i=0; i<recv_arr_size; i++) {
+		    	printf("host:%s, port: %d, key:%d, content: %s\n", 
+		    		recv_arr[i].host,
+		    		recv_arr[i].port,
+		    		recv_arr[i].key,
+		    		recv_arr[i].content
+		    	);
+		    }
+		//浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+		    NetModSocket::free_recv_msg_arr(recv_arr, recv_arr_size);
+		  }
+    }
+    else if(strcmp(action, "quit") == 0) {
+      break;
+    } else {
+      printf("error input argument\n");
+      continue;
+    }
+   
   }
+
+
+  
 }
 
 int main(int argc, char *argv[]) {
diff --git a/test_net_socket/net_mod_req_rep.sh b/test_net_socket/net_mod_req_rep.sh
index 94f79f4..383c7f2 100755
--- a/test_net_socket/net_mod_req_rep.sh
+++ b/test_net_socket/net_mod_req_rep.sh
@@ -5,6 +5,8 @@
 	./dgram_mod_req_rep server 13 &
 	./dgram_mod_req_rep server 14 &
 
+	./dgram_mod_bus server 8 &
+
 	./net_mod_req_rep server 5000 &
 
 }
@@ -14,7 +16,7 @@
 }
 
 function close() {
-	ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep" | awk  '{print $2}' | xargs -i kill -9 {}
+	ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep"  -e "dgram_mod_bus" | awk  '{print $2}' | xargs -i kill -9 {}
 	ipcrm -a
 }
 

--
Gitblit v1.8.0