一个将源项目各个子项目整合并编译以后的工程地址

https://github.com/ywcai/JKms.git

PlayerEndpoint.cpp 初始化过程代码分析

回顾 PlayerEndpointImpl.cpp初始化的过程,会先初始化他的父类MediaElementImpl.cpp以及UriEndpointImpl.cpp.其中MediaELement.CPP会作最重要的c层gstreamer插件的初始化工作:

 

element = gst_element_factory_make(factoryName.c_str(), nullptr);

返回与之相对应的element。

同时将element添加到了1个pipeline中,这个pipeline我们在java的调用方法中知道,是需要在构造参数中传入过来的,因此CPP层也是一样,由构造方法中传入。

而在UriEndpointImpl.cpp的构造方法中,则对url进行了存储和校验. 其中在removeDuplicateSlashes()方法中,对本地文件和rtps/http的路径进行了区别处理。

上一章讲到了PlayerEndpoint.CPP 执行完成后,最终设置了当前模拟对象所在容器pipeline的状态为Playing,并且发送了信号让CPP层知道状态被改变,并出发相应的回调。具体回调的位置和方法:

void

UriEndpointImpl::postConstructor ()

{

  EndpointImpl::postConstructor ();

 

  stateChangedHandlerId = register_signal_handler (G_OBJECT (element),

                          "state-changed",

                          std::function <void (GstElement *, guint) > (std::bind (

                                &UriEndpointImpl::stateChanged, this,

                                std::placeholders::_2) ),

                          std::dynamic_pointer_cast<UriEndpointImpl> (shared_from_this() ) );

}

 

同时也触发了C层state_changed回调,相关代码如下:

 

kms_uri_endpoint_signals[STATE_CHANGED] =

      g_signal_new ("state-changed",

      G_TYPE_FROM_CLASS (klass),

      G_SIGNAL_RUN_LAST,

      G_STRUCT_OFFSET (KmsUriEndpointClass, state_changed), NULL, NULL,

      g_cclosure_marshal_VOID__ENUM, G_TYPE_NONE, 1,

      KMS_TYPE_URI_ENDPOINT_STATE);

其中注意这里是state_changed方法而不是change-state方法了。G_SIGNAL_RUN_LAST参数指的是在所有连接的回调运行完成后调用。运行其他connect信号回调的方法,先看connect信号连接的下面在CPP层中回调:

继续回到UriEndpointImpl.cpp,可以搜索到构造方法中订阅state-changed信号后回调的 stateChanged的方法,代码如下:

void

UriEndpointImpl::stateChanged (guint new_state)

{

  state = wrap_c_state ( (KmsUriEndpointState) new_state);

 

  try {

    UriEndpointStateChanged event (shared_from_this (),

        UriEndpointStateChanged::getName (), state);

    sigcSignalEmit(signalUriEndpointStateChanged, event);

  } catch (const std::bad_weak_ptr &e) {

    // shared_from_this()

    GST_ERROR ("Bug creating %s: %s",

        UriEndpointStateChanged::getName ().c_str (), e.what ());

  }

}

这里首先是执行了一个切换状态的方法,然后将触发1个事件通知给业务层面的客户端,最终返回给JAVA层的RPC。因此接下来CPP中最重要的就是wrap_c_state ()方法。

再看g_singe_new中的方法, G_STRUCT_OFFSET (KmsUriEndpointClass, state_changed),运行 KmsUriEndpointClass指针对象中的 state_changed方法。

但是kmsuriendpoint.c中的该类方法并没有实现,其子类也没有任何实现,并且也没有被初始化,因此 kmsuriendpoint.c中接下来不会在处理该信号。

但在全局搜索所有“state-changed”,  可以发现其子类有单独connect了该信号,  但都是在 test相关的方法中进行的调用,所以不用再理会。

回到CPP层中的 wrap_c_state (),代码如下:

 

static std::shared_ptr<UriEndpointState>

wrap_c_state (KmsUriEndpointState state)

{

  UriEndpointState::type type;

 

  switch (state) {

  case KMS_URI_ENDPOINT_STATE_STOP:

    type = UriEndpointState::type::STOP;

    break;

 

  case KMS_URI_ENDPOINT_STATE_START:

    type = UriEndpointState::type::START;

    break;

 

  case KMS_URI_ENDPOINT_STATE_PAUSE:

    type = UriEndpointState::type::PAUSE;

    break;

 

  default:

    GST_ERROR ("Invalid state value %d", state);

    type = UriEndpointState::type::STOP;

  }

 

  std::shared_ptr<UriEndpointState> uriState (new UriEndpointState (type) );

 

  return uriState;

}

