5

OpenHarmony 3.2 Beta多媒体系列—音视频播放Gstreamer

 1 year ago
source link: https://www.51cto.com/article/740687.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

OpenHarmony 3.2 Beta多媒体系列—音视频播放Gstreamer

作者:巴延兴 2022-11-25 16:39:03
本篇文章主要从PlayerServer播放服务开始分析音视频播放的流程,涉及到gstreamer引擎的调用,相对于多媒体播放框架来说,更加底层,便于熟悉从框架到gstreamer的整体流程。
89ac7fe54fec25fd91b600d1f23e11a253fbfe.png

​想了解更多关于开源的内容,请访问:​​

​51CTO 开源基础软件社区​

​https://ost.51cto.com​

多媒体播放框架主要的实现在PlayerServer服务中,这个服务提供了媒体播放框架所需要的实现环境,继续跟踪代码分析发现,PlayerServer主要通过gstreamer适配层,对gstreamer进行调用。gstreamer属于更加具体的实现,所以本篇文章主要是分析PlayerServer通过适配层调用到gstreamer的过程。
此前,我在《OpenHarmony 3.2 Beta多媒体系列-音视频播放框架》一文中,主要分析了多媒体播放的框架层代码,本地接口通过服务端的proxy代理类进行IPC调用,最终调用到PlayerServer服务端。本篇主要分析了多媒体gstreamer的调用,涉及到从PlayerServer到gstreamer的整体流程。

gstreamer
    ├── BUILD.gn
    ├── common
    │   ├── BUILD.gn
    │   ├── playbin_adapter
    │   │   ├── i_playbin_ctrler.h
    │   │   ├── playbin2_ctrler.cpp
    │   │   ├── playbin2_ctrler.h
    │   │   ├── playbin_ctrler_base.cpp
    │   │   ├── playbin_ctrler_base.h
    │   │   ├── playbin_msg_define.h
    │   │   ├── playbin_sink_provider.h
    │   │   ├── playbin_state.cpp
    │   │   ├── playbin_state.h
    │   │   ├── playbin_task_mgr.cpp
    │   │   └── playbin_task_mgr.h
    │   ├── state_machine
    │   │   ├── state_machine.cpp
    │   │   └── state_machine.h
    ├── factory
    │   ├── BUILD.gn
    │   └── engine_factory.cpp
    └── player
        ├── BUILD.gn
        ├── player_codec_ctrl.cpp
        ├── player_codec_ctrl.h
        ├── player_engine_gst_impl.cpp
        ├── player_engine_gst_impl.h
        ├── player_sinkprovider.cpp
        ├── player_sinkprovider.h
        ├── player_track_parse.cpp
        └── player_track_parse.h

目录主要是多媒体子系统中的engine部分,涉及到了gstreamer的适配层,gstreamer具体的实现是在third_party/gstreamer目录中。

三 、Gstreamer介绍

​1. 简介

Gstreamer是一个跨平台的多媒体框架,应用程序可以通过管道(Pipeline)的方式,将多媒体处理的各个步骤串联起来,达到预期的效果。每个步骤通过元素(Element)基于GObject对象系统通过插件(plugins)的方式实现,方便了各项功能的扩展。

resize,w_590,h_458

2.Gstreamer几个重要的概念

Element

Element是Gstreamer中最重要的对象类型之一。一个element实现一个功能(读取文件,解码,输出等),程序需要创建多个element,并按顺序将其串联起来,构成一个完整的Pipeline。

Pad是一个element的输入/输出接口,分为src pad(生产数据)和sink pad(消费数据)两种。两个element必须通过pad才能连接起来,pad拥有当前element能处理数据类型的能力(capabilities),会在连接时通过比较src pad和sink pad中所支持的能力,来选择最恰当的数据类型用于传输,如果element不支持,程序会直接退出。在element通过pad连接成功后,数据会从上一个element的src pad传到下一个element的sink pad然后进行处理。

Bin和Pipeline

Bin是一个容器,用于管理多个element,改变bin的状态时,bin会自动去修改所包含的element的状态,也会转发所收到的消息。如果没有bin,我们需要依次操作我们所使用的element。通过bin降低了应用的复杂度。Pipeline继承自bin,为程序提供一个bus用于传输消息,并且对所有子element进行同步。当将Pipeline的状态设置为PLAYING时,Pipeline会在一个/多个新的线程中通过element处理数据。

四、调用流程

OpenHarmony 3.2 Beta多媒体系列——音视频播放gstreamer-开源基础软件社区

