From 2e1afff475181677a3dd38ab6e6d5632f8a70590 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 27 七月 2020 10:55:10 +0800
Subject: [PATCH] udpate

---
 demo/dgram_mod_req_rep                |    0 
 src/libshm_queue.a                    |    0 
 src/queue/include/shm_queue.h         |  265 ++++++++++-------------
 src/socket/dgram_mod_socket.c         |    6 
 test_socket/dgram_mod_bus             |    0 
 test_socket/dgram_mod_req_rep.c       |    2 
 demo/Makefile                         |    2 
 demo/dgram_mod_bus.c                  |   86 +++++++
 test_socket/dgram_mod_survey          |    0 
 README.md                             |  212 ++++++++++++++----
 demo/dgram_mod_req_rep.c              |   13 
 test_socket/dgram_mod_bus.c           |    6 
 demo/dgram_mod_req_rep.sh             |    4 
 demo/dgram_mod_survey.c               |    4 
 test_socket/dgram_mod_req_rep         |    0 
 demo/dgram_mod_survey                 |    0 
 src/socket/include/dgram_mod_socket.h |   50 ++--
 17 files changed, 413 insertions(+), 237 deletions(-)

diff --git a/README.md b/README.md
index 327d634..be00c53 100644
--- a/README.md
+++ b/README.md
@@ -1,92 +1,204 @@
  
-## 瀹炰緥
+# 1. 瀹炰緥
 
- ### 璇锋眰搴旂瓟妯″紡
- 
- `source ./demo/server.c`
+## 1.1 Bus妯″紡
+Source  
+
+`dgram_mod_bus.c`
+
+缂栬瘧  
+
+瀹夎濂絪o鍖呭悗锛岀敤濡備笅鐨勬柟寮忕紪璇戯紝shm_queue鏄�氫俊闃熷垪鍖咃紝usgcommon鏄叕鍏卞寘锛宲thread鏄郴缁熺殑绾跨▼鍖�
+`g++ dgram_mod_bus.c -mcx16 -std=c++11 -lshm_queue -lusgcommon -lpthread`
+
+婕旂ず 
+
+鍚姩bus `./dgram_mod_bus server 8`銆傜劧鍚庢墦寮�涓や釜瀹㈡埛绔繛鎺us锛� 绗竴涓鎴风璁㈤槄 "news", 绗簩涓鎴风鍙戝竷 "news". 绗竴涓鎴风浼氭敹鍒扮浜屼釜瀹㈡埛绔帹閫佺殑淇℃伅銆�
+
+鍚姩bus
+```
+$  ./dgram_mod_bus server 8
+```
+
+绗竴涓鎴风璁㈤槄 "news"
+```
+
+$ ./dgram_mod_bus client 8
+Can I help you? sub, pub or quit
+sub
+Please input topic!
+news
+Sub success!
+Can I help you? sub, pub or quit
+鏀跺埌璁㈤槄娑堟伅:111111111111111111111
+
+ ```
+绗簩涓鎴风鍙戝竷 "news"
+ ```
+$  ./dgram_mod_bus client 8
+Can I help you? sub, pub or quit
+pub
+Please input topic and content
+news 111111111111111111111
+Pub success!
+Can I help you? sub, pub or quit
+
+ ```
+杩欓噷鍙互鎵撳紑璁稿涓鎴风鍙戝竷鍜岃闃呮秷鎭��
  
 
+## 1.2 req_rep妯″紡, 閫傚簲浜庢敞鍐�
  
- **杩愯server绔細** 
- 
- `./req_req server 8`
- 
- **杩愯client绔細** 
- 鍙互鎵撳紑澶氫釜client
- 
- `./req_rep client 8`
- 
- 鍦╟lient绔緭鍏ヨ姹備俊鎭紝server 绔洖搴旓紝client绔緭鍑哄洖搴斾俊鎭�
- 
- 
- ### 鍙戝竷璁㈤槄妯″紡
-  
- **杩愯server绔細** 
- 
- `./pub_sub server 8`
- 
-  
- **杩愯client绔細** 
- 鍙互鎵撳紑澶氫釜client
-  
- `./pub_sub client 8`
- 
- 鍦╯erver绔緭鍏ュ彂甯冧俊鎭紝client绔緭鍑烘敹鍒扮殑璁㈤槄淇℃伅
+Source `dgram_mod_req_rep.c`
 
