Skip to content

Commit

Permalink
Implements SyncPointQueue and CueAlignmentHandler
Browse files Browse the repository at this point in the history
SyncPointQueue manages all cue points and returns aligned cue
points to the callers (CueAlignmentHandlers).

CueAlignmentHandler is responsible for aligning cues from different
streams. It uses SyncPointQueue internally to align / synchronize the
cue points.

Issue: #355

Change-Id: I281fecb46a3ca7172d71e7495bdd07b8efdeb283
  • Loading branch information
vaage authored and kqyang committed Mar 27, 2018
1 parent 229bd77 commit e685c8a
Show file tree
Hide file tree
Showing 7 changed files with 1,163 additions and 2 deletions.
4 changes: 2 additions & 2 deletions packager/media/base/media_handler_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ MATCHER_P4(IsMediaSample, stream_index, timestamp, duration, encrypted, "") {
<< arg->media_sample->duration() << ","
<< BoolToString(arg->media_sample->is_encrypted()) << ")";
return arg->stream_index == stream_index &&
arg->media_sample->dts() == timestamp &&
arg->media_sample->duration() == duration &&
arg->media_sample->dts() == static_cast<int64_t>(timestamp) &&
arg->media_sample->duration() == static_cast<int64_t>(duration) &&
arg->media_sample->is_encrypted() == encrypted;
}

Expand Down
5 changes: 5 additions & 0 deletions packager/media/chunking/chunking.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
'sources': [
'chunking_handler.cc',
'chunking_handler.h',
'cue_alignment_handler.cc',
'cue_alignment_handler.h',
'sync_point_queue.cc',
'sync_point_queue.h',
],
'dependencies': [
'../base/media_base.gyp:media_base',
Expand All @@ -25,6 +29,7 @@
'type': '<(gtest_target_type)',
'sources': [
'chunking_handler_unittest.cc',
'cue_alignment_handler_unittest.cc',
],
'dependencies': [
'../../testing/gtest.gyp:gtest',
Expand Down
272 changes: 272 additions & 0 deletions packager/media/chunking/cue_alignment_handler.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd

#include "packager/media/chunking/cue_alignment_handler.h"

namespace shaka {
namespace media {
namespace {
// The max number of samples that are allowed to be buffered before we shutdown
// because there is likely a problem with the content or how the pipeline was
// configured. This is about 20 seconds of buffer for audio with 48kHz.
const size_t kMaxBufferSize = 1000;

double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
int64_t time_scale;
int64_t scaled_time;
switch (data.stream_data_type) {
case StreamDataType::kMediaSample:
time_scale = info.time_scale();
scaled_time = data.media_sample->pts();
break;
case StreamDataType::kTextSample:
// Text is always in MS but the stream info time scale is 0.
time_scale = 1000;
scaled_time = data.text_sample->start_time();
break;
default:
time_scale = 0;
scaled_time = 0;
NOTREACHED() << "TimeInSeconds should only be called on media samples "
"and text samples.";
break;
}

return static_cast<double>(scaled_time) / time_scale;
}
} // namespace

CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
: sync_points_(sync_points) {}

Status CueAlignmentHandler::InitializeInternal() {
sync_points_->AddThread();
stream_states_.resize(num_input_streams());
return Status::OK;
}

Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
switch (data->stream_data_type) {
case StreamDataType::kStreamInfo:
return OnStreamInfo(std::move(data));
case StreamDataType::kTextSample:
case StreamDataType::kMediaSample:
return OnSample(std::move(data));
default:
VLOG(3) << "Dropping unsupported data type "
<< static_cast<int>(data->stream_data_type);
return Status::OK;
}
}

Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
// We need to wait for all stream to flush before we can flush each stream.
// This allows cached buffers to be cleared and cues to be properly
// synchronized and set on all streams..

stream_states_[stream_index].to_be_flushed = true;
for (const StreamState& stream_state : stream_states_) {
if (!stream_state.to_be_flushed)
return Status::OK;
}

// |to_be_flushed| is set on all streams. We don't expect to see data in any
// buffers.
for (size_t i = 0; i < stream_states_.size(); ++i) {
StreamState& stream_state = stream_states_[i];
if (!stream_state.samples.empty()) {
LOG(WARNING) << "Unexpected data seen on stream " << i;
while (!stream_state.samples.empty()) {
Status status(Dispatch(std::move(stream_state.samples.front())));
if (!status.ok())
return status;
stream_state.samples.pop_front();
}
}
}
return FlushAllDownstreams();
}

Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
StreamState& stream_state = stream_states_[data->stream_index];
// Keep a copy of the stream info so that we can check type and check
// timescale.
stream_state.info = data->stream_info;
// Get the first hint for the stream. Use a negative hint so that if there is
// suppose to be a sync point at zero, we will still respect it.
stream_state.next_cue_hint = sync_points_->GetHint(-1);

return Dispatch(std::move(data));
}

Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
// There are two modes:
// 1. There is a video input.
// 2. There are no video inputs.
//
// When there is a video input, we rely on the video input get the next sync
// point and release all the samples.
//
// When there are no video inputs, we rely on the sync point queue to block
// us until there is a sync point.

