liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
box/tcp_proxy.cpp
@@ -16,64 +16,23 @@
 * =====================================================================================
 */
#include "tcp_proxy.h"
#include "defs.h"
#include "shm_socket.h"
#include "tcp_connection.h"
TcpProxy::TcpProxy() :
    run_(false) {}
TcpProxy::~TcpProxy() {}
bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
bool TcpProxy::Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb)
{
   Stop();
   bool cur = false;
   if (!run_.compare_exchange_strong(cur, true)) { return false; }
   auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
   local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
   auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
      auto &dest = head.dest();
      if (dest.ip().empty() || dest.port() == 0) { return; }
      bool r = Send(dest.ip(), dest.port(), msg.content());
      // TODO check send fail.
   };
   local_->Start(1, localProc);
   auto proxyProc = [this]() {
      while (run_) {
         io_context_.run_one_for(std::chrono::milliseconds(100));
      }
   };
   return true;
}
void TcpProxy::Stop()
{
   local_.reset();
   bool cur = true;
   if (run_.compare_exchange_strong(cur, false)) {
      if (worker_.joinable()) {
         worker_.join();
      }
   if (content.empty()) { return false; }
   try {
      tcp::endpoint dest(ip::address::from_string(ip), port);
      TcpRequest1::Create(io_, dest, std::move(content), cb);
      LOG_TRACE() << "tcp request start " << ip << ':' << port;
      return true;
   } catch (std::exception &e) {
      LOG_ERROR() << "proxy request exception: " << e.what();
      return false;
   }
}
bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
bool TcpProxy::Publish(const std::string &ip, int port, std::string &&content)
{
   if (content.empty()) { return false; }
   tcp::endpoint dest(ip::address::from_string(ip), port);
   TcpRequest1::Create(io_context_, dest, std::move(content));
   // char tag[sizeof(kBHTcpServerTag)] = {0};
   // int n = read(sock, tag, sizeof(tag));
   // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
   //    send(sock, content.data(), content.size(), 0);
   //    connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
   //    // success
   // }
}
   return Request(ip, port, std::move(content), ReplyCB());
}