From b861de29176891657cc96631ddbfb4ea9e114a42 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期一, 30 八月 2021 17:52:23 +0800
Subject: [PATCH] re-structure the communication work flow.

---
 src/shm/hashtable.h                      |    2 
 CMakeLists.txt                           |    8 
 src/bh_api.h                             |   28 
 src/socket/bus_server_socket.h           |    9 
 src/proc_def.h                           |   70 +
 src/socket/shm_mod_socket.cpp            |  150 ++
 src/bus_proxy_start.cpp                  |  127 ++
 src/queue/lock_free_queue.h              |   24 
 src/socket/bus_server_socket_wrapper.cpp |    4 
 src/shm/mm.cpp                           |    4 
 src/socket/shm_socket.h                  |   15 
 src/svsem.cpp                            |    1 
 build.sh                                 |    2 
 src/bus_error.cpp                        |   10 
 src/net/net_mod_server_socket.cpp        |    3 
 src/net/net_mod_socket.cpp               |   29 
 src/socket/shm_socket.cpp                |   63 
 src/bus_def.h                            |    7 
 test_socket/bus_test_inter.cpp           |  502 ++++++++
 test_socket/CMakeLists.txt               |   14 
 src/socket/shm_mod_socket.h              |   13 
 src/queue/shm_queue.h                    |    4 
 src/socket/bus_server_socket.cpp         |  534 ++++++++
 test_socket/bus_test_server_mode.cpp     |  122 ++
 src/shm/shm_mm.h                         |    4 
 src/net/net_mod_socket_wrapper.cpp       |   27 
 src/net/net_mod_socket_wrapper.h         |    3 
 src/net/net_mod_socket.h                 |    8 
 src/CMakeLists.txt                       |   20 
 src/bus_error.h                          |    4 
 src/bh_api.cpp                           | 1619 +++++++++++++++++++++++++++
 31 files changed, 3,300 insertions(+), 130 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9af847a..049e0b2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -15,6 +15,8 @@
 
 option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
 
+add_compile_options(-fPIC)
+
 option(BUILD_DOC "Build doc" OFF)
 
 
@@ -30,5 +32,9 @@
 	add_subdirectory(${PROJECT_SOURCE_DIR}/test)
 	add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
 	add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
-#	add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
+  	include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto)
+	#add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
 endif()
+
+add_definitions("-DPROTOBUF_USS_DLLS")
+
diff --git a/build.sh b/build.sh
index 455436e..ea2dcde 100755
--- a/build.sh
+++ b/build.sh
@@ -2,7 +2,7 @@
 
 BUILD_TYPE="Debug"
 BUILD_DOC="OFF"
-BUILD_SHARED_LIBS="OFF"
+BUILD_SHARED_LIBS="ON"
 
 function usage() {
 	echo "build.sh [release | debug | doc]"
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 61fbc17..eb3f4c9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,6 +5,9 @@
 # to the source code
 configure_file(bus_config.h.in bus_config.h)
 
+#set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON)
+add_compile_options(-fPIC)
+#target_compile_options(shm_queue PRIVATE -fPIC)
 
 list(APPEND _SOURCES_ 
 ./logger_factory.cpp
@@ -16,31 +19,33 @@
 ./bus_error.cpp
 ./futex_sem.cpp
 ./svsem.cpp
+./bh_api.cpp
 ./net/net_conn_pool.cpp
 ./net/net_mod_server_socket_wrapper.cpp
 ./net/net_mod_socket_wrapper.cpp
 ./net/net_mod_socket.cpp
 ./net/net_mod_socket_io.cpp
 ./net/net_mod_server_socket.cpp
+./proto/bhome_msg_api.pb.cc
+./proto/bhome_msg.pb.cc
+./proto/error_msg.pb.cc
 ./shm/shm_mm_wrapper.cpp
 ./shm/mm.cpp
 ./shm/hashtable.cpp
 ./shm/shm_mm.cpp
-./bh_api.cc
-./proto/bhome_msg.pb.cc
-./proto/bhome_msg_api.pb.cc
-./proto/error_msg.pb.cc
 
 )
 
 if (BUILD_SHARED_LIBS)
   add_library(shm_queue SHARED ${_SOURCES_})
+  target_compile_options(shm_queue PRIVATE -fPIC)
+  set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON)
 else()
  add_library(shm_queue STATIC ${_SOURCES_})
 endif()
 
 # STATIC SHARED
-# add_library(shm_queue ${_SOURCES_})
+#add_library(shm_queue ${_SOURCES_})
 
 target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} )
 
@@ -48,11 +53,14 @@
 													 ${PROJECT_BINARY_DIR}/src
                            ${CMAKE_CURRENT_SOURCE_DIR}
                            ${CMAKE_CURRENT_SOURCE_DIR}/shm
+                           ${CMAKE_CURRENT_SOURCE_DIR}/proto
                            ${CMAKE_CURRENT_SOURCE_DIR}/queue
                            ${CMAKE_CURRENT_SOURCE_DIR}/socket
                            ${CMAKE_CURRENT_SOURCE_DIR}/net
                            )
 
+add_executable(bus_proxy_start bus_proxy_start.cpp)
+target_link_libraries(bus_proxy_start PRIVATE shm_queue  ${EXTRA_LIBS} )
 
 target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
 
@@ -74,6 +82,7 @@
 ./bus_def.h
 ./logger_factory.h
 ./sole.h
+./proc_def.h
 ./queue/linked_lock_free_queue.h
 ./queue/array_lock_free_queue.h
 ./queue/shm_queue.h
@@ -91,7 +100,6 @@
 ./shm/shm_mm_wrapper.h
 ./shm/shm_allocator.h
 ./shm/shm_mm.h
-./bh_api.h
 
 
   DESTINATION include)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
