Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor EventBus #3372

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0fd95a9
Refactor EventBus:
bbert Aug 19, 2020
c05dea2
Refactor EventBus: update unit tests
bbert Aug 19, 2020
155e84d
[temp] execute functional tests on this branch
bbert Aug 19, 2020
70ed08f
[temp] PR #3372 functional tests
bbert Aug 19, 2020
1c3e1f5
Refactor EventBus: fix MssHandler
bbert Aug 19, 2020
6a12802
Revert "[temp] execute functional tests on this branch"
bbert Aug 19, 2020
826f7d7
Revert "[temp] PR #3372 functional tests"
bbert Aug 19, 2020
f6e8d77
Register PLAYBACK_ENDED event with high priority in order to be notif…
bbert Aug 21, 2020
2891e99
Merge branch 'development' into refactor-eventbus
bbert Aug 31, 2020
f5985cd
fix build error
bbert Aug 31, 2020
60fd2c5
do not check stream id and media type in event handlers
bbert Aug 31, 2020
ebc5bbe
Merge branch 'development' into refactor-eventbus
bbert Sep 14, 2020
6f8526a
fix regression on SegmentBaseLoader
bbert Sep 15, 2020
8acb1a3
EventBus: add filters (streamId, mediaType...) parameter in trigger()…
bbert Sep 15, 2020
e9023ec
code style
bbert Sep 15, 2020
4f216ae
Merge branch 'development' into refactor-eventbus
bbert Oct 16, 2020
b246c89
run functional tests
bbert Nov 3, 2020
586bea5
run functional tests
bbert Nov 3, 2020
e6ddd34
run functional tests
bbert Nov 3, 2020
a8399df
Revert "run functional tests"
bbert Nov 4, 2020
d998748
Revert "run functional tests"
bbert Nov 4, 2020
d0e5625
Revert "run functional tests"
bbert Nov 4, 2020
ac4d1eb
Complete events trigger refactoring
bbert Nov 4, 2020
15ab819
NotFragmentedTextBufferController: add missing getStreamId() function
bbert Nov 4, 2020
8bf60e6
fix unit tests
bbert Nov 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/core/EventBus.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ function EventBus() {
priority: priority
};

if (scope && scope.getStreamId) {
handler.streamId = scope.getStreamId();
}
if (scope && scope.getType) {
handler.mediaType = scope.getType();
}