const uint64_t stream_index = sample->stream_index;
StreamState& stream_state = stream_states_[stream_index];

if (stream_state.info->stream_type() == kStreamVideo) {
const double sample_time = TimeInSeconds(*stream_state.info, *sample);
if (sample->media_sample->is_key_frame() &&
sample_time >= stream_state.next_cue_hint) {
std::shared_ptr<const CueEvent> next_sync =
sync_points_->PromoteAt(sample_time);
if (!next_sync) {
LOG(ERROR)
<< "Failed to promote sync point at " << sample_time
<< ". This happens only if video streams are not GOP-aligned.";
return Status(error::INVALID_ARGUMENT,
"Streams are not properly GOP-aligned.");
}

Status status(DispatchCueEvent(stream_index, next_sync));
stream_state.cue.reset();
status.Update(UseNewSyncPoint(next_sync));
if (!status.ok())
return status;
}
return Dispatch(std::move(sample));
}

// Accept the sample. This will output it if it comes before the hint point or
// will cache it if it comes after the hint point.
Status status(AcceptSample(std::move(sample), &stream_state));
if (!status.ok()) {
return status;
}

// If all the streams are waiting on a hint, it means that none has next sync
// point determined. It also means that there are no video streams and we need
// to wait for all streams to converge on a hint so that we can get the next
// sync point.
if (EveryoneWaitingAtHint()) {
// All streams should have the same hint right now.
const double next_cue_hint = stream_state.next_cue_hint;
for (const StreamState& stream_state : stream_states_) {
DCHECK_EQ(next_cue_hint, stream_state.next_cue_hint);
}

std::shared_ptr<const CueEvent> next_sync =
sync_points_->GetNext(next_cue_hint);
DCHECK(next_sync);

Status status = UseNewSyncPoint(next_sync);
if (!status.ok()) {
return status;
}
}

return Status::OK;
}

Status CueAlignmentHandler::UseNewSyncPoint(
std::shared_ptr<const CueEvent> new_sync) {
const double new_hint = sync_points_->GetHint(new_sync->time_in_seconds);
DCHECK_GT(new_hint, new_sync->time_in_seconds);

Status status;
for (StreamState& stream_state : stream_states_) {
// No stream should be so out of sync with the others that they would
// still be working on an old cue.
if (stream_state.cue) {
LOG(ERROR) << "Found two cue events that are too close together. One at "
<< stream_state.cue->time_in_seconds << " and the other at "
<< new_sync->time_in_seconds;
return Status(error::INVALID_ARGUMENT, "Cue events too close together");
}

// Add the cue and update the hint. The cue will always be used over the
// hint, so hint should always be greater than the latest cue.
stream_state.cue = new_sync;
stream_state.next_cue_hint = new_hint;

while (status.ok() && !stream_state.samples.empty()) {
std::unique_ptr<StreamData>& sample = stream_state.samples.front();
const double sample_time_in_seconds =
TimeInSeconds(*stream_state.info, *sample);
if (sample_time_in_seconds >= stream_state.next_cue_hint) {
DCHECK(!stream_state.cue);
break;
}

const size_t stream_index = sample->stream_index;
if (stream_state.cue) {
status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds,
&stream_state));
}
status.Update(Dispatch(std::move(sample)));
stream_state.samples.pop_front();
}
}
return status;
}

bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
for (const StreamState& stream_state : stream_states_) {
if (stream_state.samples.empty()) {
return false;
}
}
return true;
}