new file mode 100644
index 0000000..9875cfa
--- /dev/null
+++ b/src/bh_api.cpp
@@ -0,0 +1,1619 @@
+#include "net_mod_socket_wrapper.h"
+#include "net_mod_server_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+#include "shm_mm_wrapper.h"
+#include "proc_def.h"
+#include "usg_common.h"
+#include "bh_api.h"
+#include <pthread.h>
+#include <getopt.h>
+#include "bhome_msg_api.pb.h"
+#include "bhome_msg.pb.h"
+#include "error_msg.pb.h"
+#include "proto/bhome_msg.pb.h"
+#include "proto/bhome_msg_api.pb.h"
+
+static Logger *logger = LoggerFactory::getLogger();
+
+static int gRun_stat = 0;
+static void *gNetmod_socket = NULL;
+
+static pthread_mutex_t mutex;
+
+static char errString[100] = { 0x00 };
+
+int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv;
+  int key;
+  int count = 0;
+  void *buf = NULL;
+  int min = 0;
+  ProcInfo pData;
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _ProcInfo_proto
+	{
+		const char *proc_id;
+		const char *name;
+		const char *public_info;
+		const char *private_info;
+	}_input;
+  
+  ::bhome_msg::ProcInfo input;
+	if(!input.ParseFromArray(proc_info, proc_info_len)) {
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+  
+	_input.proc_id = input.proc_id().c_str();
+	_input.name = input.name().c_str();
+	_input.public_info = input.public_info().c_str();
+	_input.private_info = input.private_info().c_str();
+  
+#else   
+  if ((proc_info == NULL) || (proc_info_len == 0)) {
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x90, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+#endif 
+ 
+  memset(&pData, 0x00, sizeof(ProcInfo));
+  if (gRun_stat == 0) {
+    pthread_mutex_init(&mutex, NULL);
+
+  } else {
+    logger->error("the process has already registered!\n");
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+    
+    gRun_stat = 1;
+    shm_mm_wrapper_init(SHM_RES_SIZE);
+    
+#if defined(PRO_DE_SERIALIZE)
+    if (_input.proc_id != NULL) {
+      count = strlen(_input.proc_id) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.proc_id, _input.proc_id, min);
+    }
+
+    if (_input.name != NULL) {
+      count = strlen(_input.name) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.name, _input.name, min); 
+    }
+
+    if (_input.public_info != NULL) {
+      count = strlen(_input.public_info) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.public_info, _input.public_info, min);
+    }
+ 
+    if (_input.private_info != NULL) {
+      count = strlen(_input.private_info) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.private_info, _input.private_info, min);
+    }
+#else 
+    if (strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) > 0) {
+      count = strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.proc_id, ((ProcInfo *)proc_info)->proc_id, min);
+    }
+
+    if (strlen((const char *)(((ProcInfo *)proc_info)->name)) > 0) {
+      count = strlen((const char *)(((ProcInfo *)proc_info)->name)) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.name, ((ProcInfo *)proc_info)->name, min); 
+    }
+
+    if (strlen((const char *)(((ProcInfo *)proc_info)->public_info)) > 0) {
+      count = strlen((const char *)(((ProcInfo *)proc_info)->public_info)) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.public_info, ((ProcInfo *)proc_info)->public_info, min);
+    }
+ 
+    if (strlen((const char *)(((ProcInfo *)proc_info)->private_info)) > 0) {
+      count = strlen((const char *)(((ProcInfo *)proc_info)->private_info)) + 1;
+      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+      strncpy(pData.private_info, ((ProcInfo *)proc_info)->private_info, min);
+    }
+#endif 
+
+    gNetmod_socket = net_mod_socket_open();
+    hashtable_t *hashtable = mm_get_hashtable();
+    key = hashtable_alloc_key(hashtable);
+    net_mod_socket_bind(gNetmod_socket, key);
+  
+    rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    pthread_mutex_unlock(&mutex);
+
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+  }
+  
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+    ::bhome_msg::MsgCommonReply mcr;
+    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+    mcr.mutable_errmsg()->set_errstring(errString);
+    *reply_len = mcr.ByteSizeLong();
+    *reply = malloc(*reply_len);
+    mcr.SerializePartialToArray(*reply, *reply_len);
+#else 
+    min = strlen(errString) + 1;
+    buf = malloc(min) ;
+    memcpy(buf, errString, strlen(errString));
+    *((char *)buf + min - 1) = '\0';
+    
+    *reply = buf;
+    *reply_len = min;
+    
+#endif 
+  
+  if (rv == 0)
+    return true;
+
+  return false;
+}
+
+int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv;
+  int min;
+  void *buf = NULL;
+
+#if defined(PRO_DE_SERIALIZE)
+  struct _ProcInfo_proto
+	{
+		const char *proc_id;
+		const char *name;
+		const char *public_info;
+		const char *private_info;
+	}_input;
+  
+  ::bhome_msg::ProcInfo input;
+  
+	if(!input.ParseFromArray(proc_info, proc_info_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+  
+	_input.proc_id = input.proc_id().c_str();
+	_input.name = input.name().c_str();
+	_input.public_info = input.public_info().c_str();
+	_input.private_info = input.private_info().c_str();
+#endif 
+  
+  if (gRun_stat == 0) {
+    
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+    rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG);
+    if (rv == 0) {
+   
+      net_mod_socket_close(gNetmod_socket);
+      
+      gNetmod_socket = NULL;
+      
+      gRun_stat = 0;
+      
+    }
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    pthread_mutex_unlock(&mutex);
+
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+  
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+  ::bhome_msg::MsgCommonReply mcr;
+	mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+	mcr.mutable_errmsg()->set_errstring(errString);
+	*reply_len = mcr.ByteSizeLong();
+	*reply = malloc(*reply_len);
+	mcr.SerializePartialToArray(*reply, *reply_len);
+#else 
+  min = strlen(errString) + 1;
+  buf = malloc(min) ;
+  memcpy(buf, errString, strlen(errString));
+  *((char *)buf + min - 1) = '\0';
+  
+  *reply = buf;
+  *reply_len = min;
+#endif 
+  
+  if (rv == 0)
+    return true;
+
+  return false;
+}
+
+int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv; 
+  int min, i;
+  void *buf = NULL;
+  int total = 0;
+  int count = 0; 
+  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _MsgTopicList
+	{
+		int amount;
+		const char *topics[MAX_STR_LEN];
+	}_input;
+
+  ::bhome_msg::MsgTopicList input;
+	if(!input.ParseFromArray(topics, topics_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+
+	_input.amount = input.topic_list_size();
+  if (_input.amount > MAX_STR_LEN) {
+    _input.amount = MAX_STR_LEN;
+  }
+  
+	for(int i = 0; i < _input.amount; i++) {
+		_input.topics[i] = input.topic_list(i).c_str();
+  }
+#else 
+  if ((topics == NULL) || (topics_len == 0)) {
+    
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+#endif 
+
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+    total = sizeof(topics_buf) / sizeof(char);
+    for (i = 0; i < _input.amount; i++) {
+      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
+      if (min > 0) {
+        strncpy(topics_buf + count, _input.topics[i], min);
+        count += min;
+
+        if (total >= strlen(_input.topics[i])) {
+          total -= strlen(_input.topics[i]);
+        }
+
+        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
+          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
+          total -= 1;
+          count++;
+        }
+      } else {
+        topics_buf[strlen(topics_buf) - 1] = '\0'; 
+      }
+    }
+    
+    logger->debug("the parsed compound register topics: %s!\n", topics_buf);
+#else 
+    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
+#endif 
+
+    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    pthread_mutex_unlock(&mutex);
+  
+  } else {
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+  ::bhome_msg::MsgCommonReply mcr;
+	mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+	mcr.mutable_errmsg()->set_errstring(errString);
+	*reply_len = mcr.ByteSizeLong();
+	*reply = malloc(*reply_len);
+	mcr.SerializePartialToArray(*reply, *reply_len);
+#else 
+  min = strlen(errString) + 1;
+  buf = malloc(min) ;
+  memcpy(buf, errString, strlen(errString));
+  *((char *)buf + min - 1) = '\0';
+  
+  *reply = buf;
+  *reply_len = min;
+#endif 
+  
+  if (rv == 0)
+    return true;
+
+  return false;
+}
+
+int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv; 
+  int min;
+  void *buf = NULL;
+  int size;
+  char topics_buf[MAX_STR_LEN] = { 0x00 };
+  ProcInfo_query *ptr = NULL;
+  ProcInfo *Proc_ptr = NULL;
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _BHAddress
+	{
+		unsigned long long mq_id;
+		long long abs_addr;
+		const char *ip;
+		int port;
+	}_input0;
+  
+	const char *_input1;
+ 
+  ::bhome_msg::BHAddress input0;
+  ::bhome_msg::MsgQueryTopic input1;
+	if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len)) {
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+  
+  _input0.mq_id = input0.mq_id();
+	_input0.abs_addr = input0.abs_addr();
+	_input0.ip = input0.ip().c_str();
+	_input0.port = input0.port();
+	_input1 = input1.topic().c_str();
+  
+#else 
+  if ((topic == NULL) || (topic_len == 0)) {
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+#endif 
+  
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+
+#if defined(PRO_DE_SERIALIZE)
+    min = (strlen(_input1) > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : strlen(_input1));
+    strncpy(topics_buf, _input1, min);
+#else 
+    min = (topic_len > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : topic_len);
+    buf = const_cast<void *>(topic);
+    strncpy(topics_buf, (const char *)buf, min);
+#endif 
+    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS);
+   
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    pthread_mutex_unlock(&mutex);
+    
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+	
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+
+	struct _MsgQueryTopicReply
+	{
+		std::string proc_id;
+
+		unsigned long long mq_id;
+		long long abs_addr;
+		std::string ip;
+		int port;
+	}mtr_list[128];
+	int mtr_list_num = 0;
+
+  if (rv == 0) {
+    
+    ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
+    mtr_list_num = ptr->num;
+    
+    if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
+      mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
+    }
+    
+    for(int i = 0; i < mtr_list_num; i++) {
+      mtr_list[i].proc_id = ptr->procData.proc_id;
+      mtr_list[i].mq_id = 0x00;
+      mtr_list[i].abs_addr = 0x00;
+      mtr_list[i].ip = "192.168.1.1";
+      mtr_list[i].port = 5000;
+    }
+  }
+  
+  ::bhome_msg::MsgQueryTopicReply mtr;
+	mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+	mtr.mutable_errmsg()->set_errstring(errString);
+	for(int i = 0; i < mtr_list_num; i++)
+	{
+	  ::bhome_msg::MsgQueryTopicReply_BHNodeAddress *mtrb = mtr.add_node_address();
+		mtrb->set_proc_id(mtr_list[i].proc_id);
+		mtrb->mutable_addr()->set_mq_id(mtr_list[i].mq_id);
+		mtrb->mutable_addr()->set_abs_addr(mtr_list[i].abs_addr);
+		mtrb->mutable_addr()->set_ip(mtr_list[i].ip);
+		mtrb->mutable_addr()->set_port(mtr_list[i].port);
+	}
+  
+	*reply_len = mtr.ByteSizeLong();
+	*reply = malloc(*reply_len);
+	mtr.SerializePartialToArray(*reply, *reply_len);
+#else 
+  if (rv == 0) {
+    *reply = buf;
+    *reply_len = size;
+    
+  } else {
+    min = strlen(errString) + 1;
+    buf = malloc(min) ;
+    memcpy(buf, errString, strlen(errString));
+    *((char *)buf + min - 1) = '\0';
+    
+    *reply = buf;
+    *reply_len = min;
+  }
+  
+#endif 
+  
+  if (rv == 0)
+    return true;
+
+  return false;
+}
+
+int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv; 
+  void *buf = NULL;
+  int size;
+  int min;
+  ProcInfo_sum *Proc_ptr = NULL;
+  char data_buf[MAX_STR_LEN] = { 0x00 };
+
+#if defined(PRO_DE_SERIALIZE)
+  struct _BHAddress
+	{
+		unsigned long long mq_id;
+		long long abs_addr;
+		const char *ip;
+		int port;
+	}_input0;
+  
+	const char *_input1;
+  
+  ::bhome_msg::BHAddress input0;
+	::bhome_msg::MsgQueryProc input1;
+	if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+  
+	_input0.mq_id = input0.mq_id();
+	_input0.abs_addr = input0.abs_addr();
+	_input0.ip = input0.ip().c_str();
+	_input0.port = input0.port();
+	_input1 = input1.proc_id().c_str();
+#endif 
+  
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+  
+    if (query != NULL) {
+      strncpy(data_buf, (char *)query, (sizeof(data_buf) - 1) > query_len ? query_len : (sizeof(data_buf) - 1));
+    }
+    
+    rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
+    
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    pthread_mutex_unlock(&mutex);
+    
+  } else {
+  
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+  
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+  struct _MsgQueryProcReply
+	{
+		std::string proc_id;
+		std::string name;
+		std::string public_info;
+		std::string private_info;
+
+		bool online;
+
+		std::string topic_list[128];
+		int topic_list_num;
+    
+	} mpr_list[128];
+	int mpr_list_num = 0;
+  
+  if (rv == 0) {
+  
+    mpr_list_num = *(int *)buf;
+    
+    if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
+      mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
+    }
+    
+    Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
+    for(int i = 0; i < mpr_list_num; i++) {
+      mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
+      mpr_list[i].name = (Proc_ptr + i)->procData.name;
+      mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
+      mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
+      mpr_list[i].online = (Proc_ptr + i)->stat;
+      mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
+      
+      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+      {
+        if (j == 0) {
+          mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
+        } else if (j == 1) {
+          mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
+        } else if (j == 2) {
+          mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
+        }
+      }
+    }
+
+    ::bhome_msg::MsgQueryProcReply mpr;
+    mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+    mpr.mutable_errmsg()->set_errstring(errString);
+    
+    for(int i = 0; i < mpr_list_num; i++)
+    {
+      ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list();
+      mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id);
+      mpri->mutable_proc()->set_name(mpr_list[i].name);
+      mpri->mutable_proc()->set_public_info(mpr_list[i].public_info);
+      mpri->mutable_proc()->set_private_info(mpr_list[i].private_info);
+      mpri->set_online(mpr_list[i].online);
+      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+      {
+        mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]);
+      }
+    }
+    
+    *reply_len = mpr.ByteSizeLong();
+    *reply = malloc(*reply_len);
+    mpr.SerializePartialToArray(*reply,*reply_len);
+  }
+#else 
+  if (rv == 0) {
+    *reply = buf;
+    *reply_len = size;
+  } else {
+    min = strlen(errString) + 1;
+    buf = malloc(min) ;
+    memcpy(buf, errString, strlen(errString));
+    *((char *)buf + min - 1) = '\0';
+    
+    *reply = buf;
+    *reply_len = min;
+  }
+#endif 
+
+	if (rv == 0)
+    return true;
+
+  return false;
+
+}
+
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv; 
+  int sec, nsec;
+  int total = 0;
+  int count = 0;
+  int min, i;
+  void *buf = NULL;
+  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _MsgTopicList
+	{
+		int amount;
+		const char *topics[MAX_STR_LEN];
+	}_input;
+  
+  ::bhome_msg::MsgTopicList input;
+	if(!input.ParseFromArray(topics, topics_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+  
+  _input.amount = input.topic_list_size();
+
+  if (_input.amount > MAX_STR_LEN) {
+    _input.amount = MAX_STR_LEN;
+  }
+  
+	for(int i = 0; i < _input.amount; i++)
+		_input.topics[i] = input.topic_list(i).c_str();
+
+#else 
+  if ((topics == NULL) || (topics_len == 0)) {
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		goto exit_entry;
+  }
+#endif 
+
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    goto exit_entry;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+    total = sizeof(topics_buf) / sizeof(char);
+    for (i = 0; i < _input.amount; i++) {
+      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
+      if (min > 0) {
+        strncpy(topics_buf + count, _input.topics[i], min);
+        count += min;
+
+        if (total >= strlen(_input.topics[i])) {
+          total -= strlen(_input.topics[i]);
+        }
+
+        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
+          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
+          total -= 1;
+          count++;
+        }
+      } else {
+        topics_buf[strlen(topics_buf) - 1] = '\0'; 
+      }   
+    }
+    logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
+#else 
+    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
+#endif 
+
+    if (timeout_ms > 0) {
+    
+      sec = timeout_ms / 1000;
+      nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+      rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec); 
+
+    } else if (timeout_ms == 0) {
+    
+      rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
+    
+    } else {
+    
+      rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
+    
+    }
+   
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+   
+    pthread_mutex_unlock(&mutex);
+
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+
+exit_entry:  
+#if defined(PRO_DE_SERIALIZE)
+  ::bhome_msg::MsgCommonReply mcr;
+	mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+	mcr.mutable_errmsg()->set_errstring(errString);
+	*reply_len=mcr.ByteSizeLong();
+	*reply=malloc(*reply_len);
+	mcr.SerializePartialToArray(*reply,*reply_len);
+#else 
+  min = strlen(errString) + 1;
+  buf = malloc(min) ;
+  memcpy(buf, errString, strlen(errString));
+  *((char *)buf + min - 1) = '\0';
+  
+  *reply = buf;
+  *reply_len = min;
+#endif 
+  
+  if (rv == 0)
+    return true;
+
+  return false;
+
+}
+
+int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv = BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms);
+
+  return rv;
+}
+
+int BHHeartbeatEasy(const int timeout_ms)
+{
+
+  return true;
+}
+
+int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv;
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _ProcInfo_proto
+	{
+		const char *proc_id;
+		const char *name;
+		const char *public_info;
+		const char *private_info;
+	}_input;
+  
+  ::bhome_msg::ProcInfo input;
+	if(!input.ParseFromArray(proc_info,proc_info_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		return false;
+  }
+  
+	_input.proc_id = input.proc_id().c_str();
+	_input.name = input.name().c_str();
+	_input.public_info = input.public_info().c_str();
+	_input.private_info = input.private_info().c_str();
+
+  rv = 0;
+  memset(errString, 0x00, sizeof(errString));
+  strncpy(errString, bus_strerror(rv), sizeof(errString));
+  
+  ::bhome_msg::MsgCommonReply mcr;
+	mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+	mcr.mutable_errmsg()->set_errstring(errString);
+	*reply_len=mcr.ByteSizeLong();
+	*reply=malloc(*reply_len);
+	mcr.SerializePartialToArray(*reply,*reply_len);
+#endif 
+
+  return true;
+}
+
+#if defined(PRO_DE_SERIALIZE)
+int BHPublish(const char *msgpub, const char msgpub_len, const int timeout_ms)
+#else
+int BHPublish(const char *topic, const char *content, const int timeout_ms)
+#endif
+{
+  int rv; 
+  int min;
+  void *buf = NULL;
+  net_node_t node_arr;
+  int node_arr_len = 0;
+
+#if defined(PRO_DE_SERIALIZE)
+  struct _MsgPublish
+	{
+		const char *topic;
+		const char *data;
+	}_input;
+
+  ::bhome_msg::MsgPublish input;
+	if(!input.ParseFromArray(msgpub, msgpub_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		return false;
+  }
+  
+	_input.topic = input.topic().c_str();
+	_input.data = input.data().c_str();
+#else 
+  if ((topic == NULL) || (content == NULL)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		return false;
+  }
+#endif 
+  
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    return false;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {  
+#if defined(PRO_DE_SERIALIZE)
+    if (timeout_ms > 0) {
+      rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data), timeout_ms);
+    } else if (timeout_ms == 0) {
+      rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data)); 
+      
+    } else {
+      
+      rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
+    }
+#else 
+    if (timeout_ms > 0) {
+      rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content), timeout_ms);   
+      
+    } else if (timeout_ms == 0) {
+      rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
+    
+    } else {
+      rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
+    }
+#endif 
+
+    pthread_mutex_unlock(&mutex);
+
+    if (rv > 0)
+      return true;
+    
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+
+  return false;
+}
+
+int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms)
+{
+  int rv;
+  int len;
+  void *buf;
+  int key;
+  int size;
+  int sec, nsec;
+  char topics_buf[MAX_STR_LEN] = { 0x00 };
+  char data_buf[MAX_STR_LEN * 3] = { 0x00 }; 
+
+  struct _ReadSubReply
+  {
+    std::string proc_id;
+    std::string topic;
+    std::string data;
+  } rsr;
+
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    return false;
+  }
+
+  if (timeout_ms > 0) {
+    sec = timeout_ms / 1000;
+    nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+
+    rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
+
+  } else if (timeout_ms == 0) {
+
+    rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
+  
+  } else {
+
+    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
+  }
+
+  if (rv == 0) {
+ 
+    len = strlen((char *)buf);
+    if (len > size) {
+      len = size;
+    }
+    strncpy(topics_buf, (char *)buf, len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : len);
+    
+    if (len < size) {
+      len = strlen(topics_buf) + 1;
+
+      strncpy(data_buf, (char *)buf + len, size - len);
+    }
+
+    free(buf);
+
+#if defined(PRO_DE_SERIALIZE)
+    rsr.topic = topics_buf;
+    rsr.data = data_buf;
+
+    memset(topics_buf, 0x00, sizeof(topics_buf));
+    sprintf(topics_buf, "%d", key);
+
+    rsr.proc_id = topics_buf;
+    *proc_id_len = rsr.proc_id.size();
+    *proc_id = malloc(*proc_id_len);
+    memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len);
+
+    ::bhome_msg::MsgPublish Mp; 
+    Mp.set_topic(rsr.topic);
+    Mp.set_data(rsr.data.data());
+    *msgpub_len = Mp.ByteSizeLong();
+    *msgpub = malloc(*msgpub_len);
+    Mp.SerializePartialToArray(*msgpub, *msgpub_len);
+#else 
+    void *ptr;
+    if (len < size) {
+      ptr = malloc(size - len);
+      len = size - len;
+      memcpy(ptr, data_buf, len);
+    } else {
+      ptr = malloc(len);
+      memcpy(ptr, topics_buf, len);
+    }
+    *msgpub = ptr;
+    *msgpub_len = len;
+    
+    memset(topics_buf, 0x00, sizeof(topics_buf));
+    sprintf(topics_buf, "%d", key);
+    
+    *proc_id_len = strlen(topics_buf);
+    *proc_id = malloc(*proc_id_len);
+    memcpy(*proc_id, topics_buf, *proc_id_len);
+    
+#endif 
+
+    pthread_mutex_unlock(&mutex);
+
+  } else {
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+
+  if (rv == 0)
+    return true;
+
+  return false; 
+}
+
+int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len)
+{
+  int rv;
+  void *buf;
+  int size;
+  int val;
+  int len;
+  int min;
+  int sec, nsec;
+  std::string MsgID;
+  int timeout_ms = 3000;
+  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _BHAddress
+	{
+		unsigned long long mq_id;
+		long long abs_addr;
+		const char *ip;
+		int port;
+	}_input0;
+  
+  struct MsgRequestTopic
+	{
+		const char *topic;
+		const char *data;
+	}_input1;
+  
+  ::bhome_msg::BHAddress input0;
+	::bhome_msg::MsgRequestTopic input1;
+	if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  
+		return false;
+  }
+	
+	_input0.mq_id = input0.mq_id();
+	_input0.abs_addr = input0.abs_addr();
+	_input0.ip = input0.ip().c_str();
+	_input0.port = input0.port();
+	_input1.topic = input1.topic().c_str();
+	_input1.data = input1.data().c_str();
+
+#else 
+  if ((request == NULL) || (request_len == 0)) {
+
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  
+		return false;
+  }
+#endif 
+  
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    return false;
+  }
+ 
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+    strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1));
+#else 
+    strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1));
+#endif 
+    
+    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
+    if (rv == 0) {
+
+      val = atoi((char *)buf);
+
+      free(buf);
+
+      if (val > 0) {
+
+        len = strlen(topics_buf);
+#if defined(PRO_DE_SERIALIZE)
+        min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
+        strncpy(topics_buf + len + 1, _input1.data, min);
+        len += (min + 1);
+#endif
+        if (timeout_ms > 0) {
+
+          sec = timeout_ms / 1000;
+          nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+          
+          rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec); 
+          
+        } else if (timeout_ms == 0) {
+
+          rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val); 
+
+        } else {
+
+          rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); 
+        }            
+      } else {
+
+        rv = EBUS_RES_UNSUPPORT; 
+        
+      }
+    }
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    pthread_mutex_unlock(&mutex);
+
+    if((msg_id == NULL) || (msg_id_len == NULL)) { 
+      if (rv == 0)
+        return true;
+      
+      return false;
+    }
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+  
+  if (rv == 0) {
+    memset(topics_buf, 0x00, sizeof(topics_buf));
+    sprintf(topics_buf, "%d", val);
+    MsgID = topics_buf;
+
+    *msg_id_len = MsgID.size();
+	  *msg_id = malloc(*msg_id_len);
+	  memcpy(*msg_id, MsgID.data(), *msg_id_len);
+    
+    return true;
+  }
+	
+  return false;
+
+}
+
+int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len,
+              void **reply, int *reply_len, const int timeout_ms)
+{
+  int rv;
+  void *buf;
+  int size;
+  int val;
+  int min, len;
+  net_node_t node;
+  int node_size;  
+  int recv_arr_size;
+  net_mod_recv_msg_t *recv_arr;
+  net_mod_err_t *errarr;
+  int errarr_size = 0;
+  int sec, nsec;
+  char topics_buf[MAX_STR_LEN] = { 0x00 };
+  
+#if defined(PRO_DE_SERIALIZE)
+  struct _BHAddress
+	{
+		unsigned long long mq_id;
+		long long abs_addr;
+		const char *ip;
+		int port;
+	}_input0;
+  
+	struct _MsgRequestTopic
+	{
+		const char *topic;
+		const char *data;
+	}_input1;
+  
+  ::bhome_msg::BHAddress input0;
+	::bhome_msg::MsgRequestTopic input1;
+	if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		return false;
+  }
+  
+  _input0.mq_id = input0.mq_id();
+	_input0.abs_addr = input0.abs_addr();
+	_input0.ip = input0.ip().c_str();
+	_input0.port = input0.port();
+	_input1.topic = input1.topic().c_str();
+	_input1.data = input1.data().c_str();
+  
+#else 
+  if ((request == NULL) || (request_len == 0)) {
+
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+		return false;
+  }
+#endif 
+  
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    return false;
+  }
+  
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+    strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic)  : (sizeof(topics_buf) - 1));
+#else 
+    strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1));
+#endif 
+    
+    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
+    if (rv == 0) {
+      val = atoi((char *)buf);
+
+      free(buf);
+
+      if (val > 0) {
+        memset(&node, 0x00, sizeof(node));
+
+        len = strlen(topics_buf);
+#if defined(PRO_DE_SERIALIZE)
+        min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
+        strncpy(topics_buf + len + 1, _input1.data, min);
+        len += (min + 1);
+#endif
+
+        node.key = val;
+        rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size);
+        if (rv > 0) {
+          if (recv_arr_size > 0) {
+
+            node.key = recv_arr[0].key;
+            
+            memset(topics_buf, 0x00, sizeof(topics_buf));
+            size = recv_arr[0].content_length;
+            buf = (char *)malloc(size);
+            strncpy((char *)buf, (char *)recv_arr[0].content, size);
+#if !defined(PRO_DE_SERIALIZE)
+            *reply = buf;
+            *reply_len = size;
+#endif 
+          }
+
+          net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+
+          if(errarr_size > 0) {
+            free(errarr);
+          }
+
+          rv = 0;
+
+        } else {
+          rv = EBUS_TIMEOUT; 
+        }
+
+      } else {
+        rv = EBUS_RES_UNSUPPORT; 
+      }
+    }
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    struct _RequestReply
+    {
+      std::string proc_id;
+      std::string data;
+    }rr;
+
+    if (rv == 0) {
+      memset(topics_buf, 0x00, sizeof(topics_buf));
+      sprintf(topics_buf, "%d", node.key);
+
+      rr.proc_id = topics_buf;
+      *proc_id_len = rr.proc_id.size();
+      *proc_id = malloc(*proc_id_len);
+      memcpy(*proc_id, rr.proc_id.data(), *proc_id_len);
+
+      memset(topics_buf, 0x00, sizeof(topics_buf));
+      memcpy(topics_buf, buf, size);
+      rr.data = topics_buf;
+   
+#if defined(PRO_DE_SERIALIZE)   
+      ::bhome_msg::MsgRequestTopicReply mrt;
+      mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+      mrt.mutable_errmsg()->set_errstring(errString);
+      mrt.set_data(rr.data.data());
+      *reply_len = mrt.ByteSizeLong();
+      *reply = malloc(*reply_len);
+      mrt.SerializePartialToArray(*reply, *reply_len);
+#endif 
+    }
+    
+    pthread_mutex_unlock(&mutex);
+
+  } else {
+  
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+
+   if (rv == 0)
+    return true;
+
+  return false; 
+}
+
+int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms)
+{
+  int rv;
+  void *buf;
+  int key;
+  int size;
+  int sec, nsec;
+  char topics_buf[MAX_STR_LEN] = { 0x00 };
+  
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    return false;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+    if (timeout_ms > 0) {
+
+      sec = timeout_ms / 1000;
+      nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+
+      rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
+
+    } else if (timeout_ms == 0) {
+
+      rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
+    
+    } else {
+
+      rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
+    }
+
+    if (rv == 0) {
+      struct _ReadRequestReply
+      {
+        std::string proc_id;
+        std::string topic;
+        std::string data;
+        void *src;
+      } rrr;
+
+      sprintf(topics_buf, "%d", key);
+      rrr.proc_id = topics_buf;
+
+      *proc_id_len = rrr.proc_id.size();
+      *proc_id = malloc(*proc_id_len);
+      memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len);
+
+      memset(topics_buf, 0x00, sizeof(topics_buf));
+      memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size);
+      rrr.topic = topics_buf;
+      rrr.data = topics_buf;
+
+#if defined(PRO_DE_SERIALIZE)
+      ::bhome_msg::MsgRequestTopic mrt;
+      mrt.set_topic(rrr.topic);
+      mrt.set_data(rrr.data.data());
+      *request_len = mrt.ByteSizeLong();
+      *request = malloc(*request_len);
+      mrt.SerializePartialToArray(*request,*request_len);
+#else 
+      *request = buf;
+      *request_len = size;
+#endif 
+
+      buf = malloc(sizeof(int));
+      *(int *)buf = key;
+      *src = buf;
+    }
+
+    pthread_mutex_unlock(&mutex);
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+  } else {
+ 
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+	
+  if (rv == 0)
+    return true;
+  
+  return false;
+}
+
+int BHSendReply(void *src, const void *reply, const int reply_len)
+{
+  int rv;
+
+#if defined(PRO_DE_SERIALIZE)
+  ::bhome_msg::MsgRequestTopicReply input;
+  if (!input.ParseFromArray(reply, reply_len)) {
+    
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    return false;
+  }
+  
+  const char *_input;
+  _input = input.data().data();
+  
+#else 
+  if ((src == NULL) || (reply == NULL) || (reply_len == 0)) {
+
+    rv = EBUS_INVALID_PARA;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+    
+    return false;
+  }
+#endif 
+
+  if (gRun_stat == 0) {
+    logger->error("the process has not been registered yet!\n");
+
+    rv = EBUS_RES_NO;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+    return false;
+  }
+
+  rv = pthread_mutex_trylock(&mutex);
+  if (rv == 0) {
+
+    rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src);
+
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+	
+    pthread_mutex_unlock(&mutex);
+  } else {
+
+    rv = EBUS_RES_BUSY;
+    memset(errString, 0x00, sizeof(errString));
+    strncpy(errString, bus_strerror(rv), sizeof(errString));
+  }
+  
+  if (rv == 0)
+    return true;
+	
+  return false;
+}
+
+int BHCleanup() {
+
+  return true;
+}
+
+void BHFree(void *buf, int size) {
+  free(buf);
+}
+
+int BHGetLastError(void **msg, int *msg_len)
+{
+  void *buf = NULL;
+
+  buf = malloc(strlen(errString) + 1);
+
+  memset(buf, 0x00, strlen(errString) + 1);
+  memcpy(buf, errString, strlen(errString));
+
+  if ((msg != NULL) && (msg_len != NULL)) {
+    *msg = buf;
+    *msg_len = strlen(errString);
+
+    return true;
+  }
+
+  return false;
+
+}
diff --git a/src/bh_api.h b/src/bh_api.h
index 75a9c17..40d9ffa 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -1,9 +1,11 @@
-#ifndef BH_API
-#define BH_API
+#ifndef _BH_API_WRAPPER_
+#define _BH_API_WRAPPER_
 
 #ifdef __cplusplus
 extern "C" {
 #endif
+
+#define PRO_DE_SERIALIZE    1
 
 int BHRegister(const void *proc_info,
                const int proc_info_len,
@@ -17,15 +19,19 @@
                  int *reply_len,
                  const int timeout_ms);
 
+
 int BHRegisterTopics(const void *topics,
                      const int topics_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
 
-int BHQueryTopicAddress(const void *remote, const int remote_len,
-                        const void *topic, const int topic_len,
-                        void **reply, int *reply_len,
+int BHQueryTopicAddress(const void *remote,
+                        const int remote_len,
+                        const void *topics,
+                        const int topics_len,
+                        void **reply,
+                        int *reply_len,
                         const int timeout_ms);
 
 int BHQueryProcs(const void *remote,
@@ -41,6 +47,7 @@
                       void **reply,
                       int *reply_len,
                       const int timeout_ms);
+
 int BHSubscribeNetTopics(const void *topics,
                          const int topics_len,
                          void **reply,
@@ -54,9 +61,13 @@
                 int *reply_len,
                 const int timeout_ms);
 
+#if defined(PRO_DE_SERIALIZE)
 int BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms);
+#else
+int BHPublish(const char *topic, const char *content, const int timeout_ms);
+#endif
 
 int BHReadSub(void **proc_id,
               int *proc_id_len,
@@ -96,7 +107,12 @@
 
 void BHFree(void *buf, int size);
 
+int BHGetLastError(void **msg, int *msg_len);
+
 #ifdef __cplusplus
 }
 #endif
-#endif
+#endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */
+
+
+
diff --git a/src/bus_def.h b/src/bus_def.h
index 78a7eb9..9634883 100644
--- a/src/bus_def.h
+++ b/src/bus_def.h
@@ -4,4 +4,11 @@
 #define BUS_TIMEOUT_FLAG  1
 #define BUS_NOWAIT_FLAG  1 << 1
 
+#define SHM_RES_SIZE        512
+
+#define SHM_BUS_PROC_MAP_KEY    10
+#define SHM_BUS_PROC_TCS_MAP_KEY    11
+#define SHM_BUS_TCS_MAP_KEY    20
+#define SHM_BUS_PROC_PART_MAP_KEY   30
+
 #endif
\ No newline at end of file
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index 913a771..49244cd 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -20,7 +20,11 @@
   "Send to self error",
   "Receive from wrong end",
   "Service stoped",
-  "Exceed resource limit"
+  "Exceed resource limit",
+  "Service not supported",
+  "Resource busy",
+  "Resource not provide",
+  "Invalid parameters"
 
 };
 
@@ -50,6 +54,10 @@
   char *buf;
   /* Make first caller allocate key for thread-specific data */
 
+  if (err == 0) {
+    err = EBUS_BASE;
+  }
+
   s = pthread_once(&once, createKey);
   if (s != 0)
     err_exit(s, "pthread_once");
diff --git a/src/bus_error.h b/src/bus_error.h
index 769aa36..84f2e89 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -13,6 +13,10 @@
 #define EBUS_RECVFROM_WRONG_END 506
 #define EBUS_STOPED 507
 #define EBUS_EXCEED_LIMIT 508
+#define EBUS_RES_UNSUPPORT  509
+#define EBUS_RES_BUSY  510
+#define EBUS_RES_NO  511
+#define EBUS_INVALID_PARA  512
 
 extern int bus_errno;
 
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
new file mode 100644
index 0000000..a04edad
--- /dev/null
+++ b/src/bus_proxy_start.cpp
@@ -0,0 +1,127 @@
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include <signal.h>
+#include <limits.h>
+#include <stdio.h>
+#include <errno.h>
+#include <getopt.h>
+#include <stdlib.h>
+
+#define SVR_PORT            5000
+
+#define TOTAL_THREADS       2
+
+static void *gBusServer_socket = NULL;
+static void *gServer_socket = NULL;
+
+static int gShm_size = -1;
+static int gPort = -1;
+
+static int gBusServer_act = 0;
+static int gBusServer_stat = 0;
+
+pthread_t tids[2];
+void *res[2];
+
+void *bus_start(void *skptr) {
+
+  gBusServer_act = 1;
+  gBusServer_socket = bus_server_socket_wrapper_open();
+  if (bus_server_socket_wrapper_start_bus(gBusServer_socket) != 0) {
+    printf("start bus failed\n");
+    gBusServer_stat = -1;
+  }
+
+  return NULL;
+}
+
+
+
+
+void *svr_start(void *skptr) {
+  int port = *(int *)skptr;
+
+  gServer_socket  = net_mod_server_socket_open(port);
+  if(net_mod_server_socket_start(gServer_socket) != 0) {
+    printf("start net mod server failed\n");
+  }
+
+  return NULL;
+}
+
+int main(int argc, char *argv[])
+{
+  char *endptr;
+  char i;
+  int val;
+
+  sigset_t mask_all, pre;
+  sigfillset(&mask_all);
+
+  sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
+  if (argc >= 4) {
+    fprintf(stderr, "Usage: %s [shm size] [server port]\n", argv[0]);
+    exit(0);
+  };
+
+  if (argc >= 2) {
+    argc -= 2;
+    for (i = 0; i <= argc; i++) {
+      errno = 0;
+      val = strtol(argv[i + 1], &endptr, 10);
+      if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN))
+             || (errno != 0 && val == 0)) {
+        fprintf(stderr, "invalid parameter: %s\n", argv[i + 1]);
+        exit(0);
+      }
+
+      if (endptr == argv[i + 1]) {
+        fprintf(stderr, "invalid parameter %s: No digits were found\n", argv[i + 1]);
+        exit(0);
+      }
+
+      if (i == 0) {
+        gShm_size = val;
+      } else {
+        gPort = val;
+      }
+    }
+  }
+
+  if (gShm_size == -1) {
+    gShm_size = SHM_RES_SIZE;
+  }
+  shm_mm_wrapper_init(SHM_RES_SIZE);
+
+  pthread_create(&tids[0], NULL, bus_start, NULL);
+
+  if (gPort == -1) {
+    gPort = SVR_PORT;
+  }
+
+  while(gBusServer_act == 0) {
+    sleep(1);
+  }
+
+  if (gBusServer_stat >= 0) { 
+    pthread_create(&tids[1], NULL, svr_start, (void *)&gPort);
+  }
+
+  for (i = 0; i< TOTAL_THREADS; i++) {
+    if(pthread_join(tids[i], &res[i]) != 0) {
+      perror("bus_proxy pthread_join");
+    }
+  }
+
+  bus_server_socket_wrapper_close(gBusServer_socket);
+  net_mod_socket_close(gServer_socket);
+  shm_mm_wrapper_destroy();
+
+  return 0;
+}
+
diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index 10b0aac..2f733f8 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -181,7 +181,7 @@
     }
 
     if( ret != 0) {
-      logger->error("杞彂澶辫触 : NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key,  bus_strerror(ret));
+      logger->error("fail: NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key,  bus_strerror(ret));
       // 杞彂澶辫触
       response_head.code = ret;
       response_head.content_length = 0;
@@ -277,7 +277,6 @@
         FD_CLR(connfd, &pool.read_set); 
         pool.clientfd[i] = -1;
         logger->debug("===server close client %d\n", connfd);
-    // printf("===server close client %d\n", connfd);
       }
 
     }
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 6a541d6..ab065eb 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -22,7 +22,7 @@
 
 
 NetModSocket::~NetModSocket() {
- 
+
 }
 
 
@@ -46,6 +46,15 @@
   return shmModSocket.force_bind(key);
 }
 
