流媒体弱网优化之路(WebRTC)——jitterbuffer分析与优化

——
我正在的github给大家开发一个用于做实验的项目 —— github.com/qw225967/Bifrost

目标:可以让大家熟悉各类Qos能力、带宽估计能力,提供每个环节关键参数调节接口并实现一个json全配置,提供全面的可视化算法观察能力。

欢迎大家使用
——


一、jitterbuffer原理

  WebRTC中的Jitter Buffer是一个用于处理网络抖动(jitter)的缓冲区,它的作用是为音频或视频数据提供一个平滑的播放体验。当我们在网络上传输音频或视频数据时,由于网络传输延迟和不可预测的网络抖动,数据包很可能会以不同的速度到达接收端。这可能会导致数据包在接收端的播放速度不一致,从而影响音频或视频的质量和连续性。Jitter Buffer的原理是将接收到的数据包缓存到一个缓冲区中,在缓冲区中等待一段时间,以便其他数据包到达。然后,Jitter Buffer会按照正确的顺序将数据包发送到音频或视频播放器,以确保数据包按照正确的速率播放。

1.1 组包流程

  在详细介绍jitterbuffer前我们先把整体组包的流程解释一下:
  1.当接收到Rtp包后第一时间会进入到PacketBuffer中进行组帧,并对外提供FindFrame函数对外查找组成的帧。同时还会统计丢包的数据;(序号排序)
在这里插入图片描述
  2.当接收到Frame后需要RtpFrameReferenceFinder查找整体的帧参考关系,并且会根据数据整体的状态:kStash、kHandOff、kDrop几个状态确定是否丢弃该帧;(帧排序)
在这里插入图片描述
  3.上述的方式保证了Gop内部的数据帧之间的有序性,但每个Gop之间则没确定。而FrameBuffer则保证每个帧的Gop顺序的输出至解码器。(Gop排序)
在这里插入图片描述
  调用流程:

// src/video/rtp_video_stream_receiver2
RtpVideoStreamReceiver2::OnReceivedPayloadData;
//				|
PacketBuffer::InsertPacket;
//				|
PacketBuffer::FindFrames;
//				|
RtpVideoStreamReceiver2::OnInsertedPacket;
//				|
RtpVideoStreamReceiver2::OnAssembledFrame;
//				|
RtpFrameReferenceFinder::ManageFrame;
//				|
 VideoReceiveStream2::OnCompleteFrame; // video_receivestream2是rtp_video_stream_receiver2的回调complete_frame_callback_
//				|
FrameBuffer2::InsertFrame;

1.2 解码与渲染

  上诉取帧完成后,进入到解码流程。
  进入到VideoReceiver2进行解码,具体的流程如下:

FrameBuffer::StartWaitForNextFrameOnQueue;
//				|
VideoReceiveStream2::StartNextDecode;
//				|
VideoReceiveStream2::HandleEncodedFrame;
//				|
VideoReceiveStream2::DecodeAndMaybeDispatchEncodedFrame;
//				|
VideoReceiver2::Decode;
//				|
VCMGenericDecoder::Decode;
//				|
 H264DecoderImpl::Decode;
//				|
VCMDecodedFrameCallback::Decoded;
//				|
VideoStreamDecoder::FrameToRender;
//				|
IncomingVideoStream::OnFrame;
//				| ——  incoming_render_queue_.PostTask // 投入渲染队列
 

1.3 timing 与 jitter

  上述已经是整个数据包从接收到渲染的整体流程,但生产环境中我们整体的网络抖动会导致我们该流程中某一流程的阻塞就造成整个画面的卡顿。如果想要让画面更加的流畅我们必须提前感知到整个流程是否存在卡的可能,并且使用延迟换取流程,适当地增大整个渲染延迟。

1.3.1 jitter buffer

  关键参数:
  1.VCMInterFrameDelay:帧间延迟 = 前后帧接收时间差 - 前后帧发送时间差;
  2.VCMJitterEstimator:通过帧延迟计算出抖动值;
在这里插入图片描述

1.3.2 timing

  timing是解码完成后确定渲染时间戳的关键类,该类会结合jitterbuffer测算的jitter情况、解码消耗时间、渲染耗时等等计算得到一个可靠的渲染延迟。

获取函数意义
TargetDelayInternal()此函数返回目标延迟,目标延迟是希望获得的延迟,但并不是最终的延迟
RenderTimeInternal()此函数返回一个平滑后的延迟,由于延迟采样没有经过线性归一,因此波动会很大需要进行卡尔曼滤波
RequiredDecodeTime()此函数返回解码耗时,解码耗时

  current_delay_是最终使用的延迟值。

  // 渲染延迟 固定值10ms
  void set_render_delay(TimeDelta render_delay);

  // jitter延迟
  void SetJitterDelay(TimeDelta required_delay);

  // 最小播放延迟
  void set_min_playout_delay(TimeDelta min_playout_delay);
  
  // 最大播放延迟
  void set_max_playout_delay(TimeDelta max_playout_delay);

  // 直接更新当前延迟
  void UpdateCurrentDelay(uint32_t frame_timestamp);
  void UpdateCurrentDelay(Timestamp render_time, Timestamp actual_decode_time);

二、jitterbuffer的实现

  jitterbuffer插入数据实现。

VCMFrameBufferEnum VCMJitterBuffer::InsertPacket(const VCMPacket& packet,
                                                 bool* retransmitted) {
  MutexLock lock(&mutex_);

  ++num_packets_;
  // 待插入的packet更老
  // Does this packet belong to an old frame?
  if (last_decoded_state_.IsOldPacket(&packet)) {
    // Account only for media packets.
    if (packet.sizeBytes > 0) {
      // 累积老包个数
      num_consecutive_old_packets_++;
    }
    // 此包晚收到,并且属于同时间的一帧,更新last_decoded_state_->sequence_num_
    // Update last decoded sequence number if the packet arrived late and
    // belongs to a frame with a timestamp equal to the last decoded
    // timestamp.
    last_decoded_state_.UpdateOldPacket(&packet);
    DropPacketsFromNackList(last_decoded_state_.sequence_num());

    // Also see if this old packet made more incomplete frames continuous.
    FindAndInsertContinuousFramesWithState(last_decoded_state_);

    // 连续收到老包的个数太多,则flush jitter buffer
    if (num_consecutive_old_packets_ > kMaxConsecutiveOldPackets) {
      RTC_LOG(LS_WARNING)
          << num_consecutive_old_packets_
          << " consecutive old packets received. Flushing the jitter buffer.";
      Flush();
      return kFlushIndicator;
    }
    // 返回老包标示
    return kOldPacket;
  }

  // 收到了更新的包则重置此值
  num_consecutive_old_packets_ = 0;

  VCMFrameBuffer* frame;
  FrameList* frame_list;
  // 获取packet对应的帧,如果不存在则获取一个空帧
  const VCMFrameBufferEnum error = GetFrame(packet, &frame, &frame_list);
  if (error != kNoError)
    return error;

  // 记录当前包插入时间
  int64_t now_ms = clock_->TimeInMilliseconds();
  // 第一个包已经插入成功后,开始计算帧间延迟
  // We are keeping track of the first and latest seq numbers, and
  // the number of wraps to be able to calculate how many packets we expect.
  if (first_packet_since_reset_) {
    // Now it's time to start estimating jitter
    // reset the delay estimate.
    inter_frame_delay_.Reset(now_ms);
  }

  // 空包不计入jitter estimate
  // Empty packets may bias the jitter estimate (lacking size component),
  // therefore don't let empty packet trigger the following updates:
  if (packet.video_header.frame_type != VideoFrameType::kEmptyFrame) {
    // packet属于同一帧
    if (waiting_for_completion_.timestamp == packet.timestamp) {
      // This can get bad if we have a lot of duplicate packets,
      // we will then count some packet multiple times.
      waiting_for_completion_.frame_size += packet.sizeBytes;
      waiting_for_completion_.latest_packet_time = now_ms;
    } else if (waiting_for_completion_.latest_packet_time >= 0 &&
               waiting_for_completion_.latest_packet_time + 2000 <= now_ms) {
      // 如果当前包不属于正在等待完整的帧且等待完整帧超时了,重置waiting_for_completion_
      // A packet should never be more than two seconds late
      UpdateJitterEstimate(waiting_for_completion_, true);
      waiting_for_completion_.latest_packet_time = -1;
      waiting_for_completion_.frame_size = 0;
      waiting_for_completion_.timestamp = 0;
    }
  }

  // 保存帧的当前状态
  VCMFrameBufferStateEnum previous_state = frame->GetState();
  // Insert packet.
  FrameData frame_data;
  frame_data.rtt_ms = kDefaultRtt;
  frame_data.rolling_average_packets_per_frame = average_packets_per_frame_;
  // 插入packet到对应的frame
  VCMFrameBufferEnum buffer_state =
      frame->InsertPacket(packet, now_ms, frame_data);

  // 插入包未发生错误
  if (buffer_state > 0) {
    if (first_packet_since_reset_) {
      latest_received_sequence_number_ = packet.seqNum;
      first_packet_since_reset_ = false;
    } else {
      // 是重传包,增加帧nack计数
      if (IsPacketRetransmitted(packet)) {
        frame->IncrementNackCount();
      }
      // 如果更新nack列表失败且当前包不是关键帧,需要flush
      if (!UpdateNackList(packet.seqNum) &&
          packet.video_header.frame_type != VideoFrameType::kVideoFrameKey) {
        buffer_state = kFlushIndicator;
      }

      // 更新最新接收到的包序列号
      latest_received_sequence_number_ =
          LatestSequenceNumber(latest_received_sequence_number_, packet.seqNum);
    }
  }

  // frame是否连续(即本身已经连续或者跟decodable_frames_内某一帧比连续)
  // Is the frame already in the decodable list?
  bool continuous = IsContinuous(*frame);
  switch (buffer_state) {
    case kGeneralError:
    case kTimeStampError:
    case kSizeError: {
      // 发生错误回收frame
      RecycleFrameBuffer(frame);
      break;
    }
    case kCompleteSession: {
      // 只通知一次frame完整
      if (previous_state != kStateComplete) {
        if (continuous) {
          // 触发通知连续完整帧事件
          // Signal that we have a complete session.
          frame_event_->Set();
        }
      }

      *retransmitted = (frame->GetNackCount() > 0);
      if (continuous) {
        // 连续则插入到decodable_frames_
        decodable_frames_.InsertFrame(frame);
        FindAndInsertContinuousFrames(*frame);
      } else {
        // 不连续则插入未完成帧列表
        incomplete_frames_.InsertFrame(frame);
      }
      break;
    }
    case kIncomplete: {
      // 如果是空帧,更新空帧解码状态
      if (frame->GetState() == kStateEmpty &&
          last_decoded_state_.UpdateEmptyFrame(frame)) {
        // 回收空帧(比如padding帧)
        RecycleFrameBuffer(frame);
        return kNoError;
      } else {
        // 媒体帧则插入未完整列表
        incomplete_frames_.InsertFrame(frame);
      }
      break;
    }
    case kNoError:
    case kOutOfBoundsPacket:
    case kDuplicatePacket: {
      // Put back the frame where it came from.
      if (frame_list != NULL) {
        // 如果帧属于incomplete或者decodable列表,则放回到其中
        frame_list->InsertFrame(frame);
      } else {
        // 回收帧
        RecycleFrameBuffer(frame);
      }
      ++num_duplicated_packets_;
      break;
    }
    case kFlushIndicator:
      // 回收帧并请求flush
      RecycleFrameBuffer(frame);
      return kFlushIndicator;
    default:
      assert(false);
  }
  return buffer_state;
}

  其中后使用到jitter_estimate_是一个对jitter做估计的类。

// Updates the estimates with the new measurements.
void JitterEstimator::UpdateEstimate(TimeDelta frame_delay,
                                     DataSize frame_size) {
  if (frame_size.IsZero()) {
    return;
  }
  // Can't use DataSize since this can be negative.
  double delta_frame_bytes =
      frame_size.bytes() - prev_frame_size_.value_or(DataSize::Zero()).bytes();
  if (frame_size_count_ < kFsAccuStartupSamples) {
    frame_size_sum_ += frame_size;
    frame_size_count_++;
  } else if (frame_size_count_ == kFsAccuStartupSamples) {
    // Give the frame size filter.
    avg_frame_size_ = frame_size_sum_ / static_cast<double>(frame_size_count_);
    frame_size_count_++;
  }

  DataSize avg_frame_size = kPhi * avg_frame_size_ + (1 - kPhi) * frame_size;
  DataSize deviation_size = DataSize::Bytes(2 * sqrt(var_frame_size_));
  if (frame_size < avg_frame_size_ + deviation_size) {
    // Only update the average frame size if this sample wasn't a key frame.
    avg_frame_size_ = avg_frame_size;
  }

  double delta_bytes = frame_size.bytes() - avg_frame_size.bytes();
  var_frame_size_ = std::max(
      kPhi * var_frame_size_ + (1 - kPhi) * (delta_bytes * delta_bytes), 1.0);

  // Update max_frame_size_ estimate.
  max_frame_size_ = std::max(kPsi * max_frame_size_, frame_size);

  if (!prev_frame_size_) {
    prev_frame_size_ = frame_size;
    return;
  }
  prev_frame_size_ = frame_size;

  // Cap frame_delay based on the current time deviation noise.
  TimeDelta max_time_deviation =
      TimeDelta::Millis(time_deviation_upper_bound_ * sqrt(var_noise_) + 0.5);
  frame_delay.Clamp(-max_time_deviation, max_time_deviation);

  // Only update the Kalman filter if the sample is not considered an extreme
  // outlier. Even if it is an extreme outlier from a delay point of view, if
  // the frame size also is large the deviation is probably due to an incorrect
  // line slope.
  double deviation = DeviationFromExpectedDelay(frame_delay, delta_frame_bytes);

  if (fabs(deviation) < kNumStdDevDelayOutlier * sqrt(var_noise_) ||
      frame_size.bytes() >
          avg_frame_size_.bytes() +
              kNumStdDevFrameSizeOutlier * sqrt(var_frame_size_)) {
    // Update the variance of the deviation from the line given by the Kalman
    // filter.
    EstimateRandomJitter(deviation);
    // Prevent updating with frames which have been congested by a large frame,
    // and therefore arrives almost at the same time as that frame.
    // This can occur when we receive a large frame (key frame) which has been
    // delayed. The next frame is of normal size (delta frame), and thus deltaFS
    // will be << 0. This removes all frame samples which arrives after a key
    // frame.
    if (delta_frame_bytes > -0.25 * max_frame_size_.bytes()) {
      // Update the Kalman filter with the new data
      KalmanEstimateChannel(frame_delay, delta_frame_bytes);
    }
  } else {
    int nStdDev =
        (deviation >= 0) ? kNumStdDevDelayOutlier : -kNumStdDevDelayOutlier;
    EstimateRandomJitter(nStdDev * sqrt(var_noise_));
  }
  // Post process the total estimated jitter
  if (startup_count_ >= kStartupDelaySamples) {
    PostProcessEstimate();
  } else {
    startup_count_++;
  }
}

// 获取jitter estimate网络估计延迟时间
uint32_t VCMJitterBuffer::EstimatedJitterMs() {
  MutexLock lock(&mutex_);
  const double rtt_mult = 1.0f;
  return jitter_estimate_.GetJitterEstimate(rtt_mult, absl::nullopt);
}

  上面的JitterEstimator逻辑需要好好解释一下:
  造成的延迟抖动的情况分为:大帧接收慢、网络抖动等问题。
  1.大帧分片:一般720p的I帧可能会有30 ~ 50个rtp数据包,而一般的p帧只有 几个包,也就是jitter的变化会出现10倍的突变,因此而造成抖动;
  2.网络抖动:可知组帧需要多个rtp数据包,其中只要有一个数据包缺失我们就需要进行nack重传进行补包,中间会消耗至少一个rtt的时间进行修复,也会造成比较明显的抖动。因此我们可以看到,jitter计算中会参考rtt的数据:


// Returns the current filtered estimate if available,
// otherwise tries to calculate an estimate.
TimeDelta JitterEstimator::GetJitterEstimate(
    double rtt_multiplier,
    absl::optional<TimeDelta> rtt_mult_add_cap) {
  TimeDelta jitter = CalculateEstimate() + OPERATING_SYSTEM_JITTER;
  Timestamp now = clock_->CurrentTime();
  if (now - latest_nack_ > kNackCountTimeout)
    nack_count_ = 0;

  if (filter_jitter_estimate_ > jitter)
    jitter = filter_jitter_estimate_;
  if (nack_count_ >= kNackLimit) {
    if (rtt_mult_add_cap.has_value()) {
      jitter += std::min(rtt_filter_.Rtt() * rtt_multiplier,
                         rtt_mult_add_cap.value());
    } else {
      jitter += rtt_filter_.Rtt() * rtt_multiplier;
    }
  }

  if (enable_reduced_delay_) {
    static const Frequency kJitterScaleLowThreshold = Frequency::Hertz(5);
    static const Frequency kJitterScaleHighThreshold = Frequency::Hertz(10);
    Frequency fps = GetFrameRate();
    // Ignore jitter for very low fps streams.
    if (fps < kJitterScaleLowThreshold) {
      if (fps.IsZero()) {
        return std::max(TimeDelta::Zero(), jitter);
      }
      return TimeDelta::Zero();
    }

    // Semi-low frame rate; scale by factor linearly interpolated from 0.0 at
    // kJitterScaleLowThreshold to 1.0 at kJitterScaleHighThreshold.
    if (fps < kJitterScaleHighThreshold) {
      jitter = (1.0 / (kJitterScaleHighThreshold - kJitterScaleLowThreshold)) *
               (fps - kJitterScaleLowThreshold) * jitter;
    }
  }

  return std::max(TimeDelta::Zero(), jitter);
}

