Skip to content

Commit

Permalink
Move RtspClient creation into RtspPeriod.
Browse files Browse the repository at this point in the history
RtspMediaSource uses the timeline update paradigm from ProgressiveMediaPeriod.

#minor-release

PiperOrigin-RevId: 378150758
  • Loading branch information
claincly authored and ojw28 committed Jun 8, 2021
1 parent 22b126c commit 1ca0efd
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ public interface PlaybackEventListener {
}

private final SessionInfoListener sessionInfoListener;
private final PlaybackEventListener playbackEventListener;
private final Uri uri;
@Nullable private final RtspAuthUserInfo rtspAuthUserInfo;
@Nullable private final String userAgent;
private final String userAgent;
private final ArrayDeque<RtpLoadInfo> pendingSetupRtpLoadInfos;
// TODO(b/172331505) Add a timeout monitor for pending requests.
private final SparseArray<RtspRequest> pendingRequests;
private final MessageSender messageSender;

private RtspMessageChannel messageChannel;
private @MonotonicNonNull PlaybackEventListener playbackEventListener;
@Nullable private String sessionId;
@Nullable private KeepAliveMonitor keepAliveMonitor;
@Nullable private RtspAuthenticationInfo rtspAuthenticationInfo;
Expand All @@ -123,12 +123,17 @@ public interface PlaybackEventListener {
* <p>Note: all method invocations must be made from the playback thread.
*
* @param sessionInfoListener The {@link SessionInfoListener}.
* @param userAgent The user agent that will be used if needed, or {@code null} for the fallback
* to use the default user agent of the underlying platform.
* @param playbackEventListener The {@link PlaybackEventListener}.
* @param userAgent The user agent.
* @param uri The RTSP playback URI.
*/
public RtspClient(SessionInfoListener sessionInfoListener, @Nullable String userAgent, Uri uri) {
public RtspClient(
SessionInfoListener sessionInfoListener,
PlaybackEventListener playbackEventListener,
String userAgent,
Uri uri) {
this.sessionInfoListener = sessionInfoListener;
this.playbackEventListener = playbackEventListener;
this.uri = RtspMessageUtil.removeUserInfo(uri);
this.rtspAuthUserInfo = RtspMessageUtil.parseUserInfo(uri);
this.userAgent = userAgent;
Expand Down Expand Up @@ -157,17 +162,10 @@ public void start() throws IOException {
messageSender.sendOptionsRequest(uri, sessionId);
}

/** Sets the {@link PlaybackEventListener} to receive playback events. */
public void setPlaybackEventListener(PlaybackEventListener playbackEventListener) {
this.playbackEventListener = playbackEventListener;
}

/**
* Triggers RTSP SETUP requests after track selection.
*
* <p>A {@link PlaybackEventListener} must be set via {@link #setPlaybackEventListener} before
* calling this method. All selected tracks (represented by {@link RtpLoadInfo}) must have valid
* transport.
* <p>All selected tracks (represented by {@link RtpLoadInfo}) must have valid transport.
*
* @param loadInfos A list of selected tracks represented by {@link RtpLoadInfo}.
*/
Expand Down Expand Up @@ -224,7 +222,7 @@ public void retryWithRtpTcp() {
receivedAuthorizationRequest = false;
rtspAuthenticationInfo = null;
} catch (IOException e) {
checkNotNull(playbackEventListener).onPlaybackError(new RtspPlaybackException(e));
playbackEventListener.onPlaybackError(new RtspPlaybackException(e));
}
}

Expand All @@ -237,7 +235,7 @@ public void registerInterleavedDataChannel(
private void continueSetupRtspTrack() {
@Nullable RtpLoadInfo loadInfo = pendingSetupRtpLoadInfos.pollFirst();
if (loadInfo == null) {
checkNotNull(playbackEventListener).onRtspSetupCompleted();
playbackEventListener.onRtspSetupCompleted();
return;
}
messageSender.sendSetupRequest(loadInfo.getTrackUri(), loadInfo.getTransport(), sessionId);
Expand All @@ -258,7 +256,7 @@ private void dispatchRtspError(Throwable error) {

if (hasUpdatedTimelineAndTracks) {
// Playback event listener must be non-null after timeline has been updated.
checkNotNull(playbackEventListener).onPlaybackError(playbackException);
playbackEventListener.onPlaybackError(playbackException);
} else {
sessionInfoListener.onSessionTimelineRequestFailed(nullToEmpty(error.getMessage()), error);
}
Expand Down Expand Up @@ -373,9 +371,7 @@ private RtspRequest getRequestWithCommonHeaders(
Uri uri) {
RtspHeaders.Builder headersBuilder = new RtspHeaders.Builder();
headersBuilder.add(RtspHeaders.CSEQ, String.valueOf(cSeq++));
if (userAgent != null) {
headersBuilder.add(RtspHeaders.USER_AGENT, userAgent);
}
headersBuilder.add(RtspHeaders.USER_AGENT, userAgent);

if (sessionId != null) {
headersBuilder.add(RtspHeaders.SESSION, sessionId);
Expand Down Expand Up @@ -574,9 +570,8 @@ private void onPlayResponseReceived(RtspPlayResponse response) {
keepAliveMonitor.start();
}

checkNotNull(playbackEventListener)
.onPlaybackStarted(
C.msToUs(response.sessionTiming.startTimeMs), response.trackTimingList);
playbackEventListener.onPlaybackStarted(
C.msToUs(response.sessionTiming.startTimeMs), response.trackTimingList);
pendingSeekPositionUs = C.TIME_UNSET;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.android.exoplayer2.source.TrackGroup;
import com.google.android.exoplayer2.source.TrackGroupArray;
import com.google.android.exoplayer2.source.rtsp.RtspClient.PlaybackEventListener;
import com.google.android.exoplayer2.source.rtsp.RtspClient.SessionInfoListener;
import com.google.android.exoplayer2.source.rtsp.RtspMediaSource.RtspPlaybackException;
import com.google.android.exoplayer2.trackselection.ExoTrackSelection;
import com.google.android.exoplayer2.trackselection.TrackSelection;
Expand All @@ -62,22 +63,31 @@
/** A {@link MediaPeriod} that loads an RTSP stream. */
/* package */ final class RtspMediaPeriod implements MediaPeriod {

/** Listener for information about the period. */
interface Listener {

/** Called when the {@link RtspSessionTiming} is available. */
void onSourceInfoRefreshed(RtspSessionTiming timing);
}

/** The maximum times to retry if the underlying data channel failed to bind. */
private static final int PORT_BINDING_MAX_RETRY_COUNT = 3;

private final Allocator allocator;
private final Handler handler;

private final InternalListener internalListener;
private final RtspClient rtspClient;
private final List<RtspLoaderWrapper> rtspLoaderWrappers;
private final List<RtpLoadInfo> selectedLoadInfos;
private final Listener listener;
private final RtpDataChannel.Factory rtpDataChannelFactory;

private @MonotonicNonNull Callback callback;
private @MonotonicNonNull ImmutableList<TrackGroup> trackGroups;
@Nullable private IOException preparationError;
@Nullable private RtspPlaybackException playbackException;

private long lastSeekPositionUs;
private long pendingSeekPositionUs;
private boolean loadingFinished;
private boolean released;
Expand All @@ -90,29 +100,31 @@
* Creates an RTSP media period.
*
* @param allocator An {@link Allocator} from which to obtain media buffer allocations.
* @param rtspTracks A list of tracks in an RTSP playback session.
* @param rtspClient The {@link RtspClient} for the current RTSP playback.
* @param rtpDataChannelFactory A {@link RtpDataChannel.Factory} for {@link RtpDataChannel}.
* @param uri The RTSP playback {@link Uri}.
* @param listener A {@link Listener} to receive session information updates.
*/
public RtspMediaPeriod(
Allocator allocator,
List<RtspMediaTrack> rtspTracks,
RtspClient rtspClient,
RtpDataChannel.Factory rtpDataChannelFactory) {
RtpDataChannel.Factory rtpDataChannelFactory,
Uri uri,
Listener listener,
String userAgent) {
this.allocator = allocator;
handler = Util.createHandlerForCurrentLooper();
this.rtpDataChannelFactory = rtpDataChannelFactory;
this.listener = listener;

handler = Util.createHandlerForCurrentLooper();
internalListener = new InternalListener();
rtspLoaderWrappers = new ArrayList<>(rtspTracks.size());
this.rtspClient = rtspClient;
this.rtspClient.setPlaybackEventListener(internalListener);
rtspClient =
new RtspClient(
/* sessionInfoListener= */ internalListener,
/* playbackEventListener= */ internalListener,
/* userAgent= */ userAgent,
/* uri= */ uri);
rtspLoaderWrappers = new ArrayList<>();
selectedLoadInfos = new ArrayList<>();

for (int i = 0; i < rtspTracks.size(); i++) {
RtspMediaTrack rtspMediaTrack = rtspTracks.get(i);
rtspLoaderWrappers.add(
new RtspLoaderWrapper(rtspMediaTrack, /* trackId= */ i, rtpDataChannelFactory));
}
selectedLoadInfos = new ArrayList<>(rtspTracks.size());
pendingSeekPositionUs = C.TIME_UNSET;
}

Expand All @@ -121,15 +133,19 @@ public void release() {
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
rtspLoaderWrappers.get(i).release();
}
Util.closeQuietly(rtspClient);
released = true;
}

@Override
public void prepare(Callback callback, long positionUs) {
this.callback = callback;

for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
rtspLoaderWrappers.get(i).startLoading();
try {
rtspClient.start();
} catch (IOException e) {
preparationError = e;
Util.closeQuietly(rtspClient);
}
}

Expand Down Expand Up @@ -233,6 +249,7 @@ public long seekToUs(long positionUs) {
return positionUs;
}

lastSeekPositionUs = positionUs;
pendingSeekPositionUs = positionUs;
rtspClient.seekToUs(positionUs);
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
Expand All @@ -256,14 +273,19 @@ public long getBufferedPositionUs() {
return pendingSeekPositionUs;
}

long bufferedPositionUs = rtspLoaderWrappers.get(0).sampleQueue.getLargestQueuedTimestampUs();
for (int i = 1; i < rtspLoaderWrappers.size(); i++) {
bufferedPositionUs =
min(
bufferedPositionUs,
checkNotNull(rtspLoaderWrappers.get(i)).sampleQueue.getLargestQueuedTimestampUs());
boolean allLoaderWrappersAreCanceled = true;
long bufferedPositionUs = Long.MAX_VALUE;
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i);
if (!loaderWrapper.canceled) {
bufferedPositionUs = min(bufferedPositionUs, loaderWrapper.getBufferedPositionUs());
allLoaderWrappersAreCanceled = false;
}
}
return bufferedPositionUs;

return allLoaderWrappersAreCanceled || bufferedPositionUs == Long.MIN_VALUE
? lastSeekPositionUs
: bufferedPositionUs;
}

@Override
Expand Down Expand Up @@ -386,6 +408,7 @@ private final class InternalListener
implements ExtractorOutput,
Loader.Callback<RtpDataLoadable>,
UpstreamFormatChangedListener,
SessionInfoListener,
PlaybackEventListener {

// ExtractorOutput implementation.
Expand Down Expand Up @@ -515,7 +538,7 @@ public void onPlaybackError(RtspPlaybackException error) {
/** Handles the {@link Loadable} whose {@link RtpDataChannel} timed out. */
private LoadErrorAction handleSocketTimeout(RtpDataLoadable loadable) {
// TODO(b/172331505) Allow for retry when loading is not ending.
if (getBufferedPositionUs() == Long.MIN_VALUE) {
if (getBufferedPositionUs() == 0) {
if (!isUsingRtpTcp) {
// Retry playback with TCP if no sample has been received so far, and we are not already
// using TCP. Retrying will setup new loadables, so will not retry with the current
Expand All @@ -533,9 +556,27 @@ private LoadErrorAction handleSocketTimeout(RtpDataLoadable loadable) {
break;
}
}
playbackException = new RtspPlaybackException("Unknown loadable timed out.");
return Loader.DONT_RETRY;
}

@Override
public void onSessionTimelineUpdated(
RtspSessionTiming timing, ImmutableList<RtspMediaTrack> tracks) {
for (int i = 0; i < tracks.size(); i++) {
RtspMediaTrack rtspMediaTrack = tracks.get(i);
RtspLoaderWrapper loaderWrapper =
new RtspLoaderWrapper(rtspMediaTrack, /* trackId= */ i, rtpDataChannelFactory);
loaderWrapper.startLoading();
rtspLoaderWrappers.add(loaderWrapper);
}

listener.onSourceInfoRefreshed(timing);
}

@Override
public void onSessionTimelineRequestFailed(String message, @Nullable Throwable cause) {
preparationError = cause == null ? new IOException(message) : new IOException(message, cause);
}
}

private void retryWithRtpTcp() {
Expand Down Expand Up @@ -632,6 +673,14 @@ public RtspLoaderWrapper(
sampleQueue.setUpstreamFormatChangeListener(internalListener);
}

/**
* Returns the largest buffered position in microseconds; or {@link Long#MIN_VALUE} if no sample
* has been queued.
*/
public long getBufferedPositionUs() {
return sampleQueue.getLargestQueuedTimestampUs();
}

/** Starts loading. */
public void startLoading() {
loader.startLoading(
Expand Down
Loading

0 comments on commit 1ca0efd

Please sign in to comment.