librtmp读包阻塞问题修复
近期项目中有遇到播放器在使用librtmp播放rtmp码流时,在弱网环境中,会出现读包block很久的情况,甚至断网会直接阻塞读包线程,导致播放器无法退出造成ANR。librtmp目前社区已无人维护,所以无法通过升级第三方库来试图解决此问题,只能自己啃源码了。附上rtmp链接https://github.com/ossrs/librtmp阅读librtmp源码得知,读包有2个地方会造成死锁问题,这
近期项目中有遇到播放器在使用librtmp播放rtmp码流时,在弱网环境中,会出现读包block很久的情况,甚至断网会直接阻塞读包线程,导致播放器无法退出造成ANR。librtmp目前社区已无人维护,所以无法通过升级第三方库来试图解决此问题,只能自己啃源码了。
附上rtmp链接
https://github.com/ossrs/librtmp
阅读librtmp源码得知,读包有2个地方会造成死锁问题,这两个API分别是:
int RTMP_Read(RTMP *r, char *buf, int size)
int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)
首先看第一个:
RTMP_Read
Read_1_Packet
RTMP_GetNextMediaPacket
int
RTMP_GetNextMediaPacket(RTMP *r, RTMPPacket *packet)
{
int bHasMediaPacket = 0;
// 这里会死循环去读包,并且读不到数据也出不了循环
while (!bHasMediaPacket && RTMP_IsConnected(r)
&& RTMP_ReadPacket(r, packet))
{
if (!RTMPPacket_IsReady(packet) || !packet->m_nBodySize)
{
continue;
}
bHasMediaPacket = RTMP_ClientPacket(r, packet);
if (!bHasMediaPacket)
{
RTMPPacket_Free(packet);
}
else if (r->m_pausing == 3)
{
if (packet->m_nTimeStamp <= r->m_mediaStamp)
{
bHasMediaPacket = 0;
#ifdef _DEBUG
RTMP_Log(RTMP_LOGDEBUG,
"Skipped type: %02X, size: %d, TS: %d ms, abs TS: %d, pause: %d ms",
packet->m_packetType, packet->m_nBodySize,
packet->m_nTimeStamp, packet->m_hasAbsTimestamp,
r->m_mediaStamp);
#endif
RTMPPacket_Free(packet);
continue;
}
r->m_pausing = 0;
}
}
if (bHasMediaPacket)
r->m_bPlaying = TRUE;
else if (r->m_sb.sb_timedout && !r->m_pausing)
r->m_pauseStamp = r->m_mediaChannel < r->m_channelsAllocatedIn ?
r->m_channelTimestamp[r->m_mediaChannel] : 0;
return bHasMediaPacket;
}
第二个则是RTMP_ReadPacket:
RTMP_ReadPacket
ReadN
RTMPSockBuf_Fill
int
RTMPSockBuf_Fill(RTMPSockBuf *sb)
{
int nBytes;
if (!sb->sb_size)
sb->sb_start = sb->sb_buf;
while (1)
{
nBytes = sizeof(sb->sb_buf) - 1 - sb->sb_size - (sb->sb_start - sb->sb_buf);
#if defined(CRYPTO) && !defined(NO_SSL)
if (sb->sb_ssl)
{
nBytes = TLS_read(sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);
}
else
#endif
{
// 直接调用系统的recv方法风险很大
nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);
}
if (nBytes != -1)
{
sb->sb_size += nBytes;
}
else
{
int sockerr = GetSockError();
RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)",
__FUNCTION__, nBytes, sockerr, strerror(sockerr));
if (sockerr == EINTR && !RTMP_ctrlC)
continue;
if (sockerr == EWOULDBLOCK || sockerr == EAGAIN)
{
sb->sb_timedout = TRUE;
nBytes = 0;
}
}
break;
}
return nBytes;
}
在fill socket buffer的方法中会直接调用recv方法下载数据,这是风险极高的操作,弱网环境中会导致recv返回很慢,断网状态更是直接会卡在recv里无法返回。
解法方法其实不难,分两步去做:
1、recv之前我们使用select或者poll去轮训已经有数据的socket,若返回成功则在去recv即可。建议使用poll来做,因为select有fd的限制,若传入select方法的socket fd大于当前系统的FD_SETSIZE则会直接导致crash,poll方法则没有这个限制,或者使用epoll来做效率更高(如果涉及跨IOS开发则不可使用epoll,IOS系统暂时还不支持epoll方法)。
2、因为poll机制如果socket没有数据则会返回0,通常外层会有一个while循环去一直poll,知道poll到数据返回,那如果弱网或者断网一直poll不到数据还不是一样会死循环?这个问题也好解决,我们看一下ffmpeg的做法就能得到答案:
在libavformat/network.c中已有答案:
// 可以看到这里只有3种情况
// 1.poll < 0出错了,直接返回
// 2.ff_neterrno() : p.revents & (ev | POLLERR | POLLHUP)满足,说明有数据可读/写,返回0,注意,这个方法进行了封装,poll系统方法返回0是超时,这里返回0才是正常
// 3.其他情况都返回try again
int ff_network_wait_fd(int fd, int write)
{
int ev = write ? POLLOUT : POLLIN;
struct pollfd p = { .fd = fd, .events = ev, .revents = 0 };
int ret;
ret = poll(&p, 1, POLLING_TIME);
return ret < 0 ? ff_neterrno() : p.revents & (ev | POLLERR | POLLHUP) ? 0 : AVERROR(EAGAIN);
}
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
{
int ret;
int64_t wait_start = 0;
// 循环去poll等待数据可读之后退出while
while (1) {
// 中断机制,上层若有退出操作可以通过此中断异步退出循环
if (ff_check_interrupt(int_cb))
return AVERROR_EXIT;
// poll轮训一直去等待socket有数据可读
ret = ff_network_wait_fd(fd, write);
// 异常情况直接返回错误
if (ret != AVERROR(EAGAIN))
return ret;
// 有个超时等待机制,如果太长时间都poll不到数据也要退出
if (timeout > 0) {
if (!wait_start)
wait_start = av_gettime_relative();
else if (av_gettime_relative() - wait_start > timeout)
return AVERROR(ETIMEDOUT);
}
}
}
所以,我们可以通过异步中断函数来退出循环,附上我的解法:
rtmp.c
int
RTMPSockBuf_Fill(RTMPSockBuf *sb, RTMPIOInterruptCB *cb)
{
int nBytes;
if (!sb->sb_size)
sb->sb_start = sb->sb_buf;
while (1)
{
if (RTMP_Read_Check_Interrupt(cb))
{
RTMP_Log(RTMP_LOGINFO, "%s, poll data, user interrupt!", __FUNCTION__);
break;
}
nBytes = sizeof(sb->sb_buf) - 1 - sb->sb_size - (sb->sb_start - sb->sb_buf);
#if defined(CRYPTO) && !defined(NO_SSL)
if (sb->sb_ssl)
{
nBytes = TLS_read(sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);
}
else
#endif
{
int ret = RTMP_Poll_Fd(sb->sb_socket);
if (ret < 0)
return ret;
else if (ret == EAGAIN)
continue;
else
nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);
}
if (nBytes != -1)
{
sb->sb_size += nBytes;
}
else
{
int sockerr = GetSockError();
RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)",
__FUNCTION__, nBytes, sockerr, strerror(sockerr));
if (sockerr == EINTR && !RTMP_ctrlC)
continue;
if (sockerr == EWOULDBLOCK || sockerr == EAGAIN)
{
sb->sb_timedout = TRUE;
nBytes = 0;
}
}
break;
}
return nBytes;
}
int
RTMP_Read_Check_Interrupt(RTMPIOInterruptCB *cb)
{
if (cb && cb->callback && cb->callback(cb->opaque))
return 1;
else
return 0;
}
int
RTMP_Poll_Fd(int fd)
{
short ev = POLLIN;
struct pollfd p = { .fd = fd, .events = ev, .revents = 0 };
int ret;
ret = poll(&p, 1, kPollingTimeMs);
return ret < 0 ? ret : p.revents & (ev | POLLERR | POLLHUP) ? 0 : EAGAIN;
}
头文件:
typedef struct RTMP
{
RTMPIOInterruptCB m_interruptCallback; /* 读包中断函数,从调用librtmp的地方传入 */
}
/**
* @brief 中断结构
*
* @param callback: callback方法
* @param opaque: 上层传入实例
*/
typedef struct RTMPIOInterruptCB {
int (*callback)(void*);
void *opaque;
} RTMPIOInterruptCB;
/* 确认在读包过程中,上层有没有退出播放的操作 */
int RTMP_Read_Check_Interrupt(RTMPIOInterruptCB *cb);
int RTMP_Poll_Fd(int fd);
注意:
RTMPIOInterruptCB中的callback需要从其他的线程通知才可以,通常的播放器架构,会有一个主线程去处理各种回调的信息还有播放器的各类事件(如start、stop等),可在把callback接到该主线程,如果该主线程接到stop后让该callback返回true即可正常退出。
更多推荐
所有评论(0)