|
#include "bus_server_socket.h"
|
#include "shm_mod_socket.h"
|
#include "shm_socket.h"
|
#include "bus_error.h"
|
|
static Logger *logger = LoggerFactory::getLogger();
|
|
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
|
SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
|
SHMKeySet *subscripter_set;
|
SHMKeySet::iterator set_iter;
|
SHMTopicSubMap::iterator map_iter;
|
|
if(topic_sub_map != NULL) {
|
for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
|
subscripter_set = map_iter->second;
|
if(subscripter_set != NULL) {
|
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
|
cb(subscripter_set, *set_iter);
|
}
|
}
|
}
|
}
|
}
|
|
|
size_t BusServerSocket::remove_subscripters(int keys[], size_t length) {
|
size_t count = 0;
|
int key;
|
for(int i = 0; i < length; i++) {
|
key = keys[i];
|
SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
|
SHMKeySet *subscripter_set;
|
SHMKeySet::iterator set_iter;
|
SHMTopicSubMap::iterator map_iter;
|
|
if(topic_sub_map != NULL) {
|
for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
|
subscripter_set = map_iter->second;
|
if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
|
subscripter_set->erase(set_iter);
|
count++;
|
}
|
}
|
}
|
}
|
return count;
|
|
}
|
|
|
BusServerSocket::BusServerSocket() {
|
shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
|
topic_sub_map = NULL;
|
|
}
|
|
BusServerSocket::~BusServerSocket() {
|
destroy();
|
}
|
|
|
|
int BusServerSocket::bind(int key) {
|
return shm_socket_bind(shm_socket, key);
|
}
|
|
/**
|
* 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int BusServerSocket::force_bind(int key) {
|
return shm_socket_force_bind(shm_socket, key);
|
}
|
|
/**
|
* 启动bus
|
*
|
* @return 0 成功, 其他值 失败的错误码
|
*/
|
int BusServerSocket::start(){
|
int rv;
|
|
topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
|
|
rv = _run_proxy_();
|
|
return rv;
|
}
|
|
|
int BusServerSocket::stop(){
|
int ret;
|
if( shm_socket->key <= 0) {
|
return -1;
|
}
|
bus_head_t head = {};
|
memcpy(head.action, "stop", sizeof(head.action));
|
head.topic_size = 0;
|
head.content_size = 0;
|
|
ShmModSocket client;
|
void *buf;
|
int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf);
|
if(size > 0) {
|
ret = client.sendto( buf, size, shm_socket->key);
|
free(buf);
|
return ret;
|
} else {
|
return -1;
|
}
|
|
}
|
|
int BusServerSocket::destroy() {
|
SHMKeySet *subscripter_set;
|
SHMTopicSubMap::iterator map_iter;
|
if(topic_sub_map != NULL) {
|
for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
|
subscripter_set = map_iter->second;
|
if(subscripter_set != NULL) {
|
subscripter_set->clear();
|
mm_free((void *)subscripter_set);
|
}
|
|
}
|
topic_sub_map->clear();
|
shm_mm_free_by_key(SHM_BUS_MAP_KEY);
|
}
|
shm_socket_close(shm_socket);
|
|
return 0;
|
}
|
|
/*
|
* 处理订阅
|
*/
|
void BusServerSocket::_proxy_sub( char *topic, int key) {
|
SHMKeySet *subscripter_set;
|
|
struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
|
SHMTopicSubMap::iterator map_iter;
|
SHMKeySet::iterator set_iter;
|
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
|
subscripter_set = map_iter->second;
|
} else {
|
void *set_ptr = mm_malloc(sizeof(SHMKeySet));
|
subscripter_set = new(set_ptr) SHMKeySet;
|
topic_sub_map->insert({topic, subscripter_set});
|
}
|
subscripter_set->insert(key);
|
|
}
|
|
/*
|
* 处理取消订阅
|
*/
|
void BusServerSocket::_proxy_desub( char *topic, int key) {
|
SHMKeySet *subscripter_set;
|
|
SHMTopicSubMap::iterator map_iter;
|
// SHMKeySet::iterator set_iter;
|
|
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
|
subscripter_set = map_iter->second;
|
|
subscripter_set->erase(key);
|
}
|
}
|
|
/*
|
* 处理取消所有订阅
|
*/
|
void BusServerSocket::_proxy_desub_all(int key) {
|
SHMKeySet *subscripter_set;
|
|
SHMTopicSubMap::iterator map_iter;
|
// SHMKeySet::iterator set_iter;
|
for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
|
subscripter_set = map_iter->second;
|
subscripter_set->erase(key);
|
}
|
}
|
|
/*
|
* 处理发布,代理转发
|
*/
|
void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
|
SHMKeySet *subscripter_set;
|
|
SHMTopicSubMap::iterator map_iter;
|
SHMKeySet::iterator set_iter;
|
|
std::vector<int> subscripter_to_del;
|
std::vector<int>::iterator vector_iter;
|
|
int send_key;
|
int rv;
|
struct timespec timeout = {1,0};
|
|
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
|
|
subscripter_set = map_iter->second;
|
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
|
send_key = *set_iter;
|
rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
|
if(rv == 0) {
|
continue;
|
}
|
//对方已关闭的或者对应的进程被kill掉的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
|
subscripter_to_del.push_back(send_key);
|
}
|
|
// 删除已关闭的端
|
for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
|
if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
|
subscripter_set->erase(set_iter);
|
logger->debug("remove closed subscripter %d \n", send_key);
|
}
|
}
|
subscripter_to_del.clear();
|
|
}
|
|
}
|
|
ProcInfo_query *Qurey_object(const char *object, int *length) {
|
int flag = 0;
|
int val;
|
int len;
|
int total = 0;
|
ProcInfo *Proc_ptr = NULL;
|
ProcInfo Data_stru;
|
ProcInfo_query *dataBuf = NULL;
|
SvrProc *SvrSub_ele;
|
SvrTcs::iterator svr_tcs_iter;
|
SvrProc::iterator svr_proc_iter;
|
ProcZone::iterator proc_iter;
|
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
|
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
|
|
if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
|
SvrSub_ele = svr_tcs_iter->second;
|
for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
|
val = *svr_proc_iter;
|
|
if ((proc_iter = proc->find(val)) != proc->end()) {
|
|
if (dataBuf == NULL) {
|
dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
|
if (dataBuf == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
total = sizeof(ProcInfo_query);
|
}
|
|
if (flag == 0) {
|
memset(dataBuf, 0x00, sizeof(ProcInfo_query));
|
|
dataBuf->num = 1;
|
strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
|
|
flag = 1;
|
|
} else {
|
dataBuf->num++;
|
len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
|
dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
|
if (dataBuf == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
total += sizeof(ProcInfo);
|
memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
|
}
|
|
memset(&Data_stru, 0x00, sizeof(ProcInfo));
|
Data_stru = proc_iter->second;
|
|
Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
|
strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
|
strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
|
strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
|
strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
|
|
if (length != NULL)
|
*length = total;
|
}
|
}
|
}
|
|
return dataBuf;
|
}
|
|
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
|
{
|
char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
|
int count = 0;
|
int i = 0;
|
int len = 0;
|
char *data_ptr;
|
ProcInfo Data_stru;
|
ProcZone::iterator proc_iter;
|
TcsZone *TcsSub_ele;
|
ProcDataZone::iterator proc_que_iter;
|
ProcTcsMap::iterator proc_tcs_iter;
|
SvrProc *SvrSub_ele;
|
SvrProc::iterator svr_proc_iter;
|
SvrTcs::iterator svr_tcs_iter;
|
TcsZone::iterator tcssub_iter;
|
ProcPartZone::iterator proc_part_iter;
|
|
struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
|
|
if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
|
|
memset(&Data_stru, 0x00, sizeof(ProcInfo));
|
|
if (buf != NULL) {
|
|
memcpy(Data_stru.proc_id, buf, strlen(buf) + 1);
|
count = strlen(buf) + 1;
|
|
memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
|
count += strlen(buf + count) + 1;
|
|
memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
|
count += strlen(buf + count) + 1;
|
|
memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
|
count += strlen(buf + count) + 1;
|
}
|
|
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
|
ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
|
ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
|
if (flag == PROC_REG) {
|
if ((proc_iter = proc->find(key)) == proc->end()) {
|
proc->insert({key, Data_stru});
|
}
|
|
if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
|
procPart->insert({key, Data_stru.proc_id});
|
}
|
|
if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
|
procQuePart->insert({Data_stru.proc_id, key});
|
}
|
|
} else {
|
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
|
|
for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
|
SvrSub_ele = svr_tcs_iter->second;
|
|
SvrSub_ele->erase(key);
|
}
|
|
if ((proc_iter = proc->find(key)) != proc->end()) {
|
|
len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
|
strncpy(buf_temp, (proc_iter->second).proc_id, len);
|
proc->erase(proc_iter);
|
|
}
|
|
if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
|
|
procPart->erase(key);
|
}
|
|
if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
|
|
procQuePart->erase(buf_temp);
|
}
|
|
}
|
} else if (flag == PROC_REG_TCS) {
|
ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
|
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
|
|
if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
|
TcsSub_ele = proc_tcs_iter->second;
|
} else {
|
|
void *ptr_set = mm_malloc(sizeof(TcsZone));
|
TcsSub_ele = new(ptr_set) TcsZone;
|
proc->insert({key, TcsSub_ele});
|
}
|
|
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
|
data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
|
while(data_ptr) {
|
TcsSub_ele->insert(data_ptr);
|
if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
|
SvrSub_ele = svr_tcs_iter->second;
|
} else {
|
|
void *ptr_set = mm_malloc(sizeof(SvrProc));
|
SvrSub_ele = new(ptr_set) SvrProc;
|
SvrData->insert({data_ptr, SvrSub_ele});
|
}
|
SvrSub_ele->insert(key);
|
data_ptr = strtok(NULL, STR_MAGIC);
|
}
|
|
} else if (flag == PROC_QUE_TCS) {
|
|
struct _temp_store {
|
void *ptr;
|
int total;
|
} *temp_store = NULL;
|
|
int num = 0;
|
int sum = 0;
|
|
ProcInfo_query *ret = NULL;
|
ProcInfo_query *ret_store = NULL;
|
|
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
|
data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
|
while(data_ptr) {
|
ret = Qurey_object(data_ptr, &len);
|
if (ret != NULL) {
|
|
if (temp_store == NULL) {
|
temp_store = (_temp_store *)malloc(sizeof(_temp_store));
|
if (temp_store == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
temp_store->ptr = ret;
|
temp_store->total = len;
|
num = 1;
|
|
} else {
|
num++;
|
temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
|
if (temp_store == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
(temp_store + num - 1)->ptr = ret;
|
(temp_store + num - 1)->total = len;
|
}
|
|
}
|
data_ptr = strtok(NULL, STR_MAGIC);
|
}
|
|
if (num > 0) {
|
for (count = 0; count < num; count++) {
|
|
if (ret_store == NULL) {
|
ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
|
if (ret_store == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
sum = (temp_store + count)->total;
|
memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
|
|
} else {
|
|
ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
|
if (ret_store == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
|
|
sum += (temp_store + count)->total;
|
|
}
|
|
free((temp_store + count)->ptr);
|
|
}
|
|
free(temp_store);
|
}
|
|
void *last_buf = malloc(sum + sizeof(int));
|
if (last_buf == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
*(int *)last_buf = num;
|
if (num > 0) {
|
memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
|
free(ret_store);
|
}
|
|
shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
|
|
free(last_buf);
|
} else if (flag == PROC_QUE_STCS) {
|
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
|
|
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
|
if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
|
SvrSub_ele = svr_tcs_iter->second;
|
|
for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
|
count = *svr_proc_iter;
|
|
break;
|
}
|
} else {
|
count = 0;
|
}
|
|
memset(buf_temp, 0x00, sizeof(buf_temp));
|
sprintf(buf_temp, "%d", count);
|
shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
|
|
} else {
|
|
int val;
|
int temp = 0;
|
int pos = 0;
|
int size = 0;
|
ProcInfo_sum *Data_sum = NULL;
|
SHMKeySet *subs_proc;
|
SHMKeySet::iterator subs_proc_iter;
|
SHMTopicSubMap::iterator subs_iter;
|
|
ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
|
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
|
|
for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
|
|
memset(&Data_stru, 0x00, sizeof(Data_stru));
|
|
if (count == 0) {
|
Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
|
if (Data_sum == NULL) {
|
|
logger->error("in proxy_reg: Out of memory!\n");
|
|
exit(1);
|
}
|
|
count++;
|
|
memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
|
|
} else {
|
|
count++;
|
len = sizeof(ProcInfo_sum) * count;
|
Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
|
if (Data_sum == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
|
exit(1);
|
}
|
|
memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
|
}
|
|
Data_stru = proc_iter->second;
|
|
memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
|
memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
|
memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
|
memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
|
|
(Data_sum + count - 1)->stat = 1;
|
(Data_sum + count - 1)->list_num = 3;
|
|
val = proc_iter->first;
|
if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
|
TcsSub_ele = proc_tcs_iter->second;
|
|
temp = 0;
|
pos = 0;
|
len = sizeof(Data_sum->reg_info) - 1;
|
for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
|
|
if (temp == 0) {
|
strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
|
pos += strlen((Data_sum + count - 1)->reg_info);
|
len -= strlen((Data_sum + count - 1)->reg_info);
|
|
temp++;
|
} else {
|
|
if (len > 0) {
|
strcat((Data_sum + count - 1)->reg_info, ",");
|
|
pos += 1;
|
len -= 1;
|
}
|
|
if (len > 0) {
|
size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
|
strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
|
|
pos += size;
|
len -= size;
|
}
|
}
|
}
|
|
pos = 0;
|
temp = 0;
|
len = sizeof(Data_sum->local_info) - 1;
|
for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
|
subs_proc = subs_iter->second;
|
|
if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
|
|
if ((temp == 0)) {
|
|
strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
|
pos += strlen((Data_sum + count - 1)->local_info);
|
len -= strlen((Data_sum + count - 1)->local_info);
|
|
temp++;
|
} else {
|
|
if (len > 0) {
|
strcat((Data_sum + count - 1)->local_info, ",");
|
|
pos += 1;
|
len -= 1;
|
}
|
|
if (len > 0) {
|
size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
|
strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
|
|
pos += size;
|
len -= size;
|
}
|
}
|
|
}
|
}
|
|
}
|
}
|
|
temp = count * sizeof(ProcInfo_sum);
|
void *last_buf = malloc(temp + sizeof(int));
|
if (last_buf == NULL) {
|
logger->error("in proxy_reg: Out of memory!\n");
|
exit(1);
|
}
|
|
*(int *)last_buf = count;
|
if (count > 0) {
|
memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
|
free(Data_sum);
|
}
|
|
shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
|
|
}
|
}
|
|
// 运行代理
|
int BusServerSocket::_run_proxy_() {
|
int size;
|
int key;
|
int flag;
|
char * action, *topic, *topics, *buf, *content;
|
size_t head_len;
|
bus_head_t head;
|
|
int rv;
|
char send_buf[512] = { 0x00 };
|
|
const char *topic_delim = ",";
|
while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
|
head = ShmModSocket::decode_bus_head(buf);
|
topics = buf + BUS_HEAD_SIZE;
|
action = head.action;
|
if(strcmp(action, "sub") == 0) {
|
// 订阅支持多主题订阅
|
topic = strtok(topics, topic_delim);
|
while(topic) {
|
_proxy_sub(trim(topic, 0), key);
|
topic = strtok(NULL, topic_delim);
|
}
|
|
}
|
else if(strcmp(action, "desub") == 0) {
|
if(strcmp(trim(topics, 0), "") == 0) {
|
// 取消所有订阅
|
_proxy_desub_all(key);
|
} else {
|
|
topic = strtok(topics, topic_delim);
|
while(topic) {
|
_proxy_desub(trim(topic, 0), key);
|
topic = strtok(NULL, topic_delim);
|
}
|
}
|
|
}
|
else if(strcmp(action, "pub") == 0) {
|
topics[head.topic_size - 1] = '\0';
|
content = topics + head.topic_size;
|
|
_proxy_pub(topics, topics, head.topic_size + head.content_size, key);
|
|
}
|
else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
|
|| (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
|
|| (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
|
content = topics + head.topic_size;
|
if (strcmp(action, "reg") == 0) {
|
|
flag = PROC_REG;
|
|
} else if (strcmp(action, "unreg") == 0) {
|
|
flag = PROC_UNREG;
|
|
} else if (strcmp(action, "tcsreg") == 0) {
|
|
flag = PROC_REG_TCS;
|
|
} else if (strcmp(action, "tcsque") == 0) {
|
|
flag = PROC_QUE_TCS;
|
|
} else if (strcmp(action, "stcsque") == 0) {
|
|
flag = PROC_QUE_STCS;
|
|
} else {
|
|
flag = PROC_QUE_ATCS;
|
|
}
|
|
_proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
|
|
}
|
else if (strncmp(buf, "request", strlen("request")) == 0) {
|
sprintf(send_buf, "%4d", key);
|
strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
|
|
rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
|
if(rv != 0) {
|
logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
|
}
|
}
|
else if(strcmp(action, "stop") == 0) {
|
free(buf);
|
break;
|
} else {
|
logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
|
}
|
free(buf);
|
}
|
|
|
return rv;
|
}
|