+int NetModSocket::bind_proc_id(char *buf, int len) {
+  return shmModSocket.bind_proc_id(buf, len);
+}
+
+int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
+  
+  return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
+}
+
 // int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
 //   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
 //   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
@@ -67,7 +76,7 @@
 
   NetConnPool *mpool = (NetConnPool *)_pool;
   delete mpool;
-  logger->debug("destory connPool");
+
 }
 
  /* One-time key creation function */
@@ -343,9 +352,6 @@
   return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
 }
 
-
-// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
-
 int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
  int content_size, int  msec) {
   int i, connfd;
@@ -363,7 +369,7 @@
   net_mod_err_t err_msg;
 
   // 鏈湴鍙戦��
-  if(node_arr == NULL || arrlen == 0) {
+  if ((node_arr == NULL) || (arrlen == 0)) {
     if(msec == 0) {
       ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
     } else if(msec > 0) {
@@ -525,7 +531,6 @@
 int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
   return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
 }
-
 
 int NetModSocket::recvandsend(recvandsend_callback_fn callback,
                               const struct timespec *timeout , int flag, void * user_data ) {
@@ -764,23 +769,23 @@
 
   head.mod = ntohl(GET(tmp_ptr));
 
-  tmp_ptr += 4;
+  tmp_ptr += sizeof(uint32_t);
   memcpy(head.host, tmp_ptr, sizeof(head.host));
 
  
   tmp_ptr += sizeof(head.host);
   head.port = ntohl(GET(tmp_ptr));
 
-  tmp_ptr += 4;
+  tmp_ptr += sizeof(uint32_t);
   head.key = ntohl(GET(tmp_ptr));
 
-  tmp_ptr += 4;
+  tmp_ptr += sizeof(uint32_t);
   head.content_length = ntohl(GET(tmp_ptr));
 
-  tmp_ptr += 4;
+  tmp_ptr += sizeof(uint32_t);
   head.topic_length = ntohl(GET(tmp_ptr));
 
-  tmp_ptr += 4;
+  tmp_ptr += sizeof(uint32_t);
   head.timeout = ntohl(GET_INT32(tmp_ptr));
  
   return head;
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index 6289fa6..d8e53ae 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -3,6 +3,7 @@
 #include "usg_common.h"
 #include "shm_mod_socket.h"
 #include "socket_io.h"
+#include "proc_def.h"
 #include <poll.h>
 #include "socket_def.h"
 #include "net_conn_pool.h"
@@ -17,7 +18,7 @@
 	int key;
 };
 
-#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 6 * sizeof(uint32_t))
+#define NET_MODE_REQUEST_HEAD_LENGTH sizeof(net_mod_request_head_t)
 
  
 // 璇锋眰澶�
@@ -118,8 +119,8 @@
   */
   int force_bind( int key);
 
- 
-
+  int bind_proc_id(char *buf, int len);
+  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
   
   /**
    * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
@@ -166,7 +167,6 @@
   // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
   int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec);
   int recvfrom_nowait( void **buf, int *size, int *key);
-
   /**
    * 鏈湴鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
    * @key 鍙戦�佺粰璋�
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index abdcbb7..ab4d59d 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -44,6 +44,14 @@
 	// return sockt->bind(key);
 }
 
+int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
+{
+  NetModSocket *sockt = (NetModSocket *)_socket;
+
+  return sockt->reg(pData, len, buf, size, timeout_ms, flag);
+  
+}
+
 /**
  * 鍙戦�佷俊鎭�
  * @key 鍙戦�佺粰璋�
@@ -51,20 +59,17 @@
  */
 int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
 	return sockt->sendto(buf, size, key);
 }
 // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
 int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
 	return sockt->sendto_timeout(buf, size, key, sec, nsec);
 	// return sockt->sendto(buf, size, key);
 }
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
 int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
 	return sockt->sendto_nowait(buf, size, key);
 }
 
