#include "shm_mod_socket.h"
|
#include "bus_server_socket.h"
|
static Logger *logger = LoggerFactory::getLogger();
|
|
|
size_t ShmModSocket::remove_keys(int keys[], size_t length) {
|
BusServerSocket::remove_subscripters(keys, length);
|
return shm_socket_remove_keys(keys, length);
|
}
|
|
ShmModSocket::ShmModSocket() {
|
mod = (socket_mod_t)0;
|
shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
|
bus_set = new std::set<int>;
|
}
|
|
ShmModSocket::~ShmModSocket() {
|
logger->debug("Destory ShmModSocket...\n");
|
struct timespec timeout = {1, 0};
|
if(bus_set != NULL) {
|
for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
|
desub_timeout(NULL, 0, *bus_iter, &timeout);
|
}
|
delete bus_set;
|
}
|
|
shm_close_socket(shm_socket);
|
// printf("ShmModSocket destory 4\n");
|
}
|
|
int ShmModSocket::bind(int key) {
|
return shm_socket_bind(shm_socket, key);
|
}
|
|
/**
|
* 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::force_bind(int key) {
|
return shm_socket_force_bind(shm_socket, key);
|
}
|
/**
|
* 发送信息
|
* @key 发送给谁
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::sendto(const void *buf, const int size, const int key) {
|
return shm_sendto(shm_socket, buf, size, key, NULL, 0);
|
}
|
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
|
return shm_sendto(shm_socket, buf, size, key, timeout, 0);
|
}
|
// 发送信息立刻返回。
|
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
|
return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
|
}
|
|
|
inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) {
|
|
if(mod == BUS) {
|
logger->error("Can not use method recvfrom in a Bus");
|
exit(1);
|
}
|
// printf("dgram_mod_recvfrom before\n");
|
int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
|
// printf("dgram_mod_recvfrom after\n");
|
return rv;
|
}
|
/**
|
* 接收信息
|
* @key 从谁哪里收到的信息
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
|
|
return _recvfrom_( buf, size, key, NULL, 0);
|
}
|
|
|
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
|
return _recvfrom_(buf, size, key, timeout, 0);
|
}
|
|
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
|
return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
|
}
|
|
/**
|
* 发送请求信息并等待接收应答
|
* @key 发送给谁
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
|
return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
|
return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
|
}
|
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
|
return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
|
}
|
|
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
|
return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
|
return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
|
}
|
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
|
return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
|
}
|
|
|
|
|
/**
|
* 订阅指定主题
|
* @topic 主题
|
* @size 主题长度
|
* @key 总线端口
|
*/
|
int ShmModSocket::sub(char *topic, int size, int key){
|
return _sub_( topic, size, key, NULL, 0);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){
|
return _sub_(topic, size, key, timeout, 0);
|
}
|
int ShmModSocket::sub_nowait(char *topic, int size, int key) {
|
return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT);
|
}
|
|
|
|
/**
|
* 取消订阅指定主题
|
* @topic 主题
|
* @size 主题长度
|
* @key 总线端口
|
*/
|
int ShmModSocket::desub(char *topic, int size, int key){
|
return _desub_( topic, size, key, NULL, 0);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){
|
return _desub_(topic, size, key, timeout, 0);
|
}
|
int ShmModSocket::desub_nowait(char *topic, int size, int key) {
|
return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT);
|
}
|
|
|
|
/**
|
* 发布主题
|
* @topic 主题
|
* @content 主题内容
|
* @key 总线端口
|
*/
|
int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
|
return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
|
}
|
// 超时返回。 @sec 秒 , @nsec 纳秒
|
int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){
|
return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
|
}
|
int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
|
return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
|
}
|
|
|
/**
|
* 获取soket key
|
*/
|
int ShmModSocket::get_key(){
|
return shm_socket->key;
|
}
|
|
|
|
// =============================================================================
|
/**
|
* @key 总线端口
|
*/
|
int ShmModSocket::_sub_(char *topic, int size, int key,
|
struct timespec *timeout, int flags) {
|
char buf[8192];
|
int rv;
|
snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
|
rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
|
if(rv == 0) {
|
bus_set->insert(key);
|
}
|
return rv;
|
}
|
|
|
/**
|
* @key 总线端口
|
*/
|
int ShmModSocket::_desub_(char *topic, int size, int key,
|
struct timespec *timeout, int flags) {
|
char buf[8192];
|
if(topic == NULL) {
|
topic = "";
|
}
|
snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
|
return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
|
}
|
|
/**
|
* @key 总线端口
|
* @str "<**pub**>{经济}"
|
*/
|
|
int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,
|
struct timespec *timeout, int flags) {
|
int head_len;
|
char buf[8192+content_size];
|
snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
|
head_len = strlen(buf);
|
memcpy(buf+head_len, content, content_size);
|
return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
|
|
}
|
|
|
|
|