-## 鎺ュ彛璇存槑
+缂栬瘧 鍚屼笂
+
+婕旂ず
+
+```
+## 鍚姩娉ㄥ唽涓績
+./dgram_mod_req_rep server 2 & node0=$! && sleep 1
+## 鍚戞敞鍐屼腑蹇冨彂閫佹秷鎭�
+./dgram_mod_req_rep client 2 node1
+kill $node0
+```
+
+## survey妯″紡锛� 閫傚簲浜庡績璺�
+Source `dgram_mod_survey.c`
+
+缂栬瘧 鍚屼笂
+
+鍚姩蹇冭烦涓績
+```
+$ ./dgram_mod_survey server 3
+
+RECEIVED HREARTBEAT FROM 1000: 0
+RECEIVED HREARTBEAT FROM 1000: 1
+RECEIVED HREARTBEAT FROM 1000: 2
+RECEIVED HREARTBEAT FROM 1000: 3
+RECEIVED HREARTBEAT FROM 1000: 4
+RECEIVED HREARTBEAT FROM 1000: 5
+RECEIVED HREARTBEAT FROM 1000: 6
+RECEIVED HREARTBEAT FROM 1000: 7
+RECEIVED HREARTBEAT FROM 1000: 8
+RECEIVED HREARTBEAT FROM 1000: 9
+
+```
+
+鎵撳紑涓�涓鎴风锛岃繛鎺ュ績璺充腑蹇�
+```
+$ ./dgram_mod_survey client 3
+
+SEND HEART:0
+SEND HEART:1
+SEND HEART:2
+SEND HEART:3
+SEND HEART:4
+SEND HEART:5
+SEND HEART:6
+SEND HEART:7
+SEND HEART:8
+SEND HEART:9
+
+```
+ 
+
+# 2. 鎺ュ彛璇存槑
+
+shm_mm.h
+```
+/**
+ * 鍒濆鍖栧叡浜唴瀛�
+ * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M
+ * 
+ */
+void shm_init(int size);
+
+/**
+ * 閿�姣佸叡浜唴瀛�
+ * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸��
+ */
+void shm_destroy();
+```
+
+dgram_mod_socket.h
 ```
 
 /**
  * 鍒涘缓socket
  * @return socket鍦板潃
 */
-void *mod_open_socket(int mod);
+void *dgram_mod_open_socket();
 
 /**
  * 鍏抽棴socket
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int mod_close_socket(void * _socket);
+int dgram_mod_close_socket(void * _socket);
 
 /**
  * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int mod_socket_bind(void * _socket, int port);
- 
-
-/**
- * 鏈嶅姟绔紑鍚繛鎺ョ洃鍚�
- * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
- */
-int mod_listen(void * _socket);
-
-/**
- * 瀹㈡埛绔彂璧疯繛鎺ヨ姹�
- */
-int mod_connect(void * _socket, int port);
+int dgram_mod_bind(void * _socket, int port);
 
 /**
  * 鍙戦�佷俊鎭�
+ * @port 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int mod_send(void * _socket, const void *buf, const int size);
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port);
+
 
 /**
  * 鎺ユ敹淇℃伅
+ * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int mod_recv(void * _socket, void **buf, int *size) ;
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port);
 
 /**
- * 閲婃斁鎺ユ敹淇℃伅鐨刡uf
+ * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
+ * @port 鍙戦�佺粰璋�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
+
+/**
+ * 鍚姩bus
+ * 
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int  dgram_mod_start_bus(void * _socket);
+
+/**
+ * 璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
  */
-void mod_free(void *buf);
+int  dgram_mod_sub(void * _socket, void *topic, int size, int port);
+
+/**
+ * 鍙戝竷涓婚
+ * @topic 涓婚
+ * @content 涓婚鍐呭
+ * @port 鎬荤嚎绔彛
+ */
+int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port);
 
 
 /**
  * 鑾峰彇soket绔彛鍙�
  */
-int mod_get_socket_port(void * _socket);
+int dgram_mod_get_port(void * _socket) ;
+
+
+/**
+ * 閲婃斁瀛樺偍鎺ユ敹淇℃伅鐨刡uf
+ */
+void dgram_mod_free(void *buf) ;
 ```
 
  
