Kurento 源码解析系列(2)- PlayerEndpoint的初始化过程
Kurento 源码解析系列(2)- PlayerEndpoint的初始化一个将源项目各个子项目整合并编译以后的工程地址https://github.com/ywcai/JKms.gitPlayerEndpoint.cpp 初始化过程代码分析回顾 PlayerEndpointImpl.cpp初始化的过程,会先初始化他的父类MediaElementImpl.cpp以及UriEndpointImpl.
一个将源项目各个子项目整合并编译以后的工程地址
https://github.com/ywcai/JKms.git
PlayerEndpoint.cpp 初始化过程代码分析
回顾 PlayerEndpointImpl.cpp初始化的过程,会先初始化他的父类MediaElementImpl.cpp以及UriEndpointImpl.cpp.其中MediaELement.CPP会作最重要的c层gstreamer插件的初始化工作:
element = gst_element_factory_make(factoryName.c_str(), nullptr);
返回与之相对应的element。
同时将element添加到了1个pipeline中,这个pipeline我们在java的调用方法中知道,是需要在构造参数中传入过来的,因此CPP层也是一样,由构造方法中传入。
而在UriEndpointImpl.cpp的构造方法中,则对url进行了存储和校验. 其中在removeDuplicateSlashes()方法中,对本地文件和rtps/http的路径进行了区别处理。
上一章讲到了PlayerEndpoint.CPP 执行完成后,最终设置了当前模拟对象所在容器pipeline的状态为Playing,并且发送了信号让CPP层知道状态被改变,并出发相应的回调。具体回调的位置和方法:
void
UriEndpointImpl::postConstructor ()
{
EndpointImpl::postConstructor ();
stateChangedHandlerId = register_signal_handler (G_OBJECT (element),
"state-changed",
std::function <void (GstElement *, guint) > (std::bind (
&UriEndpointImpl::stateChanged, this,
std::placeholders::_2) ),
std::dynamic_pointer_cast<UriEndpointImpl> (shared_from_this() ) );
}
同时也触发了C层state_changed回调,相关代码如下:
kms_uri_endpoint_signals[STATE_CHANGED] =
g_signal_new ("state-changed",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (KmsUriEndpointClass, state_changed), NULL, NULL,
g_cclosure_marshal_VOID__ENUM, G_TYPE_NONE, 1,
KMS_TYPE_URI_ENDPOINT_STATE);
其中注意这里是state_changed方法而不是change-state方法了。G_SIGNAL_RUN_LAST参数指的是在所有连接的回调运行完成后调用。运行其他connect信号回调的方法,先看connect信号连接的下面在CPP层中回调:
继续回到UriEndpointImpl.cpp,可以搜索到构造方法中订阅state-changed信号后回调的 stateChanged的方法,代码如下:
void
UriEndpointImpl::stateChanged (guint new_state)
{
state = wrap_c_state ( (KmsUriEndpointState) new_state);
try {
UriEndpointStateChanged event (shared_from_this (),
UriEndpointStateChanged::getName (), state);
sigcSignalEmit(signalUriEndpointStateChanged, event);
} catch (const std::bad_weak_ptr &e) {
// shared_from_this()
GST_ERROR ("Bug creating %s: %s",
UriEndpointStateChanged::getName ().c_str (), e.what ());
}
}
这里首先是执行了一个切换状态的方法,然后将触发1个事件通知给业务层面的客户端,最终返回给JAVA层的RPC。因此接下来CPP中最重要的就是wrap_c_state ()方法。
再看g_singe_new中的方法, G_STRUCT_OFFSET (KmsUriEndpointClass, state_changed),运行 KmsUriEndpointClass指针对象中的 state_changed方法。
但是kmsuriendpoint.c中的该类方法并没有实现,其子类也没有任何实现,并且也没有被初始化,因此 kmsuriendpoint.c中接下来不会在处理该信号。
但在全局搜索所有“state-changed”, 可以发现其子类有单独connect了该信号, 但都是在 test相关的方法中进行的调用,所以不用再理会。
回到CPP层中的 wrap_c_state (),代码如下:
static std::shared_ptr<UriEndpointState>
wrap_c_state (KmsUriEndpointState state)
{
UriEndpointState::type type;
switch (state) {
case KMS_URI_ENDPOINT_STATE_STOP:
type = UriEndpointState::type::STOP;
break;
case KMS_URI_ENDPOINT_STATE_START:
type = UriEndpointState::type::START;
break;
case KMS_URI_ENDPOINT_STATE_PAUSE:
type = UriEndpointState::type::PAUSE;
break;
default:
GST_ERROR ("Invalid state value %d", state);
type = UriEndpointState::type::STOP;
}
std::shared_ptr<UriEndpointState> uriState (new UriEndpointState (type) );
return uriState;
}
最后一行,初始化了1个新的UriEndpointSate并赋给了uriState这个共享指针。
这样,所有kurento代码中的处理就完成了。接下来则是到用的gstream中的组件的自动处理。
Playendpoint.cpp初始化中,他的C层的element和相关bin、pad 的结构,如何初始化,如何连接。 继续kmsplayerendpoint.c这个自定义插件的初始化过程。
static void
kms_player_endpoint_class_init (KmsPlayerEndpointClass *klass)
{
.....
//类结构的初始化,主要是初始化相关方法和安装属性。
}
主要关注的是模拟对象的初始化方法,关键的element\bin\pad,都在这里初始化
static void
kms_player_endpoint_init (KmsPlayerEndpoint *self)
{
GstBus *bus;
self->priv = KMS_PLAYER_ENDPOINT_GET_PRIVATE (self);
g_mutex_init (&self->priv->base_time_mutex);
self->priv->base_time = GST_CLOCK_TIME_NONE;
self->priv->base_time_preroll = GST_CLOCK_TIME_NONE;
self->priv->loop = kms_loop_new ();
self->priv->pipeline = gst_pipeline_new ("internalpipeline");
self->priv->uridecodebin = gst_element_factory_make ("uridecodebin", NULL);
self->priv->network_cache = NETWORK_CACHE_DEFAULT;
self->priv->port_range = g_strdup (PORT_RANGE_DEFAULT);
self->priv->stats.probes = kms_list_new_full (
g_direct_equal, g_object_unref, (GDestroyNotify) kms_stats_probe_destroy);
/* Connect to signals */
g_signal_connect (self->priv->uridecodebin, "pad-added",
G_CALLBACK (kms_player_endpoint_uridecodebin_pad_added), self);
g_signal_connect (self->priv->uridecodebin, "pad-removed",
G_CALLBACK (kms_player_endpoint_uridecodebin_pad_removed), self);
g_signal_connect (self->priv->uridecodebin, "source-setup",
G_CALLBACK (kms_player_endpoint_uridecodebin_source_setup), self);
g_signal_connect (self->priv->uridecodebin, "element-added",
G_CALLBACK (kms_player_endpoint_uridecodebin_element_added), self);
/* Eat all async messages such as buffering messages */
bus = gst_pipeline_get_bus (GST_PIPELINE (self->priv->pipeline));
gst_bus_add_watch (bus, (GstBusFunc) process_bus_message, self);
g_object_set (self->priv->uridecodebin, "download", TRUE, NULL);
gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->uridecodebin);
gst_bus_set_sync_handler (bus, bus_sync_signal_handler, self, NULL);
g_object_unref (bus);
}
self->priv->pipeline = gst_pipeline_new ("internalpipeline");
self->priv->uridecodebin = gst_element_factory_make ("uridecodebin", NULL);
创建了1个internalpipeline 对象和uridecodebin的对象。
Pipeline是管道,bin箱体,箱体上是具体的element和pipeline, 而pipeline和bin都同时也是element的子类,具备其相关特性。Element可以强制转化为bin,bin中必须要有element才可以进行数据流动,且要流动数据的element 必须绑定在1个element 上。
gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->uridecodebin);
这句代码是将当前的pipeline转换为bin, 然后将uridecodebin 添加到这个bin里面, 将正常创建一个uridecodebin,创建uridecodebin 后将同步触发下面相应的回调方法。在uridecodebin 内部会根据协议不同动态添加不同的element.
在kms_player_endpoint_uridecodebin_element_added()加入日志方法,通过打印日志可以知道,这个方法在gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->uridecodebin) 执行后,如果是http 的资源会被调用4次,如果是RTSP的资源则只会被调2次。
如果添加的是RTSP的相关资源
if (g_strcmp0 (gst_plugin_feature_get_name (
GST_PLUGIN_FEATURE (gst_element_get_factory (element))),
RTSPSRC)== 0) {
g_object_set (G_OBJECT (element), "latency", self->priv->network_cache,
"drop-on-latency", TRUE, "port-range", self->priv->port_range, NULL);
}
latency:设置element的延时时间;
Port-range,设置访问RTSP本地分配的rtp端口;
drop-on-latency:超过jitterbuffer缓存区范围是否丢弃;这里是选择丢掉。
添加后,还会添加一个decodebin0的element,可通过打印日志查看。 可以知道这个element 可以进行解码。并且也是1个bin。这个element被添加时,回调方法什么也不会作做。
当rtspsrc 被创建时,kms_player_endpoint_uridecodebin_source_setup() 方法会同时也被调用1次,根据名字可以知道,一个 虽然src 的标签创建了会触发这个回调。
srcpad = gst_element_get_static_pad (source, "src");
if (srcpad == NULL) {
GST_WARNING_OBJECT (self,
"Skip setting latency probe, no src pad in %" GST_PTR_FORMAT " (%s)",
uridecodebin, G_OBJECT_TYPE_NAME ());
return;
}
该方法会获取source的pad赋值给srcpad,然后对srcpad进行 buffer 原始数据进行探测,并进行回调处理。但在这里初始化的时候, 这个source的PAD实际上并没有被创建,因此数据检测并没有被开启相关回调 。
当我们检测到 RTSPSRC最后获取到数据时,它就会自动生成source pad,并且触发“pad-added”信号。这kms_player_endpoint_uridecodebin_pad_added() 方法就会被调用了 , 方法代码如下:
static void
kms_player_endpoint_uridecodebin_pad_added (
GstElement *element, GstPad *pad, KmsPlayerEndpoint *self)
{
GstElement *appsink, *appsrc;
GstElement *agnosticbin;
GstPad *sinkpad;
GstPadLinkReturn link_ret;
// 下面 这个方法,一路查到底,生成了1个接受数据的tee对象, 如果不是音视频数据,则连接到1个默认的fakesink。如果是音视频数据,则返回对象,并注册1个转码的信号的回调。
agnosticbin = kms_player_end_point_get_agnostic_for_pad (self, pad);
if (agnosticbin != NULL) {
GstAppSinkCallbacks callbacks;
/* Create appsink */
appsink = gst_element_factory_make ("appsink", NULL);
appsrc = kms_player_end_point_add_appsrc (self, agnosticbin, appsink);
g_object_set (appsink, "enable-last-sample", FALSE, "emit-signals", FALSE,
"qos", FALSE, "max-buffers", 1, NULL);
callbacks.eos = appsink_eos_cb;
callbacks.new_preroll = appsink_new_preroll_cb;
callbacks.new_sample = appsink_new_sample_cb;
gst_app_sink_set_callbacks (
GST_APP_SINK (appsink), &callbacks, appsrc, NULL);
g_object_set_qdata_full (G_OBJECT (appsink), pts_quark (),
kms_pts_data_new (), kms_pts_data_destroy);
g_object_set_qdata (G_OBJECT (pad), appsink_quark (), appsink);
g_object_set_qdata (G_OBJECT (pad), appsrc_quark (), appsrc);
} else {
GST_WARNING_OBJECT (self,
"No supported pad: %" GST_PTR_FORMAT ". Connecting it to a fakesink",
pad);
appsink = gst_element_factory_make ("fakesink", NULL);
// the fakesink is a black hole ,the src data is input but do noting and can
// not out put .
}
g_object_set (appsink, "sync", TRUE, "async", TRUE, NULL);
sinkpad = gst_element_get_static_pad (appsink, "sink");
if (agnosticbin != NULL) {
gst_pad_add_probe (sinkpad,
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM
| GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
appsink_event_query_probe, appsrc, NULL);
}
gst_bin_add (GST_BIN (self->priv->pipeline), appsink);
link_ret = gst_pad_link (pad, sinkpad);
if (GST_PAD_LINK_FAILED (link_ret)) {
GST_ERROR ("Cannot link elements: %s to %s: %s",
GST_ELEMENT_NAME (GST_PAD_PARENT (pad)),
GST_ELEMENT_NAME (GST_PAD_PARENT (sinkpad)),
gst_pad_link_get_name (link_ret));
}
g_object_unref (sinkpad);
gst_element_sync_state_with_parent (appsink);
}
这段代码大致就是新建1个 agnosticbin的箱体,创建1个appsink和1个appsrc 的element.
根据uriendpoint创建的pad类型创建对应的agnosticbin,
如果找到了符合要求type类型的pad,则创建agnosticbin ,如果没有找到则创建1个finksink,类似于 /dev/null的黑洞。
创建agnosticbin 后,会对appsrc>agnosticbin>appsink依次进行连接。
其中
gst_pad_add_probe (sinkpad,
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM
| GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
appsink_event_query_probe, appsrc, NULL);
对数据流入进行检测和回调。
到此,整个playendpoint算是初始化完成,并且play后可以开始解析数据
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)