#include "net_mod_socket.h"
|
#include "socket_io.h"
|
#include "net_mod_socket_io.h"
|
|
#include <sys/types.h> /* See NOTES */
|
#include <sys/socket.h>
|
#include <pthread.h>
|
|
static Logger *logger = LoggerFactory::getLogger();
|
|
static pthread_once_t once = PTHREAD_ONCE_INIT;
|
static pthread_key_t poolKey;
|
|
|
|
NetModSocket::NetModSocket()
|
{
|
if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
|
logger->error(errno, "NetModSocket::NetModSocket signal");
|
|
}
|
|
|
NetModSocket::~NetModSocket() {
|
|
}
|
|
|
int NetModSocket::stop() {
|
return shmModSocket.stop();
|
}
|
|
/**
|
* 绑定端口到socket, 如果不绑定则系统自动分配一个
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int NetModSocket::bind(int key) {
|
return shmModSocket.bind(key);
|
}
|
|
/**
|
* 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int NetModSocket::force_bind( int key) {
|
return shmModSocket.force_bind(key);
|
}
|
|
int NetModSocket::bind_proc_id(char *buf, int len) {
|
return shmModSocket.bind_proc_id(buf, len);
|
}
|
|
int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
|
|
return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
|
}
|
|
// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
|
// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
|
// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
|
// }
|
// int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
|
// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) {
|
// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
|
// }
|
// int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
|
// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
|
// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
|
|
// }
|
|
|
/* Free thread-specific data buffer */
|
void NetModSocket::_destroyConnPool_(void *_pool)
|
{
|
|
NetConnPool *mpool = (NetConnPool *)_pool;
|
delete mpool;
|
|
}
|
|
/* One-time key creation function */
|
void NetModSocket::_createConnPoolKey_(void)
|
{
|
int ret;
|
|
/* Allocate a unique thread-specific data key and save the address
|
of the destructor for thread-specific data buffers */
|
|
ret = pthread_key_create(&poolKey, _destroyConnPool_);
|
if (ret != 0) {
|
logger->error(ret, "pthread_key_create");
|
exit(1);
|
}
|
}
|
|
NetConnPool* NetModSocket::_get_threadlocal_pool() {
|
|
int ret;
|
NetConnPool *mpool;
|
/* Make first caller allocate key for thread-specific data */
|
ret = pthread_once(&once, _createConnPoolKey_);
|
if (ret != 0) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_once");
|
exit(1);
|
}
|
|
mpool = (NetConnPool *)pthread_getspecific(poolKey);
|
if (mpool == NULL)
|
{
|
/* If first call from this thread, allocate buffer for thread, and save its location */
|
logger->debug("Create connPool");
|
mpool = new NetConnPool();
|
if (mpool == NULL) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
|
exit(1);
|
}
|
|
ret = pthread_setspecific(poolKey, mpool);
|
if (ret != 0) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
|
exit(1);
|
}
|
}
|
return mpool;
|
|
}
|
|
NetConnPool* NetModSocket::_get_pool() {
|
return _get_threadlocal_pool();
|
}
|
|
|
|
|
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
|
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
|
net_mod_err_t ** _err_arr, int *_err_arr_size, int msec ) {
|
|
int i, recv_size, connfd;
|
net_node_t *node;
|
void *recv_buf = NULL;
|
struct timespec timeout;
|
int ret;
|
int n_req = 0, n_recv_suc = 0, n_err = 0, n_resp =0;
|
|
net_mod_request_head_t request_head = {};
|
|
net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
|
|
net_mod_err_t *err_arr = (net_mod_err_t *)calloc(arrlen, sizeof(net_mod_err_t));
|
|
NetConnPool *mpool = _get_pool();
|
|
for (i = 0; i< arrlen; i++) {
|
|
node = &node_arr[i];
|
if(node->host == NULL || strcmp(node->host, "") == 0 ) {
|
// 本地发送
|
|
if(msec == 0) {
|
ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG);
|
} else if(msec > 0){
|
timeout.tv_sec = msec / 1000;
|
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
|
ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG);
|
} else {
|
ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
|
}
|
|
if( ret == 0) {
|
strcpy( ret_arr[n_recv_suc].host, "");
|
ret_arr[n_recv_suc].port = 0;
|
ret_arr[n_recv_suc].key = node->key;
|
ret_arr[n_recv_suc].content = recv_buf;
|
ret_arr[n_recv_suc].content_length = recv_size;
|
n_recv_suc++;
|
} else {
|
err_arr[n_err].port = 0;
|
err_arr[n_err].key = node->key;
|
err_arr[n_err].code = ret;
|
n_err++;
|
// logger->error("NetModSocket:: _sendandrecv_ to key %d failed. %s", node->key, bus_strerror(ret));
|
}
|
|
|
continue;
|
}
|
|
if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) {
|
memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host));
|
err_arr[n_err].port = node->port;
|
err_arr[n_err].key = node->key;
|
err_arr[n_err].code = EBUS_NET;
|
n_err++;
|
continue;
|
}
|
|
|
request_head.mod = REQ_REP;
|
memcpy(request_head.host, node->host, sizeof(request_head.host));
|
request_head.port = node->port;
|
request_head.key = node->key;
|
request_head.content_length = send_size;
|
request_head.timeout = msec;
|
|
// printf("write_request %s:%d\n", request_head.host, request_head.port);
|
if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) {
|
LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
|
memcpy(err_arr[n_err].host, node->host, sizeof(err_arr[n_err].host));
|
err_arr[n_err].port = node->port;
|
err_arr[n_err].key = node->key;
|
err_arr[n_err].code = EBUS_NET;
|
n_err++;
|
mpool->closeConn( connfd);
|
} else {
|
n_req++;
|
}
|
|
}
|
|
// printf(" mpool->maxi = %d\n", mpool->maxi);
|
// printf(" n_req = %d\n", n_req);
|
|
while(n_resp < n_req)
|
{
|
/* Wait for listening/connected descriptor(s) to become ready */
|
if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
|
// wirite_set 和 read_set 在指定时间内都没准备好
|
break;
|
}
|
// printf("mpool->nready =%d\n", mpool->nready);
|
for (i = 0; (i <= mpool->maxi) && (mpool->nready > 0); i++) {
|
if ( (connfd = mpool->conns[i].fd) > 0 ) {
|
/* If the descriptor is ready, echo a text line from it */
|
if (mpool->conns[i].revents & POLLIN )
|
{
|
mpool->nready--;
|
// printf("POLLIN %d\n", connfd);
|
if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_err)) == 0) {
|
n_recv_suc++;
|
// 成功收到返回消息,清空读入位
|
mpool->conns[i].fd = -1;
|
|
}
|
else if(ret == EBUS_NET) {
|
// 网络错误
|
|
logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret));
|
mpool->closeConn( connfd);
|
n_err++;
|
// mpool->conns[i].fd = -1;
|
} else {
|
// 代理服务没有转发成功
|
|
logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(), bus_strerror(ret));
|
mpool->conns[i].fd = -1;
|
n_err++;
|
}
|
|
n_resp++;
|
// printf("read response %d\n", n);
|
|
}
|
|
if (mpool->conns[i].revents & POLLOUT ) {
|
// printf("poll POLLOUT %d\n", connfd);
|
}
|
|
if (mpool->conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
|
{
|
// printf("poll POLLERR %d\n", connfd);
|
mpool->nready--;
|
mpool->closeConn( connfd);
|
}
|
}
|
}
|
}
|
|
//超时后,关闭超时连接
|
for (i = 0; i <= mpool->maxi; i++) {
|
if ( (connfd = mpool->conns[i].fd) > 0 ) {
|
// 关闭并清除写入或读取失败的连接
|
mpool->closeConn( connfd);
|
}
|
}
|
|
mpool->maxi = -1;
|
|
|
if(recv_arr != NULL) {
|
|
if(n_recv_suc > 0) {
|
*recv_arr = ret_arr;
|
|
} else {
|
free_recv_msg_arr(ret_arr, n_recv_suc);
|
}
|
|
} else {
|
free_recv_msg_arr(ret_arr, n_recv_suc);
|
}
|
|
if(recv_arr_size != NULL) {
|
*recv_arr_size = n_recv_suc;
|
}
|
|
|
if(_err_arr != NULL) {
|
|
if(n_err > 0) {
|
*_err_arr = err_arr;
|
|
|
} else {
|
*_err_arr = NULL;
|
*_err_arr_size = 0;
|
free(err_arr);
|
}
|
|
} else {
|
free(err_arr);
|
}
|
|
if(_err_arr_size != NULL) {
|
*_err_arr_size = n_err;
|
}
|
return n_recv_suc;
|
|
}
|
|
|
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
|
|
for(int i =0; i< size; i++) {
|
if(arr[i].content != NULL)
|
free(arr[i].content);
|
}
|
free(arr);
|
}
|
|
|
int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
|
return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1);
|
}
|
|
int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
|
return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, 0);
|
}
|
|
int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec ) {
|
return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
|
}
|
|
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
|
int content_size, int msec) {
|
int i, connfd;
|
net_node_t *node;
|
struct timespec timeout;
|
|
net_mod_request_head_t request_head;
|
net_mod_recv_msg_t recv_msg;
|
char portstr[32];
|
int n_req = 0, n_pub_suc = 0, n_resp = 0;
|
|
int ret;
|
NetConnPool *mpool = _get_pool();
|
|
net_mod_err_t err_msg;
|
|
// 本地发送
|
if ((node_arr == NULL) || (arrlen == 0)) {
|
if(msec == 0) {
|
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
|
} else if(msec > 0) {
|
timeout.tv_sec = msec / 1000;
|
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
|
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
|
} else {
|
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
|
}
|
if(ret == 0 ) {
|
n_pub_suc++;
|
}
|
return n_pub_suc;
|
}
|
|
for (i = 0; i < arrlen; i++) {
|
|
node = &node_arr[i];
|
if(node->host == NULL) {
|
// 本地发送
|
if(msec == 0) {
|
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
|
} else if(msec > 0) {
|
timeout.tv_sec = msec / 1000;
|
timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
|
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG);
|
} else {
|
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
|
}
|
|
if(ret == 0 ) {
|
n_pub_suc++;
|
}
|
|
|
} else {
|
sprintf(portstr, "%d", node->port);
|
if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) {
|
continue;
|
}
|
request_head.mod = BUS;
|
memcpy(request_head.host, node->host, sizeof(request_head.host));
|
request_head.key = SHM_BUS_KEY;
|
request_head.content_length = content_size;
|
request_head.topic_length = strlen(topic) + 1;
|
request_head.timeout = msec;
|
|
if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
|
LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
|
mpool->closeConn( connfd);
|
} else {
|
n_req++;
|
}
|
|
}
|
}
|
|
while(n_resp < n_req)
|
{
|
/* Wait for listening/connected descriptor(s) to become ready */
|
if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
|
// wirite_set 和 read_set 在指定时间内都没准备好
|
break;
|
}
|
// printf("mpool->nready =%d\n", mpool->nready);
|
for (i = 0; (i <= mpool->maxi) && (mpool->nready > 0); i++) {
|
if ( (connfd = mpool->conns[i].fd) > 0 ) {
|
if (mpool->conns[i].revents & POLLIN )
|
{
|
mpool->nready--;
|
// printf("POLLIN %d\n", connfd);
|
if( (ret = read_response(connfd, &recv_msg, &err_msg)) == 0) {
|
|
// 成功收到返回消息,清空读入位
|
mpool->conns[i].fd = -1;
|
n_pub_suc++;
|
}
|
else if(ret == EBUS_NET) {
|
// 网络连接错误
|
mpool->closeConn( connfd);
|
} else {
|
// 代理服务没有转发成功
|
mpool->conns[i].fd = -1;
|
}
|
n_resp++;
|
// printf("read response %d\n", n);
|
}
|
|
if (mpool->conns[i].revents & POLLOUT ) {
|
// printf("poll POLLOUT %d\n", connfd);
|
}
|
|
if (mpool->conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
|
{
|
// printf("poll POLLERR %d\n", connfd);
|
mpool->nready--;
|
|
mpool->conns[i].fd = -1;
|
mpool->closeConn( connfd);
|
}
|
}
|
}
|
}
|
|
//超时后,关闭超时连接
|
for (i = 0; i <= mpool->maxi; i++) {
|
if ( (connfd = mpool->conns[i].fd) > 0 ) {
|
// 关闭并清除写入或读取失败的连接
|
mpool->closeConn( connfd);
|
// mpool->conns[i].fd = -1;
|
}
|
}
|
|
mpool->maxi = -1;
|
|
return n_pub_suc;
|
}
|
|
/**
|
* 发送信息
|
* @key 发送给谁
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int NetModSocket::sendto(const void *buf, const int size, const int key){
|
return shmModSocket.sendto(buf, size, key);
|
}
|
|
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
|
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
|
struct timespec timeout = {sec, nsec};
|
return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
|
|
}
|
|
// 发送信息立刻返回。
|
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
|
return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
|
|
}
|
|
/**
|
* 接收信息
|
* @key 从谁哪里收到的信息
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
|
|
return shmModSocket.recvfrom(buf, size, key);
|
|
}
|
|
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
|
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
|
struct timespec timeout = {sec, nsec};
|
return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
|
|
}
|
|
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
|
return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
|
}
|
|
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
|
const struct timespec *timeout , int flag, void * user_data ) {
|
|
return shmModSocket.recvandsend(callback, timeout, flag, user_data);
|
}
|
|
|
/**
|
* 发送请求信息并等待接收应答
|
* @key 发送给谁
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int NetModSocket::sendandrecv( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size){
|
return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){
|
struct timespec timeout = {sec, nsec};
|
return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG);
|
}
|
int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) {
|
return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG);
|
}
|
|
|
/**
|
* 订阅指定主题
|
* @topic 主题
|
* @size 主题长度
|
* @key 总线端口
|
*/
|
int NetModSocket::sub( void *topic, int size, int key){
|
return shmModSocket.sub((char *)topic, size, key);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){
|
struct timespec timeout = {sec, nsec};
|
return shmModSocket.sub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG);
|
}
|
int NetModSocket::sub_nowait( void *topic, int size, int key){
|
return shmModSocket.sub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG);
|
}
|
|
|
|
/**
|
* 取消订阅指定主题
|
* @topic 主题
|
* @size 主题长度
|
* @key 总线端口
|
*/
|
int NetModSocket::desub( void *topic, int size, int key){
|
return shmModSocket.desub((char *)topic, size, key);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){
|
struct timespec timeout = {sec, nsec};
|
return shmModSocket.desub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG);
|
}
|
int NetModSocket::desub_nowait( void *topic, int size, int key){
|
return shmModSocket.desub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG);
|
}
|
|
|
|
/**
|
* 发布主题
|
* @topic 主题
|
* @content 主题内容
|
* @key 总线端口
|
*/
|
int NetModSocket::pub( char *topic, int topic_size, void *content, int content_size, int key){
|
return shmModSocket.pub(topic, topic_size, content, content_size, key);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){
|
struct timespec timeout = {sec, nsec};
|
return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG);
|
}
|
int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){
|
return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
|
}
|
|
|
/**
|
* 获取soket端口号
|
*/
|
int NetModSocket::get_key() {
|
return shmModSocket.get_key();
|
}
|
|
|
|
|
|
//======================================================================================
|
|
int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head,
|
const void *content_buf, int content_size, const void *topic_buf, int topic_size) {
|
|
int buf_size;
|
char *buf;
|
int max_buf_size;
|
if((buf = (char *)malloc(MAXBUF)) == NULL) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
|
exit(1);
|
} else {
|
max_buf_size = MAXBUF;
|
}
|
|
buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + topic_size ;
|
if(max_buf_size < buf_size) {
|
|
if((buf = (char *)realloc(buf, buf_size)) == NULL) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request realloc buf");
|
exit(1);
|
} else {
|
max_buf_size = buf_size;
|
}
|
}
|
|
memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
|
memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, content_buf, content_size);
|
if(topic_size != 0 )
|
memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH + content_size, topic_buf, topic_size);
|
|
if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request rio_writen");
|
free(buf);
|
return -1;
|
}
|
free(buf);
|
return 0;
|
}
|
|
/**
|
* @return 0 成功, EBUS_NET 网络错误, 其他值 代理服务没有转发成功。
|
*
|
*/
|
int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr) {
|
int recv_size;
|
void *recv_buf;
|
char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
|
|
net_mod_response_head_t response_head;
|
if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response rio_readnb response_head");
|
memcpy(err_arr->host, "unkown", sizeof(err_arr->host));
|
err_arr->port = 0;
|
err_arr->key = 0;
|
err_arr->code = EBUS_NET;
|
return EBUS_NET;
|
}
|
|
response_head = NetModSocket::decode_response_head(response_head_bs);
|
// printf(">>>> read_response %s\n", response_head.host);
|
if(response_head.code != 0) {
|
// 代理服务没能成功发送给对应的key
|
memcpy(err_arr->host, response_head.host, sizeof(err_arr->host));
|
err_arr->port = response_head.port;
|
err_arr->key = response_head.key;
|
err_arr->code = response_head.code;
|
return response_head.code;
|
}
|
|
recv_buf = malloc(response_head.content_length);
|
if(recv_buf == NULL) {
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf");
|
exit(1);
|
}
|
|
if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) {
|
|
memcpy(err_arr->host, response_head.host, sizeof(err_arr->host));
|
err_arr->port = response_head.port;
|
err_arr->key = response_head.key;
|
err_arr->code = EBUS_NET;
|
LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response rio_readnb recv_buf");
|
//网络错误
|
return EBUS_NET;
|
}
|
|
strcpy( recv_msg->host, response_head.host);
|
recv_msg->port = response_head.port;
|
recv_msg->key = response_head.key;
|
recv_msg->content = recv_buf;
|
recv_msg->content_length = recv_size;
|
return 0;
|
}
|
|
|
|
|
|
/**
|
uint32_t mod;
|
char host[NI_MAXHOST];
|
uint32_t port;
|
uint32_t key;
|
uint32_t content_length;
|
uint32_t topic_length;
|
*/
|
|
void * NetModSocket::encode_request_head(net_mod_request_head_t & head) {
|
void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH);
|
char *tmp_ptr = (char *)headbs;
|
|
PUT(tmp_ptr, htonl(head.mod));
|
|
tmp_ptr += 4;
|
memcpy(tmp_ptr, head.host, sizeof(head.host));
|
|
tmp_ptr += sizeof(head.host);
|
PUT(tmp_ptr, htonl(head.port));
|
|
tmp_ptr += 4;
|
PUT(tmp_ptr, htonl(head.key));
|
|
tmp_ptr += 4;
|
PUT(tmp_ptr, htonl(head.content_length));
|
|
tmp_ptr += 4;
|
PUT(tmp_ptr, htonl(head.topic_length));
|
|
tmp_ptr += 4;
|
PUT_INT32(tmp_ptr, htonl(head.timeout));
|
|
|
return headbs;
|
}
|
|
net_mod_request_head_t NetModSocket::decode_request_head(void *headbs) {
|
char *tmp_ptr = (char *)headbs;
|
net_mod_request_head_t head;
|
|
head.mod = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += sizeof(uint32_t);
|
memcpy(head.host, tmp_ptr, sizeof(head.host));
|
|
|
tmp_ptr += sizeof(head.host);
|
head.port = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += sizeof(uint32_t);
|
head.key = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += sizeof(uint32_t);
|
head.content_length = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += sizeof(uint32_t);
|
head.topic_length = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += sizeof(uint32_t);
|
head.timeout = ntohl(GET_INT32(tmp_ptr));
|
|
return head;
|
}
|
|
/**
|
char host[NI_MAXHOST];
|
uint32_t port;
|
uint32_t key;
|
uint32_t content_length;
|
uint32_t code;
|
*/
|
void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
|
void * headbs = malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
|
char *tmp_ptr = (char *)headbs;
|
|
memcpy(tmp_ptr, response.host, NI_MAXHOST);
|
|
tmp_ptr += NI_MAXHOST;
|
PUT(tmp_ptr, htonl(response.port));
|
|
tmp_ptr += 4;
|
PUT(tmp_ptr , htonl(response.key));
|
|
tmp_ptr += 4;
|
PUT(tmp_ptr, htonl(response.content_length));
|
|
tmp_ptr += 4;
|
PUT(tmp_ptr, htonl(response.code));
|
|
return headbs;
|
}
|
|
net_mod_response_head_t NetModSocket::decode_response_head(void *headbs) {
|
char *tmp_ptr = (char *)headbs;
|
net_mod_response_head_t head;
|
|
memcpy(head.host, tmp_ptr, NI_MAXHOST);
|
|
tmp_ptr += NI_MAXHOST;
|
head.port = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += 4;
|
head.key = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += 4;
|
head.content_length = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += 4;
|
head.code = ntohl(GET(tmp_ptr));
|
|
return head;
|
}
|