| | |
| | | */ |
| | | int dgram_mod_get_port(void * _socket) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return socket->m_socket->get_port(); |
| | | return socket->m_socket->get_key(); |
| | | } |
| | | |
| | | |
| | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | | int NetModSocket::get_port() { |
| | | return shmModSocket.get_port(); |
| | | int NetModSocket::get_key() { |
| | | return shmModSocket.get_key(); |
| | | } |
| | | |
| | | |
| | |
| | | int pub_nowait( char *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | | * @node_arr 网络节点组, @node_arr_len该数组长度 |
| | |
| | | int pub(net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | * 获取soket key |
| | | */ |
| | | int get_port() ; |
| | | int get_key() ; |
| | | |
| | | /** |
| | | * 销毁sendandrecv方法返回的消息组 |
| | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | | int net_mod_socket_get_port(void * _socket) { |
| | | int net_mod_socket_get_key(void * _socket) { |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; |
| | | return sockt->sockt->get_port(); |
| | | return sockt->sockt->get_key(); |
| | | } |
| | | |
| | | |
| | |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | */ |
| | | int net_mod_socket_desub(void * _socket, void *topic, int size, int port); |
| | | int net_mod_socket_desub(void * _socket, void *topic, int size, int key); |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec); |
| | | int net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int port); |
| | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | | int net_mod_socket_get_port(void * _socket) ; |
| | | int net_mod_socket_get_key(void * _socket) ; |
| | | |
| | | |
| | | |
| | |
| | | // printf("ShmModSocket destory 4\n"); |
| | | } |
| | | |
| | | int ShmModSocket::bind(int port) { |
| | | return shm_socket_bind(shm_socket, port); |
| | | int ShmModSocket::bind(int key) { |
| | | return shm_socket_bind(shm_socket, key); |
| | | } |
| | | |
| | | |
| | |
| | | * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::force_bind(int port) { |
| | | return shm_socket_force_bind(shm_socket, port); |
| | | int ShmModSocket::force_bind(int key) { |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | /** |
| | | * 发送信息 |
| | | * @port 发送给谁 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int port) { |
| | | return shm_sendto(shm_socket, buf, size, port, NULL, 0); |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key) { |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, 0); |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout) { |
| | | return shm_sendto(shm_socket, buf, size, port, timeout, 0); |
| | | int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { |
| | | return shm_sendto(shm_socket, buf, size, key, timeout, 0); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int port){ |
| | | return shm_sendto(shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags) { |
| | | inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) { |
| | | |
| | | if(mod == BUS) { |
| | | err_exit(0, "Can not use method recvfrom in a Bus"); |
| | | } |
| | | // printf("dgram_mod_recvfrom before\n"); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, port, timeout, flags); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); |
| | | // printf("dgram_mod_recvfrom after\n"); |
| | | return rv; |
| | | } |
| | | /** |
| | | * 接收信息 |
| | | * @port 从谁哪里收到的信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *port) { |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | return _recvfrom_( buf, size, port, NULL, 0); |
| | | return _recvfrom_( buf, size, key, NULL, 0); |
| | | } |
| | | |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *port, struct timespec *timeout) { |
| | | return _recvfrom_(buf, size, port, timeout, 0); |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { |
| | | return _recvfrom_(buf, size, key, timeout, 0); |
| | | } |
| | | |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *port){ |
| | | return _recvfrom_(buf, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ |
| | | return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @port 发送给谁 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); |
| | | int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); |
| | | int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); |
| | | int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::sub(char *topic, int size, int port){ |
| | | return _sub_( topic, size, port, NULL, 0); |
| | | int ShmModSocket::sub(char *topic, int size, int key){ |
| | | return _sub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){ |
| | | return _sub_(topic, size, port, timeout, 0); |
| | | int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){ |
| | | return _sub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int port) { |
| | | return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int key) { |
| | | return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::desub(char *topic, int size, int port){ |
| | | return _desub_( topic, size, port, NULL, 0); |
| | | int ShmModSocket::desub(char *topic, int size, int key){ |
| | | return _desub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){ |
| | | return _desub_(topic, size, port, timeout, 0); |
| | | int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){ |
| | | return _desub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int port) { |
| | | return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int key) { |
| | | return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){ |
| | | return _pub_(topic, topic_size, content, content_size, port, NULL, 0); |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ |
| | | return _pub_( topic, topic_size, content, content_size, port, timeout, 0); |
| | | int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){ |
| | | return _pub_( topic, topic_size, content, content_size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){ |
| | | return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | * 获取soket key |
| | | */ |
| | | int ShmModSocket::get_port(){ |
| | | return shm_socket->port; |
| | | int ShmModSocket::get_key(){ |
| | | return shm_socket->key; |
| | | } |
| | | |
| | | |
| | | |
| | | // ============================================================================= |
| | | /** |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_sub_(char *topic, int size, int port, |
| | | int ShmModSocket::_sub_(char *topic, int size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | char buf[8192]; |
| | | int rv; |
| | | snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); |
| | | rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | if(rv == 0) { |
| | | bus_set->insert(port); |
| | | bus_set->insert(key); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_desub_(char *topic, int size, int port, |
| | | int ShmModSocket::_desub_(char *topic, int size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | char buf[8192]; |
| | | if(topic == NULL) { |
| | | topic = ""; |
| | | } |
| | | snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); |
| | | return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | } |
| | | |
| | | /** |
| | | * @port 总线端口 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port, |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | int head_len; |
| | | char buf[8192+content_size]; |
| | | snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | head_len = strlen(buf); |
| | | memcpy(buf+head_len, content, content_size); |
| | | return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); |
| | | return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags); |
| | | |
| | | } |
| | | /* |
| | | * 处理订阅 |
| | | */ |
| | | void ShmModSocket::_proxy_sub( char *topic, int port) { |
| | | void ShmModSocket::_proxy_sub( char *topic, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | subscripter_set = new(set_ptr) SHMKeySet; |
| | | topic_sub_map->insert({topic, subscripter_set}); |
| | | } |
| | | subscripter_set->insert(port); |
| | | subscripter_set->insert(key); |
| | | } |
| | | |
| | | /* |
| | | * 处理取消订阅 |
| | | */ |
| | | void ShmModSocket::_proxy_desub( char *topic, int port) { |
| | | void ShmModSocket::_proxy_desub( char *topic, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | subscripter_set = map_iter->second; |
| | | |
| | | subscripter_set->erase(port); |
| | | printf("============ desub %d\n", port); |
| | | subscripter_set->erase(key); |
| | | printf("============ desub %d\n", key); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * 处理取消所有订阅 |
| | | */ |
| | | void ShmModSocket::_proxy_desub_all(int port) { |
| | | void ShmModSocket::_proxy_desub_all(int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | // SHMKeySet::iterator set_iter; |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | subscripter_set = map_iter->second; |
| | | subscripter_set->erase(port); |
| | | printf("============ desub %d\n", port); |
| | | subscripter_set->erase(key); |
| | | printf("============ desub %d\n", key); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * 处理发布,代理转发 |
| | | */ |
| | | void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port) { |
| | | void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | std::vector<int> subscripter_to_del; |
| | | std::vector<int>::iterator vector_iter; |
| | | |
| | | int send_port; |
| | | int send_key; |
| | | struct timespec timeout = {1,0}; |
| | | |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | subscripter_set = map_iter->second; |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | send_port = *set_iter; |
| | | // printf("_proxy_pub send before %d \n", send_port); |
| | | if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_port, &timeout) == SHM_SOCKET_ECONNFAILED ) { |
| | | send_key = *set_iter; |
| | | // printf("_proxy_pub send before %d \n", send_key); |
| | | if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { |
| | | //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱 |
| | | subscripter_to_del.push_back(send_port); |
| | | subscripter_to_del.push_back(send_key); |
| | | } else { |
| | | // printf("_proxy_pub send after: %d \n", send_port); |
| | | // printf("_proxy_pub send after: %d \n", send_key); |
| | | } |
| | | |
| | | |
| | |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { |
| | | if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | printf("remove closed subscripter %d \n", send_port); |
| | | printf("remove closed subscripter %d \n", send_key); |
| | | } |
| | | } |
| | | subscripter_to_del.clear(); |
| | |
| | | void * ShmModSocket::run_pubsub_proxy() { |
| | | // pthread_detach(pthread_self()); |
| | | int size; |
| | | int port; |
| | | int key; |
| | | char * action, *topic, *topics, *buf; |
| | | size_t head_len; |
| | | |
| | | const char *topic_delim = ","; |
| | | // printf("run_pubsub_proxy server receive before\n"); |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) { |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { |
| | | //printf("run_pubsub_proxy server recv after: %s \n", buf); |
| | | if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { |
| | | // printf("run_pubsub_proxy %s %s \n", action, topics); |
| | |
| | | topic = strtok(topics, topic_delim); |
| | | //printf("run_pubsub_proxy topic = %s\n", topic); |
| | | while(topic) { |
| | | _proxy_sub(trim(topic, 0), port); |
| | | _proxy_sub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | printf("====取消所有订阅\n"); |
| | | _proxy_desub_all(port); |
| | | _proxy_desub_all(key); |
| | | } else { |
| | | |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | _proxy_desub(trim(topic, 0), port); |
| | | _proxy_desub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | } |
| | |
| | | |
| | | |
| | | } else if(strcmp(action, "pub") == 0) { |
| | | _proxy_pub(topics, head_len, buf, size, port); |
| | | _proxy_pub(topics, head_len, buf, size, key); |
| | | } |
| | | |
| | | free(action); |
| | |
| | | int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; |
| | | int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; |
| | |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | * 获取soket key |
| | | */ |
| | | int get_port() ; |
| | | int get_key() ; |
| | | |
| | | |
| | | }; |
| | |
| | | |
| | | |
| | | void print_msg(char *head, shm_msg_t &msg) { |
| | | // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | | } |
| | | |
| | | static void *_server_run_msg_rev(void *_socket); |
| | |
| | | static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); |
| | | |
| | | static inline int _shm_socket_check_key(shm_socket_t *socket) { |
| | | void *tmp_ptr = mm_get_by_key(socket->port); |
| | | void *tmp_ptr = mm_get_by_key(socket->key); |
| | | if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | err_exit(0, "key %d has already been in used!", socket->key); |
| | | return 0; |
| | | } |
| | | return 1; |
| | | } |
| | | |
| | | SHMQueue<shm_msg_t> *_attach_remote_queue(int port); |
| | | SHMQueue<shm_msg_t> *_attach_remote_queue(int key); |
| | | |
| | | |
| | | |
| | |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { |
| | | shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | socket->socket_type = socket_type; |
| | | socket->port = -1; |
| | | socket->key = -1; |
| | | socket->force_bind = false; |
| | | socket->dispatch_thread = 0; |
| | | socket->status = SHM_CONN_CLOSED; |
| | |
| | | return ret; |
| | | } |
| | | |
| | | int shm_socket_bind(shm_socket_t *socket, int port) { |
| | | socket->port = port; |
| | | int shm_socket_bind(shm_socket_t *socket, int key) { |
| | | socket->key = key; |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_force_bind(shm_socket_t *socket, int port) { |
| | | int shm_socket_force_bind(shm_socket_t *socket, int key) { |
| | | socket->force_bind = true; |
| | | socket->port = port; |
| | | socket->key = key; |
| | | return 0; |
| | | } |
| | | |
| | |
| | | "SHM_SOCKET_STREAM socket"); |
| | | } |
| | | |
| | | int port; |
| | | int key; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if (socket->port == -1) { |
| | | port = hashtable_alloc_key(hashtable); |
| | | socket->port = port; |
| | | if (socket->key == -1) { |
| | | key = hashtable_alloc_key(hashtable); |
| | | socket->key = key; |
| | | } else { |
| | | |
| | | _shm_socket_check_key(socket); |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); |
| | | socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | socket->clientSocketMap = new std::map<int, shm_socket_t *>; |
| | | socket->status = SHM_CONN_LISTEN; |
| | |
| | | "SHM_SOCKET_STREAM socket"); |
| | | } |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | int client_port; |
| | | int client_key; |
| | | shm_socket_t *client_socket; |
| | | shm_msg_t src; |
| | | |
| | | if (socket->acceptQueue->pop(src)) { |
| | | |
| | | // print_msg("===accept:", src); |
| | | client_port = src.port; |
| | | client_key = src.key; |
| | | // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); |
| | | client_socket = shm_open_socket(socket->socket_type); |
| | | client_socket->port = socket->port; |
| | | client_socket->key = socket->key; |
| | | // client_socket->queue= socket->queue; |
| | | //初始化消息queue |
| | | client_socket->messageQueue = |
| | | new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | //连接到对方queue |
| | | client_socket->remoteQueue = _attach_remote_queue(client_port); |
| | | client_socket->remoteQueue = _attach_remote_queue(client_key); |
| | | |
| | | socket->clientSocketMap->insert({client_port, client_socket}); |
| | | socket->clientSocketMap->insert({client_key, client_socket}); |
| | | |
| | | /* |
| | | * shm_accept 用户执行的方法 |
| | |
| | | //发送open_reply,回应客户端的connect请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | | msg.key = socket->key; |
| | | msg.size = 0; |
| | | msg.type = SHM_SOCKET_OPEN_REPLY; |
| | | |
| | |
| | | } |
| | | |
| | | |
| | | int shm_connect(shm_socket_t *socket, int port) { |
| | | int shm_connect(shm_socket_t *socket, int key) { |
| | | if (socket->socket_type != SHM_SOCKET_STREAM) { |
| | | err_exit(0, "can not invoke shm_connect method with a socket which is not " |
| | | "a SHM_SOCKET_STREAM socket"); |
| | | } |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if (hashtable_get(hashtable, port) == NULL) { |
| | | err_exit(0, "shm_connect:connect at port %d failed!", port); |
| | | if (hashtable_get(hashtable, key) == NULL) { |
| | | err_exit(0, "shm_connect:connect at key %d failed!", key); |
| | | } |
| | | |
| | | if (socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | if (socket->key == -1) { |
| | | socket->key = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | _shm_socket_check_key(socket); |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); |
| | | |
| | | if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) { |
| | | err_exit(0, "connect to %d failted", port); |
| | | if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) { |
| | | err_exit(0, "connect to %d failted", key); |
| | | } |
| | | socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | |
| | | //发送open请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | | msg.key = socket->key; |
| | | msg.size = 0; |
| | | msg.type = SHM_SOCKET_OPEN; |
| | | socket->remoteQueue->push_timeout(msg, &timeout); |
| | |
| | | // } |
| | | shm_msg_t dest; |
| | | dest.type = SHM_COMMON_MSG; |
| | | dest.port = socket->port; |
| | | dest.key = socket->key; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | |
| | | |
| | | // 短连接方式发送 |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, |
| | | const int port, const struct timespec *timeout, const int flags) { |
| | | const int key, const struct timespec *timeout, const int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is " |
| | | "not a SHM_SOCKET_DGRAM socket ", |
| | |
| | | |
| | | SemUtil::dec(socket->mutex); |
| | | if (socket->queue == NULL) { |
| | | if (socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | if (socket->key == -1) { |
| | | socket->key = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | _shm_socket_check_key(socket); |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); |
| | | } |
| | | SemUtil::inc(socket->mutex); |
| | | |
| | | if (port == socket->port) { |
| | | if (key == socket->key) { |
| | | err_msg(0, "can not send to your self!"); |
| | | return -1; |
| | | } |
| | | |
| | | SHMQueue<shm_msg_t> *remoteQueue; |
| | | if ((remoteQueue = _attach_remote_queue(port)) == NULL) { |
| | | if ((remoteQueue = _attach_remote_queue(key)) == NULL) { |
| | | err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!"); |
| | | return SHM_SOCKET_ECONNFAILED; |
| | | } |
| | | |
| | | shm_msg_t dest; |
| | | dest.type = SHM_COMMON_MSG; |
| | | dest.port = socket->port; |
| | | dest.key = socket->key; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | |
| | | } else { |
| | | delete remoteQueue; |
| | | mm_free(dest.buf); |
| | | err_msg(errno, "sendto port %d failed!", port); |
| | | err_msg(errno, "sendto key %d failed!", key); |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec *timeout, int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which " |
| | | "is not a SHM_SOCKET_DGRAM socket ", |
| | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | SemUtil::dec(socket->mutex); |
| | | if (socket->queue == NULL) { |
| | | if (socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | if (socket->key == -1) { |
| | | socket->key = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | _shm_socket_check_key(socket); |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); |
| | | } |
| | | SemUtil::inc(socket->mutex); |
| | | |
| | |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | *port = src.port; |
| | | *key = src.key; |
| | | mm_free(src.buf); |
| | | // printf("shm_recvfrom pop after\n"); |
| | | return 0; |
| | |
| | | } |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_port, void **recv_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, struct timespec *timeout, int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | | socket->socket_type); |
| | | } |
| | | int recv_port; |
| | | int recv_key; |
| | | int rv; |
| | | |
| | | shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags); |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | shm_close_socket(tmp_socket); |
| | | return rv; |
| | | } else { |
| | |
| | | } |
| | | |
| | | int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_port, void **recv_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, struct timespec *timeout, int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | | socket->socket_type); |
| | | } |
| | | int recv_port; |
| | | int recv_key; |
| | | int rv; |
| | | |
| | | |
| | | if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags); |
| | | if ((rv = shm_sendto(socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | return rv; |
| | | } else { |
| | | return rv; |
| | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 |
| | | */ |
| | | SHMQueue<shm_msg_t> *_attach_remote_queue(int port) { |
| | | SHMQueue<shm_msg_t> *_attach_remote_queue(int key) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if (hashtable_get(hashtable, port) == NULL) { |
| | | err_msg(0, "_remote_queue_attach:connet at port %d failed!", port); |
| | | if (hashtable_get(hashtable, key) == NULL) { |
| | | err_msg(0, "_remote_queue_attach:connet at key %d failed!", key); |
| | | return NULL; |
| | | } |
| | | |
| | | SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); |
| | | SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(key, 0); |
| | | return queue; |
| | | } |
| | | |
| | | void _server_close_conn_to_client(shm_socket_t *socket, int port) { |
| | | void _server_close_conn_to_client(shm_socket_t *socket, int key) { |
| | | shm_socket_t *client_socket; |
| | | std::map<int, shm_socket_t *>::iterator iter = |
| | | socket->clientSocketMap->find(port); |
| | | socket->clientSocketMap->find(key); |
| | | if (iter != socket->clientSocketMap->end()) { |
| | | client_socket = iter->second; |
| | | free((void *)client_socket); |
| | |
| | | socket->acceptQueue->push_timeout(src, &timeout); |
| | | break; |
| | | case SHM_SOCKET_CLOSE: |
| | | _server_close_conn_to_client(socket, src.port); |
| | | _server_close_conn_to_client(socket, src.key); |
| | | break; |
| | | case SHM_COMMON_MSG: |
| | | |
| | | iter = socket->clientSocketMap->find(src.port); |
| | | iter = socket->clientSocketMap->find(src.key); |
| | | if (iter != socket->clientSocketMap->end()) { |
| | | client_socket = iter->second; |
| | | // print_msg("_server_run_msg_rev push before", src); |
| | |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t close_msg; |
| | | |
| | | close_msg.port = socket->port; |
| | | close_msg.key = socket->key; |
| | | close_msg.size = 0; |
| | | close_msg.type = SHM_SOCKET_CLOSE; |
| | | if (notifyRemote && socket->remoteQueue != NULL) { |
| | |
| | | }; |
| | | |
| | | typedef struct shm_msg_t { |
| | | int port; |
| | | int key; |
| | | shm_msg_type_t type; |
| | | size_t size; |
| | | void * buf; |
| | |
| | | |
| | | typedef struct shm_socket_t { |
| | | shm_socket_type_t socket_type; |
| | | // 本地port |
| | | int port; |
| | | // 本地key |
| | | int key; |
| | | bool force_bind; |
| | | int mutex; |
| | | shm_connection_status_t status; |
| | |
| | | int shm_close_socket(shm_socket_t * socket) ; |
| | | |
| | | |
| | | int shm_socket_bind(shm_socket_t * socket, int port) ; |
| | | int shm_socket_bind(shm_socket_t * socket, int key) ; |
| | | |
| | | int shm_socket_force_bind(shm_socket_t * socket, int port) ; |
| | | int shm_socket_force_bind(shm_socket_t * socket, int key) ; |
| | | |
| | | |
| | | int shm_listen(shm_socket_t * socket) ; |
| | | |
| | | shm_socket_t* shm_accept(shm_socket_t* socket); |
| | | |
| | | int shm_connect(shm_socket_t * socket, int port); |
| | | int shm_connect(shm_socket_t * socket, int key); |
| | | |
| | | int shm_send(shm_socket_t * socket, const void *buf, const int size) ; |
| | | |
| | | |
| | | int shm_recv(shm_socket_t * socket, void **buf, int *size) ; |
| | | |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0); |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0); |
| | | |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec * timeout = NULL, int flags=0); |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, |
| | | struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | /** |
| | | * 功能同shm_sendandrecv, 但是不是线程安全的 |
| | | */ |
| | | int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, |
| | | int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, |
| | | struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| | | int shm_stream_mod_socket_bind(void * _socket, int port){ |
| | | int shm_stream_mod_socket_bind(void * _socket, int key){ |
| | | shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; |
| | | return shm_socket_bind(socket->shm_socket, port); |
| | | return shm_socket_bind(socket->shm_socket, key); |
| | | } |
| | | |
| | | void * run_server_recv_client_msg(void *_socket) { |
| | |
| | | } |
| | | |
| | | |
| | | int shm_stream_mod_socket_connect(void * _socket, int port) { |
| | | int shm_stream_mod_socket_connect(void * _socket, int key) { |
| | | shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; |
| | | return shm_connect(socket->shm_socket, port); |
| | | return shm_connect(socket->shm_socket, key); |
| | | |
| | | } |
| | | |
| | |
| | | return -1; |
| | | } |
| | | |
| | | int shm_stream_mod_socket_get_port(void * _socket) { |
| | | int shm_stream_mod_socket_get_key(void * _socket) { |
| | | shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; |
| | | return socket->shm_socket->port; |
| | | return socket->shm_socket->key; |
| | | } |
| | | |
| | | |
| | |
| | | function server() { |
| | | |
| | | |
| | | # 打开请求应答的server |
| | | ./dgram_mod_req_rep server 11 & server_pid=$! && echo ${server_pid} |
| | | # 打开请求应答的接受端 |
| | | ./test_net_mod_socket --fun="start_reply" --key=11 & server_pid=$! && echo "pid: ${server_pid}" |
| | | |
| | | |
| | | # 开启bus |
| | | ./dgram_mod_bus server 8 & server_pid=$! && echo ${server_pid} |
| | | # 开启bus |
| | | ./test_net_mod_socket --fun="start_bus_server" --key=8 & server_pid=$! && echo "pid: ${server_pid}" |
| | | |
| | | # 开启网络server |
| | | ./test_net_mod_socket server 5000 & server_pid=$! && echo ${server_pid} |
| | | |
| | | # 开启网络转发代理 |
| | | ./test_net_mod_socket --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}" |
| | | |
| | | } |
| | | |
| | |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "shm_mm.h" |
| | | #include "dgram_mod_socket.h" |
| | | #include "usg_common.h" |
| | | #include <getopt.h> |
| | | |
| | | typedef struct Targ { |
| | | int port; |
| | |
| | | |
| | | }Targ; |
| | | |
| | | void server(int port) { |
| | | void start_net_proxy(int port) { |
| | | printf("Start net proxy\n"); |
| | | void *serverSocket = net_mod_server_socket_open(port); |
| | | if(net_mod_server_socket_start(serverSocket) != 0) { |
| | | err_exit(errno, "net_mod_server_socket_start"); |
| | | } |
| | | } |
| | | |
| | | void client(int port ){ |
| | | void start_net_client(int port ){ |
| | | void * client = net_mod_socket_open(); |
| | | char content[MAXLINE]; |
| | | char action[512]; |
| | | char topic[512]; |
| | | int buskey; |
| | | |
| | | int recv_arr_size, i, n; |
| | | |
| | |
| | | net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); |
| | | } |
| | | } |
| | | else if(strcmp(action, "desub") == 0) { |
| | | printf("Please input buskey topic!\n"); |
| | | |
| | | scanf("%d %s", buskey, topic); |
| | | if (net_mod_socket_desub(client, topic, strlen(topic), buskey) == 0) { |
| | | printf("%d Desub success!\n", net_mod_socket_get_key(client)); |
| | | } else { |
| | | printf("Desub failture!\n"); |
| | | exit(0); |
| | | } |
| | | |
| | | } |
| | | else if(strcmp(action, "sub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | if (net_mod_socket_sub(client, topic, strlen(topic), buskey) == 0) { |
| | | printf("%d Sub success!\n", net_mod_socket_get_key(client)); |
| | | } else { |
| | | printf("Sub failture!\n"); |
| | | exit(0); |
| | | } |
| | | |
| | | } |
| | | else if(strcmp(action, "quit") == 0) { |
| | | break; |
| | | } else { |
| | |
| | | return (void *)i; |
| | | } |
| | | |
| | | void mclient(int port) { |
| | | void start_net_mclient(int port) { |
| | | |
| | | int status, i = 0, processors = 1; |
| | | void *res[processors]; |
| | |
| | | // fflush(stdout); |
| | | } |
| | | |
| | | void start_bus_server(int key) { |
| | | printf("Start bus server\n"); |
| | | void * server_socket = net_mod_socket_open(); |
| | | |
| | | net_mod_socket_bind(server_socket, key); |
| | | |
| | | net_mod_socket_start_bus(server_socket); |
| | | } |
| | | |
| | | |
| | | |
| | | void start_reply(int key) { |
| | | void *socket = net_mod_socket_open(); |
| | | net_mod_socket_bind(socket, key); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | int rv; |
| | | int remote_port; |
| | | while ( (rv = net_mod_socket_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { |
| | | printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); |
| | | sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); |
| | | net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); |
| | | free(recvbuf); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | void usage(char *name) |
| | | { |
| | | fprintf(stderr, "Usage: %s [OPTIONS] [ARG...]\n\n", name); |
| | | fprintf(stderr, "Test net mod socket\n\n"); |
| | | fprintf(stderr, "Options:\n\n"); |
| | | #define fpe(str) fprintf(stderr, " %s", str); |
| | | fpe("-f, --funciton Function name\n"); |
| | | fpe("-p, --port TCP/IP Port\n"); |
| | | fpe("-k, --key SHM Key\n"); |
| | | fpe("\n"); |
| | | } |
| | | |
| | | struct argument_t { |
| | | char *fun; |
| | | int port; |
| | | int key; |
| | | char **cmd_arr; |
| | | int cmd_arr_len; |
| | | }; |
| | | |
| | | argument_t parse_args (int argc, char *argv[]) |
| | | { |
| | | int c; |
| | | |
| | | if(argc < 2) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | if(argc == 2 && strcmp(argv[1], "--help") == 0) { |
| | | usage(argv[0]); |
| | | exit(0); |
| | | } |
| | | |
| | | |
| | | argument_t mopt = {}; |
| | | |
| | | // mopt.volume_list_size = 0; |
| | | |
| | | opterr = 0; |
| | | |
| | | static struct option long_options[] = |
| | | { |
| | | /* These options set a flag. */ |
| | | |
| | | {"fun", required_argument, 0, 'f'}, |
| | | {"key", required_argument, 0, 'k'}, |
| | | {"port", required_argument, 0, 'p'}, |
| | | {0, 0, 0, 0} |
| | | }; |
| | | /* getopt_long stores the option index here. */ |
| | | int option_index = 0; |
| | | while (1) |
| | | { |
| | | |
| | | |
| | | c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index); |
| | | |
| | | /* Detect the end of the options. */ |
| | | if (c == -1) |
| | | break; |
| | | |
| | | switch (c) |
| | | { |
| | | case 0: |
| | | // printf("ffffffff\n"); |
| | | /* If this option set a flag, do nothing else now. */ |
| | | if (long_options[option_index].flag != 0) |
| | | break; |
| | | printf ("option %s", long_options[option_index].name); |
| | | if (optarg) |
| | | printf (" with arg %s", optarg); |
| | | printf ("\n"); |
| | | break; |
| | | |
| | | case 'f': |
| | | mopt.fun = optarg; |
| | | break; |
| | | |
| | | case 'k': |
| | | mopt.key = atoi(optarg); |
| | | break; |
| | | |
| | | case 'p': |
| | | // printf ("==name with value `%s'\n", optarg); |
| | | mopt.port = atoi(optarg); |
| | | break; |
| | | |
| | | case '?': |
| | | // printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind); |
| | | /* getopt_long already printed an error message. */ |
| | | usage(argv[0]); |
| | | exit(1); |
| | | break; |
| | | |
| | | default: |
| | | //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]); |
| | | break; |
| | | } |
| | | } |
| | | |
| | | // printf ("optind = %d, argc=%d \n", optind, argc); |
| | | /* Print any remaining command line arguments (not options). */ |
| | | if (optind < argc) |
| | | { |
| | | mopt.cmd_arr = &argv[optind]; |
| | | mopt.cmd_arr_len = argc - optind; |
| | | // printf ("non-option ARGV-elements: "); |
| | | // while (optind < argc) |
| | | // printf ("%d, %d, %s \n", optind, argc, argv[optind++]); |
| | | // putchar ('\n'); |
| | | } |
| | | return mopt; |
| | | |
| | | } |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | |
| | | argument_t opt = parse_args(argc, argv); |
| | | |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: %s %s|%s <PORT> \n", argv[0], "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | // port = atoi(argv[2]); |
| | | |
| | | |
| | | if (strcmp("server", argv[1]) == 0 ) { |
| | | server(port); |
| | | if (strcmp("start_net_proxy", opt.fun) == 0 ) { |
| | | if(opt.port == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_proxy(opt.port); |
| | | |
| | | } |
| | | else if (strcmp("start_bus_server", opt.fun) == 0) { |
| | | if(opt.key == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_bus_server(opt.key); |
| | | } |
| | | else if (strcmp("start_reply", opt.fun) == 0) { |
| | | if(opt.key == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_reply(opt.key); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | else if (strcmp("start_net_client", opt.fun) == 0) { |
| | | if(opt.port == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_client(opt.port); |
| | | } |
| | | else { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | |
| | | if (strcmp("mclient", argv[1]) == 0) |
| | | mclient(port); |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | exit(0); |
| | | } |
| | | |
| | | void server(int port, bool restart) { |
| | | void server(int key, bool restart) { |
| | | server_socket = dgram_mod_open_socket(); |
| | | |
| | | |
| | | if(restart) { |
| | | dgram_mod_force_bind(server_socket, port); |
| | | dgram_mod_force_bind(server_socket, key); |
| | | } else { |
| | | dgram_mod_bind(server_socket, port); |
| | | dgram_mod_bind(server_socket, key); |
| | | } |
| | | |
| | | |
| | |
| | | pthread_detach(pthread_self()); |
| | | void *recvbuf; |
| | | int size; |
| | | int port; |
| | | while (dgram_mod_recvfrom( socket, &recvbuf, &size, &port) == 0) { |
| | | int key; |
| | | while (dgram_mod_recvfrom( socket, &recvbuf, &size, &key) == 0) { |
| | | printf("收到订阅消息:%s\n", recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | } |
| | | |
| | | void client(int port) { |
| | | void client(int key) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | |
| | | |
| | | pthread_t tid; |
| | | pthread_create(&tid, NULL, run_recv, socket); |
| | | int size; |
| | |
| | | if(strcmp(action, "sub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | if (dgram_mod_sub(socket, topic, strlen(topic), port) == 0) { |
| | | if (dgram_mod_sub(socket, topic, strlen(topic), key) == 0) { |
| | | printf("%d Sub success!\n", dgram_mod_get_port(socket)); |
| | | } else { |
| | | printf("Sub failture!\n"); |
| | |
| | | } else if(strcmp(action, "desub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | if (dgram_mod_desub(socket, topic, strlen(topic), port) == 0) { |
| | | if (dgram_mod_desub(socket, topic, strlen(topic), key) == 0) { |
| | | printf("%d Desub success!\n", dgram_mod_get_port(socket)); |
| | | } else { |
| | | printf("Desub failture!\n"); |
| | |
| | | // printf("%s %s %s\n", action, topic, content); |
| | | printf("Please input topic and content\n"); |
| | | scanf("%s %s", topic, content); |
| | | if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port) == 0){ |
| | | if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, key) == 0){ |
| | | printf("%d Pub success!\n", dgram_mod_get_port(socket)); |
| | | } else { |
| | | printf("Pub failture!\n"); |
| | |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | int key; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: %s %s|%s|rmkey <PORT> ...\n", argv[0], "server", "client"); |
| | | fprintf(stderr, "Usage: %s %s|%s|rmkey <key> ...\n", argv[0], "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | key = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0) { |
| | | if(argc >= 4 && strcmp("restart", argv[3]) == 0) { |
| | | server(port, true); |
| | | server(key, true); |
| | | } |
| | | else{ |
| | | server(port, false); |
| | | server(key, false); |
| | | } |
| | | |
| | | } else if (strcmp("client", argv[1]) == 0) { |
| | | client(port); |
| | | client(key); |
| | | } else if(strcmp("rmkey", argv[1]) == 0) { |
| | | for(int i = 2; i < argc; i++) { |
| | | port = atoi(argv[i]); |
| | | dgram_mod_remove_key(port); |
| | | // printf("%d\n", port); |
| | | key = atoi(argv[i]); |
| | | dgram_mod_remove_key(key); |
| | | // printf("%d\n", key); |
| | | } |
| | | } |
| | | |