#include "shm_mod_socket.h"
|
#include "bus_server_socket.h"
|
static Logger *logger = LoggerFactory::getLogger();
|
|
|
|
ShmModSocket::ShmModSocket() {
|
shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
|
bus_set = new std::set<int>;
|
}
|
|
ShmModSocket::~ShmModSocket() {
|
struct timespec timeout = {1, 0};
|
if(bus_set != NULL) {
|
for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
|
desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
|
}
|
delete bus_set;
|
}
|
|
shm_socket_close(shm_socket);
|
}
|
|
int ShmModSocket::stop() {
|
return shm_socket_stop(shm_socket);
|
}
|
|
|
int ShmModSocket::bind(int key) {
|
return shm_socket_bind(shm_socket, key);
|
}
|
|
/**
|
* 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::force_bind(int key) {
|
return shm_socket_force_bind(shm_socket, key);
|
}
|
|
int ShmModSocket::bind_proc_id(char *buf, int len) {
|
return shm_socket_bind_proc_id(shm_socket, buf, len);
|
}
|
|
int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
|
{
|
int ret;
|
struct timespec ts;
|
|
bus_head_t head = {};
|
|
if (flag == PROC_REG) {
|
|
memcpy(head.action, "reg", sizeof(head.action));
|
|
} else if (flag == PROC_UNREG) {
|
|
memcpy(head.action, "unreg", sizeof(head.action));
|
|
} else if (flag == PROC_REG_TCS) {
|
|
memcpy(head.action, "tcsreg", sizeof(head.action));
|
|
} else if (flag == PROC_QUE_TCS) {
|
|
memcpy(head.action, "tcsque", sizeof(head.action));
|
|
} else if (flag == PROC_QUE_STCS) {
|
|
memcpy(head.action, "stcsque", sizeof(head.action));
|
|
} else if (flag == PROC_QUE_ATCS) {
|
|
memcpy(head.action, "atcsque", sizeof(head.action));
|
|
} else {
|
|
return -1;
|
|
}
|
|
if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
|
|
head.topic_size = 0;
|
|
if (pData != NULL) {
|
|
head.content_size = sizeof(ProcInfo);
|
|
} else {
|
|
head.content_size = 0;
|
|
}
|
} else {
|
|
head.topic_size = len;
|
|
head.content_size = 0;
|
|
}
|
|
void *buf_temp;
|
int buf_size;
|
|
if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
|
|
buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
|
|
} else {
|
|
buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
|
|
}
|
|
if (timeout_ms > 0) {
|
|
ts.tv_sec = timeout_ms /1000;
|
|
ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
|
|
if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
|
|
ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
|
|
} else {
|
|
ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
|
|
}
|
|
} else if (timeout_ms == 0) {
|
|
if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
|
|
ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
|
|
} else {
|
|
ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
|
|
}
|
|
} else {
|
|
if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
|
|
ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
|
|
} else {
|
|
ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
|
|
}
|
|
}
|
|
free(buf_temp);
|
|
return ret;
|
|
}
|
|
/**
|
* 发送信息
|
* @key 发送给谁
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
|
int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
|
if(rv == 0) {
|
logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
|
return 0;
|
}
|
|
logger->debug("ShmModSocket::sendto : %d sendto %d failed %s", get_key(), key, bus_strerror(rv));
|
return rv;
|
}
|
|
/**
|
* 接收信息
|
* @key 从谁哪里收到的信息
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
|
|
int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
|
|
if(rv == 0) {
|
logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
|
return 0;
|
}
|
|
logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
|
return rv;
|
}
|
|
|
/**
|
* 发送请求信息并等待接收应答
|
* @key 发送给谁
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
|
void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
|
int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
|
|
if(rv == 0) {
|
logger->debug("ShmModSocket::sendandrecv: sendandrecv to %d success.\n", send_key);
|
return 0;
|
}
|
|
logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s", send_key, bus_strerror(rv));
|
return rv;
|
}
|
|
|
int ShmModSocket::recvandsend( recvandsend_callback_fn callback,
|
const struct timespec *timeout , int flag, void * user_data ) {
|
return shm_recvandsend(shm_socket, callback, timeout, flag, user_data);
|
}
|
|
// // 超时返回。 @sec 秒 , @nsec 纳秒
|
// int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
|
// void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
|
// return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
|
// }
|
|
/**
|
* 订阅指定主题
|
* @topic 主题
|
* @size 主题长度
|
* @key 总线端口
|
*/
|
int ShmModSocket::sub(const char *topic, int topic_size, int key,
|
const struct timespec *timeout, int flags) {
|
int ret;
|
bus_head_t head = {};
|
memcpy(head.action, "sub", sizeof(head.action));
|
head.topic_size = topic_size = strlen(topic) + 1;
|
head.content_size = 0;
|
|
void *buf;
|
int size = get_bus_sendbuf(head, topic, topic_size, NULL, 0, &buf);
|
if(size > 0) {
|
ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
|
free(buf);
|
if(ret == 0) {
|
bus_set->insert(key);
|
}
|
return ret;
|
} else {
|
return -1;
|
}
|
}
|
|
|
|
|
/**
|
* 取消订阅指定主题
|
* @topic 主题
|
* @size 主题长度
|
* @key 总线端口
|
*/
|
int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
|
// char buf[8192];
|
int ret;
|
if(topic == NULL) {
|
topic = "";
|
}
|
// snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
|
// return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
|
|
bus_head_t head = {};
|
memcpy(head.action, "desub", sizeof(head.action));
|
head.topic_size = topic_size = strlen(topic) + 1;
|
head.content_size = 0;
|
void *buf;
|
int size = get_bus_sendbuf(head, topic, topic_size, NULL, 0, &buf);
|
if(size > 0) {
|
ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
|
free(buf);
|
if(ret == 0) {
|
return 0;
|
} else {
|
logger->error("ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(ret));
|
return ret;
|
}
|
} else {
|
return -1;
|
}
|
|
}
|
|
|
|
/**
|
* 发布主题
|
* @topic 主题
|
* @content 主题内容
|
* @key 总线端口
|
*/
|
int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) {
|
int ret;
|
bus_head_t head = {};
|
memcpy(head.action, "pub", sizeof(head.action));
|
head.topic_size = topic_size = strlen(topic) + 1;
|
head.content_size = content_size;
|
|
void *buf;
|
int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf);
|
if(size > 0) {
|
ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
|
free(buf);
|
return ret;
|
} else {
|
return -1;
|
}
|
|
}
|
|
|
|
/**
|
* 获取soket key
|
*/
|
int ShmModSocket::get_key(){
|
return shm_socket->key;
|
}
|
|
|
|
// =============================================================================
|
|
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
|
const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) {
|
|
int buf_size;
|
char *buf;
|
int max_buf_size;
|
void *buf_ptr;
|
int count = 0;
|
if((buf = (char *) malloc(MAXBUF)) == NULL) {
|
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
|
exit(1);
|
} else {
|
max_buf_size = MAXBUF;
|
}
|
|
buf_size = BUS_HEAD_SIZE + content_size + topic_size;
|
if(max_buf_size < buf_size) {
|
|
if((buf = (char *) realloc(buf, buf_size)) == NULL) {
|
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf");
|
exit(1);
|
} else {
|
max_buf_size = buf_size;
|
}
|
}
|
|
buf_ptr = ShmModSocket::encode_bus_head(request_head);
|
memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
|
if(topic_size != 0 )
|
memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
|
if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
|
(strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
|
memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
|
} else {
|
if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
|
strlen("unreg")) == 0)) && (content_buf != NULL)) {
|
proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
|
|
request_head.content_size = count;
|
buf_size -= (content_size - count);
|
|
}
|
}
|
|
*retbuf = buf;
|
free(buf_ptr);
|
return buf_size;
|
}
|
|
/**
|
char action[];
|
uint32_t topic_size;
|
uint32_t content_size;
|
*/
|
|
void * ShmModSocket::encode_bus_head(bus_head_t & head) {
|
void * headbs = malloc(BUS_HEAD_SIZE);
|
char *tmp_ptr = (char *)headbs;
|
|
memcpy(tmp_ptr, head.action, sizeof(head.action));
|
|
tmp_ptr += sizeof(head.action);
|
PUT(tmp_ptr, htonl(head.topic_size));
|
|
tmp_ptr += sizeof(head.topic_size);
|
PUT(tmp_ptr, htonl(head.content_size));
|
|
return headbs;
|
}
|
|
bus_head_t ShmModSocket::decode_bus_head(void *headbs) {
|
char *tmp_ptr = (char *)headbs;
|
bus_head_t head;
|
|
memcpy(head.action, tmp_ptr, sizeof(head.action));
|
|
tmp_ptr += sizeof(head.action);
|
head.topic_size = ntohl(GET(tmp_ptr));
|
|
tmp_ptr += sizeof(head.topic_size);
|
head.content_size = ntohl(GET(tmp_ptr));
|
|
return head;
|
}
|
|
|