@@ -77,22 +82,19 @@
 	int rv;
 	NetModSocket *sockt = (NetModSocket *)_socket;
 
-	logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
 	rv = sockt->recvfrom(buf, size, key);
-	logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
 	return rv;
 }
 
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
-	NetModSocket *sockt = (NetModSocket *)_socket;
-	// return sockt->recvfrom(buf, size, key);
-	return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
 }
 
 int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
-	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->recvfrom_nowait(buf, size, key);
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  return sockt->recvfrom_nowait(buf, size, key);
 }
 
 int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -101,6 +103,11 @@
 	return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
 }
 
+int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  return sockt->bind_proc_id(proc_id, len);
+}
+
 /**
  * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
  * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉�
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index b4941c0..b869510 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -12,6 +12,7 @@
 #define __NET_MOD_SOCKET_H__
 
 #include "net_mod_socket.h"
+#include "proc_def.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -55,6 +56,8 @@
  */
 int net_mod_socket_force_bind(void * _socket, int key);
 
+int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
+
 /**
  * @brief 鍙戦�佷俊鎭�,鍙戦�佸畬鎴愭墠杩斿洖
  *
diff --git a/src/proc_def.h b/src/proc_def.h
new file mode 100644
index 0000000..cb4dc0a
--- /dev/null
+++ b/src/proc_def.h
@@ -0,0 +1,70 @@
+#ifndef __PROC_DEF_
+#define __PROC_DEF_
+
+#ifdef __cplusplus
+extern "C" {
+#endif 
+
+#define MAX_STR_LEN     128 //keep the same with serializer in proc check
+#define MIN_STR_LEN      10
+#define MAX_PROC_NUM    128
+#define MAX_TOPICS_NUN  60
+
+#define PROC_REG        1
+#define PROC_UNREG      2
+#define PROC_REG_TCS    3
+#define PROC_QUE_TCS    4
+#define PROC_QUE_STCS   5
+#define PROC_QUE_ATCS   6
+
+#define STR_MAGIC       ","
+
+typedef struct _ProcInfo {
+#if 0
+  char ServerID[MAX_STR_LEN];  // 鏈哄櫒ID
+  char BoardID[MAX_STR_LEN]; // 鏉垮崱ID
+  char ServerIP[MAX_STR_LEN];  // 鏈哄櫒IP
+  char ProcID[MAX_STR_LEN]; // 杩涚▼鍞竴鏍囪瘑
+  char ProcName[MAX_STR_LEN];  // 杩涚▼鍚嶇О
+  char ProcLabel[MAX_STR_LEN];  // 杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋�
+#else
+  char proc_id[MAX_STR_LEN];
+  char name[MAX_STR_LEN];
+  char public_info[MAX_STR_LEN]; 
+  char private_info[MAX_STR_LEN];
+#endif
+} ProcInfo;
+
+typedef struct _ProcInfo_sum {
+  
+  ProcInfo procData;
+
+  int stat;
+  char reg_info[MAX_STR_LEN];
+  char local_info[MAX_STR_LEN];
+  char net_info[MAX_STR_LEN];
+
+  int list_num;
+
+} ProcInfo_sum;
+
+typedef struct _ProcInfo_query {
+  
+  char name[MAX_STR_LEN];
+
+  int num;
+
+  ProcInfo procData;
+
+} ProcInfo_query;
+
+#ifdef __cplusplus
+}
+#endif
+
+
+#endif  //end of file
+
+
+
+
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index f536208..ada70c6 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -15,7 +15,7 @@
 #include "bus_def.h"
 
 // default Queue size
-#define LOCK_FREE_Q_DEFAULT_SIZE 16
+#define LOCK_FREE_Q_DEFAULT_SIZE 320
 
 
 #define LOCK_FREE_Q_ST_OPENED 0
@@ -177,6 +177,7 @@
   typename Allocator,
   template<typename T, typename AT> class Q_TYPE>
 LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
+  //std::cout << "LockFreeQueue init reference=" << reference << std::endl;
   if (sem_init(&slots, 1, qsize) == -1)
     err_exit(errno, "LockFreeQueue sem_init");
   if (sem_init(&items, 1, 0) == -1)
@@ -211,6 +212,7 @@
   typename Allocator,
   template<typename T, typename AT> class Q_TYPE>
 LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
+  // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
   if (sem_destroy(&slots) == -1) {
     err_exit(errno, "LockFreeQueue sem_destroy");
   }
@@ -249,10 +251,10 @@
   typename Allocator,
   template<typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
-  // sigset_t mask_all, pre;
-  // sigfillset(&mask_all);
+  sigset_t mask_all, pre;
+  sigfillset(&mask_all);
 
-  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+  sigprocmask(SIG_BLOCK, &mask_all, &pre);
 
   if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
     if (psem_trywait(&slots) == -1) {
@@ -271,12 +273,12 @@
 
   if (m_qImpl.push(a_data)) {
     psem_post(&items);
-    // sigprocmask(SIG_SETMASK, &pre, NULL);
+    sigprocmask(SIG_SETMASK, &pre, NULL);
     return 0;
   }
 
   LABEL_FAILTURE:
-  // sigprocmask(SIG_SETMASK, &pre, NULL);
+  sigprocmask(SIG_SETMASK, &pre, NULL);
   return errno;
 }
 
@@ -285,10 +287,10 @@
   template<typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
 
-  // sigset_t mask_all, pre;
-  // sigfillset(&mask_all);
+  sigset_t mask_all, pre;
+  sigfillset(&mask_all);
 
-  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+  sigprocmask(SIG_BLOCK, &mask_all, &pre);
 
   if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
     if (psem_trywait(&items) == -1) {
@@ -306,13 +308,13 @@
 
   if (m_qImpl.pop(a_data)) {
     psem_post(&slots);
-    // sigprocmask(SIG_SETMASK, &pre, NULL);
+    sigprocmask(SIG_SETMASK, &pre, NULL);
     return 0;
   }
 
 
   LABEL_FAILTURE:
-  // sigprocmask(SIG_SETMASK, &pre, NULL);
+  sigprocmask(SIG_SETMASK, &pre, NULL);
   return errno;
 }
 
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 74b9b33..f5d64db 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -40,8 +40,8 @@
   bool full();
   bool empty();
 
-  int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
-  int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
+  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
 
   ELEM_T &operator[](unsigned i);
 
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 6c3cd27..7bb6eac 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -5,7 +5,7 @@
 #include <functional>
 #include <set>
 
-#define MAPSIZE 1024
+#define MAPSIZE 4096
 
 // 鍒涘缓Queue鏁伴噺鐨勪笂闄�
 #define QUEUE_COUNT_LIMIT 300
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index f23fa08..e4ef672 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -256,6 +256,7 @@
     first = false;
     shmid  = shmget(SHM_KEY, 0, 0);
   }
+
   if (shmid == -1)
     err_exit(errno, "mm_init shmget");
   shmp = shmat(shmid, key_addr, 0);
@@ -338,8 +339,6 @@
       else 
          LoggerFactory::getLogger()->debug("shared memory destroy\n");
 
-      LoggerFactory::getLogger()->debug( "mm_destroy: real destroy.");
-
       SemUtil::inc(mutex);
       SemUtil::remove(mutex);
       return true;
@@ -363,6 +362,7 @@
 
 void mm_free_by_key(int key) {
   void *ptr;
+
   ptr = hashtable_get(hashtable, key);
   if(ptr != NULL) {
     mm_free(ptr);
diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h
index 5339d8a..0ca0d08 100644
--- a/src/shm/shm_mm.h
+++ b/src/shm/shm_mm.h
@@ -8,6 +8,8 @@
 #define SHM_QUEUE_ST_CLOSED 1
 #define SHM_QUEUE_ST_RECYCLED 2
 
+#define SHM_QUEUE_ST_SET    50
+
 struct shm_queue_status_t {
 
   int status;
@@ -49,4 +51,6 @@
 
 int shm_mm_alloc_key();
 
+typedef std::map<SHMString, int, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, int> > > ProcDataZone;
+
 #endif 
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 7a45696..1646da5 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -1,6 +1,7 @@
 
 #include "bus_server_socket.h"
 #include "shm_mod_socket.h"
+#include "shm_socket.h"
 #include "bus_error.h"
 
 static Logger *logger = LoggerFactory::getLogger();
@@ -12,10 +13,10 @@
 	SHMTopicSubMap::iterator map_iter;
 
 	if(topic_sub_map != NULL) {
-		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 			subscripter_set = map_iter->second;
 			if(subscripter_set != NULL) {
-				for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+				for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
 					cb(subscripter_set, *set_iter);
 				}
 			}
@@ -35,7 +36,7 @@
 		SHMTopicSubMap::iterator map_iter;
 
 		if(topic_sub_map != NULL) {
-			for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+			for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 				subscripter_set = map_iter->second;
 				if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
 					subscripter_set->erase(set_iter);
@@ -50,7 +51,6 @@
 
 
 BusServerSocket::BusServerSocket() {
-	logger->debug("BusServerSocket Init");
 	shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
 	topic_sub_map = NULL;
 
@@ -80,10 +80,13 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
 int  BusServerSocket::start(){
+  int rv;
+
 	topic_sub_map =	shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
  
-	_run_proxy_();
-	return 0;
+	rv = _run_proxy_();
+
+	return rv;
 }
 
 
@@ -114,7 +117,7 @@
 	SHMKeySet *subscripter_set;
 	SHMTopicSubMap::iterator map_iter;
 	if(topic_sub_map != NULL) {
-		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+		for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 			subscripter_set = map_iter->second;
 			if(subscripter_set != NULL) {
 				subscripter_set->clear();
@@ -126,8 +129,8 @@
 		shm_mm_free_by_key(SHM_BUS_MAP_KEY);
 	}
 	shm_socket_close(shm_socket);
-	logger->debug("BusServerSocket destory 3");
-	return 0;
+	
+  return 0;
 }
 
 /*
@@ -135,10 +138,10 @@
 */
 void BusServerSocket::_proxy_sub( char *topic, int key) {
 	SHMKeySet *subscripter_set;
-
+ 
+  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
 	SHMTopicSubMap::iterator map_iter;
 	SHMKeySet::iterator set_iter;
-//printf("_proxy_sub topic = %s\n", topic);
 	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 		subscripter_set = map_iter->second;
 	} else {
@@ -147,6 +150,7 @@
 		topic_sub_map->insert({topic, subscripter_set});
 	}
 	subscripter_set->insert(key);