diff --git a/demo/Makefile b/demo/Makefile
index eeb4834..e339a50 100644
--- a/demo/Makefile
+++ b/demo/Makefile
@@ -14,7 +14,7 @@
 include $(ROOT)/Make.defines.$(PLATFORM)
 
 
-PROGS =
+PROGS = dgram_mod_req_rep dgram_mod_survey
 
 
 build: $(PROGS)
diff --git a/demo/dgram_mod_bus.c b/demo/dgram_mod_bus.c
new file mode 100644
index 0000000..bddc7d5
--- /dev/null
+++ b/demo/dgram_mod_bus.c
@@ -0,0 +1,86 @@
+#include "dgram_mod_socket.h"
+#include "shm_mm.h"
+#include "usg_common.h"
+
+void server(int port) {
+  void *socket = dgram_mod_open_socket();
+  dgram_mod_bind(socket, port);
+   
+  dgram_mod_start_bus(socket);
+  
+}
+
+
+void *run_recv(void *socket) {
+  pthread_detach(pthread_self());
+  void *recvbuf;
+  int size;
+  int port;
+  while (dgram_mod_recvfrom( socket, &recvbuf, &size, &port) == 0) {
+    printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
+    free(recvbuf);
+  }
+  
+}
+
+void client(int port) {
+  void *socket = dgram_mod_open_socket();
+  pthread_t tid;
+  pthread_create(&tid, NULL, run_recv, socket);
+  int size;
+  
+  char action[512];
+  char topic[512];
+  char content[512];
+  long i = 0;
+  while (true) {
+    //printf("Usage: pub <topic> [content] or sub <topic>\n");
+    printf("Can I help you? sub, pub or quit\n");
+    scanf("%s",action);
+    
+    if(strcmp(action, "sub") == 0) {
+      printf("Please input topic!\n");
+      scanf("%s", topic);
+      dgram_mod_sub(socket, topic, strlen(topic),  port);
+      printf("Sub success!\n");
+    }
+    else if(strcmp(action, "pub") == 0) {
+      // printf("%s %s %s\n", action, topic, content);
+      printf("Please input topic and content\n");
+      scanf("%s %s", topic, content);
+      dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port);
+      printf("Pub success!\n");
+    } else if(strcmp(action, "quit") == 0) {
+      break;
+    } else {
+      printf("error input\n");
+      continue;
+    }
+   
+  }
+  printf("(%d) quit\n", dgram_mod_get_port(socket));
+  dgram_mod_close_socket(socket);
+}
+
+ 
+
+int main(int argc, char *argv[]) {
+  shm_init(512);
+  int port;
+  if (argc < 3) {
+    fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+    return 1;
+  }
+
+  port = atoi(argv[2]);
+
+  if (strcmp("server", argv[1]) == 0) {
+    server(port);
+  }
+
+  if (strcmp("client", argv[1]) == 0)
+    client(port);
+
+  
+  return 0;
+}
\ No newline at end of file
diff --git a/demo/dgram_mod_req_rep b/demo/dgram_mod_req_rep
new file mode 100755
index 0000000..edd3f4e
--- /dev/null
+++ b/demo/dgram_mod_req_rep
Binary files differ
diff --git a/demo/dgram_mod_req_rep.c b/demo/dgram_mod_req_rep.c
index e269b4f..4a70a4e 100644
--- a/demo/dgram_mod_req_rep.c
+++ b/demo/dgram_mod_req_rep.c
@@ -14,8 +14,8 @@
   int rv;
   int remote_port;
   while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
-    printf( "REGIST CENTER RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
-    sprintf(sendbuf, "RECEIVED FROM PORT %d NAME %s", remote_port, recvbuf);
+    printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
+    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
     dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port);
     free(recvbuf);
   }
@@ -26,9 +26,12 @@
   void *socket = dgram_mod_open_socket();
   int size;
   void *recvbuf;
-  dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size);
-  printf("reply: %s\n", (char *)recvbuf);
-  free(recvbuf);
+  printf("client :send request%s\n", sendbuf);
+  if(dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size) == 0) {
+    printf("client :received reply => %s\n", (char *)recvbuf);
+    free(recvbuf);
+  }
+ 
   dgram_mod_close_socket(socket);
 }
 
diff --git a/demo/dgram_mod_req_rep.sh b/demo/dgram_mod_req_rep.sh
index 463db21..73e673a 100755
--- a/demo/dgram_mod_req_rep.sh
+++ b/demo/dgram_mod_req_rep.sh
@@ -1,5 +1,5 @@
 ipcrm -a
 