五、源码分析

1、PrepareAsync分析

首先,在PlayerServer的PrepareAsync中会调用OnPrepare(false),具体是在OnPrepare(false)中实现,参数传入false,表明调用的是异步方法。
int32_t PlayerServer::PrepareAsync()
{
    std::lock_guard<std::mutex> lock(mutex_);
    MEDIA_LOGW("KPI-TRACE: PlayerServer PrepareAsync in");

    if (lastOpStatus_ == PLAYER_INITIALIZED || lastOpStatus_ == PLAYER_STOPPED) {
        return OnPrepare(false);
    } else {
        MEDIA_LOGE("Can not Prepare, currentState is %{public}s", GetStatusDescription(lastOpStatus_).c_str());
        return MSERR_INVALID_OPERATION;
    }
}

OnPrepare方法中,先通过playerEngine_调用SerVideoSurface的方法,将surface_设置到PlayerEngineGstImpl中(producerSurface_),接着启动一个任务,调用目前状态的Prepare()方法。

int32_t PlayerServer::OnPrepare(bool sync)
{
    CHECK_AND_RETURN_RET_LOG(playerEngine_ != nullptr, MSERR_NO_MEMORY, "playerEngine_ is nullptr");
    int32_t ret = MSERR_OK;
#ifdef SUPPORT_VIDEO
    if (surface_ != nullptr) {
        ret = playerEngine_->SetVideoSurface(surface_);
        CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_OPERATION, "Engine SetVideoSurface Failed!");
    }
#endif
    lastOpStatus_ = PLAYER_PREPARED;

    auto preparedTask = std::make_shared<TaskHandler<int32_t>>([this]() {
        MediaTrace::TraceBegin("PlayerServer::PrepareAsync", FAKE_POINTER(this));
        auto currState = std::static_pointer_cast<BaseState>(GetCurrState());
        return currState->Prepare();
    });

    ret = taskMgr_.LaunchTask(preparedTask, PlayerServerTaskType::STATE_CHANGE);
    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "Prepare launch task failed");

    if (sync) {
        (void)preparedTask->GetResult(); // wait HandlePrpare
    }
    return MSERR_OK;
}

进入Preparing状态后,会触发PlayerServer的HandlePrepare()方法被调用,在这个方法里会通过playerEngine_调用PrepareAsync方法,这个方法调用的是PlayerEngineGstImpl对应的PrepareAsync方法。

int32_t PlayerServer::HandlePrepare()
{
    int32_t ret = playerEngine_->PrepareAsync();
    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_OPERATION, "Server Prepare Failed!");
    if (config_.leftVolume <= 1.0f || config_.rightVolume <= 1.0f) {
        ret = playerEngine_->SetVolume(config_.leftVolume, config_.rightVolume);
        MEDIA_LOGD("Prepared SetVolume leftVolume:%{public}f rightVolume:%{public}f, ret:%{public}d", \
                   config_.leftVolume, config_.rightVolume, ret);
    }
    (void)playerEngine_->SetLooping(config_.looping);

    {
        auto rateTask = std::make_shared<TaskHandler<void>>([this]() {
            auto currState = std::static_pointer_cast<BaseState>(GetCurrState());
            (void)currState->SetPlaybackSpeed(config_.speedMode);
        });

        (void)taskMgr_.LaunchTask(rateTask, PlayerServerTaskType::RATE_CHANGE);
    }
    return MSERR_OK;
}

首先初始化playBinCtrler_,后续的操作都是通过PlayBinCtrlerBase对象来操作的,所以PlayBinCtrlerInit()方法会创建PlayBinCtrlerBase对象(playBinCtrler_),创建好以后通过playBinCtrler_进行SetSource和SetXXXListener的设置。

int32_t PlayerEngineGstImpl::PrepareAsync()

{
    std::unique_lock<std::mutex> lock(mutex_);
    MEDIA_LOGD("Prepare in");

    int32_t ret = PlayBinCtrlerInit();
    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_VAL, "PlayBinCtrlerInit failed");

    CHECK_AND_RETURN_RET_LOG(playBinCtrler_ != nullptr, MSERR_INVALID_VAL, "playBinCtrler_ is nullptr");
    ret = playBinCtrler_->PrepareAsync();
    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "PrepareAsync failed");

    // The duration of some resources without header information cannot be obtained.
    MEDIA_LOGD("Prepared ok out");
    return MSERR_OK;
}