最后一行,初始化了1个新的UriEndpointSate并赋给了uriState这个共享指针。

这样,所有kurento代码中的处理就完成了。接下来则是到用的gstream中的组件的自动处理。

 

Playendpoint.cpp初始化中,他的C层的element和相关bin、pad 的结构,如何初始化,如何连接。 继续kmsplayerendpoint.c这个自定义插件的初始化过程。

 

static void

kms_player_endpoint_class_init (KmsPlayerEndpointClass *klass)

{

  .....

  //类结构的初始化,主要是初始化相关方法和安装属性。

}

 

主要关注的是模拟对象的初始化方法,关键的element\bin\pad,都在这里初始化

static void

kms_player_endpoint_init (KmsPlayerEndpoint *self)

{

  GstBus *bus;

  self->priv = KMS_PLAYER_ENDPOINT_GET_PRIVATE (self);

  g_mutex_init (&self->priv->base_time_mutex);

  self->priv->base_time = GST_CLOCK_TIME_NONE;

  self->priv->base_time_preroll = GST_CLOCK_TIME_NONE;

 

  self->priv->loop = kms_loop_new ();

  self->priv->pipeline = gst_pipeline_new ("internalpipeline");

  self->priv->uridecodebin = gst_element_factory_make ("uridecodebin", NULL);

  self->priv->network_cache = NETWORK_CACHE_DEFAULT;

  self->priv->port_range = g_strdup (PORT_RANGE_DEFAULT);

 

  self->priv->stats.probes = kms_list_new_full (

      g_direct_equal, g_object_unref, (GDestroyNotify) kms_stats_probe_destroy);

 

  /* Connect to signals */

  g_signal_connect (self->priv->uridecodebin, "pad-added",

      G_CALLBACK (kms_player_endpoint_uridecodebin_pad_added), self);

  g_signal_connect (self->priv->uridecodebin, "pad-removed",

      G_CALLBACK (kms_player_endpoint_uridecodebin_pad_removed), self);

  g_signal_connect (self->priv->uridecodebin, "source-setup",

      G_CALLBACK (kms_player_endpoint_uridecodebin_source_setup), self);

  g_signal_connect (self->priv->uridecodebin, "element-added",

      G_CALLBACK (kms_player_endpoint_uridecodebin_element_added), self);

 

  /* Eat all async messages such as buffering messages */

  bus = gst_pipeline_get_bus (GST_PIPELINE (self->priv->pipeline));

  gst_bus_add_watch (bus, (GstBusFunc) process_bus_message, self);

 

  g_object_set (self->priv->uridecodebin, "download", TRUE, NULL);

 

  gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->uridecodebin);

 

  gst_bus_set_sync_handler (bus, bus_sync_signal_handler, self, NULL);

 

  g_object_unref (bus);

}

 

 

self->priv->pipeline = gst_pipeline_new ("internalpipeline");

self->priv->uridecodebin = gst_element_factory_make ("uridecodebin", NULL);

创建了1个internalpipeline 对象和uridecodebin的对象。

Pipeline是管道,bin箱体,箱体上是具体的element和pipeline, 而pipeline和bin都同时也是element的子类,具备其相关特性。Element可以强制转化为bin,bin中必须要有element才可以进行数据流动,且要流动数据的element 必须绑定在1个element 上。 

gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->uridecodebin);

这句代码是将当前的pipeline转换为bin,  然后将uridecodebin 添加到这个bin里面, 将正常创建一个uridecodebin,创建uridecodebin 后将同步触发下面相应的回调方法。在uridecodebin 内部会根据协议不同动态添加不同的element.

在kms_player_endpoint_uridecodebin_element_added()加入日志方法,通过打印日志可以知道,这个方法在gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->uridecodebin) 执行后,如果是http 的资源会被调用4次,如果是RTSP的资源则只会被调2次。

如果添加的是RTSP的相关资源

if (g_strcmp0 (gst_plugin_feature_get_name (

                     GST_PLUGIN_FEATURE (gst_element_get_factory (element))),

          RTSPSRC)== 0) {

g_object_set (G_OBJECT (element), "latency", self->priv->network_cache,

        "drop-on-latency", TRUE, "port-range", self->priv->port_range, NULL);

}

latency:设置element的延时时间;

Port-range,设置访问RTSP本地分配的rtp端口;

drop-on-latency:超过jitterbuffer缓存区范围是否丢弃;这里是选择丢掉。

添加后,还会添加一个decodebin0的element,可通过打印日志查看。  可以知道这个element 可以进行解码。并且也是1个bin。这个element被添加时,回调方法什么也不会作做。

 

