#include "FunctionalUDPPeer.h" #include #include #include #include #include #include #include #include #include #include FunctionalUDPPeer::FunctionalUDPPeer() : cfg(), simpleUDPPeer(), /*recvQueue(nullptr)*/ partitions(), mySendBuffer(nullptr), mySendBuffSize(0), mySendBuffMaxSize(0), myRecvBuffer(nullptr), myRecvBuffSize(0), myRecvBuffMaxSize(0), mySeq(0), myGrpSeq(0), recv_async_proc_user(nullptr) { } FunctionalUDPPeer::~FunctionalUDPPeer() { } void recvq_ud_deleter(void* userData) { } void FunctionalUDPPeer::set_config(const Config& _cfg) { cfg = _cfg; // // if (recvQueue == nullptr)//#todo delete // { // cfg.recvQueueCfg.bud_deleter = recvq_ud_deleter; // recvQueue = PreAllocBufferPriorityQueue(cfg.recvQueueCfg); // // } simpleUDPPeer.set_config(cfg.simpleUDPPeerCfg); //#todo re-config myself mySendBuffMaxSize = cfg.simpleUDPPeerCfg.maxBuffSize + sizeof(FUPHeader); mySendBuffer = new uint8_t[mySendBuffMaxSize]; //myRecvBuffMaxSize = (cfg.simpleUDPPeerCfg.maxBuffSize + sizeof(FUPHeader)) * maxSeqInPartition; myRecvBuffer = new uint8_t[myRecvBuffMaxSize]; } //const Config& FunctionalUDPPeer::get_config() const //{ // return cfg; //} bool FunctionalUDPPeer::listen(const std::string& ip, short port) { //return simpleUDPPeer.listen(ip, port); } void FunctionalUDPPeer::teardown() { simpleUDPPeer.teardown(); // #todo clear up myself } bool FunctionalUDPPeer::send_sync(const uint8_t* buffer, size_t& buffSize) { size_t totalSendSize = 0; const size_t myMaxSendDataSize = cfg.simpleUDPPeerCfg.maxBuffSize - sizeof(FUPHeader); const int sendLoops = int(buffSize / myMaxSendDataSize) + (buffSize % myMaxSendDataSize > 0 ? 1 : 0); fuph_grpseq_t grpseq = ++myGrpSeq; for(int i = 1; i <= sendLoops; i++) { FUPHeader* header = new (mySendBuffer) FUPHeader; header->seq = ++mySeq; header->grpseq = grpseq; header->parts = sendLoops; // size not contains header, only sizeof data header->size = (i == sendLoops ? (buffSize % myMaxSendDataSize) : myMaxSendDataSize); memcpy(header->data, buffer + (i - 1) * myMaxSendDataSize, header->size); header->hton(); size_t _sendSize = mySendBuffSize = sizeof(FUPHeader) + header->size; if (simpleUDPPeer.send_sync(mySendBuffer, _sendSize) && mySendBuffSize == _sendSize) totalSendSize += _sendSize; } turnover_seq(); return (totalSendSize - sendLoops * sizeof(FUPHeader) == buffSize);//#todo we need better dual with partially send ok/error } bool FunctionalUDPPeer::recv_sync(uint8_t* buffer, size_t& buffSize) { size_t totalRecvSize = 0; bool packetized = false; int eagainSpin = 0; while(!packetized) { if (eagainSpin > cfg.maxEagainSpin) break; // Buffer* qbuff = recvQueue->GetFree(); // if (qbuff == nullptr) // break; // // qbuff->size = cfg.simpleUDPPeerCfg.recvQueueCfg.maxBuffSize; // if (simpleUDPPeer.recv_sync(qbuff->buffer, qbuff->size)) // { // FUPHeader* header = (FUPHeader*)(qbuff->buffer); // header->ntoh(); // if (header->magic == FUP_MAGIC && header->size == qbuff->size - sizeof(FUPHeader)) // { // if (insert_partitions(qbuff)) // qbuff = nullptr;// not release // } // //#todo else: Drop packet, DOS secuirty should care // } // else // { // eagainSpin++; // } // // recvQueue->Release(qbuff); packetized = packetize(buffer, buffSize); } return packetized; } static void recv_async_proc(void* args, uint8_t* buffer, size_t buffSize) { FunctionalUDPPeer* _this = (FunctionalUDPPeer*)args; // Buffer* qbuff = _this->recvQueue->GetFree(); // if (qbuff == nullptr) // return; // // FUPHeader* header = (FUPHeader*)buffer; // header->ntoh(); // if (header->magic == FUP_MAGIC && header->size == qbuff->size - sizeof(FUPHeader)) // { // Buffer* qbuff = recvQueue->GetFree(); // if (qbuff != nullptr) // { // memcpy(qbuff->buffer, buffer, buffSize); // if (insert_partitions(qbuff)) // qbuff = nullptr;// not release // } // } // //#todo else: Drop packet, DOS secuirty should care // // // #todo eagainSpin // // recvQueue->Release(qbuff); // // myRecvBuffSize = myRecvBuffMaxSize; // if (packetize(myRecvBuffer, myRecvBuffSize)) // recv_async_proc_user(myRecvBuffer, myRecvBuffSize); } bool FunctionalUDPPeer::recv_async_start(proc_func_t _proc) { recv_async_proc_user = _proc; return simpleUDPPeer.recv_async_start(FunctionalUDPPeer::recv_async_proc, this); } void FunctionalUDPPeer::recv_async_stop() { simpleUDPPeer.recv_async_stop(); recv_async_proc_user = nullptr; } void FunctionalUDPPeer::turnover_seq() { if ((uint32_t)(mySeq + cfg.maxSeqInPartition) >= UINT16_MAX || (uint32_t)(myGrpSeq + 1) >= UINT8_MAX) { mySeq = 0; myGrpSeq = 0; } } bool insert_partitions(PreAllocBufferPriorityQueue::Buffer* qbuff) { // FUPHeader* header = (FUPHeader*)buffer; // partitions_map_t::iterator iterPart = partitions.find(header->grpseq); // if (iterPart == partitions.end()) // { // if (partitions.size() >= cfg.maxPartitions) // return false; // // partitions_set_t parts; // parts.insert(qbuff); // partitions.insert(std::make_pair(PartitionWrapper(header->grpseq, header->parts), parts)); // } // else // { // partitions_set_t& parts(iterPart->second); // if (parts.size() >= cfg.maxSeqInPartition) // return false; // // parts.insert(qbuff); // } return true; } bool FunctionalUDPPeer::packetize(uint8_t* buffer, size_t& buffSize) { // partition by grpseq // sort in seq // test full group // copy data of full group for (partitions_map_t::iterator iterPart = partitions.begin(); iterPart != partitions.end(); ++iterPart) { if (iterPart->first.packetizedCount >= cfg.maxPacketize) //#todo delete from map else iterPart->first.packetizedCount += 1; partitions_set_t& parts(iterPart->second); FUPHeader* header = (FUPHeader*)buffer; if (parts.size() >= iterPart->first.maxGrpSeq) { for(partitions_set_t::iterator iterSeq = parts.begin(); iterSeq != parts.end(); ++iterSeq) //copytobuffer //#todo delete from map } } }