#include <sys/ioctl.h>
|
#include <pthread.h>
|
#include <poll.h>
|
#include <sys/time.h>
|
#include <sys/types.h>
|
#include <unistd.h>
|
#include <sys/epoll.h>
|
#include <sys/ioctl.h>
|
#include <arpa/inet.h>
|
#include <sys/mman.h>
|
#include "TcpStreamTrans.h"
|
#include "sps_pps.h"
|
|
CTcpStreamTrans::CTcpStreamTrans()
|
{
|
ExitFlag = false;
|
RecvBuffer = NULL;
|
|
m_TcpClientFd = -1;
|
m_TcpServerFd = -1;
|
|
m_pTcpRecvBufTmp = NULL;
|
m_pTcpRecvPacketBuf = NULL;
|
m_sizeOfTcpRecvBufTmp = 0;
|
m_sizeOfTcpRecvPacketBuf = 0;
|
|
m_pHeadTmpBuff = NULL;
|
m_HeadTmplen = 0;
|
m_tcprecvssrc = 0;
|
m_listTcpStreamPacket.clear();
|
|
m_eStreamTransType = E_STREAM_TRANS_TCPACTIVE; //ÂëÁ÷´«ÊäÀàÐÍ
|
m_StreamSendIp[0] = '\0';
|
m_StreamSendPort = 0;
|
m_StreamRecvIp[0] = '\0';
|
m_StreamRecvPort = 0;
|
|
m_StreamParseHandle1 = 0;
|
m_StreamParseHandle2 = 0;
|
m_StreamParseHandle3 = 0;
|
|
m_RecvStreamType = 0;
|
m_eVideoStreamType = E_VIDEO_STREAM_NONE;
|
}
|
|
CTcpStreamTrans::~CTcpStreamTrans()
|
{
|
ThreadQuit();
|
if(m_TcpClientFd > 0)
|
{
|
close(m_TcpClientFd);
|
m_TcpClientFd = -1;
|
}
|
if (m_TcpServerFd > 0)
|
{
|
close(m_TcpServerFd);
|
m_TcpServerFd = -1;
|
}
|
if (m_StreamParseHandle1 != 0)
|
{
|
STREAMPARSE_Close(m_StreamParseHandle1);
|
m_StreamParseHandle1 = 0;
|
}
|
if (m_StreamParseHandle2 != 0)
|
{
|
STREAMPARSE_Close(m_StreamParseHandle2);
|
m_StreamParseHandle2 = 0;
|
}
|
if (m_StreamParseHandle3 != 0)
|
{
|
STREAMPARSE_Close(m_StreamParseHandle3);
|
m_StreamParseHandle3 = 0;
|
}
|
}
|
|
bool CTcpStreamTrans::SetStreamTransParam(StreamTransType_E streamtype, const char *sendip, int sendport, const char *recvip, int recvport)
|
{
|
if (streamtype != E_STREAM_TRANS_TCPACTIVE && streamtype != E_STREAM_TRANS_TCPPASSIVE)
|
{
|
return false;
|
}
|
m_eStreamTransType = streamtype;
|
strncpy2(m_StreamSendIp, sendip, IPSTR_MAX_LEN);
|
m_StreamSendPort = sendport;
|
strncpy2(m_StreamRecvIp, recvip, IPSTR_MAX_LEN);
|
m_StreamRecvPort= recvport;
|
return true;
|
}
|
|
int CTcpStreamTrans::AcceptClient(int socketfd, char *pSrcIp, UINT16 &SrcPort)
|
{
|
printf("%s:GetSockFd:%d AcceptClient enters into!!\n", __FUNCTION__, socketfd);
|
struct sockaddr_in ClientAddr;
|
socklen_t ClientLen = sizeof(struct sockaddr);
|
int NewFd = -1;
|
//UINT16 CliPort;
|
while (true)
|
{
|
struct pollfd polset;
|
|
memset(&polset,0,sizeof(struct pollfd));
|
polset.fd = socketfd;
|
polset.events = POLLIN;
|
polset.revents = 0;
|
int iRet = poll(&polset,1, 5*1000);
|
bool bOK = (iRet > 0) && ( polset.revents & POLLIN);
|
|
if(!bOK)
|
{
|
printf("%s: poll timeout 5s! close the socket!!", __FUNCTION__);
|
return -1;
|
}
|
|
ClientLen = sizeof(struct sockaddr);
|
memset(&ClientAddr,0,ClientLen);
|
|
if ( (NewFd = accept(socketfd, (struct sockaddr *)&ClientAddr, &ClientLen) ) == -1)
|
{
|
int iError = errno;
|
printf("tcp server accept error!(%d)\n",iError);
|
}
|
else
|
{
|
int tcpmaxlen = 1024*1024;
|
setsockopt(NewFd, SOL_SOCKET, SO_RCVBUF, &tcpmaxlen, sizeof(tcpmaxlen));
|
setsockopt(NewFd, SOL_SOCKET, SO_SNDBUF, &tcpmaxlen, sizeof(tcpmaxlen));
|
|
//---------------- Set socket some parameters to avoid network exception ----------------//
|
int keepalive = 1; //´ò¿ªÌ½²â
|
int keepidle = 60; //¿ªÊ¼Ì½²âǰµÄ¿ÕÏеȴýʱ¼ä
|
int keepintvl = 10; //·¢ËÍ̽²â·Ö½ÚµÄʱ¼ä¼ä¸ô
|
int keepcnt = 3; //·¢ËÍ̽²â·Ö½ÚµÄ´ÎÊý
|
|
if (setsockopt(NewFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepalive, sizeof(keepalive) ) < 0)
|
{
|
perror("fail to set SO_KEEPALIVE");
|
}
|
if (setsockopt(NewFd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepidle, sizeof(keepidle) ) < 0)
|
{
|
perror("fail to set SO_KEEPIDLE");
|
}
|
if (setsockopt(NewFd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepintvl, sizeof(keepintvl) ) < 0)
|
{
|
perror("fail to set SO_KEEPINTVL");
|
}
|
if (setsockopt(NewFd, SOL_TCP, TCP_KEEPCNT, (void *)&keepcnt, sizeof(keepcnt) ) < 0)
|
{
|
perror("fail to set SO_KEEPALIVE");
|
}
|
//---------------------------------------------------------------------------------------//
|
//---------------------------------------------------------------------------------------//
|
//char ClientHostIp[IPSTR_MAX_LEN+1] = {0};
|
//strncpy2(ClientHostIp, pCliIpAddr, IPSTR_MAX_LEN);
|
inet_ntop(AF_INET, (struct in_addr *)&ClientAddr.sin_addr.s_addr, pSrcIp, IPSTR_MAX_LEN); //ʹÓÃḬ̈߳²È«º¯Êý
|
SrcPort = ntohs(ClientAddr.sin_port);
|
printf("NewFd:%d#########new tcp conn:%s,%d#######\n", NewFd, pSrcIp,SrcPort);
|
|
m_TcpClientFd = NewFd;
|
}
|
|
|
return NewFd;
|
}
|
}
|
|
void CTcpStreamTrans::StickyTcpStramPacket(UINT8 *pData, int TcpRecvLen)
|
{
|
if (pData == NULL || TcpRecvLen <= 0)
|
{
|
DBGPrint(M_MRELAY, ERROR_LEVEL,"%s :Tcp Recv Data Error!!", __FUNCTION__);
|
return;
|
}
|
UINT8* pBuff = pData;
|
|
do
|
{
|
if (0 == m_sizeOfTcpRecvBufTmp)
|
{
|
//³¤¶ÈµÄ×ֶηֿª£¬ÐèÒª±£Áô×îºÃÒ»¸ö×Ö½Ú
|
//Ö±½Ó¶ªµô
|
if (TcpRecvLen < TCP_STRAM_HEADER_LEN)
|
{
|
printf("TcpRecvLen(%d) < TCP_STRAM_HEADER_LEN(2) !!!!!!!!!!pBuff[0]:0x%02x", TcpRecvLen, pBuff[0]);
|
if ((TcpRecvLen > 0) && (m_HeadTmplen <= 0))
|
{
|
memcpy(m_pHeadTmpBuff, pBuff, TcpRecvLen);
|
m_HeadTmplen = TcpRecvLen;
|
}
|
break;
|
}
|
if ((m_HeadTmplen > 0) && (m_pHeadTmpBuff != NULL))
|
{
|
memcpy(m_pHeadTmpBuff+m_HeadTmplen, pData, TcpRecvLen);
|
//Õë¶ÔbuffÖØÐ¸³Öµ;
|
pBuff = m_pHeadTmpBuff;
|
TcpRecvLen += m_HeadTmplen;
|
m_HeadTmplen = 0;
|
}
|
|
m_sizeOfTcpRecvPacketBuf = htons(*( (UINT16 *)&pBuff[0] ));
|
pBuff += TCP_STRAM_HEADER_LEN;
|
TcpRecvLen -= TCP_STRAM_HEADER_LEN;
|
|
m_sizeOfTcpRecvBufTmp = m_sizeOfTcpRecvPacketBuf;
|
#if 0
|
//Èç¹ûRTP°üµÄ³¤¶È³¬¹ý2000 ±íʾ²»Õý³£µÄ°ü£¬Ö±½Ó¶ªÆú
|
if (m_sizeOfTcpRecvBufTmp > 2000)
|
{
|
DBGPrint(M_MRELAY, ERROR_LEVEL,"vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv%d recvpacklen:%d\n", TcpRecvLen, m_sizeOfTcpRecvPacketBuf);
|
m_sizeOfTcpRecvBufTmp = 0;
|
return;
|
}
|
#endif
|
m_pTcpRecvPacketBuf = (UINT8 *)malloc(m_sizeOfTcpRecvPacketBuf*2);
|
//memset(m_pTcpRecvPacketBuf, 0, sizeof(m_pTcpRecvPacketBuf));
|
m_pTcpRecvBufTmp = m_pTcpRecvPacketBuf;
|
}
|
|
if (TcpRecvLen <= 0)
|
{
|
break;
|
}
|
//ÒѽÓÊյİüÊý¾Ý³¤¶È < °üÊý¾Ý³¤¶È = Ò»¸ö²»ÍêÕûµÄ°ü
|
if (TcpRecvLen < m_sizeOfTcpRecvBufTmp)
|
{
|
memcpy(m_pTcpRecvBufTmp, pBuff, TcpRecvLen);
|
m_pTcpRecvBufTmp += TcpRecvLen;
|
m_sizeOfTcpRecvBufTmp -= TcpRecvLen;
|
|
pBuff += TcpRecvLen;
|
TcpRecvLen -= TcpRecvLen;
|
}
|
else
|
{
|
//ÒѽÓÊյİüÊý¾Ý³¤¶È == °üÊý¾Ý³¤¶È = Ò»¸öÍêÕûµÄ°ü
|
//ÒѽÓÊյİüÊý¾Ý³¤¶È > °üÊý¾Ý³¤¶È = ÖÁÉÙÓÐÒ»¸öÍêÕûµÄ°ü + ÖÁÉÙÒ»¸öÊý¾ÝƬ¶Î
|
memcpy(m_pTcpRecvBufTmp, pBuff, m_sizeOfTcpRecvBufTmp);
|
|
pBuff += m_sizeOfTcpRecvBufTmp;
|
TcpRecvLen -= m_sizeOfTcpRecvBufTmp;
|
|
UINT32 SSrc = ntohl( *( (UINT32 *)&m_pTcpRecvPacketBuf[8]) );
|
|
if (m_tcprecvssrc <= 0)
|
{
|
m_tcprecvssrc = SSrc;
|
}
|
//Èç¹ûssrc²»Ò»Ö£¬ ±íʾ½âÎöʧ°Ü
|
if (m_tcprecvssrc != SSrc)
|
{
|
printf("vvvvvvvoldssrc:%d, newssrc:%d, %d recvpacklen:%d\n", m_tcprecvssrc, SSrc, TcpRecvLen, m_sizeOfTcpRecvPacketBuf);
|
//free
|
if (m_pTcpRecvPacketBuf != NULL)
|
{
|
free(m_pTcpRecvPacketBuf);
|
m_pTcpRecvPacketBuf = NULL;
|
}
|
m_sizeOfTcpRecvBufTmp = 0;
|
m_tcprecvssrc = 0;
|
break;
|
}
|
|
TcpStreamPacket tcppacket;
|
tcppacket.nLength = m_sizeOfTcpRecvPacketBuf;
|
tcppacket.pData = (UINT8*)new char[m_sizeOfTcpRecvPacketBuf+1];
|
memcpy(tcppacket.pData, m_pTcpRecvPacketBuf, tcppacket.nLength);
|
if (tcppacket.nLength > 0)
|
{
|
tcppacket.pData[tcppacket.nLength] = '\0';
|
}
|
m_listTcpStreamPacket.push_back(tcppacket);
|
#if 1
|
//free
|
if (m_pTcpRecvPacketBuf != NULL)
|
{
|
free(m_pTcpRecvPacketBuf);
|
m_pTcpRecvPacketBuf = NULL;
|
}
|
m_sizeOfTcpRecvBufTmp = 0;
|
#else
|
//memset(m_pTcpRecvBufTmp, 0, sizeof(m_pTcpRecvBufTmp));
|
m_pTcpRecvBufTmp = NULL;
|
m_sizeOfTcpRecvBufTmp = 0;
|
#endif
|
}
|
}while(TcpRecvLen > 0);
|
}
|
|
bool CTcpStreamTrans::ReadSelect(int scoketfd, UINT32 timeout)
|
{
|
int res;
|
struct timeval tv;
|
fd_set wrset;
|
int valopt;
|
socklen_t sock_len;
|
|
if (timeout == 0)
|
{
|
tv.tv_sec=tv.tv_usec=0;
|
}
|
else
|
{
|
tv.tv_sec = timeout / 1000000;
|
tv.tv_usec = timeout % 1000000;
|
}
|
|
FD_ZERO(&wrset);
|
FD_SET(scoketfd, &wrset);
|
|
res = select(scoketfd + 1, NULL, &wrset, NULL, &tv);
|
if (res > 0)
|
{
|
sock_len = sizeof(int);
|
if(getsockopt(scoketfd, SOL_SOCKET, SO_ERROR, (void *) (&valopt), &sock_len) == 0)
|
{
|
if(!valopt)
|
return true;
|
}
|
}
|
return false;
|
}
|
|
int CTcpStreamTrans::Connect(void)
|
{
|
if ((strlen(m_StreamSendIp) <= 0) || (m_StreamSendPort <= 0) || (m_StreamRecvPort <= 0))
|
{
|
//wait conenct to server
|
return -1;
|
}
|
|
int socketfd = -1;
|
//Create the TCP socket
|
socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
if (-1 == socketfd)
|
{
|
perror("CTcpTransport socket");
|
return -1;
|
}
|
//----------------------------------°ó¶¨Ô´IPºÍ¶Ë¿Ú---------------
|
struct sockaddr_in MyAddr;
|
bzero(&MyAddr, sizeof(struct sockaddr_in) );
|
MyAddr.sin_family = AF_INET;
|
MyAddr.sin_addr.s_addr = inet_addr(m_StreamRecvIp);
|
MyAddr.sin_port = htons(m_StreamRecvPort);
|
|
if (bind(socketfd,(struct sockaddr *)&MyAddr, sizeof(struct sockaddr_in) ) == -1)
|
{
|
printf("socket:%d bind %s %d failed!\r\n", socketfd, m_StreamRecvIp,m_StreamRecvPort);
|
perror("bind");
|
//return -1;
|
}
|
else
|
{
|
printf("socket:%d bind local:%s %d sucess!\r\n", socketfd, m_StreamRecvIp, m_StreamRecvPort);
|
}
|
//--------------------------------------------------------------------
|
struct sockaddr_in SvrAddr;
|
bzero(&SvrAddr, sizeof(struct sockaddr_in) );
|
SvrAddr.sin_family = AF_INET;
|
SvrAddr.sin_addr.s_addr = inet_addr(m_StreamSendIp);
|
SvrAddr.sin_port = htons(m_StreamSendPort);
|
|
if (connect(socketfd, (struct sockaddr *)&SvrAddr, sizeof(struct sockaddr_in) ) < 0)
|
{
|
perror("connect");
|
printf("%s:socket:%d connect tcp server:%s %d failed!\r\n",__FUNCTION__, socketfd, m_StreamSendIp,m_StreamSendPort);
|
close(socketfd);
|
socketfd = -1;
|
return -1;
|
}
|
printf("%s: connect tcp server:%s %d sucess by tcp client:%s %d!\r\n",__FUNCTION__,m_StreamSendIp,m_StreamSendPort, m_StreamRecvIp,m_StreamRecvPort);
|
return socketfd;
|
}
|
|
int CTcpStreamTrans::CreateTcpServer(int port)
|
{
|
if (port <= 0)
|
{
|
//wait conenct to server
|
return -1;
|
}
|
//Create the TCP socket
|
m_TcpServerFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
if (-1 == m_TcpServerFd)
|
{
|
perror("CTcpTransport socket");
|
return -1;
|
}
|
|
struct sockaddr_in MyAddr;
|
struct linger linger;
|
int optval;
|
|
bzero(&MyAddr, sizeof(struct sockaddr_in) );
|
MyAddr.sin_family = AF_INET;
|
MyAddr.sin_addr.s_addr = htonl(INADDR_ANY);
|
MyAddr.sin_port = htons(port);
|
|
linger.l_onoff = 1;
|
linger.l_linger = 0;
|
if (setsockopt(m_TcpServerFd, SOL_SOCKET, SO_LINGER, (char *)&linger, sizeof(struct linger) ) == -1)
|
{
|
close(m_TcpServerFd);
|
perror("CTcpTransport set tcp sockopt linger error");
|
return -1;
|
}
|
|
optval = 1;
|
if (setsockopt(m_TcpServerFd, SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval) ) == -1)
|
{
|
close(m_TcpServerFd);
|
perror("CTcpTransport set tcp sockopt reuse address error");
|
return -1;
|
}
|
|
if (bind(m_TcpServerFd, (struct sockaddr *)&MyAddr, sizeof(struct sockaddr_in) ) == -1)
|
{
|
close(m_TcpServerFd);
|
perror("CTcpTransport tcp bind error");
|
return -1;
|
}
|
|
if (listen(m_TcpServerFd, 20) == -1)
|
{
|
close(m_TcpServerFd);
|
return -1;
|
}
|
m_eStreamTransType = E_STREAM_TRANS_TCPPASSIVE;
|
printf("%s: create tcp server port:%d sucess\n", __FUNCTION__, port);
|
return m_TcpServerFd;
|
}
|
|
int CTcpStreamTrans::IsMark( UINT8* pBuffer, unsigned int BufSize)
|
{
|
bool mark = false;
|
if (BufSize > 12) //video
|
{
|
mark= pBuffer[1] & 0x80;
|
}
|
return mark;
|
}
|
|
|
int CTcpStreamTrans::GetRtpHeaderSize(UINT8* _frame, int _frameLen)
|
{
|
int size;
|
size = 12;
|
if (_frameLen < 12)
|
return 0;
|
|
size += (_frame[0] & 0x0f) * 4;
|
|
//ÅжÏÓÐûÓÐÉèÖÃRTPÍ·²¿À©Õ¹
|
if (!(_frame[0] & 0x10))/*ûÓÐÀ©Õ¹*/
|
return size;
|
|
/*Í·²¿À©Õ¹ÁË£¬À©Õ¹Í·¸ñʽ¼ûrfc3550µÄ5.3.1*/
|
//if ((size + 4) < _frameLen)
|
//return (size + 4 + (_frame[size + 2] << 8) + _frame[size + 3]);
|
|
if ((size + 4) < _frameLen)
|
return (size + 4 + 4* ((_frame[size + 2] << 8) + _frame[size + 3]) );
|
|
return 0;
|
}
|
|
int CTcpStreamTrans::ParseTcpStream(UINT8* pData, int len)
|
{
|
RTP_FIXED_HEADER *rtpheader;
|
rtpheader = (RTP_FIXED_HEADER *)pData;
|
int rtpplaytype = rtpheader->payload;
|
int rtpheadlen = GetRtpHeaderSize(pData, len);
|
if (IsVideoPayloadType(rtpplaytype) )
|
{
|
if(len-rtpheadlen > 0)
|
{
|
HandleRecvStream(pData+rtpheadlen, len-rtpheadlen, rtpplaytype, IsMark(pData,len));
|
}
|
}
|
return 0;
|
}
|
|
int CTcpStreamTrans::HandleRecvStream(UINT8* pAFrame, UINT32 FrameSize, int PayLoadType, bool bMark)
|
{
|
do
|
{
|
if (0 == m_StreamParseHandle1)
|
{
|
m_StreamParseHandle1 = STREAMPARSE_Open();
|
if (m_StreamParseHandle1 == 0)
|
{
|
//fail
|
}
|
}
|
|
StreamParseParamIn In;
|
In.pBufIn = pAFrame;
|
In.nSizeIn = FrameSize;
|
In.bMark = bMark;
|
In.PayLoadType = (StreamParseType_E)PayLoadType;
|
StreamParseParamOut Out;
|
|
STREAMPARSE_Parse(m_StreamParseHandle1, &In, &Out);
|
if (Out.nSizeOut <= 0)
|
{
|
break;
|
}
|
if (Out.streamtype==0xC0)
|
{
|
//
|
break;
|
}
|
if((m_RecvStreamType == 0) && (Out.streamtype != 0))
|
{
|
m_RecvStreamType = Out.streamtype;
|
printf("%s:1. >>>>>>>>>Detecing Stream Type... StreamType:%02x!!", __FUNCTION__, Out.streamtype);
|
}
|
|
if(m_RecvStreamType == 0)
|
{
|
printf("%s:2. Detecing Stream Type... StreamType:%d!!", __FUNCTION__, Out.streamtype);
|
break;
|
}
|
|
int pts = Out.pts/90;
|
|
StreamParseParamIn StreamIn;
|
if(m_RecvStreamType == 0xE2)//MPEG4
|
{
|
StreamIn.PayLoadType = STREAMPARSE_MERGEAMPEG4;
|
}
|
else if(m_RecvStreamType == 0xE3)//MPEG2
|
{
|
StreamIn.PayLoadType = STREAMPARSE_MERGEAMPEG2;
|
}
|
else//H264
|
{
|
StreamIn.PayLoadType = STREAMPARSE_ESTOES;
|
}
|
if (0 == m_StreamParseHandle2)
|
{
|
m_StreamParseHandle2 = STREAMPARSE_Open();
|
if (m_StreamParseHandle2 == 0)
|
{
|
//fail
|
}
|
}
|
StreamIn.pBufIn = Out.pBufOut;
|
StreamIn.nSizeIn = Out.nSizeOut;
|
StreamIn.bMark = false;
|
StreamParseParamOut StreamOut;
|
|
STREAMPARSE_Parse(m_StreamParseHandle2, &StreamIn, &StreamOut);
|
if (StreamOut.nSizeOut > 0)
|
{
|
int FrameType = GB_VIDEO_FRAME_P;
|
VideoStreamType_E eVideoStreamType = E_VIDEO_STREAM_H264;
|
if(m_RecvStreamType == 0xE2)
|
{
|
eVideoStreamType = E_VIDEO_STREAM_MPEG4;
|
mpeg_header_t* VOL = (mpeg_header_t*)StreamOut.pBufOut;
|
if(is_VOL_header(VOL))
|
{
|
FrameType = GB_VIDEO_FRAME_I;
|
}
|
}
|
else if(m_RecvStreamType == 0xE3)
|
{
|
eVideoStreamType = E_VIDEO_STREAM_MPEG2;
|
mpeg_header_t* SH = (mpeg_header_t*)StreamOut.pBufOut;
|
if(is_sequence_header(SH))
|
{
|
FrameType = GB_VIDEO_FRAME_I;
|
}
|
}
|
else if(m_RecvStreamType == 0xE1)
|
{
|
eVideoStreamType = E_VIDEO_STREAM_H265;
|
if (is_h265_IFrame_Data(StreamOut.pBufOut, StreamOut.nSizeOut))
|
{
|
FrameType = GB_VIDEO_FRAME_I;
|
}
|
}
|
else
|
{
|
eVideoStreamType = E_VIDEO_STREAM_H264;
|
if (is_H264_IFrame_Data(StreamOut.pBufOut, StreamOut.nSizeOut))
|
{
|
FrameType = GB_VIDEO_FRAME_I;
|
}
|
}
|
|
if (m_eVideoStreamType == E_VIDEO_STREAM_NONE)
|
{
|
m_eVideoStreamType = eVideoStreamType;
|
printf("%s:>>>>>>>>>Detecing Stream Type:%d!!!!!!!!", __FUNCTION__, (int)m_eVideoStreamType);
|
}
|
if (MediaCBFunc != NULL)
|
{
|
MediaCBFunc((int)m_eVideoStreamType, FrameType, StreamOut.pBufOut, StreamOut.nSizeOut, UserData);
|
}
|
}
|
break;
|
}while(0);
|
}
|
|
void* CTcpStreamTrans::Thread()
|
{
|
ThreadStarted();
|
|
int RecvLen = 0;
|
if (RecvBuffer == NULL)
|
{
|
RecvBuffer = new unsigned char[UDP_PACKET_RECV_MAXSIZE+1];
|
RecvBuffer[0] = '\0';
|
}
|
|
if (m_eStreamTransType == E_STREAM_TRANS_TCPPASSIVE && m_TcpServerFd <= 0)
|
{
|
m_TcpServerFd = CreateTcpServer(m_StreamRecvPort);
|
if (m_TcpServerFd <= 0)
|
{
|
printf("%s: create tcp server listen failed!!!!\n", __FUNCTION__);
|
return NULL;
|
}
|
}
|
|
m_pHeadTmpBuff = (UINT8 *)malloc(TCP_STREAM_BUFFER_MAX_LEN);
|
m_HeadTmplen = 0;
|
m_listTcpStreamPacket.clear();
|
TcpStreamPacket TcpRecvPacket;
|
TcpPacketIt it;
|
|
int m_DataIncomingTime = GetClockTime();
|
int WaitForTimeout = 10 * 1000;
|
struct pollfd FdSet[1] = { {0, 0} };
|
while(!ExitFlag)
|
{
|
if (m_eStreamTransType == E_STREAM_TRANS_TCPPASSIVE)
|
{
|
if (m_TcpClientFd <= 0)
|
{
|
char sendip[IPSTR_MAX_LEN+1] = {0};
|
UINT16 sendport = 0;
|
m_TcpClientFd = AcceptClient(m_TcpServerFd, sendip, sendport);
|
if (m_TcpClientFd <= 0)
|
{
|
//usleep(100*1000);
|
continue;
|
}
|
else
|
{
|
//ÊÇ·ñÆ¥ÅäÔ´ipºÍport
|
//ÔÝʱĬÈÏÓÐÁ¬½Ó¾ÍËãÊǽÓÊÕµ½ÂëÁ÷
|
//continue;
|
FdSet[0].fd = m_TcpClientFd;
|
FdSet[0].events = POLLIN | POLLPRI;
|
}
|
}
|
}
|
else
|
{
|
if (m_TcpClientFd <= 0)
|
{
|
m_TcpClientFd = Connect();
|
//tcpclient
|
if (m_TcpClientFd < 0)
|
{
|
printf("+++++++++++++++++++++m_TcpClientFd:%d\n", m_TcpClientFd);
|
usleep(1000*1000);
|
continue;
|
}
|
FdSet[0].fd = m_TcpClientFd;
|
FdSet[0].events = POLLIN | POLLPRI;
|
}
|
}
|
//ÂÖÑ¯ÍøÂçFD£¬ÓÐÊý¾Ý»òÕßÒì³£Á¢¼´·µ»Ø£¬ÎÞÊý¾ÝµÈ´ý³¬Ê±¡£
|
if ( (poll(FdSet, 1, WaitForTimeout) < 0) && (errno != EINTR) )
|
{
|
printf("%s: #####%s:%d poll error: SockFd<%d>!######", __FUNCTION__, __FILE__,__LINE__,m_TcpClientFd);
|
return NULL;
|
}
|
|
//ÅжÏÊý¾Ý¿É¿¿ÐÔ£¿
|
if ( ( (FdSet[0].revents & POLLIN) == POLLIN) || ( (FdSet[0].revents & POLLPRI) == POLLPRI) )
|
{
|
RecvLen = recv(m_TcpClientFd, RecvBuffer, TCP_STREAM_BUFFER_MAX_LEN, 0);
|
if (RecvLen > 0)
|
{
|
StickyTcpStramPacket(RecvBuffer, RecvLen);
|
while (m_listTcpStreamPacket.size() > 0)
|
{
|
it = m_listTcpStreamPacket.begin();
|
TcpRecvPacket = *it;
|
m_listTcpStreamPacket.erase(it);
|
|
if (TcpRecvPacket.nLength > 0)
|
{
|
//»ñȡÿһ°üµÄÊý¾Ý
|
ParseTcpStream(TcpRecvPacket.pData, TcpRecvPacket.nLength);
|
}
|
if (TcpRecvPacket.pData != NULL)
|
{
|
delete []((UINT8*)TcpRecvPacket.pData);
|
TcpRecvPacket.pData = NULL;
|
TcpRecvPacket.nLength = 0;
|
}
|
}
|
}
|
else
|
{
|
printf("%s:Recv Data Len<%d> From socket<%d>, Connect Failed!", __FILE__, RecvLen, m_TcpClientFd);
|
break;
|
}
|
}
|
|
// ³¬¹ý5ÃëûÓнÓÊÕµ½ÂëÁ÷´òÓ¡ÐÅÏ¢
|
int now = GetClockTime();
|
if (now - m_DataIncomingTime > 5)
|
{
|
printf("%s:>>>>>>>tcp sendinfo<%s:%d> recvinfo<%s:%d> over 5 seconds has not stream!!!!!", __FUNCTION__, m_StreamSendIp, m_StreamSendPort, m_StreamRecvIp, m_StreamRecvPort);
|
}
|
m_DataIncomingTime = now;
|
}
|
|
printf("%s: Stream Thread Exit %p, Jstream::ThreadQuit Sucess!!!\n", __FUNCTION__, this);
|
return NULL;
|
}
|
|
void CTcpStreamTrans::ThreadQuit(void)
|
{
|
ExitFlag = true;
|
usleep(100000);
|
|
if (m_pTcpRecvPacketBuf != NULL)
|
{
|
free(m_pTcpRecvPacketBuf);
|
m_pTcpRecvPacketBuf = NULL;
|
}
|
|
if(RecvBuffer != NULL)
|
{
|
delete [] RecvBuffer;
|
RecvBuffer = NULL;
|
}
|
|
if (m_pHeadTmpBuff != NULL)
|
{
|
free(m_pHeadTmpBuff);
|
m_pHeadTmpBuff = NULL;
|
}
|
m_HeadTmplen = 0;
|
|
TcpPacketIt it;
|
while (!m_listTcpStreamPacket.empty())
|
{
|
it = m_listTcpStreamPacket.begin();
|
delete []((UINT8*)it->pData);
|
it->pData = NULL;
|
m_listTcpStreamPacket.erase(it);
|
}
|
}
|
|
|