+
 }
 
 /*
@@ -173,7 +177,7 @@
 
 	SHMTopicSubMap::iterator map_iter;
 	// SHMKeySet::iterator set_iter;
-	for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+	for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
 			subscripter_set = map_iter->second;
 			subscripter_set->erase(key);
 	}
@@ -182,7 +186,7 @@
 /*
  * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
 */
-void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
+void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
 	SHMKeySet *subscripter_set;
 
 	SHMTopicSubMap::iterator map_iter;
@@ -198,7 +202,7 @@
 	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
 
 		subscripter_set = map_iter->second;
-		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
 			send_key = *set_iter;
 			rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
 			if(rv == 0) {
@@ -209,7 +213,7 @@
 		}
 
 		// 鍒犻櫎宸插叧闂殑绔�
-		for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
+		for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
 			if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
 				subscripter_set->erase(set_iter);
 				logger->debug("remove closed subscripter %d \n", send_key);
@@ -218,13 +222,458 @@
 		subscripter_to_del.clear();
 
 	}
+
 }
 
+ProcInfo_query *Qurey_object(const char *object, int *length) {
+  int flag = 0;
+  int val;
+  int len;
+  int total = 0;
+  ProcInfo *Proc_ptr = NULL;
+  ProcInfo Data_stru;
+  ProcInfo_query *dataBuf = NULL;
+  SvrProc *SvrSub_ele;
+  SvrTcs::iterator svr_tcs_iter;
+  SvrProc::iterator svr_proc_iter;
+  ProcZone::iterator proc_iter;
+  SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+    
+  if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
+    SvrSub_ele = svr_tcs_iter->second;
+    for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
+      val = *svr_proc_iter; 
+    
+      if ((proc_iter = proc->find(val)) != proc->end()) {
+
+        if (dataBuf == NULL) {
+          dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
+          if (dataBuf == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          total = sizeof(ProcInfo_query);
+        }
+
+        if (flag == 0) {
+          memset(dataBuf, 0x00, sizeof(ProcInfo_query));
+
+          dataBuf->num = 1;
+          strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
+          
+          flag = 1;
+
+        } else {
+          dataBuf->num++;
+          len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
+          dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
+          if (dataBuf == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+          
+          total += sizeof(ProcInfo);
+          memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
+        }
+
+        memset(&Data_stru, 0x00, sizeof(ProcInfo));
+        Data_stru = proc_iter->second;  
+
+        Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
+        strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
+        strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
+        strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
+        strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
+
+        if (length != NULL)
+          *length = total;
+      }
+    }
+  }
+
+  return dataBuf;
+}
+
+void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
+{
+  char buf_temp[MAX_STR_LEN] = { 0x00 };
+  int count = 0;
+  int i = 0;
+  int len = 0;
+  char *data_ptr;
+  ProcInfo Data_stru;
+  ProcZone::iterator proc_iter;
+  TcsZone *TcsSub_ele; 
+  ProcDataZone::iterator proc_que_iter;
+  ProcTcsMap::iterator proc_tcs_iter;
+  SvrProc *SvrSub_ele;
+  SvrProc::iterator svr_proc_iter;
+  SvrTcs::iterator svr_tcs_iter;
+  TcsZone::iterator tcssub_iter;
+  ProcPartZone::iterator proc_part_iter;
+
+  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
+
+  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+    memset(&Data_stru, 0x00, sizeof(ProcInfo));
+
+    if (buf != NULL) {
+    
+      memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); 
+      count = strlen(buf) + 1;
+      
+      memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
+      count += strlen(buf + count) + 1;
+      
+      memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
+      count += strlen(buf + count) + 1;
+      
+      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
+      count += strlen(buf + count) + 1;
+    }
+
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
+    ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
+    if (flag == PROC_REG) {
+      if ((proc_iter = proc->find(key)) == proc->end()) {
+        proc->insert({key, Data_stru});
+      }
+
+      if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
+        procPart->insert({key, Data_stru.proc_id});
+      }
+
+      if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
+        procQuePart->insert({Data_stru.proc_id, key});
+      }
+
+    } else {
+      SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); 
+
+      for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
+        SvrSub_ele = svr_tcs_iter->second;
+
+        SvrSub_ele->erase(key);
+      }
+
+      if ((proc_iter = proc->find(key)) != proc->end()) {
+
+        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
+        strncpy(buf_temp, (proc_iter->second).proc_id, len);
+        proc->erase(proc_iter);
+
+      }
+
+      if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
+
+        procPart->erase(key);
+      }
+
+      if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+        procQuePart->erase(buf_temp);
+      }
+
+    }
+  } else if (flag == PROC_REG_TCS) {
+    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
+    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+    if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
+      TcsSub_ele = proc_tcs_iter->second;
+    } else {
+
+      void *ptr_set = mm_malloc(sizeof(TcsZone));
+      TcsSub_ele = new(ptr_set) TcsZone;
+      proc->insert({key, TcsSub_ele});
+    }
+
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); 
+    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+    while(data_ptr) {
+      TcsSub_ele->insert(data_ptr);
+      if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
+        SvrSub_ele = svr_tcs_iter->second;
+      } else {
+
+        void *ptr_set = mm_malloc(sizeof(SvrProc));
+        SvrSub_ele = new(ptr_set) SvrProc;
+        SvrData->insert({data_ptr, SvrSub_ele});
+      }
+      SvrSub_ele->insert(key);
+      data_ptr = strtok(NULL, STR_MAGIC);
+    }
+
+  } else if (flag == PROC_QUE_TCS) {
+
+    struct _temp_store {
+      void *ptr;
+      int total;
+    } *temp_store = NULL;
+    
+    int num = 0;
+    int sum = 0;
+
+    ProcInfo_query *ret = NULL;
+    ProcInfo_query *ret_store = NULL;
+  
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+    while(data_ptr) {
+      ret = Qurey_object(data_ptr, &len);
+      if (ret != NULL) {
+    
+        if (temp_store == NULL) {
+          temp_store = (_temp_store *)malloc(sizeof(_temp_store));
+          if (temp_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          temp_store->ptr = ret;
+          temp_store->total = len;
+          num = 1;
+
+        } else {
+          num++;
+          temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
+          if (temp_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          (temp_store + num - 1)->ptr = ret;
+          (temp_store + num - 1)->total = len;
+        }
+
+      }
+      data_ptr = strtok(NULL, STR_MAGIC);
+    }
+
+    if (num > 0) {
+      for (count = 0; count < num; count++) {
+
+        if (ret_store == NULL) {
+          ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
+          if (ret_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          sum = (temp_store + count)->total;
+          memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
+
+        } else {
+        
+          ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
+          if (ret_store == NULL) {
+            logger->error("in proxy_reg: Out of memory!\n");
+            exit(1);
+          }
+
+          memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
+
+          sum += (temp_store + count)->total;
+
+        }
+
+        free((temp_store + count)->ptr);
+
+      }
+
+      free(temp_store);
+    }
+
+    void *last_buf = malloc(sum + sizeof(int));
+    if (last_buf == NULL) {
+      logger->error("in proxy_reg: Out of memory!\n");
+      exit(1);
+    }   
+
+    *(int *)last_buf = num;
+    if (num > 0) {
+      memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
+      free(ret_store);
+    }
+
+    shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+    free(last_buf);
+  } else if (flag == PROC_QUE_STCS) {
+    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
+      SvrSub_ele = svr_tcs_iter->second;
+    
+      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
+        count = *svr_proc_iter;
+
+        break;
+      }
+    } else {
+      count = 0;
+    }
+
+    memset(buf_temp, 0x00, sizeof(buf_temp));
+    sprintf(buf_temp, "%d", count);
+    shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
+
+  } else {
+
+    int val;
+    int temp = 0;
+    int pos = 0;
+    int size = 0;
+    ProcInfo_sum *Data_sum = NULL;
+    SHMKeySet *subs_proc;
+    SHMKeySet::iterator subs_proc_iter;
+    SHMTopicSubMap::iterator subs_iter;
+
+    ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); 
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+    for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
+
+      memset(&Data_stru, 0x00, sizeof(Data_stru));
+
+      if (count == 0) {
+        Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
+        if (Data_sum == NULL) {
+
+          logger->error("in proxy_reg: Out of memory!\n");
+            
+          exit(1);
+        }
+
+        count++;
+
+        memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
+
+      } else {
+
+        count++;
+        len = sizeof(ProcInfo_sum) * count;
+        Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
+        if (Data_sum == NULL) {
+          logger->error("in proxy_reg: Out of memory!\n");
+            
+          exit(1);
+        }
+
+        memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
+      }
+
+      Data_stru = proc_iter->second;
+
+      memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
+      memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
+      memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
+      memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
+
+      (Data_sum + count - 1)->stat = 1;
+      (Data_sum + count - 1)->list_num = 3;
+
+      val = proc_iter->first;
+      if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
+        TcsSub_ele = proc_tcs_iter->second;
+
+        temp = 0;
+        pos = 0;
+        len = sizeof(Data_sum->reg_info) - 1;
+        for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
+
+          if (temp == 0) {
+            strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
+            pos += strlen((Data_sum + count - 1)->reg_info);
+            len -= strlen((Data_sum + count - 1)->reg_info);
+
+            temp++;
+          } else {
+
+            if (len > 0) {
+              strcat((Data_sum + count - 1)->reg_info, ",");
+
+              pos += 1;
+              len -= 1;
+            }
+
+            if (len > 0) {
+              size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
+              strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
+
+              pos += size;
+              len -= size;
+            }
+          }
+        }
+
+        pos = 0;
+        temp = 0;
+        len = sizeof(Data_sum->local_info) - 1;
+        for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
+          subs_proc = subs_iter->second;
+          
+          if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
+
+            if ((temp == 0)) {
+
+              strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
+              pos += strlen((Data_sum + count - 1)->local_info);
+              len -= strlen((Data_sum + count - 1)->local_info);
+
+              temp++;
+            } else {
+
+              if (len > 0) {
+                strcat((Data_sum + count - 1)->local_info, ",");
+
+                pos += 1;
+                len -= 1;
+              }
+
+              if (len > 0) {
+                size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
+                strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
+
+                pos += size;
+                len -= size;
+              }
+            }
+
+          }
+        }
+
+      }
+    }
+
+    temp = count * sizeof(ProcInfo_sum);
+    void *last_buf = malloc(temp + sizeof(int));
+    if (last_buf == NULL) {
+      logger->error("in proxy_reg: Out of memory!\n");
+      exit(1);
+    }
+
+    *(int *)last_buf = count;
+    if (count > 0) {
+      memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
+      free(Data_sum);
+    }
+
+    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+  }
+}
 
 // 杩愯浠g悊
