From f5f063a8d4fdf1e9e967a2206f7cc8de2d549b66 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 03 十二月 2020 15:05:23 +0800
Subject: [PATCH] update
---
src/socket/net_mod_socket.c | 575 +++++++++++++++++++++++++++++++--------------------------
1 files changed, 313 insertions(+), 262 deletions(-)
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index f55e439..07438fc 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -1,26 +1,27 @@
#include "net_mod_socket.h"
#include "socket_io.h"
#include "net_mod_socket_io.h"
+#include "net_conn_pool.h"
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
+#include <pthread.h>
+static Logger *logger = LoggerFactory::getLogger();
+
+static pthread_once_t once = PTHREAD_ONCE_INIT;
+static pthread_key_t poolKey;
NetModSocket::NetModSocket()
{
- init_req_rep_req_resp_pool();
-
- if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal");
+ if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ logger->error(errno, "NetModSocket::NetModSocket signal");
}
NetModSocket::~NetModSocket() {
- int clientfd;
- for (auto map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); map_iter++) {
- clientfd = map_iter->second;
- Close(clientfd);
- }
+
}
@@ -28,25 +29,25 @@
* 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::bind(int port) {
- return shmModSocket.bind(port);
+int NetModSocket::bind(int key) {
+ return shmModSocket.bind(key);
}
/**
* 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::force_bind( int port) {
- return shmModSocket.force_bind(port);
+int NetModSocket::force_bind( int key) {
+ return shmModSocket.force_bind(key);
}
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
_sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
}
-int NetModSocket::sendandrecv_timout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
- net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout) {
- _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, timeout);
+int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
+ net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) {
+ _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
}
int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
@@ -54,20 +55,76 @@
}
-int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
- net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout = 5 * 1000) {
- int i, n, recv_size, connfd;
+/* Free thread-specific data buffer */
+void NetModSocket::_destroyConnPool_(void *_pool)
+{
+
+ NetConnPool *mpool = (NetConnPool *)_pool;
+ delete mpool;
+ logger->debug("destory connPool");
+}
+
+ /* One-time key creation function */
+void NetModSocket::_createConnPoolKey_(void)
+{
+ int ret;
+
+ /* Allocate a unique thread-specific data key and save the address
+ of the destructor for thread-specific data buffers */
+
+ ret = pthread_key_create(&poolKey, _destroyConnPool_);
+ if (ret != 0) {
+ logger->error(ret, "pthread_key_create");
+ exit(1);
+ }
+}
+
+int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
+ net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) {
+
+ int i, n, recv_size, connfd;
net_node_t *node;
void *recv_buf;
net_mod_request_head_t request_head = {};
- int n_req = 0, n_recv_suc = 0, n_resp;
+ int n_req = 0, n_recv_suc = 0, n_resp =0;
+
net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
+
+ int ret;
+ NetConnPool *mpool;
+
+ /* Make first caller allocate key for thread-specific data */
+ ret = pthread_once(&once, _createConnPoolKey_);
+ if (ret != 0) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_once");
+ exit(1);
+ }
+
+ mpool = (NetConnPool *)pthread_getspecific(poolKey);
+ if (mpool == NULL)
+ {
+ /* If first call from this thread, allocate buffer for thread, and save its location */
+ logger->debug("Create connPool");
+ mpool = new NetConnPool();
+ if (mpool == NULL) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
+ exit(1);
+ }
+
+
+
+ ret = pthread_setspecific(poolKey, mpool);
+ if (ret != 0) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
+ exit(1);
+ }
+ }
- //init_req_rep_req_resp_pool();
+
for (i = 0; i< arrlen; i++) {
@@ -84,7 +141,7 @@
continue;
}
- if( (connfd = connect(node)) < 0 ) {
+ if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) {
continue;
}
@@ -94,81 +151,81 @@
request_head.port = node->port;
request_head.key = node->key;
request_head.content_length = send_size;
-
+ request_head.timeout = msec;
// printf("write_request %s:%d\n", request_head.host, request_head.port);
- if(write_request(connfd, request_head, send_buf, send_size) != 0) {
+ if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) {
LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
- close_connect(connfd);
- // req_resp_pool.conns[i].fd = -1;
+ mpool->closeConn( connfd);
} else {
n_req++;
}
}
-// printf(" req_resp_pool.maxi = %d\n", req_resp_pool.maxi);
+// printf(" mpool->maxi = %d\n", mpool->maxi);
// printf(" n_req = %d\n", n_req);
-// int tmp = 0;
while(n_resp < n_req)
{
-// printf(" while %d\n", tmp++);
/* Wait for listening/connected descriptor(s) to become ready */
- if( (req_resp_pool.nready = poll(req_resp_pool.conns, req_resp_pool.maxi + 1, timeout) ) <= 0) {
+ if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
// wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
break;
}
-// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
- for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
- if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
+// printf("mpool->nready =%d\n", mpool->nready);
+ for (i = 0; (i <= mpool->maxi) && (mpool->nready > 0); i++) {
+ if ( (connfd = mpool->conns[i].fd) > 0 ) {
/* If the descriptor is ready, echo a text line from it */
- if (req_resp_pool.conns[i].revents & POLLIN )
+ if (mpool->conns[i].revents & POLLIN )
{
- req_resp_pool.nready--;
+ mpool->nready--;
// printf("POLLIN %d\n", connfd);
if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
-
- // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
- req_resp_pool.conns[i].fd = -1;
n_recv_suc++;
+ // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
+ mpool->conns[i].fd = -1;
- } else if(n == -1) {
- req_resp_pool.conns[i].fd = -1;
- close_connect(connfd);
- } else {
- req_resp_pool.conns[i].fd = -1;
-
}
+ else if(n == -1) {
+ // 缃戠粶閿欒
+ mpool->closeConn( connfd);
+ // mpool->conns[i].fd = -1;
+ } else {
+ // 瀵规柟key鏄叧闂殑
+ mpool->conns[i].fd = -1;
+ }
+
n_resp++;
// printf("read response %d\n", n);
}
- if (req_resp_pool.conns[i].revents & POLLOUT ) {
+ if (mpool->conns[i].revents & POLLOUT ) {
// printf("poll POLLOUT %d\n", connfd);
}
- if (req_resp_pool.conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
+ if (mpool->conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
{
// printf("poll POLLERR %d\n", connfd);
- req_resp_pool.nready--;
- close_connect(connfd);
- req_resp_pool.conns[i].fd = -1;
+ mpool->nready--;
+ mpool->closeConn( connfd);
+ // mpool->conns[i].fd = -1;
}
}
}
}
- for (i = 0; i <= req_resp_pool.maxi; i++) {
- if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
+ //瓒呮椂鍚庯紝鍏抽棴瓒呮椂杩炴帴
+ for (i = 0; i <= mpool->maxi; i++) {
+ if ( (connfd = mpool->conns[i].fd) > 0 ) {
// 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺�
- close_connect(connfd);
- req_resp_pool.conns[i].fd = -1;
+ mpool->closeConn( connfd);
+ // mpool->conns[i].fd = -1;
}
}
- req_resp_pool.maxi = -1;
+ mpool->maxi = -1;
*recv_arr = ret_arr;
if(recv_arr_size != NULL) {
@@ -176,6 +233,151 @@
}
return n_recv_suc;
+}
+
+int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
+ return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1);
+}
+
+int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
+ return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, 0);
+}
+
+int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int msec ) {
+ return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
+}
+
+
+// int pub(char *topic, int topic_size, void *content, int content_size, int port);
+
+int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
+ int content_size, int timeout) {
+ int i, connfd;
+ net_node_t *node;
+
+ net_mod_request_head_t request_head;
+ net_mod_recv_msg_t recv_msg;
+ char portstr[32];
+ int n_req = 0, n_pub_suc = 0, n_resp = 0;
+
+ int ret;
+ NetConnPool *mpool;
+
+ /* Make first caller allocate key for thread-specific data */
+ ret = pthread_once(&once, _createConnPoolKey_);
+ if (ret != 0) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_once");
+ exit(1);
+ }
+
+ mpool = (NetConnPool *)pthread_getspecific(poolKey);
+ if (mpool == NULL)
+ {
+ /* If first call from this thread, allocte buffer for thread, and save its location */
+ mpool = new NetConnPool();
+ if (mpool == NULL) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
+ exit(1);
+ }
+
+ ret = pthread_setspecific(poolKey, mpool);
+ if (ret != 0) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
+ exit(1);
+ }
+ }
+
+
+ for (i = 0; i < arrlen; i++) {
+
+ node = &node_arr[i];
+ if(node->host == NULL) {
+ // 鏈湴鍙戦��
+ if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
+ n_pub_suc++;
+ }
+
+ } else {
+ sprintf(portstr, "%d", node->port);
+ if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) {
+ continue;
+ }
+ request_head.mod = BUS;
+ memcpy(request_head.host, node->host, sizeof(request_head.host));
+ request_head.key = node->key;
+ request_head.content_length = content_size;
+ request_head.topic_length = strlen(topic) + 1;
+ request_head.timeout = timeout;
+
+ if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
+ LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
+ mpool->closeConn( connfd);
+ } else {
+ n_req++;
+ }
+
+ }
+ }
+
+ while(n_resp < n_req)
+ {
+ /* Wait for listening/connected descriptor(s) to become ready */
+ if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) {
+ // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
+ break;
+ }
+// printf("mpool->nready =%d\n", mpool->nready);
+ for (i = 0; (i <= mpool->maxi) && (mpool->nready > 0); i++) {
+ if ( (connfd = mpool->conns[i].fd) > 0 ) {
+ if (mpool->conns[i].revents & POLLIN )
+ {
+ mpool->nready--;
+// printf("POLLIN %d\n", connfd);
+ if( (ret = read_response(connfd, &recv_msg)) == 0) {
+
+ // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
+ mpool->conns[i].fd = -1;
+ n_pub_suc++;
+ }
+ else if(ret == -1) {
+ // 缃戠粶杩炴帴閿欒
+ mpool->closeConn( connfd);
+ } else {
+ // 瀵规柟鐨刱ey鏄叧闂殑
+ mpool->conns[i].fd = -1;
+ }
+ n_resp++;
+// printf("read response %d\n", n);
+ }
+
+ if (mpool->conns[i].revents & POLLOUT ) {
+ // printf("poll POLLOUT %d\n", connfd);
+ }
+
+ if (mpool->conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
+ {
+// printf("poll POLLERR %d\n", connfd);
+ mpool->nready--;
+
+ mpool->conns[i].fd = -1;
+ mpool->closeConn( connfd);
+ }
+ }
+ }
+ }
+
+ //瓒呮椂鍚庯紝鍏抽棴瓒呮椂杩炴帴
+ for (i = 0; i <= mpool->maxi; i++) {
+ if ( (connfd = mpool->conns[i].fd) > 0 ) {
+ // 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺�
+ mpool->closeConn( connfd);
+ // mpool->conns[i].fd = -1;
+ }
+ }
+
+ mpool->maxi = -1;
+
+ return n_pub_suc;
}
@@ -299,131 +501,61 @@
}
-// int pub(char *topic, int topic_size, void *content, int content_size, int port);
-
-int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
- int i, clientfd;
- net_node_t *node;
-
- char *buf;
- int max_buf_size, buf_size;
-
- net_mod_request_head_t request_head;
- char portstr[32];
- int nsuc = 0;
-
- if((buf = (char *)malloc(MAXBUF)) == NULL) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
- exit(1);
- } else {
- max_buf_size = MAXBUF;
- }
-
- for (i = 0; i< arrlen; i++) {
-
- node = &node_arr[i];
- if(node->host == NULL) {
- // 鏈湴鍙戦��
- if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
- nsuc++;
- }
-
- } else {
- sprintf(portstr, "%d", node->port);
- clientfd = open_clientfd(node->host, portstr);
- if(clientfd < 0) {
- continue;
- }
-
- request_head.mod = BUS;
- request_head.key = node->key;
- request_head.content_length = content_size;
- request_head.topic_length = strlen(topic) + 1;
-
- buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + request_head.topic_length;
-
- if(max_buf_size < buf_size) {
- if( ( buf = (char *)realloc(buf, buf_size)) == NULL) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::pub realloc buf ");
- } else {
- max_buf_size = buf_size;
- }
-
- }
-
- memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
- memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, content, content_size);
- memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH + content_size, topic, request_head.topic_length);
-
- if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
- } else {
- nsuc++;
- }
- close(clientfd);
- }
-
-
- }
-
- free(buf);
- return nsuc;
-}
/**
* 鍙戦�佷俊鎭�
- * @port 鍙戦�佺粰璋�
+ * @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::sendto(const void *buf, const int size, const int port){
- return shmModSocket.sendto(buf, size,port);
+int NetModSocket::sendto(const void *buf, const int size, const int key){
+ return shmModSocket.sendto(buf, size, key);
}
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::sendto_timeout(const void *buf, const int size, const int port, int sec, int nsec){
+int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.sendto_timeout(buf, size, port, &timeout);
+ return shmModSocket.sendto_timeout(buf, size, key, &timeout);
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int NetModSocket::sendto_nowait(const void *buf, const int size, const int port){
- return shmModSocket.sendto_nowait(buf, size,port);
+int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
+ return shmModSocket.sendto_nowait(buf, size, key);
}
/**
* 鎺ユ敹淇℃伅
- * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
+ * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::recvfrom(void **buf, int *size, int *port) {
- return shmModSocket.recvfrom(buf, size, port);
+int NetModSocket::recvfrom(void **buf, int *size, int *key) {
+ return shmModSocket.recvfrom(buf, size, key);
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::recvfrom_timeout(void **buf, int *size, int *port, int sec, int nsec){
+int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.recvfrom_timeout(buf, size, port, &timeout);
+ return shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
}
-int NetModSocket::recvfrom_nowait(void **buf, int *size, int *port){
- return shmModSocket.recvfrom_nowait(buf, size, port);
+int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
+ return shmModSocket.recvfrom_nowait(buf, size, key);
}
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
- * @port 鍙戦�佺粰璋�
+ * @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::sendandrecv( const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size){
- return shmModSocket.sendandrecv(send_buf, send_size, port, recv_buf, recv_size);
+int NetModSocket::sendandrecv( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size){
+ return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec){
+int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.sendandrecv_timeout(send_buf, send_size, port, recv_buf, recv_size, &timeout);
+ return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout);
}
-int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) {
- return shmModSocket.sendandrecv_nowait(send_buf, send_size, port, recv_buf, recv_size);
+int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
+ return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size);
}
@@ -440,18 +572,18 @@
* 璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int NetModSocket::sub( void *topic, int size, int port){
- return shmModSocket.sub((char *)topic, size, port);
+int NetModSocket::sub( void *topic, int size, int key){
+ return shmModSocket.sub((char *)topic, size, key);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::sub_timeout( void *topic, int size, int port, int sec, int nsec){
+int NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.sub_timeout((char *)topic, size, port, &timeout);
+ return shmModSocket.sub_timeout((char *)topic, size, key, &timeout);
}
-int NetModSocket::sub_nowait( void *topic, int size, int port){
- return shmModSocket.sub_nowait((char *)topic, size, port);
+int NetModSocket::sub_nowait( void *topic, int size, int key){
+ return shmModSocket.sub_nowait((char *)topic, size, key);
}
@@ -460,18 +592,18 @@
* 鍙栨秷璁㈤槄鎸囧畾涓婚
* @topic 涓婚
* @size 涓婚闀垮害
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int NetModSocket::desub( void *topic, int size, int port){
- return shmModSocket.desub((char *)topic, size, port);
+int NetModSocket::desub( void *topic, int size, int key){
+ return shmModSocket.desub((char *)topic, size, key);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::desub_timeout( void *topic, int size, int port, int sec, int nsec){
+int NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.desub_timeout((char *)topic, size, port, &timeout);
+ return shmModSocket.desub_timeout((char *)topic, size, key, &timeout);
}
-int NetModSocket::desub_nowait( void *topic, int size, int port){
- return shmModSocket.desub_nowait((char *)topic, size, port);
+int NetModSocket::desub_nowait( void *topic, int size, int key){
+ return shmModSocket.desub_nowait((char *)topic, size, key);
}
@@ -480,26 +612,26 @@
* 鍙戝竷涓婚
* @topic 涓婚
* @content 涓婚鍐呭
- * @port 鎬荤嚎绔彛
+ * @key 鎬荤嚎绔彛
*/
-int NetModSocket::pub( char *topic, int topic_size, void *content, int content_size, int port){
- return shmModSocket.pub(topic, topic_size, content, content_size, port);
+int NetModSocket::pub( char *topic, int topic_size, void *content, int content_size, int key){
+ return shmModSocket.pub(topic, topic_size, content, content_size, key);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec){
+int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
struct timespec timeout = {sec, nsec};
- return shmModSocket.pub_timeout(topic, topic_size, content, content_size, port, &timeout);
+ return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout);
}
-int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int port){
- return shmModSocket.pub_nowait(topic, topic_size, content, content_size, port);
+int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
+ return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key);
}
/**
* 鑾峰彇soket绔彛鍙�
*/
-int NetModSocket::get_port() {
- return shmModSocket.get_port();
+int NetModSocket::get_key() {
+ return shmModSocket.get_key();
}
@@ -516,9 +648,8 @@
//======================================================================================
-
-
-int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, void *send_buf, int send_size) {
+int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head,
+ void *content_buf, int content_size, void *topic_buf, int topic_size) {
int buf_size;
char *buf;
@@ -530,7 +661,7 @@
max_buf_size = MAXBUF;
}
- buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
+ buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + topic_size ;
if(max_buf_size < buf_size) {
if((buf = (char *)realloc(buf, buf_size)) == NULL) {
@@ -542,7 +673,9 @@
}
memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
- memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, send_buf, send_size);
+ memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, content_buf, content_size);
+ if(topic_size != 0 )
+ memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH + content_size, topic_buf, topic_size);
if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request rio_writen");
@@ -572,7 +705,7 @@
response_head = NetModSocket::decode_response_head(response_head_bs);
// printf(">>>> read_response %s\n", response_head.host);
if(response_head.code != 0) {
- // 瀵规柟娌℃湁瀵瑰簲鐨刱ey
+ // 浠g悊鏈嶅姟娌¤兘鎴愬姛鍙戦�佺粰瀵瑰簲鐨刱ey
return 1;
}
@@ -598,96 +731,7 @@
-void NetModSocket::init_req_rep_req_resp_pool()
-{
- /* Initially, there are no connected descriptors */
- int i;
- req_resp_pool.maxi = -1; //line:conc:echoservers:beginempty
- for (i = 0; i < OPEN_MAX; i++) {
- req_resp_pool.conns[i].fd = -1;
- req_resp_pool.conns[i].events = 0;
- }
-}
-
-int NetModSocket::connect( net_node_t *node) {
- std::map<std::string, int>::iterator mapIter;
- int connfd;
- int i;
- char mapKey[256];
- char portstr[32];
-
- sprintf(mapKey, "%s:%d", node->host, node->port);
- mapIter = req_resp_pool.connectionMap.find(mapKey);
- if( mapIter != req_resp_pool.connectionMap.end()) {
- connfd = mapIter->second;
-// printf("hit: %s\n", mapKey);
- } else {
-// printf("mis: %s\n", mapKey);
- sprintf(portstr, "%d", node->port);
-// printf("open before: %s\n", mapKey);
- connfd = open_clientfd(node->host, portstr);
-// printf("open after: %s\n", mapKey);
- if(connfd < 0) {
- LoggerFactory::getLogger()->error(errno, "connect %s:%d ", node->host, node->port);
- return -1;
- }
- req_resp_pool.connectionMap.insert({mapKey, connfd});
- }
-
-
- for (i = 0; i < OPEN_MAX; i++) { /* Find an available slot */
- if (req_resp_pool.conns[i].fd < 0)
- {
- /* Add connected descriptor to the req_resp_pool */
- req_resp_pool.conns[i].fd = connfd;
-
- req_resp_pool.conns[i].events = POLLIN;
- /* Add the descriptor to descriptor set */
- break;
- }
- }
-
- if (i > req_resp_pool.maxi)
- req_resp_pool.maxi = i;
-
- if (i == OPEN_MAX) {
- /* Couldn't find an empty slot */
- LoggerFactory::getLogger()->error(errno, "add_client error: Too many clients");
- return -1;
- }
-
-
- return connfd;
-}
-
-void NetModSocket::close_connect(int connfd) {
- int i;
- char mapKey[256];
- std::map<std::string, int>::iterator map_iter;
- if(close(connfd) != 0) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect close");
- }
-
-
- for (i = 0; i <= req_resp_pool.maxi; i++) {
- if(req_resp_pool.conns[i].fd == connfd) {
- req_resp_pool.conns[i].fd = -1;
- }
- }
-
- for ( map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); ) {
- if(connfd == map_iter->second) {
-// std::cout << "map_iter->first==" << map_iter->first << std::endl;
- map_iter = req_resp_pool.connectionMap.erase(map_iter);
- } else {
- ++map_iter;
- }
- }
-
- LoggerFactory::getLogger()->debug( "closed %d\n", connfd);
-
-}
/**
uint32_t mod;
char host[NI_MAXHOST];
@@ -717,6 +761,9 @@
tmp_ptr += 4;
PUT(tmp_ptr, htonl(head.topic_length));
+
+ tmp_ptr += 4;
+ PUT_INT32(tmp_ptr, htonl(head.timeout));
return headbs;
@@ -743,6 +790,9 @@
tmp_ptr += 4;
head.topic_length = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.timeout = ntohl(GET_INT32(tmp_ptr));
return head;
}
@@ -795,3 +845,4 @@
return head;
}
+
--
Gitblit v1.8.0