-./dgram_mod_req_rep server 8 & node0=$!
-./dgram_mod_req_rep client 8 node1
+./dgram_mod_req_rep server 2 & node0=$! && sleep 1
+./dgram_mod_req_rep client 2 node1
 kill $node0
\ No newline at end of file
diff --git a/demo/dgram_mod_survey b/demo/dgram_mod_survey
new file mode 100755
index 0000000..e142d90
--- /dev/null
+++ b/demo/dgram_mod_survey
Binary files differ
diff --git a/demo/dgram_mod_survey.c b/demo/dgram_mod_survey.c
index c98ec92..a462fef 100644
--- a/demo/dgram_mod_survey.c
+++ b/demo/dgram_mod_survey.c
@@ -7,7 +7,7 @@
 #include "usg_common.h"
 
 void server(int port) {
-  void *socket = dgram_mod_open_socket(SURVEY);
+  void *socket = dgram_mod_open_socket();
   dgram_mod_bind(socket, port);
   int size;
   void *recvbuf;
@@ -22,7 +22,7 @@
 }
 
 void client(int port) {
-  void *socket = dgram_mod_open_socket(SURVEY);
+  void *socket = dgram_mod_open_socket();
   int size;
   void *recvbuf;
   char sendbuf[512];
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
new file mode 100644
index 0000000..d0d821c
--- /dev/null
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 0d8bbaf..81913d0 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -1,191 +1,166 @@
 #ifndef __SHM_QUEUE_H__
 #define __SHM_QUEUE_H__
 
-#include "usg_common.h"
 #include "hashtable.h"
 #include "lock_free_queue.h"
 #include "logger_factory.h"
-#include "shm_allocator.h"
 #include "sem_util.h"
+#include "shm_allocator.h"
+#include "usg_common.h"
 // default Queue size
 // #define LOCK_FREE_Q_DEFAULT_SIZE 16
- 
-template < typename ELEM_T>
-class SHMQueue
-{
+
+template <typename ELEM_T> class SHMQueue {
 
 private:
-    const int KEY;
- 
+  const int KEY;
+
 public:
-    /// @brief constructor of the class
-    SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
-    
-    
-    ~SHMQueue();
+  /// @brief constructor of the class
+  SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
 
-   
-    inline uint32_t size();
- 
-    inline bool full();
-    inline bool empty();
-   
-    inline bool push(const ELEM_T &a_data);
-    inline bool push_nowait(const ELEM_T &a_data);
-    inline bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
-    inline bool pop(ELEM_T &a_data);
-    inline bool pop_nowait(ELEM_T &a_data);
-    inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+  ~SHMQueue();
 
-    inline ELEM_T& operator[](unsigned i);
+  inline uint32_t size();
 
-    static void remove_queues_exclude(int *keys, size_t length);
+  inline bool full();
+  inline bool empty();
+
+  inline bool push(const ELEM_T &a_data);
+  inline bool push_nowait(const ELEM_T &a_data);
+  inline bool push_timeout(const ELEM_T &a_data,
+                           const struct timespec *timeout);
+  inline bool pop(ELEM_T &a_data);
+  inline bool pop_nowait(ELEM_T &a_data);
+  inline bool pop_timeout(ELEM_T &a_data, struct timespec *timeout);
+
+  inline ELEM_T &operator[](unsigned i);
+
+  static void remove_queues_exclude(int *keys, size_t length);
+
 private:
-
-
 protected:
-    /// @brief the actual queue-> methods are forwarded into the real 
-    ///        implementation
-    LockFreeQueue<ELEM_T, SHM_Allocator>* queue;
+  /// @brief the actual queue-> methods are forwarded into the real
+  ///        implementation
+  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
 
 private:
-    /// @brief disable copy constructor declaring it private
-    SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
+  /// @brief disable copy constructor declaring it private
+  SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
 };
 
+template <typename ELEM_T>
+void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) {
+  hashtable_t *hashtable = mm_get_hashtable();
+  std::set<int> *keyset = hashtable_keyset(hashtable);
+  std::set<int>::iterator keyItr;
+  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+  bool found;
+  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+    found = false;
+    for (size_t i = 0; i < length; i++) {
+      if (*keyItr == keys[i]) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable,
+                                                                     *keyItr);
+      delete mqueue;
+    }
+  }
+  delete keyset;
+}
 
