#include "FunctionalUDPPeer.h"
|
|
#include <stdio.h>
|
#include <stdlib.h>
|
#include <string.h>
|
#include <errno.h>
|
#include <sys/socket.h>
|
#include <netinet/in.h>
|
#include <arpa/inet.h>
|
#include <netdb.h>
|
#include <unistd.h>
|
|
#include <pthread.h>
|
|
FunctionalUDPPeer::FunctionalUDPPeer() :
|
cfg(), simpleUDPPeer(), /*recvQueue(nullptr)*/ partitions(),
|
mySendBuffer(nullptr), mySendBuffSize(0), mySendBuffMaxSize(0),
|
myRecvBuffer(nullptr), myRecvBuffSize(0), myRecvBuffMaxSize(0),
|
mySeq(0), myGrpSeq(0),
|
recv_async_proc_user(nullptr)
|
{
|
}
|
|
FunctionalUDPPeer::~FunctionalUDPPeer()
|
{
|
}
|
|
void recvq_ud_deleter(void* userData)
|
{
|
}
|
|
void FunctionalUDPPeer::set_config(const Config& _cfg)
|
{
|
cfg = _cfg;
|
//
|
// if (recvQueue == nullptr)//#todo delete
|
// {
|
// cfg.recvQueueCfg.bud_deleter = recvq_ud_deleter;
|
// recvQueue = PreAllocBufferPriorityQueue(cfg.recvQueueCfg);
|
//
|
// }
|
|
simpleUDPPeer.set_config(cfg.simpleUDPPeerCfg);
|
|
//#todo re-config myself
|
mySendBuffMaxSize = cfg.simpleUDPPeerCfg.maxBuffSize + sizeof(FUPHeader);
|
mySendBuffer = new uint8_t[mySendBuffMaxSize];
|
|
//myRecvBuffMaxSize = (cfg.simpleUDPPeerCfg.maxBuffSize + sizeof(FUPHeader)) * maxSeqInPartition;
|
myRecvBuffer = new uint8_t[myRecvBuffMaxSize];
|
}
|
|
//const Config& FunctionalUDPPeer::get_config() const
|
//{
|
// return cfg;
|
//}
|
|
bool FunctionalUDPPeer::listen(const std::string& ip, short port)
|
{
|
//return simpleUDPPeer.listen(ip, port);
|
}
|
|
void FunctionalUDPPeer::teardown()
|
{
|
simpleUDPPeer.teardown(); // #todo clear up myself
|
}
|
|
bool FunctionalUDPPeer::send_sync(const uint8_t* buffer, size_t& buffSize)
|
{
|
size_t totalSendSize = 0;
|
|
const size_t myMaxSendDataSize = cfg.simpleUDPPeerCfg.maxBuffSize - sizeof(FUPHeader);
|
const int sendLoops = int(buffSize / myMaxSendDataSize) + (buffSize % myMaxSendDataSize > 0 ? 1 : 0);
|
fuph_grpseq_t grpseq = ++myGrpSeq;
|
|
for(int i = 1; i <= sendLoops; i++)
|
{
|
FUPHeader* header = new (mySendBuffer) FUPHeader;
|
header->seq = ++mySeq;
|
header->grpseq = grpseq;
|
header->parts = sendLoops;
|
// size not contains header, only sizeof data
|
header->size = (i == sendLoops ? (buffSize % myMaxSendDataSize) : myMaxSendDataSize);
|
memcpy(header->data, buffer + (i - 1) * myMaxSendDataSize, header->size);
|
header->hton();
|
|
size_t _sendSize = mySendBuffSize = sizeof(FUPHeader) + header->size;
|
if (simpleUDPPeer.send_sync(mySendBuffer, _sendSize) && mySendBuffSize == _sendSize)
|
totalSendSize += _sendSize;
|
}
|
|
turnover_seq();
|
|
return (totalSendSize - sendLoops * sizeof(FUPHeader) == buffSize);//#todo we need better dual with partially send ok/error
|
}
|
|
bool FunctionalUDPPeer::recv_sync(uint8_t* buffer, size_t& buffSize)
|
{
|
size_t totalRecvSize = 0;
|
bool packetized = false;
|
int eagainSpin = 0;
|
|
while(!packetized)
|
{
|
if (eagainSpin > cfg.maxEagainSpin)
|
break;
|
|
// Buffer* qbuff = recvQueue->GetFree();
|
// if (qbuff == nullptr)
|
// break;
|
//
|
// qbuff->size = cfg.simpleUDPPeerCfg.recvQueueCfg.maxBuffSize;
|
// if (simpleUDPPeer.recv_sync(qbuff->buffer, qbuff->size))
|
// {
|
// FUPHeader* header = (FUPHeader*)(qbuff->buffer);
|
// header->ntoh();
|
// if (header->magic == FUP_MAGIC && header->size == qbuff->size - sizeof(FUPHeader))
|
// {
|
// if (insert_partitions(qbuff))
|
// qbuff = nullptr;// not release
|
// }
|
// //#todo else: Drop packet, DOS secuirty should care
|
// }
|
// else
|
// {
|
// eagainSpin++;
|
// }
|
//
|
// recvQueue->Release(qbuff);
|
|
packetized = packetize(buffer, buffSize);
|
}
|
|
return packetized;
|
}
|
|
static void recv_async_proc(void* args, uint8_t* buffer, size_t buffSize)
|
{
|
FunctionalUDPPeer* _this = (FunctionalUDPPeer*)args;
|
|
// Buffer* qbuff = _this->recvQueue->GetFree();
|
// if (qbuff == nullptr)
|
// return;
|
//
|
// FUPHeader* header = (FUPHeader*)buffer;
|
// header->ntoh();
|
// if (header->magic == FUP_MAGIC && header->size == qbuff->size - sizeof(FUPHeader))
|
// {
|
// Buffer* qbuff = recvQueue->GetFree();
|
// if (qbuff != nullptr)
|
// {
|
// memcpy(qbuff->buffer, buffer, buffSize);
|
// if (insert_partitions(qbuff))
|
// qbuff = nullptr;// not release
|
// }
|
// }
|
// //#todo else: Drop packet, DOS secuirty should care
|
//
|
// // #todo eagainSpin
|
//
|
// recvQueue->Release(qbuff);
|
//
|
// myRecvBuffSize = myRecvBuffMaxSize;
|
// if (packetize(myRecvBuffer, myRecvBuffSize))
|
// recv_async_proc_user(myRecvBuffer, myRecvBuffSize);
|
}
|
|
bool FunctionalUDPPeer::recv_async_start(proc_func_t _proc)
|
{
|
recv_async_proc_user = _proc;
|
return simpleUDPPeer.recv_async_start(FunctionalUDPPeer::recv_async_proc, this);
|
}
|
|
void FunctionalUDPPeer::recv_async_stop()
|
{
|
simpleUDPPeer.recv_async_stop();
|
recv_async_proc_user = nullptr;
|
}
|
|
void FunctionalUDPPeer::turnover_seq()
|
{
|
if ((uint32_t)(mySeq + cfg.maxSeqInPartition) >= UINT16_MAX || (uint32_t)(myGrpSeq + 1) >= UINT8_MAX)
|
{
|
mySeq = 0;
|
myGrpSeq = 0;
|
}
|
}
|
|
bool insert_partitions(PreAllocBufferPriorityQueue::Buffer* qbuff)
|
{
|
// FUPHeader* header = (FUPHeader*)buffer;
|
// partitions_map_t::iterator iterPart = partitions.find(header->grpseq);
|
// if (iterPart == partitions.end())
|
// {
|
// if (partitions.size() >= cfg.maxPartitions)
|
// return false;
|
//
|
// partitions_set_t parts;
|
// parts.insert(qbuff);
|
// partitions.insert(std::make_pair(PartitionWrapper(header->grpseq, header->parts), parts));
|
// }
|
// else
|
// {
|
// partitions_set_t& parts(iterPart->second);
|
// if (parts.size() >= cfg.maxSeqInPartition)
|
// return false;
|
//
|
// parts.insert(qbuff);
|
// }
|
|
return true;
|
}
|
|
bool FunctionalUDPPeer::packetize(uint8_t* buffer, size_t& buffSize)
|
{
|
// partition by grpseq
|
// sort in seq
|
// test full group
|
// copy data of full group
|
|
for (partitions_map_t::iterator iterPart = partitions.begin(); iterPart != partitions.end(); ++iterPart)
|
{
|
if (iterPart->first.packetizedCount >= cfg.maxPacketize)
|
//#todo delete from map
|
else
|
iterPart->first.packetizedCount += 1;
|
|
partitions_set_t& parts(iterPart->second);
|
FUPHeader* header = (FUPHeader*)buffer;
|
if (parts.size() >= iterPart->first.maxGrpSeq)
|
{
|
for(partitions_set_t::iterator iterSeq = parts.begin(); iterSeq != parts.end(); ++iterSeq)
|
//copytobuffer
|
//#todo delete from map
|
}
|
}
|
|
}
|