// Accept Sample will either:
// 1. Send the sample downstream, as it comes before the next sync point and
// therefore can skip the bufferring.
// 2. Save the sample in the buffer as it comes after the next sync point.
Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
StreamState* stream_state) {
DCHECK(stream_state);

const size_t stream_index = sample->stream_index;
if (stream_state->samples.empty()) {
const double sample_time_in_seconds =
TimeInSeconds(*stream_state->info, *sample);
if (sample_time_in_seconds < stream_state->next_cue_hint) {
Status status;
if (stream_state->cue) {
status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds,
stream_state));
}
status.Update(Dispatch(std::move(sample)));
return status;
}
DCHECK(!stream_state->cue);
}

stream_state->samples.push_back(std::move(sample));
if (stream_state->samples.size() > kMaxBufferSize) {
LOG(ERROR) << "Stream " << stream_index << " has buffered "
<< stream_state->samples.size() << " when the max is "
<< kMaxBufferSize;
return Status(error::INVALID_ARGUMENT,
"Streams are not properly multiplexed.");
}

return Status::OK;
}

Status CueAlignmentHandler::DispatchCueIfNeeded(
size_t stream_index,
double next_sample_time_in_seconds,
StreamState* stream_state) {
DCHECK(stream_state->cue);
if (next_sample_time_in_seconds < stream_state->cue->time_in_seconds)
return Status::OK;
DCHECK_LT(stream_state->cue->time_in_seconds, stream_state->next_cue_hint);
return DispatchCueEvent(stream_index, std::move(stream_state->cue));
}

} // namespace media
} // namespace shaka
83 changes: 83 additions & 0 deletions packager/media/chunking/cue_alignment_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd

#include <list>

#include "packager/media/base/media_handler.h"
#include "packager/media/chunking/sync_point_queue.h"

namespace shaka {
namespace media {

/// The cue alignment handler is a N-to-N handler that will inject CueEvents
/// into all streams. It will align the cues across streams (and handlers)
/// using a shared SyncPointQueue.
///
/// There should be a cue alignment handler per demuxer/thread and not per
/// stream. A cue alignment handler must be one per thread in order to properly
/// manage blocking.
class CueAlignmentHandler : public MediaHandler {
public:
explicit CueAlignmentHandler(SyncPointQueue* sync_points);
~CueAlignmentHandler() = default;

private:
CueAlignmentHandler(const CueAlignmentHandler&) = delete;
CueAlignmentHandler& operator=(const CueAlignmentHandler&) = delete;

struct StreamState {
// Information for the stream.
std::shared_ptr<const StreamInfo> info;
// Cached samples that cannot be dispatched. All the samples should be at or
// after |hint|.
std::list<std::unique_ptr<StreamData>> samples;
// If set, the stream is pending to be flushed.
bool to_be_flushed = false;

// If set, it points to the next cue it has to send downstream. Note that if
// it is not set, the next cue is not determined.
// This is set but not really used by video stream.
std::shared_ptr<const CueEvent> cue;
// If |cue| is set, this is the hint for the cue after |cue|, i.e. next next
// cue; otherwise this is the hint for the next cue. This holds the time in
// seconds of the scheduled (unpromoted) next or next next cue's time. This
// is essentially the barrier for the sample stream. Non video samples after
// this barrier must wait until the |cue| is determined and thus the next
// hint to be determined; video samples will promote the hint to cue when
// seeing the first key frame after |next_cue_hint|.
double next_cue_hint = 0;
};

// MediaHandler overrides.
Status InitializeInternal() override;
Status Process(std::unique_ptr<StreamData> data) override;
Status OnFlushRequest(size_t stream_index) override;

// Internal handling functions for different stream data.
Status OnStreamInfo(std::unique_ptr<StreamData> data);
Status OnSample(std::unique_ptr<StreamData> sample);

// Update stream states with new sync point.
Status UseNewSyncPoint(std::shared_ptr<const CueEvent> new_sync);

// Check if everyone is waiting for new hint points.
bool EveryoneWaitingAtHint() const;

// Dispatch or save incoming sample.
Status AcceptSample(std::unique_ptr<StreamData> sample,
StreamState* stream_state);

// Dispatch the cue if the new sample comes at or after it.
Status DispatchCueIfNeeded(size_t stream_index,
double next_sample_time_in_seconds,
StreamState* stream_state);

SyncPointQueue* const sync_points_ = nullptr;
std::vector<StreamState> stream_states_;
};

} // namespace media
} // namespace shaka
Loading

0 comments on commit e685c8a

Please sign in to comment.