-template < typename ELEM_T >
-void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length)
-{
+template <typename ELEM_T>
+SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
+
+  hashtable_t *hashtable = mm_get_hashtable();
+  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+  // LockFreeQueue<int, 10000> q;
+  if (queue == NULL || (void *)queue == (void *)1) {
+    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
+    hashtable_put(hashtable, key, (void *)queue);
+  }
+  queue->reference++;
+  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d",
+                                   queue->reference.load());
+}
+
+template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
+  SemUtil::dec(queue->mutex);
+  queue->reference--;
+  // LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d",
+  // queue->reference.load());
+  if (queue->reference.load() == 0) {
+    delete queue;
     hashtable_t *hashtable = mm_get_hashtable();
-    std::set<int>* keyset = hashtable_keyset(hashtable);
-    std::set<int>::iterator keyItr;
-     LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue;
-    bool found;
-    for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
-        found = false;
-        for(size_t i = 0; i < length; i++) {
-            if(*keyItr == keys[i]) {
-                found = true;
-                break;
-            }
-        }
-        if(!found) {
-           mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
-           delete mqueue;
-        }
-    }
-    delete keyset;
-    
-}  
-
-template < typename ELEM_T >
-SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key)
-{
-
-    hashtable_t *hashtable = mm_get_hashtable();
-    queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
-    //LockFreeQueue<int, 10000> q;
-    if (queue == NULL || (void *)queue == (void *)1) {
-        queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
-        hashtable_put(hashtable,  key, (void *)queue);
-    }
-    queue->reference++;
-    LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
+    hashtable_remove(hashtable, KEY);
+    // LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n");
+  } else {
+    SemUtil::inc(queue->mutex);
+  }
 }
 
-template < typename ELEM_T >
-SHMQueue<ELEM_T>::~SHMQueue()
-{
-    SemUtil::dec( queue->mutex);
-    queue->reference--;
-//LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d", queue->reference.load());
-    if(queue->reference.load() == 0) {
-        delete queue;
-        hashtable_t *hashtable = mm_get_hashtable();
-        hashtable_remove(hashtable, KEY);
-// LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n");
-    } else {
-        SemUtil::inc(queue->mutex);
-    }
-    
+template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::force_destroy() {
+  SemUtil::dec(queue->mutex);
+  delete queue;
+  hashtable_t *hashtable = mm_get_hashtable();
+  hashtable_remove(hashtable, KEY);
+  SemUtil::inc(queue->mutex);
 }
 
-template < typename ELEM_T >
-inline uint32_t SHMQueue<ELEM_T>::size()
-{
-    return queue->size();
-}  
-
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::full()
-{
-    return queue->full();
+template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() {
+  return queue->size();
 }
 
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::empty()
-{
-    return queue->empty();
-}  
-
-
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data)
-{
-   return queue->push(a_data);
-    
+template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::full() {
+  return queue->full();
 }
 
-template <
-    typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data)
-{
-   return queue->push_nowait(a_data);
-    
+template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::empty() {
+  return queue->empty();
 }
 
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
-{
-
-    return queue->push_timeout(a_data, timeout);
-    
+template <typename ELEM_T>
+inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
+  return queue->push(a_data);
 }
 
-
-
-
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data)
-{
-// printf("SHMQueue pop before\n");
-   int rv = queue->pop(a_data);
-// printf("SHMQueue after before\n");
-   return rv;
-    
+template <typename ELEM_T>
+inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
+  return queue->push_nowait(a_data);
 }
 
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data)
-{
-    return queue->pop_nowait(a_data);
-    
+template <typename ELEM_T>
+inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
+                                           const struct timespec *timeout) {
+
+  return queue->push_timeout(a_data, timeout);
 }
 
- 
-template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
-{
-   return queue->pop_timeout(a_data, timeout);
-    
+template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
+  // printf("SHMQueue pop before\n");
+  int rv = queue->pop(a_data);
+  // printf("SHMQueue after before\n");
+  return rv;
 }
 
-template < typename ELEM_T >
-inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) {
-     return queue->operator[](i);
+template <typename ELEM_T>
+inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
+  return queue->pop_nowait(a_data);
 }
 
+template <typename ELEM_T>
+inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data,
+                                          struct timespec *timeout) {
+  return queue->pop_timeout(a_data, timeout);
+}
 