初始化完成以后,接下来进行playBinCtrler_的PrepareAsync的调用,PlayBinCtrlerBase中的PrepareAsync的方法间接地调用了PrepareAsyncInternal。

int32_t PlayBinCtrlerBase::PrepareAsync()
{
    MEDIA_LOGD("enter");

    std::unique_lock<std::mutex> lock(mutex_);
    return PrepareAsyncInternal();
}

PrepareAsyncInternal首先判断当前的状态,如果是preparingState或preparedState,那么就直接返回成功,否则继续向下调用。接下来会调用EnterInitializedState(),这个方法中会创建playbin,设置signal的回调以及gstreamer参数的设置。最后调用目前状态的Prepare方法,此时的状态是InitializedState。

int32_t PlayBinCtrlerBase::PrepareAsyncInternal()
{
    if ((GetCurrState() == preparingState_) || (GetCurrState() == preparedState_)) {
        MEDIA_LOGI("already at preparing state, skip");
        return MSERR_OK;
    }

    CHECK_AND_RETURN_RET_LOG((!uri_.empty() || appsrcWrap_), MSERR_INVALID_OPERATION, "Set uri firsty!");

    int32_t ret = EnterInitializedState();
    CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);

    auto currState = std::static_pointer_cast<BaseState>(GetCurrState());
    ret = currState->Prepare();
    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "PrepareAsyncInternal failed");

    return MSERR_OK;
}

InitializedState的Prepare方法又通过ctrler_调回到PlayBinCtrlerBase的ChangeState方法,这个方法是在PlayBinCtrlerBase的父类StateMachine中,它是一个状态机,管理着各种状态的切换。

int32_t PlayBinCtrlerBase::InitializedState::Prepare()
{
    ctrler_.ChangeState(ctrler_.preparingState_);
    return MSERR_OK;
}

很多表示状态的类在PlayBinCtrlerBase中进行声明,这些子类的具体实现功能在playbin_state.cpp中。

private:
    class BaseState;
    class IdleState;
    class InitializedState;
    class PreparingState;
    class PreparedState;
    class PlayingState;
    class PausedState;
    class StoppedState;
    class StoppingState;
    class PlaybackCompletedState;

接下来看一下状态机的ChangeState方法,可以看出切换状态的时候,先调用切换前状态的StateExit()方法,再调用切换后状态的StateEnter()。如果需要一些操作,我们可以在状态的StateEnter和StateExit中进行。

void StateMachine::ChangeState(const std::shared_ptr<State> &state)
{
    ......
    if (currState_ != nullptr && currState_->GetStateName() == "stopping_state" && state->GetStateName() != "stopped_state") {
        return;
    }
    if (currState_) {
        currState_->StateExit();
    }
    currState_ = state;
    state->StateEnter();
}

因为上面切换状态调用的是ctrler_.ChangeState(ctrler_.preparingState_),所以接下来看一下PreparingState状态的StateEnter方法。这个方法中首先是调用了ctrler_.ReportMessage(msg),字面上看是用来上报msg信息的。

void PlayBinCtrlerBase::PreparingState::StateEnter()
{
    PlayBinMessage msg = { PLAYBIN_MSG_SUBTYPE, PLAYBIN_SUB_MSG_BUFFERING_START, 0, {} };
    ctrler_.ReportMessage(msg);

    GstStateChangeReturn ret;
    (void)ChangePlayBinState(GST_STATE_PAUSED, ret);

    MEDIA_LOGD("PreparingState::StateEnter finished");
}

ctrler_是PlayBinCtrlerBase类型的变量,直接看PlayBinCtrlerBase的ReportMessage方法,这个方法的核心,是创建一个任务后,将任务放入消息队列中,等待消息被处理,这里我们最想知道的是这个消息会在什么地方被处理。msgReportHandler创建了TaskHandler,这个里面会调用notifier_(msg),这里的notifier_比较重要,我们可以顺着这个变量向上分析。

void PlayBinCtrlerBase::ReportMessage(const PlayBinMessage &msg)
{
    ......
    auto msgReportHandler = std::make_shared<TaskHandler<void>>([this, msg]() { notifier_(msg); });
    int32_t ret = msgQueue_->EnqueueTask(msgReportHandler);
    if (ret != MSERR_OK) {
        MEDIA_LOGE("async report msg failed, type: %{public}d, subType: %{public}d, code: %{public}d",
                   msg.type, msg.subType, msg.code);
    };

    if (msg.type == PlayBinMsgType::PLAYBIN_MSG_EOS) {
        ProcessEndOfStream();
    }
}