当rtspsrc 被创建时,kms_player_endpoint_uridecodebin_source_setup()  方法会同时也被调用1次,根据名字可以知道,一个 虽然src  的标签创建了会触发这个回调。

srcpad = gst_element_get_static_pad (source, "src");  

if (srcpad == NULL) {

    GST_WARNING_OBJECT (self,

        "Skip setting latency probe, no src pad in %" GST_PTR_FORMAT " (%s)",

        uridecodebin, G_OBJECT_TYPE_NAME ());

    return;

}

该方法会获取source的pad赋值给srcpad,然后对srcpad进行 buffer 原始数据进行探测,并进行回调处理。但在这里初始化的时候, 这个source的PAD实际上并没有被创建,因此数据检测并没有被开启相关回调 。    

当我们检测到 RTSPSRC最后获取到数据时,它就会自动生成source pad,并且触发“pad-added”信号。这kms_player_endpoint_uridecodebin_pad_added()  方法就会被调用了 , 方法代码如下:

static void

kms_player_endpoint_uridecodebin_pad_added (

    GstElement *element, GstPad *pad, KmsPlayerEndpoint *self)

{

  GstElement *appsink, *appsrc;

  GstElement *agnosticbin;

  GstPad *sinkpad;

  GstPadLinkReturn link_ret;

// 下面 这个方法,一路查到底,生成了1个接受数据的tee对象, 如果不是音视频数据,则连接到1个默认的fakesink。如果是音视频数据,则返回对象,并注册1个转码的信号的回调。

  agnosticbin = kms_player_end_point_get_agnostic_for_pad (self, pad);

  if (agnosticbin != NULL) {

    GstAppSinkCallbacks callbacks;

    /* Create appsink */

    appsink = gst_element_factory_make ("appsink", NULL);

    appsrc = kms_player_end_point_add_appsrc (self, agnosticbin, appsink);

 

    g_object_set (appsink, "enable-last-sample", FALSE, "emit-signals", FALSE,

        "qos", FALSE, "max-buffers", 1, NULL);

 

    callbacks.eos = appsink_eos_cb;

    callbacks.new_preroll = appsink_new_preroll_cb;

    callbacks.new_sample = appsink_new_sample_cb;

    gst_app_sink_set_callbacks (

        GST_APP_SINK (appsink), &callbacks, appsrc, NULL);

 

    g_object_set_qdata_full (G_OBJECT (appsink), pts_quark (),

        kms_pts_data_new (), kms_pts_data_destroy);

 

    g_object_set_qdata (G_OBJECT (pad), appsink_quark (), appsink);

    g_object_set_qdata (G_OBJECT (pad), appsrc_quark (), appsrc);

  } else {

    GST_WARNING_OBJECT (self,

        "No supported pad: %" GST_PTR_FORMAT ". Connecting it to a fakesink",

        pad);

    appsink = gst_element_factory_make ("fakesink", NULL);

    // the fakesink is a black hole ,the src data is input but do noting and can

    // not out put .

  }

  g_object_set (appsink, "sync", TRUE, "async", TRUE, NULL);

  sinkpad = gst_element_get_static_pad (appsink, "sink");

  if (agnosticbin != NULL) {

    gst_pad_add_probe (sinkpad,

        GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM

            | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,

        appsink_event_query_probe, appsrc, NULL);

  }

  gst_bin_add (GST_BIN (self->priv->pipeline), appsink);

  link_ret = gst_pad_link (pad, sinkpad);

  if (GST_PAD_LINK_FAILED (link_ret)) {

    GST_ERROR ("Cannot link elements: %s to %s: %s",

        GST_ELEMENT_NAME (GST_PAD_PARENT (pad)),

        GST_ELEMENT_NAME (GST_PAD_PARENT (sinkpad)),

        gst_pad_link_get_name (link_ret));

  }

  g_object_unref (sinkpad);

  gst_element_sync_state_with_parent (appsink);

}

 

这段代码大致就是新建1个 agnosticbin的箱体,创建1个appsink和1个appsrc 的element.

根据uriendpoint创建的pad类型创建对应的agnosticbin,

如果找到了符合要求type类型的pad,则创建agnosticbin ,如果没有找到则创建1个finksink,类似于 /dev/null的黑洞。

创建agnosticbin 后,会对appsrc>agnosticbin>appsink依次进行连接。

其中

gst_pad_add_probe (sinkpad,

        GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM

            | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,

        appsink_event_query_probe, appsrc, NULL);

对数据流入进行检测和回调。

 

到此,整个playendpoint算是初始化完成,并且play后可以开始解析数据

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