-void * BusServerSocket::_run_proxy_() {
+int BusServerSocket::_run_proxy_() {
 	int size;
 	int key;
+  int flag;
 	char * action, *topic, *topics, *buf, *content;
 	size_t head_len;
 	char resp_buf[128];
@@ -233,25 +682,21 @@
   int rv;
   char send_buf[512] = { 0x00 };
 
-	const char *topic_delim = ",";
-
-	while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
+  const char *topic_delim = ",";
+	while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
 		head = ShmModSocket::decode_bus_head(buf);
 		topics = buf + BUS_HEAD_SIZE;
 		action = head.action;
-
 		if(strcmp(action, "sub") == 0) {
 			// 璁㈤槄鏀寔澶氫富棰樿闃�
 			topic = strtok(topics, topic_delim);
 		  while(topic) {
-
        _proxy_sub(trim(topic, 0), key);
         topic =  strtok(NULL, topic_delim);
 		  }
 
 		} 
 		else if(strcmp(action, "desub") == 0) {
-
 			if(strcmp(trim(topics, 0), "") == 0) {
 				// 鍙栨秷鎵�鏈夎闃�
 				_proxy_desub_all(key);
@@ -259,7 +704,6 @@
 			 
 				topic = strtok(topics, topic_delim);
 			  while(topic) {
-
 	       _proxy_desub(trim(topic, 0), key);
 	        topic =  strtok(NULL, topic_delim);
 			  }
@@ -267,10 +711,45 @@
 			
 		} 
 		else if(strcmp(action, "pub") == 0) {
-			 content = topics + head.topic_size;
-			_proxy_pub(topics, content, head.content_size, key);
+      topics[head.topic_size - 1] = '\0';
+		  content = topics + head.topic_size;
 
-		}  
+			_proxy_pub(topics, topics, head.topic_size + head.content_size, key);
+
+		} 
+    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
+            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
+            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+      content = topics + head.topic_size;
+      if (strcmp(action, "reg") == 0) {
+        
+        flag = PROC_REG;
+
+      } else if (strcmp(action, "unreg") == 0) {
+        
+        flag = PROC_UNREG;
+
+      } else if (strcmp(action, "tcsreg") == 0) {
+        
+        flag = PROC_REG_TCS;
+
+      } else if (strcmp(action, "tcsque") == 0) {
+        
+        flag = PROC_QUE_TCS;
+
+      } else if (strcmp(action, "stcsque") == 0) {
+        
+        flag = PROC_QUE_STCS; 
+
+      } else {
+        
+        flag = PROC_QUE_ATCS;
+
+      }
+        
+      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
+
+    }
 		else if (strncmp(buf, "request", strlen("request")) == 0) {
       sprintf(send_buf, "%4d", key);
       strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
@@ -281,7 +760,6 @@
       }
     }
     else if(strcmp(action, "stop") == 0) {
-			logger->info( "Stopping Bus...");			
 			free(buf);
 			break;
 		} else {
@@ -291,5 +769,5 @@
 	}
 
 
-	return NULL;
+	return rv;
 }
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 956271b..3052c8b 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -18,7 +18,6 @@
 typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
 typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
 
-
 class BusServerSocket {
 private:
 	shm_socket_t *shm_socket;
@@ -29,14 +28,16 @@
 private:
 	int  destroy();
 	void _proxy_sub( char *topic, int key);
-	void _proxy_pub( char *topic, void *buf, size_t size, int key);
-	void *_run_proxy_();
+	void _proxy_pub( char *topic, char *buf, size_t size, int key);
+	int _run_proxy_();
 	// int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
   	
 	void _proxy_desub( char *topic, int key);
 	void _proxy_desub_all(int key);
 
-	static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
+  void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag);
+
+  static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
 	// static bool include_in_keys(int key, int keys[], size_t length);
 
 public:
diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp
index d2f5a8e..6b730a9 100644
--- a/src/socket/bus_server_socket_wrapper.cpp
+++ b/src/socket/bus_server_socket_wrapper.cpp
@@ -7,7 +7,6 @@
  * 鍒涘缓
  */
 void * bus_server_socket_wrapper_open() {
-	logger->debug("===bus_server_socket_wrapper_open\n");
 	BusServerSocket *sockt = new BusServerSocket;
 	return (void *)sockt;
 }
@@ -19,7 +18,6 @@
 
 	BusServerSocket *sockt = (BusServerSocket *)_socket;
 	delete sockt;
-	logger->debug("===bus_server_socket_wrapper_close\n");
 }
 
 int bus_server_socket_wrapper_stop(void *_socket) {
@@ -35,7 +33,7 @@
 	int ret;
 	BusServerSocket *sockt = (BusServerSocket *)_socket;
 
-	if( (ret = sockt->force_bind(SHM_BUS_KEY)) == 0) {
+	if( (ret = sockt->bind(SHM_BUS_KEY)) == 0) {
 		return sockt->start();
 	} else {
 		logger->error("start bus failed");
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index abd9477..a94b9c3 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -38,6 +38,129 @@
 	return shm_socket_force_bind(shm_socket, key);
 }
 
+int ShmModSocket::bind_proc_id(char *buf, int len) {
+  return shm_socket_bind_proc_id(shm_socket, buf, len);
+}
+
+int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
+{
+  int ret;
+  struct timespec ts;
+
+  bus_head_t head = {}; 
+  
+  if (flag == PROC_REG) {
+  
+    memcpy(head.action, "reg", sizeof(head.action));
+  
+  } else if (flag == PROC_UNREG) {
+
+    memcpy(head.action, "unreg", sizeof(head.action));
+  
+  } else if (flag == PROC_REG_TCS) { 
+  
+    memcpy(head.action, "tcsreg", sizeof(head.action));
+  
+  } else if (flag == PROC_QUE_TCS) { 
+  
+    memcpy(head.action, "tcsque", sizeof(head.action)); 
+  
+  } else if (flag == PROC_QUE_STCS) {
+  
+    memcpy(head.action, "stcsque", sizeof(head.action));
+  
+  } else  if (flag == PROC_QUE_ATCS) {
+
+    memcpy(head.action, "atcsque", sizeof(head.action));
+
+  } else {
+
+    return -1;
+
+  }
+
+  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+    
+    head.topic_size = 0;
+
+    if (pData != NULL) {
+    
+      head.content_size = sizeof(ProcInfo);
+    
+    } else {
+
+      head.content_size = 0;
+
+    }
+  } else {
+
+    head.topic_size = len;
+
+    head.content_size = 0;
+
+  }
+  
+  void *buf_temp;
+  int buf_size;
+
+  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+  
+    buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
+  
+  } else {
+  
+    buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
+  
+  }
+
+  if (timeout_ms > 0) {
+
+    ts.tv_sec = timeout_ms /1000;
+    
+    ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
+  
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
+
+    } else {
+
+      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
+
+    }
+  
+  } else if (timeout_ms == 0) {
+  
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
+
+    } else {
+
+      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
+
+    }
+ 
+  } else {
+
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+    
+      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
+
+    } else {
+
+      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
+
+    }
+  
+  }
+
+  free(buf_temp);
+
+  return ret;
+
+}
+
 /**
  * 鍙戦�佷俊鎭�
  * @key 鍙戦�佺粰璋�
@@ -60,7 +183,8 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
 
 	if(rv == 0) {
     logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
@@ -77,7 +201,7 @@
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, 
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
 	void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
 	int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 
@@ -183,8 +307,8 @@
 	memcpy(head.action, "pub", sizeof(head.action));
 	head.topic_size = topic_size = strlen(topic) + 1;
 	head.content_size = content_size;
-
-	void *buf;
+	
+  void *buf;
 	int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
 	if(size > 0) {
 		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
@@ -216,6 +340,7 @@
   char *buf;
   int  max_buf_size;
   void *buf_ptr;
+  int count = 0;
   if((buf = (char *) malloc(MAXBUF)) == NULL) {
     LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
     exit(1);
@@ -223,7 +348,7 @@
     max_buf_size = MAXBUF;
   }
 
-  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
+  buf_size = BUS_HEAD_SIZE + content_size + topic_size;
   if(max_buf_size < buf_size) {
     
     if((buf = (char *) realloc(buf, buf_size)) == NULL) {
@@ -238,8 +363,19 @@
   memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
   if(topic_size != 0 ) 
     memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
-  if(content_size != 0)
- 	 memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+  if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
+                              (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
+ 	  memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+  } else {
+    if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
+                                strlen("unreg")) == 0)) && (content_buf != NULL)) {
+      proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
+
+      request_head.content_size = count;
+      buf_size -= (content_size - count);
+
+    }
+  }
  
   *retbuf = buf;
   free(buf_ptr);
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 9890aef..da02fab 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -5,6 +5,7 @@
 #include "shm_allocator.h"
 #include "shm_mm.h"
 #include "hashtable.h"
+#include "proc_def.h"
 #include "sem_util.h"
 #include "logger_factory.h"
 #include "key_def.h"
@@ -60,6 +61,9 @@
 	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 	*/
 	int force_bind(int key);