+template <typename ELEM_T>
+inline ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) {
+  return queue->operator[](i);
+}
 
 #endif
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index 84976b6..62ae7ac 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -94,7 +94,7 @@
 	free(buf);
 }
 
-int start_bus(void * _socket) {
+int  dgram_mod_start_bus(void * _socket) {
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
 	socket->topic_sub_map = new std::map<std::string, std::set<int> *>;
 	run_pubsub_proxy(socket);
@@ -107,7 +107,7 @@
 /**
  * @port 鎬荤嚎绔彛
  */
-int sub(void * _socket, void *topic, int size, int port) {
+int  dgram_mod_sub(void * _socket, void *topic, int size, int port) {
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
 	char buf[8192];
 	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
@@ -117,7 +117,7 @@
 /**
  * @port 鎬荤嚎绔彛
  */
-int pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) {
+int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) {
 
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
 	int head_len;
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
index 29e16eb..1c2ad64 100644
--- a/src/socket/include/dgram_mod_socket.h
+++ b/src/socket/include/dgram_mod_socket.h
@@ -47,6 +47,31 @@
 */
 int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
 
+
+/**
+ * 鍚姩bus
+ * 
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int  dgram_mod_start_bus(void * _socket);
+
+/**
+ * 璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
+ */
+int  dgram_mod_sub(void * _socket, void *topic, int size, int port);
+
+/**
+ * 鍙戝竷涓婚
+ * @topic 涓婚
+ * @content 涓婚鍐呭
+ * @port 鎬荤嚎绔彛
+ */
+int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port);
+
+
 /**
  * 鑾峰彇soket绔彛鍙�
  */
@@ -57,31 +82,6 @@
  * 閲婃斁瀛樺偍鎺ユ敹淇℃伅鐨刡uf
  */
 void dgram_mod_free(void *buf) ;
-
-/**
- * 鍚姩bus
- * 
- * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-*/
-int start_bus(void * _socket);
-
-/**
- * 璁㈤槄鎸囧畾涓婚
- * @topic 涓婚
- * @size 涓婚闀垮害
- * @port 鎬荤嚎绔彛
- */
-int sub(void * _socket, void *topic, int size, int port);
-
-/**
- * 鍙戝竷涓婚
- * @topic 涓婚
- * @content 涓婚鍐呭
- * @port 鎬荤嚎绔彛
- */
-int pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port);
-
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/test_socket/dgram_mod_bus b/test_socket/dgram_mod_bus
new file mode 100755
index 0000000..4cb3c17
--- /dev/null
+++ b/test_socket/dgram_mod_bus
Binary files differ
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index d51caf0..bddc7d5 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -6,7 +6,7 @@
   void *socket = dgram_mod_open_socket();
   dgram_mod_bind(socket, port);
    
-  start_bus(socket);
+  dgram_mod_start_bus(socket);
   
 }
 
@@ -41,14 +41,14 @@
     if(strcmp(action, "sub") == 0) {
       printf("Please input topic!\n");
       scanf("%s", topic);
-      sub(socket, topic, strlen(topic),  port);
+      dgram_mod_sub(socket, topic, strlen(topic),  port);
       printf("Sub success!\n");
     }
     else if(strcmp(action, "pub") == 0) {
       // printf("%s %s %s\n", action, topic, content);
       printf("Please input topic and content\n");
       scanf("%s %s", topic, content);
-      pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port);
+      dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port);
       printf("Pub success!\n");
     } else if(strcmp(action, "quit") == 0) {
       break;
diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep
new file mode 100755
index 0000000..988b3f4
--- /dev/null
+++ b/test_socket/dgram_mod_req_rep
Binary files differ
diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c
index b5b4bef..9ed938b 100644
--- a/test_socket/dgram_mod_req_rep.c
+++ b/test_socket/dgram_mod_req_rep.c
@@ -19,7 +19,7 @@
 
 }Targ;
 
-LockFreeQueue<task_t, DM_Allocator> task_queue(100);
+LockFreeQueue<task_t, DM_Allocator> task_queue(128);
 
 
 void *worker(void *socket) {
diff --git a/test_socket/dgram_mod_survey b/test_socket/dgram_mod_survey
new file mode 100755
index 0000000..9db2e48
--- /dev/null
+++ b/test_socket/dgram_mod_survey
Binary files differ

--
Gitblit v1.8.0