Skip to content

Commit

Permalink
Handle CueEvent in TextChunker
Browse files Browse the repository at this point in the history
Updated TextChunker to handle incoming CueEvents. Connecting the
text chunker with the cue alignment handler will happen in a later
CL.

Issue: #362

Change-Id: Ib1fa9f457cf4ec0ce413dadcfa7eed5895ecd628
  • Loading branch information
vaage committed Mar 29, 2018
1 parent daac686 commit a11cbf9
Show file tree
Hide file tree
Showing 3 changed files with 547 additions and 278 deletions.
149 changes: 74 additions & 75 deletions packager/media/chunking/text_chunker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,119 +6,118 @@

#include "packager/media/chunking/text_chunker.h"

#include "packager/status_macros.h"

namespace shaka {
namespace media {
namespace {
const size_t kStreamIndex = 0;

std::shared_ptr<const SegmentInfo> MakeSegmentInfo(int64_t start_ms,
int64_t end_ms) {
DCHECK_LT(start_ms, end_ms);

std::shared_ptr<SegmentInfo> info = std::make_shared<SegmentInfo>();
info->start_timestamp = start_ms;
info->duration = end_ms - start_ms;

return info;
}
} // namespace

TextChunker::TextChunker(uint64_t segment_duration_ms)
: segment_duration_ms_(segment_duration_ms) {}
TextChunker::TextChunker(int64_t segment_duration_ms)
: segment_duration_ms_(segment_duration_ms),
segment_start_ms_(0),
segment_expected_end_ms_(segment_duration_ms) {}

Status TextChunker::InitializeInternal() {
return Status::OK;
}

Status TextChunker::Process(std::unique_ptr<StreamData> stream_data) {
switch (stream_data->stream_data_type) {
Status TextChunker::Process(std::unique_ptr<StreamData> data) {
switch (data->stream_data_type) {
case StreamDataType::kStreamInfo:
return DispatchStreamInfo(kStreamIndex,
std::move(stream_data->stream_info));
return OnStreamInfo(std::move(data->stream_info));
case StreamDataType::kTextSample:
return OnTextSample(stream_data->text_sample);
return OnTextSample(data->text_sample);
case StreamDataType::kCueEvent:
return OnCueEvent(data->cue_event);
default:
return Status(error::INTERNAL_ERROR,
"Invalid stream data type for this handler");
}
}

Status TextChunker::OnFlushRequest(size_t input_stream_index) {
// At this point we know that there is a single series of consecutive
// segments, all we need to do is run through all of them.
for (const auto& pair : segment_map_) {
Status status = DispatchSegmentWithSamples(pair.first, pair.second);

if (!status.ok()) {
return status;
}
// Keep outputting segments until all the samples leave the system.
while (segment_samples_.size()) {
RETURN_IF_ERROR(EndSegment(segment_expected_end_ms_));
}

segment_map_.clear();

return FlushAllDownstreams();
}

Status TextChunker::OnTextSample(std::shared_ptr<const TextSample> sample) {
const uint64_t start_segment = sample->start_time() / segment_duration_ms_;

// Find the last segment that overlaps the sample. Adjust the sample by one
// ms (smallest time unit) in case |EndTime| falls on the segment boundary.
DCHECK_GT(sample->duration(), 0u);
const uint64_t ending_segment =
(sample->EndTime() - 1) / segment_duration_ms_;

DCHECK_GE(ending_segment, start_segment);

// Samples must always be advancing. If a sample comes in out of order,
// skip the sample.
if (head_segment_ > start_segment) {
LOG(WARNING) << "New sample has arrived out of order. Skipping sample "
<< "as segment start is " << start_segment << " and segment "
<< "head is " << head_segment_ << ".";
return Status::OK;
}
Status TextChunker::OnStreamInfo(std::shared_ptr<const StreamInfo> info) {
// There is no information we need from the stream info, so just pass it
// downstream.
return DispatchStreamInfo(kStreamIndex, std::move(info));
}

// Add the sample to each segment it spans.
for (uint64_t segment = start_segment; segment <= ending_segment; segment++) {
segment_map_[segment].push_back(sample);
}
Status TextChunker::OnCueEvent(std::shared_ptr<const CueEvent> event) {
// We are going to cut the current segment into two using the event's time as
// the division.
const int64_t cue_time_in_ms = event->time_in_seconds * 1000;

// Move forward segment-by-segment so that we output empty segments to fill
// any segments with no cues.
for (uint64_t segment = head_segment_; segment < start_segment; segment++) {
auto it = segment_map_.find(segment);

Status status;
if (it == segment_map_.end()) {
const SegmentSamples kNoSamples;
status.Update(DispatchSegmentWithSamples(segment, kNoSamples));
} else {
// We found a segment, output all the samples. Remove it from the map as
// we should never need to write to it again.
status.Update(DispatchSegmentWithSamples(segment, it->second));
segment_map_.erase(it);
}

// If we fail to output a single sample, just stop.
if (!status.ok()) {
return status;
}
// In the case that there is a gap with no samples between the last sample
// and the cue event, output all the segments until we get to the segment that
// the cue event interrupts.
while (segment_expected_end_ms_ < cue_time_in_ms) {
RETURN_IF_ERROR(EndSegment(segment_expected_end_ms_));
}

// Jump ahead to the start of this segment as we should never have any samples
// start before |start_segment|.
head_segment_ = start_segment;
RETURN_IF_ERROR(EndSegment(cue_time_in_ms));
RETURN_IF_ERROR(DispatchCueEvent(kStreamIndex, std::move(event)));

return Status::OK;
}

Status TextChunker::DispatchSegmentWithSamples(uint64_t segment,
const SegmentSamples& samples) {
Status status;
for (const auto& sample : samples) {
status.Update(DispatchTextSample(kStreamIndex, sample));
Status TextChunker::OnTextSample(std::shared_ptr<const TextSample> sample) {
// Output all segments that come before our new sample.
while (segment_expected_end_ms_ <= sample->start_time()) {
RETURN_IF_ERROR(EndSegment(segment_expected_end_ms_));
}

// Only send the segment info if all the samples were successful.
if (!status.ok()) {
return status;
segment_samples_.push_back(std::move(sample));

return Status::OK;
}

Status TextChunker::EndSegment(int64_t segment_actual_end_ms) {
// Output all the samples that are part of the segment.
for (const auto& sample : segment_samples_) {
RETURN_IF_ERROR(DispatchTextSample(kStreamIndex, sample));
}

std::shared_ptr<SegmentInfo> info = std::make_shared<SegmentInfo>();
info->start_timestamp = segment * segment_duration_ms_;
info->duration = segment_duration_ms_;
RETURN_IF_ERROR(DispatchSegmentInfo(
kStreamIndex, MakeSegmentInfo(segment_start_ms_, segment_actual_end_ms)));

// Create a new segment that comes right after the old segment and remove all
// samples that don't cross over into the new segment.
StartNewSegment(segment_actual_end_ms);

return DispatchSegmentInfo(kStreamIndex, std::move(info));
return Status::OK;
}

void TextChunker::StartNewSegment(int64_t start_ms) {
segment_start_ms_ = start_ms;
segment_expected_end_ms_ = start_ms + segment_duration_ms_;

// Remove all samples that no longer overlap with the new segment.
segment_samples_.remove_if(
[start_ms](const std::shared_ptr<const TextSample>& sample) {
return sample->EndTime() <= start_ms;
});
}

} // namespace media
} // namespace shaka
34 changes: 16 additions & 18 deletions packager/media/chunking/text_chunker.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,41 @@
#ifndef PACKAGER_MEDIA_CHUNKING_TEXT_CHUNKER_H_
#define PACKAGER_MEDIA_CHUNKING_TEXT_CHUNKER_H_

#include <stdint.h>

#include <map>
#include <vector>
#include <list>

#include "packager/media/base/media_handler.h"
#include "packager/status.h"

namespace shaka {
namespace media {

class TextChunker : public MediaHandler {
public:
explicit TextChunker(uint64_t segment_duration_ms);

protected:
Status Process(std::unique_ptr<StreamData> stream_data) override;
Status OnFlushRequest(size_t input_stream_index) override;
explicit TextChunker(int64_t segment_duration_ms);

private:
TextChunker(const TextChunker&) = delete;
TextChunker& operator=(const TextChunker&) = delete;

using SegmentSamples = std::vector<std::shared_ptr<const TextSample>>;

Status InitializeInternal() override;

Status Process(std::unique_ptr<StreamData> stream_data) override;
Status OnFlushRequest(size_t input_stream_index) override;

Status OnStreamInfo(std::shared_ptr<const StreamInfo> info);
Status OnCueEvent(std::shared_ptr<const CueEvent> cue);
Status OnTextSample(std::shared_ptr<const TextSample> sample);

Status DispatchSegmentWithSamples(uint64_t segment,
const SegmentSamples& samples);
Status EndSegment(int64_t segment_actual_end_ms);
void StartNewSegment(int64_t start_ms);

uint64_t segment_duration_ms_;
int64_t segment_duration_ms_;

// Mapping of segment number to segment.
std::map<uint64_t, SegmentSamples> segment_map_;
uint64_t head_segment_ = 0;
// The segment that we are currently outputting samples for. The segment
// will end once a new sample with start time greater or equal to the
// segment's end time arrives.
int64_t segment_start_ms_;
int64_t segment_expected_end_ms_;
std::list<std::shared_ptr<const TextSample>> segment_samples_;
};

} // namespace media
Expand Down
Loading

0 comments on commit a11cbf9

Please sign in to comment.