| | |
| | | |
| | | LoggerConfig config; |
| | | config.level = Logger::DEBUG; |
| | | |
| | | config.logFile = "/tmp/bhome_bus.log"; |
| | | |
| | | const char *logFileFormat= "/tmp/bhome_bus.%ld.log"; |
| | | char logFile[128]; |
| | | sprintf(logFile, logFileFormat, getpid()); |
| | | config.logFile = logFile; |
| | | |
| | | #ifdef BUILD_Debug |
| | | config.console = 1; |
| | |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | // return sockt->sendto(buf, size, key); |
| | | // return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | return sockt->sendto(buf, size, key); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto_nowait(buf, size, key); |
| | | return sockt->sendto(buf, size, key); |
| | | // return sockt->sendto_nowait(buf, size, key); |
| | | } |
| | | |
| | | /** |
| | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | // return sockt->recvfrom(buf, size, key); |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | return sockt->recvfrom(buf, size, key); |
| | | // return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | } |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | return sockt->recvfrom(buf, size, key); |
| | | // return sockt->recvfrom_nowait(buf, size, key); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | |
| | | int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | // return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | // return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | // return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | |
| | |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::sub(char *topic, int topic_size, int key, |
| | | int ShmModSocket::sub(const char *topic, int topic_size, int key, |
| | | const struct timespec *timeout, int flags) { |
| | | int ret; |
| | | bus_head_t head = {}; |
| | |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { |
| | | int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { |
| | | // char buf[8192]; |
| | | int ret; |
| | | if(topic == NULL) { |
| | |
| | | * @content 主题内容 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) { |
| | | int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) { |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "pub", sizeof(head.action)); |
| | |
| | | // ============================================================================= |
| | | |
| | | int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, |
| | | void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { |
| | | const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) { |
| | | |
| | | int buf_size; |
| | | char *buf; |
| | |
| | | |
| | | |
| | | |
| | | static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf); |
| | | static int get_bus_sendbuf(bus_head_t &request_head, const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf); |
| | | |
| | | public: |
| | | static size_t remove_keys(int keys[], size_t length); |
| | |
| | | * @key 总线端口 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | */ |
| | | int sub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | int sub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | /** |
| | |
| | | * @key 总线端口 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | */ |
| | | int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | int desub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | /** |
| | | * 发布主题 |
| | |
| | | * @key 总线端口 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | */ |
| | | int pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | int pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | /** |
| | |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) { |
| | | LockFreeQueue<shm_msg_t> * queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | // hashtable_lock(hashtable); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1) { |
| | | //logger->error("shm_socket._remote_queue_attach:connet at key %d failed!", key); |
| | |
| | | int s; |
| | | int rv; |
| | | |
| | | if (sockt->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is " |
| | | "not a SHM_SOCKET_DGRAM socket ", |
| | | sockt->socket_type); |
| | | exit(0); |
| | | } |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | |
| | |
| | | |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *sokt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int s; |
| | | int rv; |
| | | |
| | | if (sokt->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which " |
| | | "is not a SHM_SOCKET_DGRAM socket ", |
| | | sokt->socket_type); |
| | | exit(1); |
| | | } |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if ((s = pthread_mutex_lock(&(sokt->mutex))) != 0) |
| | | if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | if (sokt->queue == NULL) { |
| | | if (sokt->key == 0) { |
| | | sokt->key = hashtable_alloc_key(hashtable); |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sokt->queue = shm_socket_bind_queue( sokt->key, sokt->force_bind); |
| | | if(sokt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sokt->key); |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0) |
| | | if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | shm_msg_t src; |
| | | |
| | | rv = sokt->queue->pop(src, timeout, flag); |
| | | rv = sockt->queue->pop(src, timeout, flag); |
| | | |
| | | if (rv == 0) { |
| | | if(buf != NULL) { |
| | |
| | | |
| | | |
| | | // use thread local |
| | | int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf, |
| | | int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | int recv_key; |
| | |
| | | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | | socket->socket_type); |
| | | exit(1); |
| | | } |
| | | |
| | | |
| | | rv = pthread_once(&_once_, _create_socket_key_perthread); |
| | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | printf("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | } |
| | | else if(rv == 0 ) { |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | |
| | | if(recv_key == shm_socket_get_key(sockt)) { |
| | | logger->debug("=====收到了自己发给自己的消息\n"); |
| | | } |
| | | assert( send_key == recv_key); |
| | | if(send_key != recv_key) { |
| | | err_exit(0, "_shm_sendandrecv_thread_local: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | exit(1); |
| | | } |
| | | |
| | | } |
| | | return rv; |
| | | } else { |
| | |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | return _shm_sendandrecv_alloc_new(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); |
| | | return _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | } |
| | | |
| | | # 无限循环send |
| | | # one_to_many send |
| | | function one_to_many() { |
| | | ./test_net_mod_socket --fun="one_sendto_many" \ |
| | | --sendlist=" :5000:100, :5000:101, :5000:102" |
| | | |
| | | } |
| | | # 多线程send |
| | | function msend() { |
| | | ./test_net_mod_socket --fun="test_net_sendandrecv_threads" \ |
| | | --sendlist="localhost:5000:100, localhost:5000:100" |
| | | |
| | | # |
| | | function send() { |
| | | ./test_net_mod_socket --fun="test_net_sendandrecv" \ |
| | | --sendlist=" :5000:100, :5000:101, :5000:102" |
| | | |
| | | } |
| | | |
| | | # 无限循环 pub |
| | | function pub() { |
| | | ./test_net_mod_socket --fun="test_net_pub" \ |
| | |
| | | int key; |
| | | int rv; |
| | | while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { |
| | | printf("收到订阅消息:%s\n", recvbuf); |
| | | printf("收到订阅消息:%s\n", (char *)recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | |
| | |
| | | int remote_port; |
| | | while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &remote_port) ) == 0) { |
| | | // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); |
| | | sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), recvbuf); |
| | | sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); |
| | | net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, remote_port); |
| | | free(recvbuf); |
| | | } |
| | |
| | | recv_arr[i].host, |
| | | recv_arr[i].port, |
| | | recv_arr[i].key, |
| | | recv_arr[i].content |
| | | (char *)recv_arr[i].content |
| | | ); |
| | | } |
| | | |
| | |
| | | recv_arr[j].host, |
| | | recv_arr[j].port, |
| | | recv_arr[j].key, |
| | | recv_arr[j].content |
| | | (char *)recv_arr[j].content |
| | | ); |
| | | |
| | | printf("key == %d\n", net_mod_socket_get_key(client)); |
| | |
| | | net_node_t *node_arr; |
| | | int node_arr_size = parse_node_list(nodelist, &node_arr); |
| | | char buf[128]; |
| | | pid_t pid, rpid ; |
| | | unsigned int l , rl; |
| | | const char *hello_format = "%ld say Hello %u "; |
| | | pid_t pid, retPid ; |
| | | unsigned int l , retl; |
| | | int remoteKey; |
| | | const char *hello_format = "%d say Hello %u "; |
| | | const char *reply_format = "%d RECEIVED %d say Hello %d"; |
| | | |
| | | pid = getpid(); |
| | | l = 0; |
| | | |
| | | client = net_mod_socket_open(); |
| | | while(true) { |
| | | sprintf(buf, hello_format, (long)pid, l); |
| | | sprintf(buf, hello_format, pid, l); |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, |
| | | &recv_arr, &recv_arr_size, 1000); |
| | | printf(" %d nodes reply\n", n); |
| | | for(j = 0; j < recv_arr_size; j++) { |
| | | |
| | | LoggerFactory::getLogger()->debug("%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n", |
| | | printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", |
| | | (long)pid, |
| | | buf, |
| | | recv_arr[j].content, |
| | | (char *)recv_arr[j].content, |
| | | recv_arr[j].host, |
| | | recv_arr[j].port, |
| | | recv_arr[j].key |
| | | |
| | | ); |
| | | |
| | | // assert(sscanf((const char *)recv_arr[j].content, hello_format, &rpid, &rl) == 2); |
| | | // assert(rpid == pid); |
| | | // printf( "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", |
| | | // net_mod_socket_get_key(client), |
| | | // sendbuf, |
| | | // targ->node->key, |
| | | // recv_arr[j].host, |
| | | // recv_arr[j].port, |
| | | // recv_arr[j].key, |
| | | // recv_arr[j].content |
| | | // ); |
| | | |
| | | |
| | | // assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); |
| | | // assert(targ->node->key == rkey); |
| | | // assert(net_mod_socket_get_key(client) == lkey); |
| | | // assert(rl == l); |
| | | |
| | | assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); |
| | | assert(retPid == pid); |
| | | assert(retl == l); |
| | | assert(remoteKey == recv_arr[j].key); |
| | | } |
| | | |
| | | // 使用完后,不要忘记释放掉 |