From f2a03c696bcf76bbaba349325abeef7be3979205 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 30 一月 2021 19:05:00 +0800
Subject: [PATCH] update
---
src/net/net_mod_socket_wrapper.cpp | 3
src/bus_error.h | 1
src/shm/hashtable.h | 2
src/bus_error.cpp | 3
src/shm/hashtable.cpp | 85 +++++++++++-----
src/socket/shm_socket.cpp | 192 ++++++++++++++++++++-----------------
6 files changed, 168 insertions(+), 118 deletions(-)
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index 179211f..fe20cea 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -16,7 +16,8 @@
"Timeout",
"The other end is not inline",
"Key already in use",
- "Network fault"
+ "Network fault",
+ "Send to self error"
};
diff --git a/src/bus_error.h b/src/bus_error.h
index 5f6128b..9770ccd 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -9,6 +9,7 @@
#define EBUS_CLOSED 502
#define EBUS_KEY_INUSED 503
#define EBUS_NET 504
+#define EBUS_SENDTO_SELF 505
extern int bus_errno;
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index e7bf302..1dd2465 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -35,7 +35,8 @@
*/
int net_mod_socket_force_bind(void * _socket, int key) {
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->force_bind(key);
+ // return sockt->force_bind(key);
+ return sockt->bind(key);
}
/**
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index 2248ad9..cd2bd5a 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -152,20 +152,61 @@
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
-
- int rv;
- if(( rv = svsem_wait(hashtable->mutex)) != 0) {
- LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
- }
-
- _hashtable_put(hashtable, key, value);
-
- if(( rv = svsem_post(hashtable->mutex)) != 0) {
- LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
- }
+ _hashtable_put(hashtable, key, value);
}
+// bool hashtable_put(hashtable_t *hashtable, int key, void *value, bool overwrite) {
+// int rv;
+// if(( rv = svsem_wait(hashtable->mutex)) != 0) {
+// LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
+// }
+// if(overwrite) {
+// _hashtable_put(hashtable, key, value);
+// goto suc;
+// }
+// void * val = _hashtable_get(hashtable, key);
+// // val = 1鏄痑llockey鐨勬儏鍐�
+// if(val != NULL && val != (void *)1)
+// goto fail;
+
+// _hashtable_put(hashtable, key, value);
+
+// suc:
+// if(( rv = svsem_post(hashtable->mutex)) != 0) {
+// LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
+// }
+// return true;
+
+// fail:
+// if(( rv = svsem_post(hashtable->mutex)) != 0) {
+// LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
+// }
+// return false;
+// }
+
+
+
+int hashtable_alloc_key(hashtable_t *hashtable) {
+ int rv;
+ int key = START_KEY;
+ rv = svsem_wait(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
+ }
+
+ while(_hashtable_get(hashtable, key) != NULL) {
+ key++;
+ }
+ // 鍗犵敤key
+ _hashtable_put(hashtable, key, (void *)1);
+
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
+ }
+ return key;
+}
static inline void _hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) {
tailq_entry_t *item;
@@ -207,25 +248,13 @@
}
-int hashtable_alloc_key(hashtable_t *hashtable) {
- int rv;
- int key = START_KEY;
- rv = svsem_wait(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
- }
- while(_hashtable_get(hashtable, key) != NULL) {
- key++;
- }
- // 鍗犵敤key
- _hashtable_put(hashtable, key, (void *)1);
+int hashtable_lock(hashtable_t *hashtable) {
+ return svsem_wait(hashtable->mutex);
+}
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
- }
- return key;
+int hashtable_unlock(hashtable_t *hashtable) {
+ return svsem_post(hashtable->mutex);
}
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 35b5892..e43029c 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -25,6 +25,8 @@
void *hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_removeall(hashtable_t *hashtable);
+int hashtable_lock(hashtable_t *hashtable);
+int hashtable_unlock(hashtable_t *hashtable);
/**
* 閬嶅巻hash_table
* @demo
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index daa7f1a..791311b 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -28,14 +28,28 @@
static void _create_tmp_recv_socket_key(void);
// 妫�鏌ey鏄惁宸茬粡琚娇鐢紝鏄繑鍥�0, 鍚﹁繑鍥�1
-static inline int _shm_socket_check_key(shm_socket_t *socket) {
- void *tmp_ptr = mm_get_by_key(socket->key);
- if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
- bus_errno = EBUS_KEY_INUSED;
- logger->error("%s. key = %d ", bus_strerror(EBUS_KEY_INUSED), socket->key);
- return 0;
- }
- return 1;
+// static int _shm_socket_check_key(shm_socket_t *socket) {
+// void *tmp_ptr = mm_get_by_key(socket->key);
+// if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
+// bus_errno = EBUS_KEY_INUSED;
+// logger->error("%s. key = %d ", bus_strerror(EBUS_KEY_INUSED), socket->key);
+// return 0;
+// }
+// return 1;
+// }
+
+// 妫�鏌ey鏄惁宸茬粡琚娇鐢紝 鏈浣跨敤鍒欑粦瀹歬ey
+static int check_and_bind_queue(shm_socket_t * sockt) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ hashtable_lock(hashtable);
+ void *tmp_ptr = mm_get_by_key(sockt->key);
+ if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !sockt->force_bind ) {
+ hashtable_unlock(hashtable);
+ return EBUS_KEY_INUSED;
+ }
+ sockt->queue = new SHMQueue<shm_msg_t>(sockt->key, 16);
+ hashtable_unlock(hashtable);
+ return 0;
}
SHMQueue<shm_msg_t> *_attach_remote_queue(int key);
@@ -118,9 +132,13 @@
return 0;
}
-int shm_listen(shm_socket_t *socket) {
+int shm_socket_get_key(shm_socket_t *sk){
+ return sk->key;
+}
- if (socket->socket_type != SHM_SOCKET_STREAM) {
+int shm_listen(shm_socket_t *sockt) {
+ int rv;
+ if (sockt->socket_type != SHM_SOCKET_STREAM) {
logger->error("can not invoke shm_listen method with a socket which is not a "
"SHM_SOCKET_STREAM socket");
exit(1);
@@ -128,23 +146,23 @@
int key;
hashtable_t *hashtable = mm_get_hashtable();
- if (socket->key == 0) {
+ if (sockt->key == 0) {
key = hashtable_alloc_key(hashtable);
- socket->key = key;
+ sockt->key = key;
} else {
- if(!_shm_socket_check_key(socket)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
- }
+ rv = check_and_bind_queue(sockt);
+ if(rv !=0 ) {
+ return rv;
+ }
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
- socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- socket->clientSocketMap = new std::map<int, shm_socket_t *>;
- socket->status = SHM_CONN_LISTEN;
- pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev,
- (void *)socket);
+ sockt->queue = new SHMQueue<shm_msg_t>(sockt->key, 16);
+ sockt->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ sockt->clientSocketMap = new std::map<int, shm_socket_t *>;
+ sockt->status = SHM_CONN_LISTEN;
+ pthread_create(&(sockt->dispatch_thread), NULL, _server_run_msg_rev,
+ (void *)sockt);
return 0;
}
@@ -153,8 +171,8 @@
* 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
*
*/
-shm_socket_t *shm_accept(shm_socket_t *socket) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
+shm_socket_t *shm_accept(shm_socket_t *sockt) {
+ if (sockt->socket_type != SHM_SOCKET_STREAM) {
logger->error("can not invoke shm_accept method with a socket which is not a "
"SHM_SOCKET_STREAM socket");
exit(1);
@@ -164,13 +182,13 @@
shm_socket_t *client_socket;
shm_msg_t src;
- if (socket->acceptQueue->pop(src) == 0) {
+ if (sockt->acceptQueue->pop(src) == 0) {
// print_msg("===accept:", src);
client_key = src.key;
// client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
- client_socket = shm_open_socket(socket->socket_type);
- client_socket->key = socket->key;
+ client_socket = shm_open_socket(sockt->socket_type);
+ client_socket->key = sockt->key;
// client_socket->queue= socket->queue;
//鍒濆鍖栨秷鎭痲ueue
client_socket->messageQueue =
@@ -178,7 +196,7 @@
//杩炴帴鍒板鏂筿ueue
client_socket->remoteQueue = _attach_remote_queue(client_key);
- socket->clientSocketMap->insert({client_key, client_socket});
+ sockt->clientSocketMap->insert({client_key, client_socket});
/*
* shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉�
@@ -187,7 +205,7 @@
//鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
struct timespec timeout = {1, 0};
shm_msg_t msg;
- msg.key = socket->key;
+ msg.key = sockt->key;
msg.size = 0;
msg.type = SHM_SOCKET_OPEN_REPLY;
@@ -209,8 +227,9 @@
/**
* @return 0鎴愬姛. 鍏朵粬鍊煎け璐�
*/
-int shm_connect(shm_socket_t *socket, int key) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
+int shm_connect(shm_socket_t *sockt, int key) {
+ int rv;
+ if (sockt->socket_type != SHM_SOCKET_STREAM) {
logger->error( "can not invoke shm_connect method with a socket which is not "
"a SHM_SOCKET_STREAM socket");
exit(1);
@@ -221,38 +240,37 @@
return -1;
}
- if (socket->key == 0) {
- socket->key = hashtable_alloc_key(hashtable);
+ if (sockt->key == 0) {
+ sockt->key = hashtable_alloc_key(hashtable);
+ sockt->queue = new SHMQueue<shm_msg_t>(sockt->key, 16);
} else {
- if(!_shm_socket_check_key(socket)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
+ rv = check_and_bind_queue(sockt);
+ if(rv != 0 ) {
+ return rv;
}
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
-
- if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) {
+ if ((sockt->remoteQueue = _attach_remote_queue(key)) == NULL) {
logger->error("connect to %d failted", key);
return -1;
}
- socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ sockt->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
//鍙戦�乷pen璇锋眰
struct timespec timeout = {1, 0};
shm_msg_t msg;
- msg.key = socket->key;
+ msg.key = sockt->key;
msg.size = 0;
msg.type = SHM_SOCKET_OPEN;
- socket->remoteQueue->push(msg, &timeout, BUS_TIMEOUT_FLAG);
+ sockt->remoteQueue->push(msg, &timeout, BUS_TIMEOUT_FLAG);
//鎺ュ彈open reply
- if (socket->queue->pop(msg) == 0) {
+ if (sockt->queue->pop(msg) == 0) {
// 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
if (msg.type == SHM_SOCKET_OPEN_REPLY) {
- socket->status = SHM_CONN_ESTABLISHED;
- pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev,
- (void *)socket);
+ sockt->status = SHM_CONN_ESTABLISHED;
+ pthread_create(&(sockt->dispatch_thread), NULL, _client_run_msg_rev,
+ (void *)sockt);
} else {
logger->error( "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
exit(1);
@@ -266,25 +284,25 @@
return 0;
}
-int shm_send(shm_socket_t *socket, const void *buf, const int size) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
+int shm_send(shm_socket_t *sockt, const void *buf, const int size) {
+ if (sockt->socket_type != SHM_SOCKET_STREAM) {
logger->error("shm_socket.shm_send: can not invoke shm_send method with a socket which is not a "
"SHM_SOCKET_STREAM socket");
exit(1);
}
hashtable_t *hashtable = mm_get_hashtable();
- if(socket->remoteQueue == NULL) {
+ if(sockt->remoteQueue == NULL) {
err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
return -1;
}
shm_msg_t dest;
dest.type = SHM_COMMON_MSG;
- dest.key = socket->key;
+ dest.key = sockt->key;
dest.size = size;
dest.buf = mm_malloc(size);
memcpy(dest.buf, buf, size);
- if (socket->remoteQueue->push(dest) == 0) {
+ if (sockt->remoteQueue->push(dest) == 0) {
return 0;
} else {
logger->error(errno, "connection has been closed!");
@@ -292,16 +310,16 @@
}
}
-int shm_recv(shm_socket_t *socket, void **buf, int *size) {
- if (socket->socket_type != SHM_SOCKET_STREAM) {
+int shm_recv(shm_socket_t *sockt, void **buf, int *size) {
+ if (sockt->socket_type != SHM_SOCKET_STREAM) {
logger->error( "shm_socket.shm_recv: can not invoke shm_recv method in a %d type socket which is "
"not a SHM_SOCKET_STREAM socket ",
- socket->socket_type);
+ sockt->socket_type);
exit(1);
}
shm_msg_t src;
- if (socket->messageQueue->pop(src) == 0) {
+ if (sockt->messageQueue->pop(src) == 0) {
void *_buf = malloc(src.size);
memcpy(_buf, src.buf, src.size);
*buf = _buf;
@@ -315,49 +333,48 @@
// 鐭繛鎺ユ柟寮忓彂閫�
-int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
+int shm_sendto(shm_socket_t *sockt, const void *buf, const int size,
const int key, const struct timespec *timeout, const int flag) {
int s;
int rv;
- if (socket->socket_type != SHM_SOCKET_DGRAM) {
+ if (sockt->socket_type != SHM_SOCKET_DGRAM) {
logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is "
"not a SHM_SOCKET_DGRAM socket ",
- socket->socket_type);
+ sockt->socket_type);
exit(0);
}
hashtable_t *hashtable = mm_get_hashtable();
- if ((s = pthread_mutex_lock(&(socket->mutex))) != 0)
+ if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0)
err_exit(s, "shm_sendto : pthread_mutex_lock");
- if (socket->queue == NULL) {
- if (socket->key == 0) {
- socket->key = hashtable_alloc_key(hashtable);
+ if (sockt->queue == NULL) {
+ if (sockt->key == 0) {
+ sockt->key = hashtable_alloc_key(hashtable);
+ sockt->queue = new SHMQueue<shm_msg_t>(sockt->key, 16);
} else {
-
- if(!_shm_socket_check_key(socket)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
- }
-
+ rv = check_and_bind_queue(sockt);
+ if(rv !=0 ) {
+ return rv;
+ }
}
- socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
+
}
- if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
+ if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0)
err_exit(s, "shm_sendto : pthread_mutex_unlock");
- // There is some case where a socket need to send to himeself, for example when bus server need to stop, he need to send himself
+ // There is some case where a sockt need to send to himeself, for example when bus server need to stop, he need to send himself
// a top message.
- // if (key == socket->key) {
- // logger->error( "can not send to your self!");
- // return -1;
- // }
+ if (key == sockt->key) {
+ logger->error( "can not send to your self!");
+ return EBUS_SENDTO_SELF;
+ }
SHMQueue<shm_msg_t> *remoteQueue;
if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
@@ -368,7 +385,7 @@
shm_msg_t dest;
dest.type = SHM_COMMON_MSG;
- dest.key = socket->key;
+ dest.key = sockt->key;
dest.size = size;
dest.buf = mm_malloc(size);
memcpy(dest.buf, buf, size);
@@ -376,11 +393,11 @@
rv = remoteQueue->push(dest, timeout, flag);
if (rv == 0) {
- // printf("shm_sendto push after\n");
+ printf("%d sendto %d suc.\n", shm_socket_get_key(sockt), key);
return 0;
} else {
mm_free(dest.buf);
- logger->debug("sendto key %d failed %s", key, bus_strerror(rv));
+ logger->debug("%d sendto key %d failed %s", shm_socket_get_key(sockt), key, bus_strerror(rv));
return rv;
}
}
@@ -405,15 +422,13 @@
if (sokt->queue == NULL) {
if (sokt->key == 0) {
sokt->key = hashtable_alloc_key(hashtable);
+ sokt->queue = new SHMQueue<shm_msg_t>(sokt->key, 16);
} else {
-
- if(!_shm_socket_check_key(sokt)) {
- bus_errno = EBUS_KEY_INUSED;
- return EBUS_KEY_INUSED;
+ rv = check_and_bind_queue(sokt);
+ if(rv != 0 ) {
+ return rv;
}
}
-
- sokt->queue = new SHMQueue<shm_msg_t>(sokt->key, 16);
}
if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0)
@@ -535,7 +550,7 @@
}
}
-int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
+int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int send_key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
int recv_key;
@@ -544,10 +559,10 @@
// 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket;
- if (socket->socket_type != SHM_SOCKET_DGRAM) {
+ if (sockt->socket_type != SHM_SOCKET_DGRAM) {
logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket "
"which is not a SHM_SOCKET_DGRAM socket ",
- socket->socket_type);
+ sockt->socket_type);
exit(1);
}
@@ -557,7 +572,7 @@
if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
- printf("======send key =%d , recv key=%d\n", send_key, recv_key);
+ printf("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
if(rv != 0) {
printf("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv));
@@ -579,6 +594,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
const int send_size, const int send_key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
+
return _shm_sendandrecv_alloc_new(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags);
}
--
Gitblit v1.8.0