| | |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | // static Logger *logger = LoggerFactory::getLogger(); |
| | | // define this macro if calls to "size" must return the real size of the |
| | | // queue. If it is undefined that function will try to take a snapshot of |
| | | // the queue, but returned value might be bogus |
| | |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) |
| | | { |
| | | // printf("==================LockFreeQueue push before\n"); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); |
| | | if (SemUtil::dec(slots) == -1) { |
| | | err_msg(errno, "LockFreeQueue push"); |
| | | return false; |
| | |
| | | if ( m_qImpl.push(a_data) ) { |
| | | |
| | | SemUtil::inc(items); |
| | | // printf("==================LockFreeQueue push after\n"); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) |
| | | { |
| | | |
| | | |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); |
| | | if (SemUtil::dec_timeout(slots, timeout) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else { |
| | | // err_msg(errno, "LockFreeQueue push_timeout"); |
| | | err_msg(errno, "LockFreeQueue push_timeout"); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (m_qImpl.push(a_data)){ |
| | | SemUtil::inc(items); |
| | | SemUtil::inc(items); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) |
| | | { |
| | | // printf("==================LockFreeQueue pop before\n"); |
| | | |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); |
| | | if (SemUtil::dec(items) == -1) { |
| | | err_msg(errno, "LockFreeQueue pop"); |
| | | return false; |
| | |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | SemUtil::inc(slots); |
| | | // printf("==================LockFreeQueue pop after\n"); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | // printf("==================LockFreeQueue pop_timeout before\n"); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); |
| | | if (SemUtil::dec_timeout(items, timeout) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | SemUtil::inc(slots); |
| | | // printf("==================LockFreeQueue pop_timeout after\n"); |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | return m_qImpl.operator[](i); |
| | | } |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | |
| | | |
| | | |
| | | BusServerSocket::BusServerSocket() { |
| | | logger->debug("BusServerSocket Init"); |
| | | shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | topic_sub_map = NULL; |
| | | |
| | | } |
| | | |
| | | BusServerSocket::~BusServerSocket() { |
| | | SHMKeySet *subscripter_set; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | logger->debug("BusServerSocket destory 1"); |
| | | stop(); |
| | | logger->debug("BusServerSocket destory 2"); |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | |
| | | mem_pool_free_by_key(BUS_MAP_KEY); |
| | | } |
| | | shm_close_socket(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | run_pubsub_proxy(); |
| | | // 进程停止的时候,预留3秒资源回收的时间。否则,会发生调用close的时候,共享内存的资源还没来得及回收进程就退出了 |
| | | sleep(3); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int BusServerSocket::stop(){ |
| | | int ret; |
| | | |
| | | logger->debug("====>stopping"); |
| | | if( shm_socket->key <= 0) { |
| | | return -1; |
| | | } |
| | |
| | | head.topic_size = 0; |
| | | head.content_size = 0; |
| | | |
| | | void *recv_buf; |
| | | int recv_size; |
| | | |
| | | void *buf; |
| | | int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size); |
| | | ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL); |
| | | free(buf); |
| | | free(recv_buf); |
| | | return ret; |
| | | } else { |
| | | return -1; |
| | |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | | } else if(strcmp(action, "desub") == 0) { |
| | | } |
| | | else if(strcmp(action, "desub") == 0) { |
| | | // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | |
| | | } |
| | | } |
| | | |
| | | } else if(strcmp(action, "pub") == 0) { |
| | | } |
| | | else if(strcmp(action, "pub") == 0) { |
| | | content = topics + head.topic_size; |
| | | _proxy_pub(topics, content, head.content_size, key); |
| | | } else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); |
| | | |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action); |
| | | } |
| | | |
| | | // free(action); |
| | | // free(topics); |
| | | // } else { |
| | | // logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg"); |
| | | // } |
| | | free(buf); |
| | | } |
| | | |
| | | logger->info( "Stopping Bus..."); |
| | | shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | |
| | | * 创建 |
| | | */ |
| | | void * bus_server_socket_wrapper_open() { |
| | | printf("===bus_server_socket_wrapper_open\n"); |
| | | logger->debug("===bus_server_socket_wrapper_open\n"); |
| | | BusServerSocket *sockt = new BusServerSocket; |
| | | return (void *)sockt; |
| | | } |
| | |
| | | * 关闭 |
| | | */ |
| | | void bus_server_socket_wrapper_close(void *_socket) { |
| | | printf("===bus_server_socket_wrapper_close\n"); |
| | | BusServerSocket *sockt = (BusServerSocket *)_socket; |
| | | delete sockt; |
| | | |
| | | // BusServerSocket *sockt = (BusServerSocket *)_socket; |
| | | //delete sockt; |
| | | logger->debug("===bus_server_socket_wrapper_close\n"); |
| | | } |
| | | |
| | | /** |
| | |
| | | if(request_head.timeout > 0) { |
| | | timeout.tv_sec = request_head.timeout / 1000; |
| | | timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6; |
| | | |
| | | // printf(" timeout.tv_sec = %d, timeout.tv_nsec=%ld\n", timeout.tv_sec, timeout.tv_nsec ); |
| | | |
| | | ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout); |
| | | } |
| | | else if(request_head.timeout == 0) { |
| | |
| | | int i, n, recv_size, connfd; |
| | | net_node_t *node; |
| | | void *recv_buf = NULL; |
| | | struct timespec timeout; |
| | | int ret; |
| | | int n_req = 0, n_recv_suc = 0, n_resp =0; |
| | | |
| | | net_mod_request_head_t request_head = {}; |
| | | |
| | | int n_req = 0, n_recv_suc = 0, n_resp =0; |
| | | |
| | | |
| | | net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); |
| | | |
| | | int ret; |
| | | |
| | | NetConnPool *mpool; |
| | | |
| | | /* Make first caller allocate key for thread-specific data */ |
| | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL || strcmp(node->host, "") == 0 ) { |
| | | // 本地发送 |
| | | if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) { |
| | | |
| | | if(msec == 0) { |
| | | ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | } else if(msec > 0){ |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); |
| | | } else { |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | } |
| | | if( ret == 0) { |
| | | strcpy( ret_arr[n_recv_suc].host,""); |
| | | ret_arr[n_recv_suc].port = 0; |
| | | ret_arr[n_recv_suc].key = node->key; |
| | |
| | | |
| | | mpool->maxi = -1; |
| | | |
| | | *recv_arr = ret_arr; |
| | | if(recv_arr != NULL) { |
| | | *recv_arr = ret_arr; |
| | | } else { |
| | | free_recv_msg_arr(ret_arr, n_recv_suc); |
| | | } |
| | | |
| | | if(recv_arr_size != NULL) { |
| | | *recv_arr_size = n_recv_suc; |
| | | } |
| | |
| | | // int pub(char *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, |
| | | int content_size, int timeout) { |
| | | int content_size, int msec) { |
| | | int i, connfd; |
| | | net_node_t *node; |
| | | struct timespec timeout; |
| | | |
| | | net_mod_request_head_t request_head; |
| | | net_mod_recv_msg_t recv_msg; |
| | |
| | | |
| | | // 本地发送 |
| | | if(node_arr == NULL || arrlen == 0) { |
| | | if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | } |
| | | if(ret == 0 ) { |
| | | n_pub_suc++; |
| | | } |
| | | } |
| | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { |
| | | n_pub_suc++; |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | } |
| | | |
| | | if(ret == 0 ) { |
| | | n_pub_suc++; |
| | | } |
| | | |
| | | |
| | | } else { |
| | | sprintf(portstr, "%d", node->port); |
| | |
| | | request_head.key = node->key; |
| | | request_head.content_length = content_size; |
| | | request_head.topic_length = strlen(topic) + 1; |
| | | request_head.timeout = timeout; |
| | | request_head.timeout = msec; |
| | | |
| | | if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) { |
| | | LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port); |
| | |
| | | while(n_resp < n_req) |
| | | { |
| | | /* Wait for listening/connected descriptor(s) to become ready */ |
| | | if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) { |
| | | if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) { |
| | | // wirite_set 和 read_set 在指定时间内都没准备好 |
| | | break; |
| | | } |
| | |
| | | |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { |
| | | |
| | | logger->debug("shm_open_socket\n"); |
| | | shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | socket->socket_type = socket_type; |
| | | socket->key = -1; |
| | |
| | | socket->dispatch_thread = 0; |
| | | socket->status = SHM_CONN_CLOSED; |
| | | socket->mutex = SemUtil::get(IPC_PRIVATE, 1); |
| | | logger->debug("shm_open_socket\n"); |
| | | |
| | | return socket; |
| | | } |
| | | |
| | | static int _shm_close_socket(shm_socket_t *socket) { |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | |
| | | int ret; |
| | | |
| | |
| | | return ret; |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | // int shm_close_socket(shm_socket_t *socket) { |
| | | |
| | | // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_)); |
| | | // // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_)); |
| | | |
| | | return _shm_close_socket(socket);; |
| | | } |
| | | // return shm_close_socket(socket);; |
| | | // } |
| | | |
| | | int shm_socket_bind(shm_socket_t *socket, int key) { |
| | | socket->key = key; |
| | |
| | | } |
| | | |
| | | if (rv) { |
| | | void *_buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | *key = src.key; |
| | | if(buf != NULL) { |
| | | void *_buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | } |
| | | |
| | | if(size != NULL) |
| | | *size = src.size; |
| | | |
| | | if(key != NULL) |
| | | *key = src.key; |
| | | |
| | | mm_free(src.buf); |
| | | // printf("shm_recvfrom pop after\n"); |
| | | return 0; |
| | |
| | | int rv; |
| | | if(tmp_socket == NULL) |
| | | return; |
| | | |
| | | logger->debug("%d destroy tmp socket\n", pthread_self()); |
| | | _shm_close_socket((shm_socket_t *)tmp_socket); |
| | | shm_close_socket((shm_socket_t *)tmp_socket); |
| | | rv = pthread_setspecific(_tmp_recv_socket_key_, NULL); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, |
| | | int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, struct timespec *timeout, int flags) { |
| | | int recv_key; |
| | |
| | | return -1; |
| | | } |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, struct timespec *timeout, int flags) { |
| | | return shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); |
| | | } |
| | | |
| | | // ============================================================================================================ |
| | | |
| | | /** |
| | |
| | | #-I$(ROOT)/include/usgcommon |
| | | INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include -I${ROOT}/include/usgcommon |
| | | |
| | | |
| | | PROGS = ${DEST}/test_net_mod_socket |
| | | PROGS = ${DEST}/test_net_mod_socket ${DEST}/test_bus_stop ${DEST}/heart_beat |
| | | |
| | | DEPENDENCES = $(patsubst %, %.d, $(PROGS)) |
| | | |
File was renamed from test_socket/dgram_mod_survey.c |
| | |
| | | #include "dgram_mod_socket.h" |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "bus_server_socket_wrapper.h" |
| | | |
| | | #include "shm_mm_wraper.h" |
| | | #include "usg_common.h" |
| | | #include <getopt.h> |
| | | |
| | | |
| | | typedef struct Targ { |
| | |
| | | }Targ; |
| | | |
| | | void sigint_handler(int sig) { |
| | | //dgram_mod_close_socket(server_socket); |
| | | // net_mod_socket_close(server_socket); |
| | | printf("===Catch sigint======================\n"); |
| | | shm_mm_wrapper_destroy(); |
| | | exit(0); |
| | | } |
| | | |
| | | void server(int port) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | dgram_mod_bind(socket, port); |
| | | void *serv = net_mod_socket_open(); |
| | | net_mod_socket_bind(serv, port); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | int rv; |
| | | int remote_port; |
| | | while (true) { |
| | | if ((rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 15, 0) ) == 0) { |
| | | if ((rv = net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) ) == 0) { |
| | | printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); |
| | | net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | } |
| | | dgram_mod_close_socket(socket); |
| | | net_mod_socket_close(serv); |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | int rv; |
| | | void *client = net_mod_socket_open(); |
| | | int size; |
| | | char sendbuf[512]; |
| | | long i = 0; |
| | | net_node_t node_arr[] = {"", 0, 100}; |
| | | int node_arr_size = 1; |
| | | |
| | | int recv_arr_size; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | while (true) { |
| | | sprintf(sendbuf, "%d", i); |
| | | printf("SEND HEART:%s\n", sendbuf); |
| | | dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); |
| | | rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); |
| | | // sleep(1); |
| | | i++; |
| | | } |
| | | dgram_mod_close_socket(socket); |
| | | net_mod_socket_close(client); |
| | | } |
| | | |
| | | |
| | |
| | | signal(SIGINT, sigint_handler); |
| | | Targ *targ = (Targ *)arg; |
| | | int port = targ->port; |
| | | void *socket = dgram_mod_open_socket(); |
| | | void *socket = net_mod_socket_open(); |
| | | int size; |
| | | char sendbuf[512]; |
| | | long scale = 10; |
| | | long i = 0; |
| | | net_node_t node_arr[] = {"", 0, 100}; |
| | | int node_arr_size = 1; |
| | | |
| | | int recv_arr_size; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | |
| | | while (i < scale) { |
| | | sprintf(sendbuf, "%d", i); |
| | | printf("%d SEND HEART:%s\n", targ->id, sendbuf); |
| | | dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); |
| | | net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); |
| | | sleep(1); |
| | | i++; |
| | | } |
| | | |
| | | dgram_mod_close_socket(socket); |
| | | net_mod_socket_close(socket); |
| | | return (void *)i; |
| | | } |
| | | |
New file |
| | |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "bus_server_socket_wrapper.h" |
| | | |
| | | #include "shm_mm_wraper.h" |
| | | #include "usg_common.h" |
| | | #include <getopt.h> |
| | | |
| | | static void * server_sockt; |
| | | |
| | | static void *_start_bus_(void *arg) { |
| | | // pthread_detach(pthread_self()); |
| | | printf("Start bus server\n"); |
| | | pthread_t tid; |
| | | |
| | | server_sockt = bus_server_socket_wrapper_open(); |
| | | |
| | | if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) { |
| | | printf("start bus failed\n"); |
| | | } |
| | | } |
| | | |
| | | int main() { |
| | | |
| | | |
| | | pthread_t tid; |
| | | char action[512]; |
| | | |
| | | shm_mm_wrapper_init(512); |
| | | pthread_create(&tid, NULL, _start_bus_, NULL); |
| | | |
| | | |
| | | while (true) { |
| | | printf("Input action: Close?\n"); |
| | | if(scanf("%s", action) < 1) { |
| | | printf("Invalide action\n"); |
| | | continue; |
| | | } |
| | | |
| | | if(strcmp(action, "close") == 0) { |
| | | bus_server_socket_wrapper_close(server_sockt); |
| | | break; |
| | | } else { |
| | | printf("Invalide action\n"); |
| | | } |
| | | } |
| | | |
| | | if (pthread_join(tid, NULL) != 0) { |
| | | perror(" pthread_join"); |
| | | } |
| | | |
| | | |
| | | shm_mm_wrapper_destroy(); |
| | | } |
| | |
| | | sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); |
| | | net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port); |
| | | free(recvbuf); |
| | | sleep(1000); |
| | | } |
| | | } |
| | | |
| | |
| | | for (i = 0; i < SCALE; i++) { |
| | | sprintf(sendbuf, "thread(%d) %d", targ->id, i); |
| | | fprintf(fp, "requst:%s\n", sendbuf); |
| | | n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | //printf("send %d nodes\n", n); |
| | | // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000); |
| | | printf("send %d nodes\n", n); |
| | | for(j=0; j < recv_arr_size; j++) { |
| | | fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", |
| | | recv_arr[j].host, |