From 72b7aebb0022f8e391c999348763acd5f7a16133 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 26 十一月 2020 18:56:33 +0800
Subject: [PATCH] update
---
src/socket/net_mod_socket.c | 4
src/socket/net_mod_socket_wrapper.h | 4
src/socket/shm_socket.c | 120 +++++-----
test_net_socket/test_net_mod_socket.c | 227 +++++++++++++++++++-
src/socket/net_mod_socket.h | 6
src/socket/dgram_mod_socket.c | 2
src/socket/shm_socket.h | 20
src/socket/shm_stream_mod_socket.c | 12
src/socket/shm_mod_socket.c | 178 ++++++++--------
src/socket/shm_mod_socket.h | 5
test_socket/dgram_mod_bus.c | 37 +-
test_net_socket/net_mod_socket.sh | 13
src/socket/net_mod_socket_wrapper.c | 4
13 files changed, 413 insertions(+), 219 deletions(-)
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index 3e0fd0c..1667bf6 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -202,7 +202,7 @@
*/
int dgram_mod_get_port(void * _socket) {
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->get_port();
+ return socket->m_socket->get_key();
}
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index f55e439..51a26ab 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -498,8 +498,8 @@
/**
* 鑾峰彇soket绔彛鍙�
*/
-int NetModSocket::get_port() {
- return shmModSocket.get_port();
+int NetModSocket::get_key() {
+ return shmModSocket.get_key();
}
diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h
index 80e98cc..7598f6c 100644
--- a/src/socket/net_mod_socket.h
+++ b/src/socket/net_mod_socket.h
@@ -222,7 +222,7 @@
int pub_nowait( char *topic, int topic_size, void *content, int content_size, int port);
-
+
/**
* 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂甯冩秷鎭�
* @node_arr 缃戠粶鑺傜偣缁�, @node_arr_len璇ユ暟缁勯暱搴�
@@ -233,9 +233,9 @@
int pub(net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size);
/**
- * 鑾峰彇soket绔彛鍙�
+ * 鑾峰彇soket key
*/
- int get_port() ;
+ int get_key() ;
/**
* 閿�姣乻endandrecv鏂规硶杩斿洖鐨勬秷鎭粍
diff --git a/src/socket/net_mod_socket_wrapper.c b/src/socket/net_mod_socket_wrapper.c
index 6776b36..0f8cf42 100644
--- a/src/socket/net_mod_socket_wrapper.c
+++ b/src/socket/net_mod_socket_wrapper.c
@@ -184,9 +184,9 @@
/**
* 鑾峰彇soket绔彛鍙�
*/
-int net_mod_socket_get_port(void * _socket) {
+int net_mod_socket_get_key(void * _socket) {
net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
- return sockt->sockt->get_port();
+ return sockt->sockt->get_key();
}
diff --git a/src/socket/net_mod_socket_wrapper.h b/src/socket/net_mod_socket_wrapper.h
index 1a15003..1b80cd9 100644
--- a/src/socket/net_mod_socket_wrapper.h
+++ b/src/socket/net_mod_socket_wrapper.h
@@ -121,7 +121,7 @@
* @size 涓婚闀垮害
* @port 鎬荤嚎绔彛
*/
-int net_mod_socket_desub(void * _socket, void *topic, int size, int port);
+int net_mod_socket_desub(void * _socket, void *topic, int size, int key);
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec);
int net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int port);
@@ -131,7 +131,7 @@
/**
* 鑾峰彇soket绔彛鍙�
*/
-int net_mod_socket_get_port(void * _socket) ;
+int net_mod_socket_get_key(void * _socket) ;
diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c
index e890721..9781a8a 100644
--- a/src/socket/shm_mod_socket.c
+++ b/src/socket/shm_mod_socket.c
@@ -105,8 +105,8 @@
// printf("ShmModSocket destory 4\n");
}
-int ShmModSocket::bind(int port) {
- return shm_socket_bind(shm_socket, port);
+int ShmModSocket::bind(int key) {
+ return shm_socket_bind(shm_socket, key);
}
@@ -115,82 +115,82 @@
* 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::force_bind(int port) {
- return shm_socket_force_bind(shm_socket, port);
+int ShmModSocket::force_bind(int key) {
+ return shm_socket_force_bind(shm_socket, key);
}
/**
* 鍙戦�佷俊鎭�
- * @port 鍙戦�佺粰璋�
+ * @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendto(const void *buf, const int size, const int port) {
- return shm_sendto(shm_socket, buf, size, port, NULL, 0);
+int ShmModSocket::sendto(const void *buf, const int size, const int key) {
+ return shm_sendto(shm_socket, buf, size, key, NULL, 0);
}
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout) {
- return shm_sendto(shm_socket, buf, size, port, timeout, 0);
+int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
+ return shm_sendto(shm_socket, buf, size, key, timeout, 0);
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int ShmModSocket::sendto_nowait( const void *buf, const int size, const int port){
- return shm_sendto(shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
+ return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
}
-inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags) {
+inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) {
if(mod == BUS) {
err_exit(0, "Can not use method recvfrom in a Bus");
}
// printf("dgram_mod_recvfrom before\n");
- int rv = shm_recvfrom(shm_socket, buf, size, port, timeout, flags);
+ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
// printf("dgram_mod_recvfrom after\n");
return rv;
}
/**
* 鎺ユ敹淇℃伅
- * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
+ * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::recvfrom(void **buf, int *size, int *port) {
+int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
- return _recvfrom_( buf, size, port, NULL, 0);
+ return _recvfrom_( buf, size, key, NULL, 0);
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *port, struct timespec *timeout) {
- return _recvfrom_(buf, size, port, timeout, 0);
+int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
+ return _recvfrom_(buf, size, key, timeout, 0);
}
-int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *port){
- return _recvfrom_(buf, size, port, NULL, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
+ return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
}
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
- * @port 鍙戦�佺粰璋�
+ * @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
+int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
+ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0);
+int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
+ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
-int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
- return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
+ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
-int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
+int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
+ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0);
+int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
+ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
-int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
- return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
+ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
@@ -214,17 +214,17 @@
* 璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int ShmModSocket::sub(char *topic, int size, int port){
- return _sub_( topic, size, port, NULL, 0);
+int ShmModSocket::sub(char *topic, int size, int key){
+ return _sub_( topic, size, key, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){
- return _sub_(topic, size, port, timeout, 0);
+int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){
+ return _sub_(topic, size, key, timeout, 0);
}
-int ShmModSocket::sub_nowait(char *topic, int size, int port) {
- return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::sub_nowait(char *topic, int size, int key) {
+ return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT);
}
@@ -233,17 +233,17 @@
* 鍙栨秷璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int ShmModSocket::desub(char *topic, int size, int port){
- return _desub_( topic, size, port, NULL, 0);
+int ShmModSocket::desub(char *topic, int size, int key){
+ return _desub_( topic, size, key, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){
- return _desub_(topic, size, port, timeout, 0);
+int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){
+ return _desub_(topic, size, key, timeout, 0);
}
-int ShmModSocket::desub_nowait(char *topic, int size, int port) {
- return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::desub_nowait(char *topic, int size, int key) {
+ return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT);
}
@@ -252,76 +252,76 @@
* 鍙戝竷涓婚
* @topic 涓婚
* @content 涓婚鍐呭
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){
- return _pub_(topic, topic_size, content, content_size, port, NULL, 0);
+int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
+ return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){
- return _pub_( topic, topic_size, content, content_size, port, timeout, 0);
+int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){
+ return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
}
-int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){
- return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT);
+int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
+ return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
}
/**
- * 鑾峰彇soket绔彛鍙�
+ * 鑾峰彇soket key
*/
-int ShmModSocket::get_port(){
- return shm_socket->port;
+int ShmModSocket::get_key(){
+ return shm_socket->key;
}
// =============================================================================
/**
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int ShmModSocket::_sub_(char *topic, int size, int port,
+int ShmModSocket::_sub_(char *topic, int size, int key,
struct timespec *timeout, int flags) {
char buf[8192];
int rv;
snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
- rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+ rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
if(rv == 0) {
- bus_set->insert(port);
+ bus_set->insert(key);
}
return rv;
}
/**
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int ShmModSocket::_desub_(char *topic, int size, int port,
+int ShmModSocket::_desub_(char *topic, int size, int key,
struct timespec *timeout, int flags) {
char buf[8192];
if(topic == NULL) {
topic = "";
}
snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
- return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+ return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
}
/**
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port,
+int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,
struct timespec *timeout, int flags) {
int head_len;
char buf[8192+content_size];
snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
head_len = strlen(buf);
memcpy(buf+head_len, content, content_size);
- return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
+ return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
}
/*
* 澶勭悊璁㈤槄
*/
-void ShmModSocket::_proxy_sub( char *topic, int port) {
+void ShmModSocket::_proxy_sub( char *topic, int key) {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
@@ -334,13 +334,13 @@
subscripter_set = new(set_ptr) SHMKeySet;
topic_sub_map->insert({topic, subscripter_set});
}
- subscripter_set->insert(port);
+ subscripter_set->insert(key);
}
/*
* 澶勭悊鍙栨秷璁㈤槄
*/
-void ShmModSocket::_proxy_desub( char *topic, int port) {
+void ShmModSocket::_proxy_desub( char *topic, int key) {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
@@ -349,30 +349,30 @@
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
- subscripter_set->erase(port);
-printf("============ desub %d\n", port);
+ subscripter_set->erase(key);
+printf("============ desub %d\n", key);
}
}
/*
* 澶勭悊鍙栨秷鎵�鏈夎闃�
*/
-void ShmModSocket::_proxy_desub_all(int port) {
+void ShmModSocket::_proxy_desub_all(int key) {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
// SHMKeySet::iterator set_iter;
for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
subscripter_set = map_iter->second;
- subscripter_set->erase(port);
-printf("============ desub %d\n", port);
+ subscripter_set->erase(key);
+printf("============ desub %d\n", key);
}
}
/*
* 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
*/
-void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port) {
+void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
@@ -381,19 +381,19 @@
std::vector<int> subscripter_to_del;
std::vector<int>::iterator vector_iter;
- int send_port;
+ int send_key;
struct timespec timeout = {1,0};
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
- send_port = *set_iter;
- // printf("_proxy_pub send before %d \n", send_port);
- if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_port, &timeout) == SHM_SOCKET_ECONNFAILED ) {
+ send_key = *set_iter;
+ // printf("_proxy_pub send before %d \n", send_key);
+ if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
- subscripter_to_del.push_back(send_port);
+ subscripter_to_del.push_back(send_key);
} else {
-// printf("_proxy_pub send after: %d \n", send_port);
+// printf("_proxy_pub send after: %d \n", send_key);
}
@@ -403,7 +403,7 @@
for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
subscripter_set->erase(set_iter);
- printf("remove closed subscripter %d \n", send_port);
+ printf("remove closed subscripter %d \n", send_key);
}
}
subscripter_to_del.clear();
@@ -414,13 +414,13 @@
void * ShmModSocket::run_pubsub_proxy() {
// pthread_detach(pthread_self());
int size;
- int port;
+ int key;
char * action, *topic, *topics, *buf;
size_t head_len;
const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
- while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) {
+ while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
// printf("run_pubsub_proxy %s %s \n", action, topics);
@@ -429,7 +429,7 @@
topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
while(topic) {
- _proxy_sub(trim(topic, 0), port);
+ _proxy_sub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
}
@@ -438,12 +438,12 @@
if(strcmp(trim(topics, 0), "") == 0) {
// 鍙栨秷鎵�鏈夎闃�
printf("====鍙栨秷鎵�鏈夎闃匼n");
- _proxy_desub_all(port);
+ _proxy_desub_all(key);
} else {
topic = strtok(topics, topic_delim);
while(topic) {
- _proxy_desub(trim(topic, 0), port);
+ _proxy_desub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
}
}
@@ -451,7 +451,7 @@
} else if(strcmp(action, "pub") == 0) {
- _proxy_pub(topics, head_len, buf, size, port);
+ _proxy_pub(topics, head_len, buf, size, key);
}
free(action);
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index b6dadee..34c78a5 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -104,6 +104,7 @@
int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ;
int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ;
@@ -153,9 +154,9 @@
/**
- * 鑾峰彇soket绔彛鍙�
+ * 鑾峰彇soket key
*/
- int get_port() ;
+ int get_key() ;
};
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index f07d8a6..3041568 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -8,7 +8,7 @@
void print_msg(char *head, shm_msg_t &msg) {
- // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
+ // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
}
static void *_server_run_msg_rev(void *_socket);
@@ -20,15 +20,15 @@
static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
static inline int _shm_socket_check_key(shm_socket_t *socket) {
- void *tmp_ptr = mm_get_by_key(socket->port);
+ void *tmp_ptr = mm_get_by_key(socket->key);
if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
- err_exit(0, "key %d has already been in used!", socket->port);
+ err_exit(0, "key %d has already been in used!", socket->key);
return 0;
}
return 1;
}
-SHMQueue<shm_msg_t> *_attach_remote_queue(int port);
+SHMQueue<shm_msg_t> *_attach_remote_queue(int key);
@@ -39,7 +39,7 @@
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
socket->socket_type = socket_type;
- socket->port = -1;
+ socket->key = -1;
socket->force_bind = false;
socket->dispatch_thread = 0;
socket->status = SHM_CONN_CLOSED;
@@ -65,14 +65,14 @@
return ret;
}
-int shm_socket_bind(shm_socket_t *socket, int port) {
- socket->port = port;
+int shm_socket_bind(shm_socket_t *socket, int key) {
+ socket->key = key;
return 0;
}
-int shm_socket_force_bind(shm_socket_t *socket, int port) {
+int shm_socket_force_bind(shm_socket_t *socket, int key) {
socket->force_bind = true;
- socket->port = port;
+ socket->key = key;
return 0;
}
@@ -83,17 +83,17 @@
"SHM_SOCKET_STREAM socket");
}
- int port;
+ int key;
hashtable_t *hashtable = mm_get_hashtable();
- if (socket->port == -1) {
- port = hashtable_alloc_key(hashtable);
- socket->port = port;
+ if (socket->key == -1) {
+ key = hashtable_alloc_key(hashtable);
+ socket->key = key;
} else {
_shm_socket_check_key(socket);
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
socket->clientSocketMap = new std::map<int, shm_socket_t *>;
socket->status = SHM_CONN_LISTEN;
@@ -113,25 +113,25 @@
"SHM_SOCKET_STREAM socket");
}
hashtable_t *hashtable = mm_get_hashtable();
- int client_port;
+ int client_key;
shm_socket_t *client_socket;
shm_msg_t src;
if (socket->acceptQueue->pop(src)) {
// print_msg("===accept:", src);
- client_port = src.port;
+ client_key = src.key;
// client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
client_socket = shm_open_socket(socket->socket_type);
- client_socket->port = socket->port;
+ client_socket->key = socket->key;
// client_socket->queue= socket->queue;
//鍒濆鍖栨秷鎭痲ueue
client_socket->messageQueue =
new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
//杩炴帴鍒板鏂筿ueue
- client_socket->remoteQueue = _attach_remote_queue(client_port);
+ client_socket->remoteQueue = _attach_remote_queue(client_key);
- socket->clientSocketMap->insert({client_port, client_socket});
+ socket->clientSocketMap->insert({client_key, client_socket});
/*
* shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉�
@@ -140,7 +140,7 @@
//鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
struct timespec timeout = {1, 0};
shm_msg_t msg;
- msg.port = socket->port;
+ msg.key = socket->key;
msg.size = 0;
msg.type = SHM_SOCKET_OPEN_REPLY;
@@ -159,33 +159,33 @@
}
-int shm_connect(shm_socket_t *socket, int port) {
+int shm_connect(shm_socket_t *socket, int key) {
if (socket->socket_type != SHM_SOCKET_STREAM) {
err_exit(0, "can not invoke shm_connect method with a socket which is not "
"a SHM_SOCKET_STREAM socket");
}
hashtable_t *hashtable = mm_get_hashtable();
- if (hashtable_get(hashtable, port) == NULL) {
- err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
+ if (hashtable_get(hashtable, key) == NULL) {
+ err_exit(0, "shm_connect锛歝onnect at key %d failed!", key);
}
- if (socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
+ if (socket->key == -1) {
+ socket->key = hashtable_alloc_key(hashtable);
} else {
_shm_socket_check_key(socket);
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
- if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) {
- err_exit(0, "connect to %d failted", port);
+ if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) {
+ err_exit(0, "connect to %d failted", key);
}
socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
//鍙戦�乷pen璇锋眰
struct timespec timeout = {1, 0};
shm_msg_t msg;
- msg.port = socket->port;
+ msg.key = socket->key;
msg.size = 0;
msg.type = SHM_SOCKET_OPEN;
socket->remoteQueue->push_timeout(msg, &timeout);
@@ -220,7 +220,7 @@
// }
shm_msg_t dest;
dest.type = SHM_COMMON_MSG;
- dest.port = socket->port;
+ dest.key = socket->key;
dest.size = size;
dest.buf = mm_malloc(size);
memcpy(dest.buf, buf, size);
@@ -256,7 +256,7 @@
// 鐭繛鎺ユ柟寮忓彂閫�
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
- const int port, const struct timespec *timeout, const int flags) {
+ const int key, const struct timespec *timeout, const int flags) {
if (socket->socket_type != SHM_SOCKET_DGRAM) {
err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is "
"not a SHM_SOCKET_DGRAM socket ",
@@ -266,31 +266,31 @@
SemUtil::dec(socket->mutex);
if (socket->queue == NULL) {
- if (socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
+ if (socket->key == -1) {
+ socket->key = hashtable_alloc_key(hashtable);
} else {
_shm_socket_check_key(socket);
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
}
SemUtil::inc(socket->mutex);
- if (port == socket->port) {
+ if (key == socket->key) {
err_msg(0, "can not send to your self!");
return -1;
}
SHMQueue<shm_msg_t> *remoteQueue;
- if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
+ if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
return SHM_SOCKET_ECONNFAILED;
}
shm_msg_t dest;
dest.type = SHM_COMMON_MSG;
- dest.port = socket->port;
+ dest.key = socket->key;
dest.size = size;
dest.buf = mm_malloc(size);
memcpy(dest.buf, buf, size);
@@ -312,13 +312,13 @@
} else {
delete remoteQueue;
mm_free(dest.buf);
- err_msg(errno, "sendto port %d failed!", port);
+ err_msg(errno, "sendto key %d failed!", key);
return -1;
}
}
// 鐭繛鎺ユ柟寮忔帴鍙�
-int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) {
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec *timeout, int flags) {
if (socket->socket_type != SHM_SOCKET_DGRAM) {
err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which "
"is not a SHM_SOCKET_DGRAM socket ",
@@ -327,14 +327,14 @@
hashtable_t *hashtable = mm_get_hashtable();
SemUtil::dec(socket->mutex);
if (socket->queue == NULL) {
- if (socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
+ if (socket->key == -1) {
+ socket->key = hashtable_alloc_key(hashtable);
} else {
_shm_socket_check_key(socket);
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
}
SemUtil::inc(socket->mutex);
@@ -354,7 +354,7 @@
memcpy(_buf, src.buf, src.size);
*buf = _buf;
*size = src.size;
- *port = src.port;
+ *key = src.key;
mm_free(src.buf);
// printf("shm_recvfrom pop after\n");
return 0;
@@ -364,19 +364,19 @@
}
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
- const int send_size, const int send_port, void **recv_buf,
+ const int send_size, const int send_key, void **recv_buf,
int *recv_size, struct timespec *timeout, int flags) {
if (socket->socket_type != SHM_SOCKET_DGRAM) {
err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket "
"which is not a SHM_SOCKET_DGRAM socket ",
socket->socket_type);
}
- int recv_port;
+ int recv_key;
int rv;
shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
- if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags)) == 0) {
- rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags);
+ if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
+ rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
shm_close_socket(tmp_socket);
return rv;
} else {
@@ -387,19 +387,19 @@
}
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
- const int send_size, const int send_port, void **recv_buf,
+ const int send_size, const int send_key, void **recv_buf,
int *recv_size, struct timespec *timeout, int flags) {
if (socket->socket_type != SHM_SOCKET_DGRAM) {
err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket "
"which is not a SHM_SOCKET_DGRAM socket ",
socket->socket_type);
}
- int recv_port;
+ int recv_key;
int rv;
- if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) {
- rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags);
+ if ((rv = shm_sendto(socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
+ rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_key, timeout, flags);
return rv;
} else {
return rv;
@@ -412,21 +412,21 @@
/**
* 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
*/
-SHMQueue<shm_msg_t> *_attach_remote_queue(int port) {
+SHMQueue<shm_msg_t> *_attach_remote_queue(int key) {
hashtable_t *hashtable = mm_get_hashtable();
- if (hashtable_get(hashtable, port) == NULL) {
- err_msg(0, "_remote_queue_attach锛歝onnet at port %d failed!", port);
+ if (hashtable_get(hashtable, key) == NULL) {
+ err_msg(0, "_remote_queue_attach锛歝onnet at key %d failed!", key);
return NULL;
}
- SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
+ SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(key, 0);
return queue;
}
-void _server_close_conn_to_client(shm_socket_t *socket, int port) {
+void _server_close_conn_to_client(shm_socket_t *socket, int key) {
shm_socket_t *client_socket;
std::map<int, shm_socket_t *>::iterator iter =
- socket->clientSocketMap->find(port);
+ socket->clientSocketMap->find(key);
if (iter != socket->clientSocketMap->end()) {
client_socket = iter->second;
free((void *)client_socket);
@@ -452,11 +452,11 @@
socket->acceptQueue->push_timeout(src, &timeout);
break;
case SHM_SOCKET_CLOSE:
- _server_close_conn_to_client(socket, src.port);
+ _server_close_conn_to_client(socket, src.key);
break;
case SHM_COMMON_MSG:
- iter = socket->clientSocketMap->find(src.port);
+ iter = socket->clientSocketMap->find(src.key);
if (iter != socket->clientSocketMap->end()) {
client_socket = iter->second;
// print_msg("_server_run_msg_rev push before", src);
@@ -511,7 +511,7 @@
struct timespec timeout = {1, 0};
shm_msg_t close_msg;
- close_msg.port = socket->port;
+ close_msg.key = socket->key;
close_msg.size = 0;
close_msg.type = SHM_SOCKET_CLOSE;
if (notifyRemote && socket->remoteQueue != NULL) {
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 4f1efc2..65fb899 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -41,7 +41,7 @@
};
typedef struct shm_msg_t {
- int port;
+ int key;
shm_msg_type_t type;
size_t size;
void * buf;
@@ -53,8 +53,8 @@
typedef struct shm_socket_t {
shm_socket_type_t socket_type;
- // 鏈湴port
- int port;
+ // 鏈湴key
+ int key;
bool force_bind;
int mutex;
shm_connection_status_t status;
@@ -77,33 +77,33 @@
int shm_close_socket(shm_socket_t * socket) ;
-int shm_socket_bind(shm_socket_t * socket, int port) ;
+int shm_socket_bind(shm_socket_t * socket, int key) ;
-int shm_socket_force_bind(shm_socket_t * socket, int port) ;
+int shm_socket_force_bind(shm_socket_t * socket, int key) ;
int shm_listen(shm_socket_t * socket) ;
shm_socket_t* shm_accept(shm_socket_t* socket);
-int shm_connect(shm_socket_t * socket, int port);
+int shm_connect(shm_socket_t * socket, int key);
int shm_send(shm_socket_t * socket, const void *buf, const int size) ;
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
-int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0);
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
-int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec * timeout = NULL, int flags=0);
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec * timeout = NULL, int flags=0);
-int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,
+int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,
struct timespec * timeout = NULL, int flags=0);
/**
* 鍔熻兘鍚宻hm_sendandrecv, 浣嗘槸涓嶆槸绾跨▼瀹夊叏鐨�
*/
-int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,
+int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,
struct timespec * timeout = NULL, int flags=0);
diff --git a/src/socket/shm_stream_mod_socket.c b/src/socket/shm_stream_mod_socket.c
index ca8c2a1..4b948c9 100644
--- a/src/socket/shm_stream_mod_socket.c
+++ b/src/socket/shm_stream_mod_socket.c
@@ -63,9 +63,9 @@
-int shm_stream_mod_socket_bind(void * _socket, int port){
+int shm_stream_mod_socket_bind(void * _socket, int key){
shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket;
- return shm_socket_bind(socket->shm_socket, port);
+ return shm_socket_bind(socket->shm_socket, key);
}
void * run_server_recv_client_msg(void *_socket) {
@@ -116,9 +116,9 @@
}
-int shm_stream_mod_socket_connect(void * _socket, int port) {
+int shm_stream_mod_socket_connect(void * _socket, int key) {
shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket;
- return shm_connect(socket->shm_socket, port);
+ return shm_connect(socket->shm_socket, key);
}
@@ -189,9 +189,9 @@
return -1;
}
-int shm_stream_mod_socket_get_port(void * _socket) {
+int shm_stream_mod_socket_get_key(void * _socket) {
shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket;
- return socket->shm_socket->port;
+ return socket->shm_socket->key;
}
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index fb65fa3..3aadf24 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -1,15 +1,16 @@
function server() {
-# 鎵撳紑璇锋眰搴旂瓟鐨剆erver
- ./dgram_mod_req_rep server 11 & server_pid=$! && echo ${server_pid}
+# 鎵撳紑璇锋眰搴旂瓟鐨勬帴鍙楃
+ ./test_net_mod_socket --fun="start_reply" --key=11 & server_pid=$! && echo "pid: ${server_pid}"
-# 寮�鍚痓us
- ./dgram_mod_bus server 8 & server_pid=$! && echo ${server_pid}
+# 寮�鍚痓us
+ ./test_net_mod_socket --fun="start_bus_server" --key=8 & server_pid=$! && echo "pid: ${server_pid}"
-# 寮�鍚綉缁渟erver
- ./test_net_mod_socket server 5000 & server_pid=$! && echo ${server_pid}
+
+# 寮�鍚綉缁滆浆鍙戜唬鐞�
+ ./test_net_mod_socket --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}"
}
diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c
index f389c59..403530f 100644
--- a/test_net_socket/test_net_mod_socket.c
+++ b/test_net_socket/test_net_mod_socket.c
@@ -1,8 +1,8 @@
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "shm_mm.h"
-#include "dgram_mod_socket.h"
#include "usg_common.h"
+#include <getopt.h>
typedef struct Targ {
int port;
@@ -10,18 +10,20 @@
}Targ;
-void server(int port) {
+void start_net_proxy(int port) {
+ printf("Start net proxy\n");
void *serverSocket = net_mod_server_socket_open(port);
if(net_mod_server_socket_start(serverSocket) != 0) {
err_exit(errno, "net_mod_server_socket_start");
}
}
-void client(int port ){
+void start_net_client(int port ){
void * client = net_mod_socket_open();
char content[MAXLINE];
char action[512];
char topic[512];
+ int buskey;
int recv_arr_size, i, n;
@@ -75,6 +77,29 @@
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
}
}
+ else if(strcmp(action, "desub") == 0) {
+ printf("Please input buskey topic!\n");
+
+ scanf("%d %s", buskey, topic);
+ if (net_mod_socket_desub(client, topic, strlen(topic), buskey) == 0) {
+ printf("%d Desub success!\n", net_mod_socket_get_key(client));
+ } else {
+ printf("Desub failture!\n");
+ exit(0);
+ }
+
+ }
+ else if(strcmp(action, "sub") == 0) {
+ printf("Please input topic!\n");
+ scanf("%s", topic);
+ if (net_mod_socket_sub(client, topic, strlen(topic), buskey) == 0) {
+ printf("%d Sub success!\n", net_mod_socket_get_key(client));
+ } else {
+ printf("Sub failture!\n");
+ exit(0);
+ }
+
+ }
else if(strcmp(action, "quit") == 0) {
break;
} else {
@@ -138,7 +163,7 @@
return (void *)i;
}
-void mclient(int port) {
+void start_net_mclient(int port) {
int status, i = 0, processors = 1;
void *res[processors];
@@ -175,27 +200,195 @@
// fflush(stdout);
}
+void start_bus_server(int key) {
+ printf("Start bus server\n");
+ void * server_socket = net_mod_socket_open();
+
+ net_mod_socket_bind(server_socket, key);
+
+ net_mod_socket_start_bus(server_socket);
+}
+
+
+
+void start_reply(int key) {
+ void *socket = net_mod_socket_open();
+ net_mod_socket_bind(socket, key);
+ int size;
+ void *recvbuf;
+ char sendbuf[512];
+ int rv;
+ int remote_port;
+ while ( (rv = net_mod_socket_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
+ printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
+ sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf);
+ net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port);
+ free(recvbuf);
+ }
+}
+
+
+
+void usage(char *name)
+{
+ fprintf(stderr, "Usage: %s [OPTIONS] [ARG...]\n\n", name);
+ fprintf(stderr, "Test net mod socket\n\n");
+ fprintf(stderr, "Options:\n\n");
+ #define fpe(str) fprintf(stderr, " %s", str);
+ fpe("-f, --funciton Function name\n");
+ fpe("-p, --port TCP/IP Port\n");
+ fpe("-k, --key SHM Key\n");
+ fpe("\n");
+}
+
+struct argument_t {
+ char *fun;
+ int port;
+ int key;
+ char **cmd_arr;
+ int cmd_arr_len;
+};
+
+argument_t parse_args (int argc, char *argv[])
+{
+ int c;
+
+ if(argc < 2) {
+ usage(argv[0]);
+ exit(1);
+ }
+
+ if(argc == 2 && strcmp(argv[1], "--help") == 0) {
+ usage(argv[0]);
+ exit(0);
+ }
+
+
+ argument_t mopt = {};
+
+ // mopt.volume_list_size = 0;
+
+ opterr = 0;
+
+ static struct option long_options[] =
+ {
+ /* These options set a flag. */
+
+ {"fun", required_argument, 0, 'f'},
+ {"key", required_argument, 0, 'k'},
+ {"port", required_argument, 0, 'p'},
+ {0, 0, 0, 0}
+ };
+ /* getopt_long stores the option index here. */
+ int option_index = 0;
+ while (1)
+ {
+
+
+ c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index);
+
+ /* Detect the end of the options. */
+ if (c == -1)
+ break;
+
+ switch (c)
+ {
+ case 0:
+ // printf("ffffffff\n");
+ /* If this option set a flag, do nothing else now. */
+ if (long_options[option_index].flag != 0)
+ break;
+ printf ("option %s", long_options[option_index].name);
+ if (optarg)
+ printf (" with arg %s", optarg);
+ printf ("\n");
+ break;
+
+ case 'f':
+ mopt.fun = optarg;
+ break;
+
+ case 'k':
+ mopt.key = atoi(optarg);
+ break;
+
+ case 'p':
+ // printf ("==name with value `%s'\n", optarg);
+ mopt.port = atoi(optarg);
+ break;
+
+ case '?':
+ // printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind);
+ /* getopt_long already printed an error message. */
+ usage(argv[0]);
+ exit(1);
+ break;
+
+ default:
+ //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]);
+ break;
+ }
+ }
+
+ // printf ("optind = %d, argc=%d \n", optind, argc);
+ /* Print any remaining command line arguments (not options). */
+ if (optind < argc)
+ {
+ mopt.cmd_arr = &argv[optind];
+ mopt.cmd_arr_len = argc - optind;
+ // printf ("non-option ARGV-elements: ");
+ // while (optind < argc)
+ // printf ("%d, %d, %s \n", optind, argc, argv[optind++]);
+ // putchar ('\n');
+ }
+ return mopt;
+
+}
int main(int argc, char *argv[]) {
shm_init(512);
+
+ argument_t opt = parse_args(argc, argv);
- int port;
- if (argc < 3) {
- fprintf(stderr, "Usage: %s %s|%s <PORT> \n", argv[0], "server", "client");
- return 1;
- }
-
- port = atoi(argv[2]);
+ // port = atoi(argv[2]);
- if (strcmp("server", argv[1]) == 0 ) {
- server(port);
+ if (strcmp("start_net_proxy", opt.fun) == 0 ) {
+ if(opt.port == 0) {
+ usage(argv[0]);
+ exit(1);
+ }
+ start_net_proxy(opt.port);
+
+ }
+ else if (strcmp("start_bus_server", opt.fun) == 0) {
+ if(opt.key == 0) {
+ usage(argv[0]);
+ exit(1);
+ }
+ start_bus_server(opt.key);
+ }
+ else if (strcmp("start_reply", opt.fun) == 0) {
+ if(opt.key == 0) {
+ usage(argv[0]);
+ exit(1);
+ }
+ start_reply(opt.key);
}
- if (strcmp("client", argv[1]) == 0)
- client(port);
+ else if (strcmp("start_net_client", opt.fun) == 0) {
+ if(opt.port == 0) {
+ usage(argv[0]);
+ exit(1);
+ }
+ start_net_client(opt.port);
+ }
+ else {
+ usage(argv[0]);
+ exit(1);
- if (strcmp("mclient", argv[1]) == 0)
- mclient(port);
+ }
+
+
}
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index f800f10..9f4795b 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -9,14 +9,14 @@
exit(0);
}
-void server(int port, bool restart) {
+void server(int key, bool restart) {
server_socket = dgram_mod_open_socket();
if(restart) {
- dgram_mod_force_bind(server_socket, port);
+ dgram_mod_force_bind(server_socket, key);
} else {
- dgram_mod_bind(server_socket, port);
+ dgram_mod_bind(server_socket, key);
}
@@ -28,18 +28,17 @@
pthread_detach(pthread_self());
void *recvbuf;
int size;
- int port;
- while (dgram_mod_recvfrom( socket, &recvbuf, &size, &port) == 0) {
+ int key;
+ while (dgram_mod_recvfrom( socket, &recvbuf, &size, &key) == 0) {
printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
free(recvbuf);
}
}
-void client(int port) {
+void client(int key) {
void *socket = dgram_mod_open_socket();
-
pthread_t tid;
pthread_create(&tid, NULL, run_recv, socket);
int size;
@@ -56,7 +55,7 @@
if(strcmp(action, "sub") == 0) {
printf("Please input topic!\n");
scanf("%s", topic);
- if (dgram_mod_sub(socket, topic, strlen(topic), port) == 0) {
+ if (dgram_mod_sub(socket, topic, strlen(topic), key) == 0) {
printf("%d Sub success!\n", dgram_mod_get_port(socket));
} else {
printf("Sub failture!\n");
@@ -66,7 +65,7 @@
} else if(strcmp(action, "desub") == 0) {
printf("Please input topic!\n");
scanf("%s", topic);
- if (dgram_mod_desub(socket, topic, strlen(topic), port) == 0) {
+ if (dgram_mod_desub(socket, topic, strlen(topic), key) == 0) {
printf("%d Desub success!\n", dgram_mod_get_port(socket));
} else {
printf("Desub failture!\n");
@@ -77,7 +76,7 @@
// printf("%s %s %s\n", action, topic, content);
printf("Please input topic and content\n");
scanf("%s %s", topic, content);
- if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port) == 0){
+ if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){
printf("%d Pub success!\n", dgram_mod_get_port(socket));
} else {
printf("Pub failture!\n");
@@ -100,29 +99,29 @@
int main(int argc, char *argv[]) {
shm_init(512);
- int port;
+ int key;
if (argc < 3) {
- fprintf(stderr, "Usage: %s %s|%s|rmkey <PORT> ...\n", argv[0], "server", "client");
+ fprintf(stderr, "Usage: %s %s|%s|rmkey <key> ...\n", argv[0], "server", "client");
return 1;
}
- port = atoi(argv[2]);
+ key = atoi(argv[2]);
if (strcmp("server", argv[1]) == 0) {
if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
- server(port, true);
+ server(key, true);
}
else{
- server(port, false);
+ server(key, false);
}
} else if (strcmp("client", argv[1]) == 0) {
- client(port);
+ client(key);
} else if(strcmp("rmkey", argv[1]) == 0) {
for(int i = 2; i < argc; i++) {
- port = atoi(argv[i]);
- dgram_mod_remove_key(port);
- // printf("%d\n", port);
+ key = atoi(argv[i]);
+ dgram_mod_remove_key(key);
+ // printf("%d\n", key);
}
}
--
Gitblit v1.8.0