notifier_是在PlayBinCtrlerBase被创建的时候赋值的。

PlayBinCtrlerBase::PlayBinCtrlerBase(const PlayBinCreateParam &createParam)
    : renderMode_(createParam.renderMode),
    notifier_(createParam.notifier),
    sinkProvider_(createParam.sinkProvider)
{
    MEDIA_LOGD("enter ctor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
}

在源码分析的前期PlayerEngineGstImpl初始化PlayBinCtrlerBase的时候进行了创建notifier = std::bind(&PlayerEngineGstImpl::OnNotifyMessage, this, std::placeholders::_1) notifier相当于是调用了PlayerEngineGstImpl::OnNotifyMessage方法。所以上述中的处理函数就是PlayerEngineGstImpl::OnNotifyMessage。

int32_t PlayerEngineGstImpl::PlayBinCtrlerPrepare()
{
    uint8_t renderMode = IPlayBinCtrler::PlayBinRenderMode::DEFAULT_RENDER;
    auto notifier = std::bind(&PlayerEngineGstImpl::OnNotifyMessage, this, std::placeholders::_1);

    {
        std::unique_lock<std::mutex> lk(trackParseMutex_);
        sinkProvider_ = std::make_shared<PlayerSinkProvider>(producerSurface_);
        sinkProvider_->SetAppInfo(appuid_, apppid_);
    }

    IPlayBinCtrler::PlayBinCreateParam createParam = {
        static_cast<IPlayBinCtrler::PlayBinRenderMode>(renderMode), notifier, sinkProvider_
    };
    playBinCtrler_ = IPlayBinCtrler::Create(IPlayBinCtrler::PlayBinKind::PLAYBIN2, createParam);
    ......
    return MSERR_OK;
}

在OnNotifyMessage中指定了各种消息类型对应的执行函数,上述代码中创建的Message类型是PLAYBIN_MSG_SUBTYPE,子类型为PLAYBIN_SUB_MSG_BUFFERING_START。

void PlayerEngineGstImpl::OnNotifyMessage(const PlayBinMessage &msg)
{
    const std::unordered_map<int32_t, MsgNotifyFunc> MSG_NOTIFY_FUNC_TABLE = {
        { PLAYBIN_MSG_ERROR, std::bind(&PlayerEngineGstImpl::HandleErrorMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_SEEKDONE, std::bind(&PlayerEngineGstImpl::HandleSeekDoneMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_SPEEDDONE, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_BITRATEDONE, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1)},
        { PLAYBIN_MSG_EOS, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_STATE_CHANGE, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_SUBTYPE, std::bind(&PlayerEngineGstImpl::HandleSubTypeMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_AUDIO_SINK, std::bind(&PlayerEngineGstImpl::HandleAudioMessage, this, std::placeholders::_1) },
        { PLAYBIN_MSG_POSITION_UPDATE, std::bind(&PlayerEngineGstImpl::HandlePositionUpdateMessage, this,
            std::placeholders::_1) },
    };
    if (MSG_NOTIFY_FUNC_TABLE.count(msg.type) != 0) {
        MSG_NOTIFY_FUNC_TABLE.at(msg.type)(msg);
    }
}

最终的流程走到了PlayerEngineGstImpl::HandleBufferingStart(),在这个方法中,主要通过obs_将format传给IPlayerEngineObs的OnInfo方法。

void PlayerEngineGstImpl::HandleBufferingStart()
{
    percent_ = 0;
    Format format;
(void)format.PutIntValue(std::string(PlayerKeys::PLAYER_BUFFERING_START), 0);
    std::shared_ptr<IPlayerEngineObs> notifyObs = obs_.lock();
    if (notifyObs != nullptr) {
        notifyObs->OnInfo(INFO_TYPE_BUFFERING_UPDATE, 0, format);
    }
}

我们重点看一下obs_是哪里设置的,在PlayerServer的初始化InitPlayEngine。shared_from_this()相当于是把PlayerServer自身赋值给obs,PlayerServer也是实现了IPlayerEngineObs对应的接口。

int32_t PlayerServer::InitPlayEngine(const std::string &url)
{
    ......
    int32_t ret = taskMgr_.Init();
    auto engineFactory = EngineFactoryRepo::Instance().GetEngineFactory(IEngineFactory::Scene::SCENE_PLAYBACK, url);
    
    playerEngine_ = engineFactory->CreatePlayerEngine(appUid_, appPid_);

    if (dataSrc_ == nullptr) {
        ret = playerEngine_->SetSource(url);
    } else {
        ret = playerEngine_->SetSource(dataSrc_);
    }

    std::shared_ptr<IPlayerEngineObs> obs = shared_from_this();
    ret = playerEngine_->SetObs(obs);

    lastOpStatus_ = PLAYER_INITIALIZED;
    ChangeState(initializedState_);

    return MSERR_OK;
}

这样我们就跟踪到了PlayerServer的OnInfo()方法。

void PlayerServer::OnInfo(PlayerOnInfoType type, int32_t extra, const Format &infoBody)
{
    std::lock_guard<std::mutex> lockCb(mutexCb_);

    int32_t ret = HandleMessage(type, extra, infoBody);
    if (playerCb_ != nullptr && ret == MSERR_OK) {
        playerCb_->OnInfo(type, extra, infoBody);
    }
}

2. Play分析

从PlayerServer开始跟踪,调用到PlayerServer的OnPlay()方法。

int32_t PlayerServer::Play()
{
    ......
    if (lastOpStatus_ == PLAYER_PREPARED || lastOpStatus_ == PLAYER_PLAYBACK_COMPLETE ||
        lastOpStatus_ == PLAYER_PAUSED) {
        return OnPlay();
    } else {
        return MSERR_INVALID_OPERATION;
    }
}

在OnPlay中会启动一个任务,在任务中获取当前的状态,然后调用当前状态的Play()方法。

int32_t PlayerServer::OnPlay()
{
    ......
    auto playingTask = std::make_shared<TaskHandler<void>>([this]() {
        auto currState = std::static_pointer_cast<BaseState>(GetCurrState());
        (void)currState->Play();
    });

    int ret = taskMgr_.LaunchTask(playingTask, PlayerServerTaskType::STATE_CHANGE);

    lastOpStatus_ = PLAYER_STARTED;
    return MSERR_OK;
}

前面调用了PrepareAsync,所以当前的状态是Prepared,调用到了PreparedState的Play()方法,这个方法还是按照之前Prepare的方式,调回到PlayerServer的HandlePlay()。

int32_t PlayerServer::PreparedState::Play()
{
    return server_.HandlePlay();
}

在PlayServer中通过播放引擎继续向下调用。

int32_t PlayerServer::HandlePlay()
{
    int32_t ret = playerEngine_->Play();
    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_OPERATION, "Engine Play Failed!");

    return MSERR_OK;
}

在PlayerEngineGstImpl的Play()方法会继续调用playBinCtrler_的Play()方法。

int32_t PlayerEngineGstImpl::Play()
{
    ......
    playBinCtrler_->Play();
    return MSERR_OK;
}

PlayBinCtrlerBase的Play()方法根据当前的State,调用currSate->Play()。

int32_t PlayBinCtrlerBase::Play()
{
    ......
    auto currState =    std::static_pointer_cast<BaseState>(GetCurrState());
    int32_t ret = currState->Play();

    return MSERR_OK;
}

在PreparedState的Play()方法中改变了PlayBin的状态为playing。

int32_t PlayBinCtrlerBase::PreparedState::Play()
{
    GstStateChangeReturn ret;
    return ChangePlayBinState(GST_STATE_PLAYING, ret);
}

ChangePlayBinState主要是调用了gst_element_set_state(GST_ELEMENT_CAST(ctrler_.playbin_),GST_STATE_PLAYING),这个直接调用了gstreamer三方库的实现,调用完这个方法以后,gstreamer就开始进行播放了。

int32_t PlayBinCtrlerBase::BaseState::ChangePlayBinState(GstState targetState, GstStateChangeReturn &ret)
{
    ......
    ret = gst_element_set_state(GST_ELEMENT_CAST(ctrler_.playbin_), targetState);
    if (ret == GST_STATE_CHANGE_FAILURE) {
        MEDIA_LOGE("Failed to change playbin's state to %{public}s", gst_element_state_get_name(targetState));
        return MSERR_INVALID_OPERATION;
    }
    return MSERR_OK;
}

本篇文章主要从PlayerServer播放服务开始分析音视频播放的流程,涉及到gstreamer引擎的调用,相对于多媒体播放框架来说,更加底层,便于熟悉从框架到gstreamer的整体流程。

​想了解更多关于开源的内容,请访问:​​

​51CTO 开源基础软件社区​

​https://ost.51cto.com​​。

责任编辑:jianghua 来源: 51CTO开源基础软件社区

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK