| | |
| | | "The other end is not inline", |
| | | "Key already in use", |
| | | "Network fault", |
| | | "Send to self error" |
| | | "Send to self error", |
| | | "Receive from wrong end" |
| | | |
| | | }; |
| | | |
| | |
| | | #define EBUS_KEY_INUSED 503 |
| | | #define EBUS_NET 504 |
| | | #define EBUS_SENDTO_SELF 505 |
| | | #define EBUS_RECVFROM_WRONG_END 506 |
| | | |
| | | extern int bus_errno; |
| | | |
| | |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | // return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | return sockt->sendto(buf, size, key); |
| | | return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | // return sockt->sendto(buf, size, key); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto(buf, size, key); |
| | | // return sockt->sendto_nowait(buf, size, key); |
| | | return sockt->sendto_nowait(buf, size, key); |
| | | } |
| | | |
| | | /** |
| | |
| | | logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv); |
| | | return rv; |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom(buf, size, key); |
| | | // return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | // return sockt->recvfrom(buf, size, key); |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | } |
| | | |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom(buf, size, key); |
| | | // return sockt->recvfrom_nowait(buf, size, key); |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | | * @timeout 等待时间,单位是千分之一秒 |
| | |
| | | int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | // return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); |
| | | // return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | // return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | shm_packet_t dest; |
| | | dest.type = SHM_COMMON_MSG; |
| | | dest.key = sockt->key; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | |
| | | //s = pthread_key_create(&_perthread_socket_key_, NULL); |
| | | if (s != 0) { |
| | | logger->error(s, "pthread_key_create"); |
| | | abort(); /* dump core and terminate */ |
| | | exit(1); |
| | | } |
| | | } |
| | |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | int recv_key; |
| | | int rv; |
| | | int tryn = 0; |
| | | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | // logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | rv = pthread_once(&_once_, _create_socket_key_perthread); |
| | | if (rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv pthread_once"); |
| | |
| | | } |
| | | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | } |
| | | else if(rv == 0 ) { |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | |
| | | if(recv_key == shm_socket_get_key(sockt)) { |
| | | logger->debug("=====收到了自己发给自己的消息\n"); |
| | | |
| | | while(tryn < 3) { |
| | | tryn++; |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | assert( send_key == recv_key); |
| | | |
| | | // 超时导致接发送对象,与返回对象不对应的情况 |
| | | if(send_key != recv_key) { |
| | | logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | exit(1); |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | // exit(1); |
| | | continue; |
| | | // return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | return rv; |
| | | } else { |
| | | return rv; |
| | | } |
| | | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf, |
| | |
| | | int recv_key; |
| | | int rv; |
| | | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | int tryn = 0; |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | // logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | |
| | | tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | |
| | | if(rv != 0) { |
| | | printf("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv)); |
| | | } |
| | | else if(rv == 0 ) { |
| | | printf("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | |
| | | if(recv_key == shm_socket_get_key(sockt)) { |
| | | printf("=====收到了自己发给自己的消息\n"); |
| | | } |
| | | assert( send_key == recv_key); |
| | | if(send_key != recv_key) { |
| | | err_exit(0, "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | while(tryn < 3) { |
| | | tryn++; |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | // 超时导致接发送对象,与返回对象不对应的情况 |
| | | if(send_key != recv_key) { |
| | | // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | |
| | | continue; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | shm_close_socket(tmp_socket); |
| | |
| | | SHM_SOCKET_DGRAM = 2 |
| | | |
| | | }; |
| | | |
| | | enum shm_packet_type_t |
| | | { |
| | | SHM_SOCKET_OPEN = 1, |
| | | SHM_SOCKET_OPEN_REPLY = 2, |
| | | SHM_SOCKET_CLOSE = 3, |
| | | SHM_COMMON_MSG = 4 |
| | | |
| | | }; |
| | | |
| | | |
| | | typedef struct shm_packet_t { |
| | | int key; |
| | | shm_packet_type_t type; |
| | | size_t size; |
| | | void * buf; |
| | | |
| | |
| | | |
| | | }; |
| | | |
| | | |
| | | |
| | | |
| | | // typedef struct shm_bus_msg_t { |
| | | // void *topic; |
| | | // int topic_length; |
| | | |
| | | // } shm_bus_msg_t; |
| | | |
| | | #define ACTION_LIDENTIFIER "<**" |
| | | #define ACTION_RIDENTIFIER "**>" |
| | | #define TOPIC_LIDENTIFIER "{" |
| | | #define TOPIC_RIDENTIFIER "}" |
| | | |
| | | #endif |
| | |
| | | sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); |
| | | // fprintf(fp, "requst:%s\n", sendbuf); |
| | | // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000); |
| | | n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); |
| | | printf("%d: send %d nodes\n", l, n); |
| | | for(j=0; j < recv_arr_size; j++) { |
| | | |
| | | fprintf(fp, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", |
| | | fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", |
| | | net_mod_socket_get_key(client), |
| | | sendbuf, |
| | | targ->node->key, |
| | |
| | | while(true) { |
| | | sprintf(buf, hello_format, pid, l); |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, |
| | | &recv_arr, &recv_arr_size, 1000); |
| | | &recv_arr, &recv_arr_size, 1); |
| | | printf(" %d nodes reply\n", n); |
| | | for(j = 0; j < recv_arr_size; j++) { |
| | | |
| | |
| | | |
| | | ); |
| | | |
| | | // printf( "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", |
| | | // net_mod_socket_get_key(client), |
| | | // sendbuf, |
| | | // targ->node->key, |
| | | // recv_arr[j].host, |
| | | // recv_arr[j].port, |
| | | // recv_arr[j].key, |
| | | // recv_arr[j].content |
| | | // ); |
| | | |
| | | |
| | | // assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); |
| | | // assert(targ->node->key == rkey); |
| | | // assert(net_mod_socket_get_key(client) == lkey); |
| | | // assert(rl == l); |
| | | |
| | | |
| | | assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); |
| | | assert(retPid == pid); |