+
+  int bind_proc_id(char *buf, int len);
+  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
 	/**
 	 * 鍙戦�佷俊鎭�
 	 * @key 鍙戦�佺粰璋�
@@ -75,9 +79,7 @@
 	 * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
 	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 	*/
- 
 	int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
-
 	/**
 	 * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
 	 * @key 鍙戦�佺粰璋�
@@ -128,7 +130,14 @@
 	 */
 	int get_key() ;
 
+  int get_procid(char *buf, int len);
+
 
 };
 
+
+typedef std::map<int, ProcInfo, std::less<int>, SHM_STL_Allocator<std::pair<int, ProcInfo> > > ProcZone;
+typedef std::set<SHMString,  std::less<SHMString>, SHM_STL_Allocator<SHMString> > TcsZone;
+typedef std::map<int, TcsZone *, std::less<int>, SHM_STL_Allocator<std::pair<const int, TcsZone *> > > ProcTcsMap;
+
 #endif
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 918aef6..84bf77e 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -1,4 +1,5 @@
 #include "shm_socket.h"
+#include "socket_def.h"
 #include "hashtable.h"
 #include "logger_factory.h"
 #include <map>
@@ -108,8 +109,6 @@
   
   int rv, i;
   hashtable_t *hashtable = mm_get_hashtable();
-  logger->debug("shm_socket_close\n");
- 
 
   // if(sockt->key != 0) {
   //   auto it =  shmQueueStMap->find(sockt->key);
@@ -118,8 +117,6 @@
   //     it->second.closeTime = time(NULL);
   //   }
   // }
-
-
 
   if(sockt->queue != NULL) {
     sockt->queue->close();
@@ -169,8 +166,20 @@
   return 0;
 }
 
+int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
+  strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
+
+  return 0;
+}
+
 int shm_socket_get_key(shm_socket_t *sockt){
   return sockt->key;
+}
+
+int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
+  strncpy(buf, sockt->proc_id, len);
+
+  return 0;
 }
 
 // 鐭繛鎺ユ柟寮忓彂閫�
@@ -297,8 +306,6 @@
 {
   int rv;
 
-  logger->debug("%lu destroy threadlocal socket\n", pthread_self()); 
-
   if(tmp_socket == NULL)
     return;
   
@@ -411,8 +418,6 @@
   
  
   int rv = 0, tryn = 16;
-  static int Counter_suc = 0;
-  static int Counter_fail = 0;
   shm_packet_t sendpak;
   shm_packet_t recvpak;
   std::map<int, shm_packet_t>::iterator recvbufIter;
@@ -430,12 +435,13 @@
   {
     /* If first call from this thread, allocate buffer for thread, and save its location */
     tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
-
-    rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
-    if ( rv != 0) {
-      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
-      exit(1);
-    }
+    
+  }
+    
+  rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
+  if ( rv != 0) {
+    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+    exit(1);
   }
  
   sendpak.key = tmp_socket->key;
@@ -473,6 +479,7 @@
     } else {
       // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
       tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
+      exit(0);
       continue;
     }
   }
@@ -481,7 +488,6 @@
   return EBUS_RECVFROM_WRONG_END;
  
 LABLE_SUC:
-  sockt->key = tmp_socket->key;
   if(recv_buf != NULL) {
     void *_buf = malloc(recvpak.size);
     memcpy(_buf, recvpak.buf, recvpak.size);
@@ -520,8 +526,8 @@
      
       // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
       if(send_key != recv_key) {
-        // logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
-        // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+
+        logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
         
         continue;
       }
@@ -640,7 +646,6 @@
     if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
       err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
  
-   
     if (sockt->key == 0) {
       sockt->key = hashtable_alloc_key(hashtable);
     }  
@@ -664,7 +669,6 @@
   
 LABEL_POP:
 
- 
   rv = sockt->queue->pop(recvpak, timeout, flag);
   if(rv != 0) {
     if(rv == ETIMEDOUT) {
@@ -680,4 +684,23 @@
   *_recvpak = recvpak;
   return 0;
 }
- 
+
+void proc_copy(char *dst, void *src, int *counter) {
+  int count = 0;
+  ProcInfo *ptr = static_cast<ProcInfo *>(src);
+
+  memcpy(dst, ptr->proc_id, strlen(ptr->proc_id) + 1);
+  count = strlen(ptr->proc_id) + 1;
+  memcpy(dst + count, ptr->name, strlen(ptr->name) + 1);
+  count += strlen(ptr->name) + 1;
+  memcpy(dst + count, ptr->public_info, strlen(ptr->public_info) + 1);
+  count += strlen(ptr->public_info) + 1;
+  memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1);
+  count += strlen(ptr->private_info) + 1;
+
+  *counter = count;
+}
+
+
+
+
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 97d9f2c..8e874d1 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -4,6 +4,7 @@
 #include "usg_common.h"
 #include "usg_typedef.h"
 #include "shm_queue.h"
+#include "proc_def.h"
 #include "lock_free_queue.h"
 #include <functional>
 
@@ -18,7 +19,8 @@
 #define BUS_ACTION_STOP 1 
 
 typedef struct shm_packet_t {
-	int key;
+  int key;
+
 	size_t size;
 	void * buf;
 	char uuid[64];
@@ -31,8 +33,8 @@
 
 typedef struct shm_socket_t {
 	shm_socket_type_t socket_type;
-	// 鏈湴key
 	int key;
+  char proc_id[MAX_STR_LEN];
 	bool force_bind;
 	pthread_mutex_t mutex;
 
@@ -59,7 +61,8 @@
 int shm_socket_bind(shm_socket_t * socket, int key) ;
 
 int shm_socket_force_bind(shm_socket_t * socket, int key) ;
- 
+
+int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len);
 /**
  * @flags : BUS_NOWAIT_FLAG
  */
@@ -70,6 +73,9 @@
 int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,  
 	const struct timespec * timeout = NULL,  int flags = 0);
 
+typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SvrProc;
+typedef std::map<SHMString, SvrProc *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SvrProc *> > > SvrTcs;
+typedef std::map<int, SHMString, std::less<int>, SHM_STL_Allocator<std::pair<int, const SHMString> > > ProcPartZone;
 /**
  * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
  *                  @recvbuf 鏀跺埌鐨勬暟鎹�
@@ -83,7 +89,6 @@
                     const struct timespec *timeout = NULL, int flag = 0,  void * user_data = NULL);
 
 
-
-
+void proc_copy(char *dst, void *src, int *count);
 
 #endif
\ No newline at end of file
diff --git a/src/svsem.cpp b/src/svsem.cpp
index 26bde2c..00e6dbc 100644
--- a/src/svsem.cpp
+++ b/src/svsem.cpp
@@ -11,7 +11,6 @@
     union semun arg;
     struct sembuf sop;
 
-
     arg.val = 0; /* So initialize it to 0 */
     if (semctl(semid, 0, SETVAL, arg) == -1)
       err_exit(errno, "semctl 1");
diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt
index 18bcf4c..a096136 100644
--- a/test_socket/CMakeLists.txt
+++ b/test_socket/CMakeLists.txt
@@ -6,7 +6,19 @@
                              ${EXTRA_INCLUDES}
                             )
 