const inserted = handlers[type].some((item , idx) => {
if (item && priority > item.priority ) {
handlers[type].splice(idx, 0, handler);
Expand All @@ -75,17 +82,33 @@ function EventBus() {
handlers[type][idx] = null;
}

function trigger(type, payload) {
function trigger(type, payload = {}, streamId = undefined, mediaType = undefined) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are currently working on #3379 and would like to introduce an additional parameter in the trigger function as well. In order to align this and avoid a large constructor I suggest we make the third parameter and object {streamId,mediaType,additonalParameter}. That way we can enhance it even further later

Copy link
Contributor Author

@bbert bbert Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not enriching payload parameter with all these additional parameters? for example:

eventBus.trigger(Events.INIT_FRAGMENT_NEEDED, {
    streamId: streamInfo.id,
    mediaType: type,
    representationId: currentRepresentationInfo.id
});

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer having a third object in the function for all internal attributes which are not supposed to be dispatched to the client/app

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see
we may get heavier source code, but nevermind
So this could be a 3rd function parameter called 'filters'? Or this may contains other parameters that could have other utility than even filtering?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes basically everything which we can use for filtering and decision making when triggering the event. We will adjust the PR/request accordingly: #3379

if (!type || !handlers[type]) return;

payload = payload || {};

if (payload.hasOwnProperty('type')) throw new Error('\'type\' is a reserved word for event dispatching');

payload.type = type;
if (streamId) {
payload.streamId = streamId;
}
if (mediaType) {
payload.mediaType = mediaType;
}

handlers[type] = handlers[type].filter((item) => item);
handlers[type].forEach( handler => handler && handler.callback.call(handler.scope, payload) );
const eventHandlers = handlers[type].filter(item => {
if (streamId && item.streamId && item.streamId !== streamId) {
return false;
}
if (mediaType && item.mediaType && item.mediaType !== mediaType) {
return false;
}
return true;
});

eventHandlers.forEach(handler => handler && handler.callback.call(handler.scope, payload));
}

function getHandlerIdx(type, listener, scope) {
Expand Down
30 changes: 15 additions & 15 deletions src/dash/DashHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ function DashHandler(config) {
segmentsController.initialize(isDynamic);
}

function getStreamId() {
return streamInfo.id;
}

function getType() {
return type;
}
Expand Down Expand Up @@ -183,25 +187,20 @@ function DashHandler(config) {
dashMetrics.updateManifestUpdateInfo({presentationStartTime: liveEdge});
}

function onRepresentationUpdateStarted(eventObj) {
if (eventObj.sender.getType() !== getType()) return;

processRepresentation(eventObj.representation);
function onRepresentationUpdateStarted(e) {
processRepresentation(e.representation);
}

function processRepresentation(voRepresentation) {
const hasInitialization = voRepresentation.hasInitialization();
const hasSegments = voRepresentation.hasSegments();

//if representation has initialization and segments information, REPRESENTATION_UPDATE_COMPLETED can be triggered immediately
//otherwise, it means that a request has to be made to get initialization and/or segments informations
// If representation has initialization and segments information, REPRESENTATION_UPDATE_COMPLETED can be triggered immediately
// otherwise, it means that a request has to be made to get initialization and/or segments informations
if (hasInitialization && hasSegments) {
eventBus.trigger(events.REPRESENTATION_UPDATE_COMPLETED, {
sender: instance,
representation: voRepresentation
});
eventBus.trigger(events.REPRESENTATION_UPDATE_COMPLETED, { representation: voRepresentation }, streamInfo.id, type);
} else {
segmentsController.update(voRepresentation, getType(), selectedMimeType, hasInitialization, hasSegments);
segmentsController.update(voRepresentation, selectedMimeType, hasInitialization, hasSegments);
}
}

Expand Down Expand Up @@ -360,11 +359,11 @@ function DashHandler(config) {
const representation = e.representation;
if (!representation.segments) return;

eventBus.trigger(events.REPRESENTATION_UPDATE_COMPLETED, {sender: this, representation: representation});
eventBus.trigger(events.REPRESENTATION_UPDATE_COMPLETED, { representation: representation }, streamInfo.id, type);
}

function onSegmentsLoaded(e) {
if (e.error || (getType() !== e.mediaType)) return;
if (e.error) return;

const fragments = e.segments;
const representation = e.representation;
Expand Down Expand Up @@ -417,7 +416,7 @@ function DashHandler(config) {
return;
}

eventBus.trigger(events.REPRESENTATION_UPDATE_COMPLETED, {sender: this, representation: representation});
eventBus.trigger(events.REPRESENTATION_UPDATE_COMPLETED, { representation: representation }, streamInfo.id, type);
}

function onDynamicStreamCompleted() {
Expand All @@ -427,7 +426,8 @@ function DashHandler(config) {

instance = {
initialize: initialize,
getType: getType, //need to be public in order to be used by logger
getStreamId: getStreamId,
getType: getType,
getStreamInfo: getStreamInfo,
getInitRequest: getInitRequest,
getRequestForSegment: getRequestForSegment,
Expand Down
42 changes: 21 additions & 21 deletions src/dash/SegmentBaseLoader.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ function SegmentBaseLoader() {
}
}

function loadInitialization(representation, loadingInfo) {
function loadInitialization(streamId, mediaType, representation, loadingInfo) {
checkConfig();
let initRange = null;
const baseUrl = representation ? baseURLController.resolve(representation.path) : null;
Expand All @@ -151,7 +151,7 @@ function SegmentBaseLoader() {
searching: false,
bytesLoaded: 0,
bytesToLoad: 1500,
mediaType: representation && representation.adaptation ? representation.adaptation.type : null
mediaType: mediaType
};

logger.debug('Start searching for initialization.');
Expand All @@ -166,23 +166,23 @@ function SegmentBaseLoader() {
representation.range = initRange;
// note that we don't explicitly set rep.initialization as this
// will be computed when all BaseURLs are resolved later
eventBus.trigger(events.INITIALIZATION_LOADED, {representation: representation});
eventBus.trigger(events.INITIALIZATION_LOADED, { representation: representation }, streamId, mediaType);
} else {
info.range.end = info.bytesLoaded + info.bytesToLoad;
loadInitialization(representation, info);
loadInitialization(streamId, mediaType, representation, info);
}
};

const onerror = function () {
eventBus.trigger(events.INITIALIZATION_LOADED, {representation: representation});
eventBus.trigger(events.INITIALIZATION_LOADED, { representation: representation });
};

urlLoader.load({request: request, success: onload, error: onerror});

logger.debug('Perform init search: ' + info.url);
}

function loadSegments(representation, type, range, callback, loadingInfo) {
function loadSegments(streamId, mediaType, representation, range, callback, loadingInfo) {
checkConfig();
if (range && (range.start === undefined || range.end === undefined)) {
const parts = range ? range.toString().split('-') : null;
Expand All @@ -201,7 +201,7 @@ function SegmentBaseLoader() {
searching: !hasRange,
bytesLoaded: loadingInfo ? loadingInfo.bytesLoaded : 0,
bytesToLoad: 1500,
mediaType: representation && representation.adaptation ? representation.adaptation.type : null
mediaType: mediaType
};

const request = getFragmentRequest(info);
Expand All @@ -220,7 +220,7 @@ function SegmentBaseLoader() {
info.range.end = info.range.start + (sidx.size || extraBytes);
} else if (loadedLength < info.bytesLoaded) {
// if we have reached a search limit or if we have reached the end of the file we have to stop trying to find sidx
callback(null, representation, type);
callback(streamId, mediaType, null, representation);
return;
} else {
const lastBox = isoFile.getLastBox();
Expand All @@ -232,7 +232,7 @@ function SegmentBaseLoader() {
info.range.end += extraBytes;
}
}
loadSegments(representation, type, info.range, callback, info);
loadSegments(streamId, mediaType, representation, info.range, callback, info);
} else {
const ref = sidx.references;
let loadMultiSidx,
Expand Down Expand Up @@ -260,10 +260,10 @@ function SegmentBaseLoader() {
segs.sort(function (a, b) {
return a.startTime - b.startTime < 0 ? -1 : 0;
});
callback(segs, representation, type);
callback(streamId, mediaType, segs, representation);
}
} else {
callback(null, representation, type);
callback(streamId, mediaType, null, representation);
}
};

Expand All @@ -272,19 +272,19 @@ function SegmentBaseLoader() {
se = offset + ref[j].referenced_size - 1;
offset = offset + ref[j].referenced_size;
r = {start: ss, end: se};
loadSegments(representation, null, r, tmpCallback, info);
loadSegments(streamId, mediaType, representation, r, tmpCallback, info);
}

} else {
logger.debug('Parsing segments from SIDX. representation ' + representation.adaptation.type + ' - id: ' + representation.id + ' for range : ' + info.range.start + ' - ' + info.range.end);
logger.debug('Parsing segments from SIDX. representation ' + mediaType + ' - id: ' + representation.id + ' for range : ' + info.range.start + ' - ' + info.range.end);
segments = getSegmentsForSidx(sidx, info);
callback(segments, representation, type);
callback(streamId, mediaType, segments, representation);
}
}
};

const onerror = function () {
callback(null, representation, type);
callback(streamId, mediaType, null, representation);
};

urlLoader.load({request: request, success: onload, error: onerror});
Expand Down Expand Up @@ -340,12 +340,12 @@ function SegmentBaseLoader() {
return request;
}

function onLoaded(segments, representation, type) {
if (segments) {
eventBus.trigger(events.SEGMENTS_LOADED, {segments: segments, representation: representation, mediaType: type});
} else {
eventBus.trigger(events.SEGMENTS_LOADED, {segments: null, representation: representation, mediaType: type, error: new DashJSError(errors.SEGMENT_BASE_LOADER_ERROR_CODE, errors.SEGMENT_BASE_LOADER_ERROR_MESSAGE)});
}
function onLoaded(streamId, mediaType, segments, representation) {
eventBus.trigger(events.SEGMENTS_LOADED, {
segments: segments,
representation: representation,
error: segments ? undefined : new DashJSError(errors.SEGMENT_BASE_LOADER_ERROR_CODE, errors.SEGMENT_BASE_LOADER_ERROR_MESSAGE)
}, streamId, mediaType);
}

instance = {
Expand Down
41 changes: 14 additions & 27 deletions src/dash/WebmSegmentBaseLoader.js
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ function WebmSegmentBaseLoader() {
}
}

function loadInitialization(representation, loadingInfo) {
function loadInitialization(streamId, mediaType, representation, loadingInfo) {
checkConfig();
let request = null;
let baseUrl = representation ? baseURLController.resolve(representation.path) : null;
Expand All @@ -317,7 +317,7 @@ function WebmSegmentBaseLoader() {
request: request,
url: baseUrl ? baseUrl.url : undefined,
init: true,
mediaType: representation && representation.adaptation ? representation.adaptation.type : null
mediaType: mediaType
};

logger.info('Start loading initialization.');
Expand All @@ -327,15 +327,11 @@ function WebmSegmentBaseLoader() {
const onload = function () {
// note that we don't explicitly set rep.initialization as this
// will be computed when all BaseURLs are resolved later
eventBus.trigger(events.INITIALIZATION_LOADED, {
representation: representation
});
eventBus.trigger(events.INITIALIZATION_LOADED, { representation: representation }, streamId, mediaType);
};

const onloadend = function () {
eventBus.trigger(events.INITIALIZATION_LOADED, {
representation: representation
});
eventBus.trigger(events.INITIALIZATION_LOADED, { representation: representation }, streamId, mediaType);
};

urlLoader.load({
Expand All @@ -347,7 +343,7 @@ function WebmSegmentBaseLoader() {
logger.debug('Perform init load: ' + info.url);
}

function loadSegments(representation, type, theRange, callback) {
function loadSegments(streamId, mediaType, representation, theRange, callback) {
checkConfig();
let request = null;
let baseUrl = representation ? baseURLController.resolve(representation.path) : null;
Expand All @@ -363,7 +359,7 @@ function WebmSegmentBaseLoader() {
request: request,
url: media,
init: false,
mediaType: representation && representation.adaptation ? representation.adaptation.type : null
mediaType: mediaType
};

callback = !callback ? onLoaded : callback;
Expand All @@ -376,12 +372,12 @@ function WebmSegmentBaseLoader() {

const onload = function (response) {
parseEbmlHeader(response, media, theRange, function (segments) {
callback(segments, representation, type);
callback(streamId, mediaType, segments, representation);
});
};

const onloadend = function () {
callback(null, representation, type);
callback(streamId, mediaType, null, representation);
};

urlLoader.load({
Expand All @@ -391,21 +387,12 @@ function WebmSegmentBaseLoader() {
});
}

function onLoaded(segments, representation, type) {
if (segments) {
eventBus.trigger(events.SEGMENTS_LOADED, {
segments: segments,
representation: representation,
mediaType: type
});
} else {
eventBus.trigger(events.SEGMENTS_LOADED, {
segments: null,
representation: representation,
mediaType: type,
error: new DashJSError(errors.SEGMENT_BASE_LOADER_ERROR_CODE, errors.SEGMENT_BASE_LOADER_ERROR_MESSAGE)
});
}
function onLoaded(streamId, mediaType, segments, representation) {
eventBus.trigger(events.SEGMENTS_LOADED, {
segments: segments,
representation: representation,
error: segments ? undefined : new DashJSError(errors.SEGMENT_BASE_LOADER_ERROR_CODE, errors.SEGMENT_BASE_LOADER_ERROR_MESSAGE)
}, streamId, mediaType);
}

function getFragmentRequest(info) {
Expand Down
Loading