三、乱序反应与丢包jitter

  上面给大家详细介绍了jitterbuffer部分的实现,可知jitterbuffer在工作的过程中是依赖于rtt统计的,同时jitter的大帧延迟进行了卡尔曼滤波之后,同样会作用到我们的渲染延迟中。
  这样的实现在P2P的形式下工作会比较良好,但在当前国内部分厂商的应用场景下会出现一个无法完美解决的问题,同时jitterbuffer也不完美存在这些应用中的问题:
  1.高丢包剧烈变化的场景下卡顿会比较频繁的出现(例如:0 丢包 ~ 70 丢包);
  2.经过服务器中转时,上行丢包对下行jitter动态调整效果不佳。

3.1 突发高丢包卡顿原因

  我们可知整体的组包在于 packet buffer 部分,当数据包出现大面积丢包时,在packet buffer部分最明显的表现就是组成完整帧的能力变慢了,而后续 frame finder 的帧排序,frame buffer 的Gop排序都依赖于前面的数据包,在突变的过程中 jitterbuffer 会出现较明显的延迟增加不足,因此卡顿会非常明显。

  解决方式:根据乱序,固定增加render延迟;

3.2 上行丢包卡顿

  我们的jitterbuffer调整jitter延迟是依赖于rtt的,在P2P模式下,rtt就是两个端交互的真实值。但在带有sfu服务器的模式下,上行的rtt不会反馈到下行的rtt上,因此我们整体的jitter计算会出现偏低的情况。

  解决方式:统计端到端延迟,根据端到端延迟以及乱序度动态增加渲染延迟

四、总结

  本文介绍了jitterbuffer的一些实现逻辑和代码实现,并且针对jitterbuffer在sfu模式下使用的一些问题提出了一些新的解决方案。根据上述的解决方案进行调整,我们在70%随机丢包、无带宽限制、无延迟增加的情况下,实现了1.5s延迟且在弱网切换瞬间几乎无感知的播放平滑。也是使用了最传统的延迟换卡顿的策略,感谢大家阅读。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