- 
+add_executable(bus_test_inter bus_test_inter.cpp)
+target_link_libraries(bus_test_inter PRIVATE shm_queue  ${EXTRA_LIBS} )
+target_include_directories(bus_test_inter PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
+
+add_executable(bus_test_server_mode bus_test_server_mode.cpp)
+target_link_libraries(bus_test_server_mode PRIVATE shm_queue  ${EXTRA_LIBS} )
+target_include_directories(bus_test_server_mode PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
 
 add_custom_command(
   OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
diff --git a/test_socket/bus_test_inter.cpp b/test_socket/bus_test_inter.cpp
new file mode 100644
index 0000000..8abcb46
--- /dev/null
+++ b/test_socket/bus_test_inter.cpp
@@ -0,0 +1,502 @@
+#include "bus_server_socket.h"
+#include "shm_mod_socket.h"
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include "mm.h"
+#include "logger_factory.h"
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#include "bus_error.h"
+
+#include "bh_api.h"
+#include "proc_def.h"
+
+#define TOTAL_REG_UNREG         2
+
+#define MAGIC_STR       "INTER_"
+#define STR_LEN         30
+
+pthread_t tids;
+void *res;
+
+static ProcInfo proc_desc;
+
+char *genStr(int length, char *buf)
+{
+  int flag, i;
+  char *str;
+
+  if (length < (strlen(buf) + 10)) {
+    length = strlen(buf) + 10;
+  }
+
+  srand((unsigned) time(NULL));
+  if ((str = (char *)malloc(length + 1)) == NULL) {
+    printf("out of memory!\n");
+
+    exit(0);
+  }
+
+  memset(str, 0x00, length + 1);
+  memcpy(str, MAGIC_STR, strlen(MAGIC_STR));
+  strcat(str, buf);
+  for (i = strlen(str); i < length; i++) {
+    flag = rand() % 3;
+    switch (flag) {
+      case 0:
+        str[i] = 'A' + rand() % 26;
+        break;
+
+      case 1:
+        str[i] = 'a' + rand() % 26;
+        break;
+
+      default:
+        str[i] = '0' + rand() % 10;
+        break;
+    }
+  }
+
+  str[length] = '\0';
+
+  return str;
+
+}
+
+void *client_recv(void *skptr) {
+  
+  pthread_detach(pthread_self());
+  
+  void *recvbuf = NULL;
+  int recv_len;
+  void *proc_id = NULL;
+  int id_len;
+  int rv;
+  void *errBuf = NULL;
+  int len;
+  char proc_data[200] = { 0x00 };
+  char topics[200] = { 0x00 };
+  
+  struct timespec timeout = {2, 0};
+
+  while (true) {
+
+    rv = BHReadSub(&proc_id, &id_len, &recvbuf, &recv_len, -1);
+    if(rv == true) {
+
+      memset(topics, 0x00, sizeof(topics));
+      memset(proc_data, 0x00, sizeof(proc_data)); 
+
+      memcpy(proc_data, proc_id, (sizeof(proc_data) - 1) > id_len ? id_len : (sizeof(proc_data) - 1));
+
+      memcpy(topics, recvbuf, (sizeof(topics) - 1) > recv_len ? recv_len : (sizeof(topics) - 1));
+
+      printf("Get the sub topics data(%s) from proc id(%s)\n", (char *)topics, (char *)proc_id);
+
+      BHFree(recvbuf, len);
+      BHFree(proc_id, id_len);
+
+    } else {
+      BHGetLastError(&errBuf, &len);
+      printf("the thread recv fail with error: %s\n", (char *)errBuf);
+
+      BHFree(errBuf, len);
+    }
+
+  }
+
+}
+
+void parseQueryTopicsBuf(void *buf, int len)
+{
+  if (buf == NULL)
+    return;
+
+  int total_topics = *(int *)buf;
+  int i, j;
+  int buf_pos = 0;
+  void *ptr_temp = NULL;
+  ProcInfo_query *ptr = NULL;
+  ProcInfo *Proc_ptr = NULL;
+
+  buf_pos = sizeof(ProcInfo_query);
+  ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
+  ptr_temp = (void *)ptr;
+  for (i = 0; i < total_topics; i++) {
+    printf("topic %s:\n", ptr->name);
+    for (j = 0; j < ptr->num; j++) {
+      printf("the %dst process info:\n", j + 1);
+      Proc_ptr = &(ptr->procData) +  j;
+      printf("proc_id: %s\n", Proc_ptr->proc_id);
+      printf("name: %s\n", Proc_ptr->name);
+      printf("public_info: %s\n", Proc_ptr->public_info);
+      printf("private_info: %s\n", Proc_ptr->private_info);
+
+    }
+
+    if (ptr->num > 1) {
+      buf_pos += sizeof(ProcInfo) * (ptr->num - 1);
+    }
+    
+    ptr = (ProcInfo_query *)((char *)ptr_temp + buf_pos);
+  }
+
+}
+
+void parseQueryProcBuf(void *buf, int len)
+{
+  if (buf == NULL)
+    return;
+
+  int total = *(int *)buf;
+  int i;
+  ProcInfo_sum *Proc_ptr = NULL;
+
+  Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
+  for (i = 0; i < total; i++) {
+    printf("proc_id: %s\n", (Proc_ptr + i)->procData.proc_id);
+    printf("name: %s\n", (Proc_ptr + i)->procData.name);
+    printf("public_info: %s\n", (Proc_ptr + i)->procData.public_info);
+    printf("private_info: %s\n", (Proc_ptr + i)->procData.private_info);
+
+    printf("service: %s\n", (Proc_ptr + i)->reg_info);
+    printf("sub: %s\n", (Proc_ptr + i)->local_info);
+  }
+}
+
+int main(int argc, char *argv[]) {
+  int ret;
+  char *ptr = NULL;
+  char buf[] = "Process";
+  void *buf_temp = NULL;
+  void *errBuf = NULL;
+  void *proc_id = NULL;
+  int size;
+  int id_len;
+  int i;
+  char data_buf[200] = { 0x00 };
+  const int timeout_ms = 3000;
+
+  memset(&proc_desc, 0x00, sizeof(ProcInfo));
+  ptr = genStr(STR_LEN, buf);
+  strncpy(proc_desc.proc_id, ptr, strlen(ptr) + 1); 
+  //strncpy(proc_desc.proc_id, "Hello", strlen("Hello") + 1);
+  free(ptr);
+
+  sleep(2); //make rand change
+  ptr = genStr(STR_LEN, buf);
+  strncpy(proc_desc.name, ptr, strlen(ptr) + 1);
+  //strncpy(proc_desc.name, "World", strlen("World") + 1);
+  free(ptr);
+
+  sleep(2);
+  ptr = genStr(STR_LEN, buf);
+  //strncpy(proc_desc.public_info, ptr, strlen(ptr) + 1);
+  strncpy(proc_desc.public_info, "Good", strlen("Good") + 1);
+  free(ptr);
+
+  sleep(2);
+  ptr = genStr(STR_LEN, buf);
+  //strncpy(proc_desc.private_info, ptr, strlen(ptr) + 1);
+  //strncpy(proc_desc.private_info, "Bye", strlen("Bye") + 1);
+  free(ptr);
+
+  printf("before the registered, process info:\n");
+  printf("proc_id: %s\n", proc_desc.proc_id);
+  printf("name: %s\n", proc_desc.name);
+  printf("public_info: %s\n", proc_desc.public_info);
+  printf("private_info: %s\n", proc_desc.private_info);
+
+  for (i = 0; i < TOTAL_REG_UNREG; i++) {
+    ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
+    if (ret == true) {
+      printf("the process registered OKay\n");
+
+    } else {
+      BHGetLastError(&errBuf, &size);
+      printf("the process registered fail with error: %s\n", (char *)errBuf);
+      BHFree(errBuf, size);
+      
+      printf("the second way to get the error log: %s\n", buf_temp);
+      
+    };
+    BHFree(buf_temp, size);
+
+    ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms);
+    if (ret == true) {
+      printf("the process unregistered OKay\n");
+
+    } else {
+      BHGetLastError(&errBuf, &size);
+      printf("the process unregistered fail with error: %s\n", (char *)errBuf);
+      BHFree(errBuf, size);
+      
+      printf("the second way to get the error log: %s\n", buf_temp);
+      
+    };  
+    BHFree(buf_temp, size);
+  }
+
+  //const char *topics_reg_buf1[] = {"topics demo1"};
+  const char *topics_reg_buf1 = "topics demo1";
+  const char *topics_query_buf1 = "topics demo1";
+  const char *topics_query_buf2 = "Hello World,So,Great,Good"; //No space between each other 
+  //const char *topics_reg_buf2[] = {"Hello World", "So", "Great", "Good"};
+  const char *topics_reg_buf2 = "Hello World,So,Great,Good";
+
+  //const char *topics_sub_buf1[] = {"news"};
+  //const char *topics_sub_buf2[] = {"sports", "balls", "topics demo1"};
+  const char *topics_sub_buf1 = "news";
+  const char *topics_sub_buf2 = "sports,balls,topics demo1";
+
+  const char *topics_pub_topic1 = "news";
+  const char *topics_pub_topic1_data = "boob news";
+
+  const char *topics_pub_topic2 = "balls";
+  const char *topics_pub_topic2_data = "Great volleyballs and basketballs";
+
+  ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+  ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered topics OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+  ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered topics OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered2 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+  
+  ret = BHQueryTopicAddress(NULL, 0, topics_query_buf1, strlen(topics_query_buf1), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process query topics OKay\n");
+
+    parseQueryTopicsBuf(buf_temp, size);
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process query3 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+  ret = BHQueryTopicAddress(NULL, 0, topics_query_buf2, strlen(topics_query_buf2), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process query topics OKay\n");
+
+    parseQueryTopicsBuf(buf_temp, size);
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process query4 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+ 
+  pthread_create(&tids, NULL, client_recv, NULL);
+
+  ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered topics OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+  ret = BHSubscribeTopics(topics_sub_buf1, strlen(topics_sub_buf1), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process subscribe topics OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process sub1 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+    BHFree(errBuf, size);
+  };  
+  BHFree(buf_temp, size);
+
+  ret = BHSubscribeTopics(topics_sub_buf2, strlen(topics_sub_buf2), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("tthe process subscribe topics OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process sub2 fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+  ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered topics OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered10 fail with errorL %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+  
+  const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1";
+  const char *topics_server_specific_reg_buf2 = "Server Specific Hello World";
+  void *msgID = NULL;
+  int msg_id_len;
+  
+  ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &msgID, &msg_id_len);
+  if (ret == true) {
+    printf("the process BHAsyncRequest topics OKay\n");
+   
+    BHFree(msgID, msg_id_len);
+
+  } else {
+    
+    BHGetLastError(&errBuf, &size);
+    printf("the process BHAsyncRequest1 topics fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  
+  ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), NULL, 0);
+  if (ret == true) {
+    printf("the process BHAsyncRequest topics OKay\n");
+
+  } else {
+    
+    BHGetLastError(&errBuf, &size);
+    printf("the process BHAsyncRequest2 topics fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+
+  ret = BHRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &proc_id, &id_len,
+              &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    
+    memset(data_buf, 0x00, sizeof(data_buf));
+    strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1));
+    printf("the process BHRequest topics OKay\n");
+    
+    printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id);
+
+
+  } else {
+    
+    BHGetLastError(&errBuf, &size);
+    printf("the process BHRequest topics fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+  
+  ret = BHRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &proc_id, &id_len,
+              &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+
+    memset(data_buf, 0x00, sizeof(data_buf));
+    strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1));
+    printf("the process BHRequest topics OKay\n");
+    
+    printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id);
+
+  } else {
+    
+    BHGetLastError(&errBuf, &size);
+    printf("the process BHRequest2 topics fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+#if !defined(PRO_DE_SERIALIZE)
+  ret = BHPublish(topics_pub_topic1, topics_pub_topic1_data, timeout_ms);
+  if (ret == true) {
+    printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic1, topics_pub_topic1_data);
+
+  } else {
+    printf("the process published1 fail\n");
+  };
+
+  ret = BHPublish(topics_pub_topic2, topics_pub_topic2_data, timeout_ms);
+  if (ret == true) {
+    printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic2, topics_pub_topic2_data);
+
+  } else {
+    printf("the process published2 fail\n");
+  };
+#endif 
+ 
+  memset(data_buf, 0x00, sizeof(data_buf));
+  strcpy(data_buf, "query the process");
+  ret = BHQueryProcs(NULL, 0, data_buf, strlen(data_buf), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process query all the process data OKay\n");
+
+    parseQueryProcBuf(buf_temp, size);
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process query proc fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+
+#if 1
+  while(1) {
+    sleep(1);
+  }
+#else 
+  /*if the process will exit finally, we must call BHUnregister to release the resources*/
+  ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process unregistered OKay\n");
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process unregistered fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  BHFree(buf_temp, size);
+  
+#endif 
+
+  return 0;
+
+}
+
+
diff --git a/test_socket/bus_test_server_mode.cpp b/test_socket/bus_test_server_mode.cpp
new file mode 100644
index 0000000..71f13c9
--- /dev/null
+++ b/test_socket/bus_test_server_mode.cpp
@@ -0,0 +1,122 @@
+#include "bus_server_socket.h"
+#include "shm_mod_socket.h"
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include "mm.h"
+#include "logger_factory.h"
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#include "bus_error.h"
+
+#include "bh_api.h"
+#include "proc_def.h"
+
+static ProcInfo proc_desc;
+
+int main(int argc, char *argv[]) {
+  int ret;
+  void *buf_temp = NULL;
+  void *errBuf = NULL;
+  void *proc_id = NULL;
+  void *src = NULL;
+  void *data_buf = NULL;
+  char topics_buf[200] = { 0x00 };
+  int size;
+  int id_len;
+  int count = 0;
+  const int timeout_ms = 3000;
+
+  memset(&proc_desc, 0x00, sizeof(ProcInfo));
+  strncpy(proc_desc.proc_id, "Hello", strlen("Hello"));
+  strncpy(proc_desc.name, "World", strlen("World"));
+  strncpy(proc_desc.public_info, "Good", strlen("Good") + 1);
+
+  const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1";
+  const char *topics_server_specific_reg_buf2 = "Server Specific Hello World";
+
+  ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered OKay\n");
+
+    BHFree(buf_temp, size);
+
+  } else {
+	  BHGetLastError(&errBuf, &size);
+
+    printf("the process registered fail with error: %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  
+  ret = BHRegisterTopics(topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered topics OKay\n");
+
+    BHFree(buf_temp, size);
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered1 fail with errorL %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  
+  ret = BHRegisterTopics(topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &buf_temp, &size, timeout_ms);
+  if (ret == true) {
+    printf("the process registered topics OKay\n");
+
+    BHFree(buf_temp, size);
+
+  } else {
+    BHGetLastError(&errBuf, &size);
+    printf("the process registered1 fail with errorL %s\n", (char *)errBuf);
+
+    BHFree(errBuf, size);
+  };
+  
+  while(true) {
+    ret = BHReadRequest(&proc_id, &id_len, &buf_temp, &size, &src, -1);
+    if (ret == true) {
+
+      strncpy(topics_buf, (char *)buf_temp, (sizeof(topics_buf) - 1) > size ? size : (sizeof(topics_buf) - 1));
+      printf("Get data(%s)", topics_buf);
+      
+      memset(topics_buf, 0x00, sizeof(topics_buf));
+      strncpy(topics_buf, (char *)proc_id, (sizeof(topics_buf) - 1) > id_len ? id_len : (sizeof(topics_buf) - 1));
+      printf("proc id(%s)", topics_buf);
+      
+      BHFree(buf_temp, size);
+      BHFree(proc_id, id_len);
+      
+      memset(topics_buf, 0x00, sizeof(topics_buf));
+      sprintf(topics_buf, "Get data count: %d", ++count);
+      ret = BHSendReply(src, topics_buf, strlen(topics_buf));
+      if (ret == true) {
+        
+        printf("the process send reply OKay\n");
+        
+      } else {
+        
+        BHGetLastError(&errBuf, &size);
+        printf("the process send reply fail with errorL %s\n", (char *)errBuf);
+
+        BHFree(errBuf, size);
+      };
+
+      BHFree(src, size);
+
+    } else {
+      BHGetLastError(&errBuf, &size);
+      printf("BHReadRequest fail with error(%s)\n", (char *)errBuf);
+
+      BHFree(errBuf, size);
+    };
+  }
+  
+  return 0;
+
+}
+
+

--
Gitblit v1.8.0