Skip to content

Commit

Permalink
Fix stuck player after seek
Browse files Browse the repository at this point in the history
Seeking was causing the player to hang in the following scenario:
1. The surfaceTexture's onFrameAvailableListener is called in
   ExternalTextureManager to notify that a new frame is available.
2. This call submits a task on the GL thread.
3. A seek is performed and DefaultVideoFrameProcessor.flush() is called
   before the task submitted in 2 is executed.
4. DefaultVideoFrameProcessor.flush() flushes the task executor, so that
   the task submitted in 2 never gets executed.
5. Once the seek is over, the first frame is registered and rendered on
   the surface texture.
6. Playback hangs because the onFrameAvailableListener is never called
   for this new frame. This is because surfaceTexture.updateTexImage()
   was never called on the frame that became available in 1.

This fix is making sure that the task submitted in 2 always gets executed.

Issue: #1535
PiperOrigin-RevId: 671389215
  • Loading branch information
kim-vde authored and copybara-github committed Sep 5, 2024
1 parent 6822818 commit a1d2310
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,14 +742,15 @@ public void flush() {
return;
}
try {
videoFrameProcessingTaskExecutor.flush();

// Flush from the end of the GlShaderProgram pipeline up to the start.
CountDownLatch latch = new CountDownLatch(1);
TextureManager textureManager = inputSwitcher.activeTextureManager();
textureManager.dropIncomingRegisteredFrames();
// Flush pending tasks to prevent any operation to be executed on the frames being processed
// before the flush operation.
videoFrameProcessingTaskExecutor.flush();
textureManager.releaseAllRegisteredFrames();
CountDownLatch latch = new CountDownLatch(1);
textureManager.setOnFlushCompleteListener(latch::countDown);

// Flush from the end of the GlShaderProgram pipeline up to the start.
videoFrameProcessingTaskExecutor.submit(finalShaderProgramWrapper::flush);
latch.await();
textureManager.setOnFlushCompleteListener(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@
private boolean repeatLastRegisteredFrame;

@Nullable private Future<?> forceSignalEndOfStreamFuture;
private boolean shouldRejectIncomingFrames;
@Nullable private CountDownLatch releaseAllFramesLatch;

private volatile boolean shouldRejectIncomingFrames;
@Nullable private volatile RuntimeException releaseAllFramesException;

/**
Expand Down Expand Up @@ -166,43 +166,13 @@ public ExternalTextureManager(
}
availableFrameCount++;
maybeQueueFrameToExternalShaderProgram();
}));
},
// Ensures the available frame is consumed even if the task executor is flushed
// before the submitted task is executed.
/* isCancellable= */ false));
surface = new Surface(surfaceTexture);
}

@Override
public void releaseAllRegisteredFrames() {
// Blocks the calling thread until all the registered frames are received and released.
CountDownLatch releaseAllFramesLatch = new CountDownLatch(1);
this.releaseAllFramesLatch = releaseAllFramesLatch;
videoFrameProcessingTaskExecutor.submit(
() -> {
try {
removeAllSurfaceTextureFrames();
} catch (RuntimeException e) {
releaseAllFramesException = e;
Log.e(TAG, "Failed to remove texture frames", e);
if (this.releaseAllFramesLatch != null) {
this.releaseAllFramesLatch.countDown();
}
}
});
try {
if (!releaseAllFramesLatch.await(SURFACE_TEXTURE_TIMEOUT_MS, MILLISECONDS)) {
Log.w(TAG, "Timeout reached while waiting for latch to be unblocked.");
}
} catch (InterruptedException e) {
// Not re-thrown to not crash frame processing. Frame process can likely continue even when
// not all rendered frames arrive.
Thread.currentThread().interrupt();
Log.w(TAG, "Interrupted when waiting for MediaCodec frames to arrive.");
}
this.releaseAllFramesLatch = null;
if (releaseAllFramesException != null) {
throw releaseAllFramesException;
}
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -279,7 +249,7 @@ public void registerInputFrame(FrameInfo frame) {
if (!repeatLastRegisteredFrame) {
pendingFrames.add(frame);
}
videoFrameProcessingTaskExecutor.submit(() -> shouldRejectIncomingFrames = false);
shouldRejectIncomingFrames = false;
}

/**
Expand Down Expand Up @@ -328,6 +298,44 @@ protected void flush() throws VideoFrameProcessingException {
super.flush();
}

@Override
public void dropIncomingRegisteredFrames() {
shouldRejectIncomingFrames = true;
}

@Override
public void releaseAllRegisteredFrames() {
// Blocks the calling thread until all the registered frames are received and released.
CountDownLatch releaseAllFramesLatch = new CountDownLatch(1);
this.releaseAllFramesLatch = releaseAllFramesLatch;
videoFrameProcessingTaskExecutor.submit(
() -> {
try {
removeAllSurfaceTextureFrames();
} catch (RuntimeException e) {
releaseAllFramesException = e;
Log.e(TAG, "Failed to remove texture frames", e);
if (this.releaseAllFramesLatch != null) {
this.releaseAllFramesLatch.countDown();
}
}
});
try {
if (!releaseAllFramesLatch.await(SURFACE_TEXTURE_TIMEOUT_MS, MILLISECONDS)) {
Log.w(TAG, "Timeout reached while waiting for latch to be unblocked.");
}
} catch (InterruptedException e) {
// Not re-thrown to not crash frame processing. Frame process can likely continue even when
// not all rendered frames arrive.
Thread.currentThread().interrupt();
Log.w(TAG, "Interrupted when waiting for MediaCodec frames to arrive.");
}
this.releaseAllFramesLatch = null;
if (releaseAllFramesException != null) {
throw releaseAllFramesException;
}
}

private void restartForceSignalEndOfStreamTimer() {
cancelForceSignalEndOfStreamTimer();
forceSignalEndOfStreamFuture =
Expand All @@ -353,6 +361,7 @@ private void forceSignalEndOfStream() {
// Reset because there could be further input streams after the current one ends.
currentInputStreamEnded = false;
currentFrame = null;
shouldRejectIncomingFrames = true;

// Frames could be made available while waiting for OpenGL to finish processing. That is,
// time out is triggered while waiting for the downstream shader programs to process a frame,
Expand All @@ -364,7 +373,6 @@ private void forceSignalEndOfStream() {
}

private void removeAllSurfaceTextureFrames() {
shouldRejectIncomingFrames = true;
while (availableFrameCount > 0) {
availableFrameCount--;
surfaceTexture.updateTexImage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ protected void flush() throws VideoFrameProcessingException {
}
}

/**
* Instructs the texture manager to drop any incoming {@linkplain #registerInputFrame(FrameInfo)
* registered} frame.
*/
public void dropIncomingRegisteredFrames() {}

/** Releases all previously {@linkplain #registerInputFrame(FrameInfo) registered} frames. */
public void releaseAllRegisteredFrames() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,20 @@ public VideoFrameProcessingTaskExecutor(
* Submits the given {@link Task} to be executed after all pending tasks have completed.
*
* <p>Can be called on any thread.
*
* @param task The task to submit.
* @param isCancellable Whether the task can be cancelled, for example if an error occurred in a
* previous task or if the executor is flushed.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void submit(Task task) {
public void submit(Task task, boolean isCancellable) {
@Nullable RejectedExecutionException executionException = null;
synchronized (lock) {
if (shouldCancelTasks) {
if (shouldCancelTasks && isCancellable) {
return;
}
try {
wrapTaskAndSubmitToExecutorService(task, /* isFlushOrReleaseTask= */ false);
wrapTaskAndSubmitToExecutorService(task, isCancellable);
} catch (RejectedExecutionException e) {
executionException = e;
}
Expand All @@ -113,6 +117,16 @@ public void submit(Task task) {
}
}

/**
* Submits the given {@link Task} to be executed after all pending tasks have completed.
*
* <p>Can be called on any thread.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void submit(Task task) {
submit(task, /* isCancellable= */ true);
}

/**
* Blocks the caller until the given {@link Task} has completed.
*
Expand Down Expand Up @@ -193,7 +207,7 @@ public void flush() throws InterruptedException {
}
latch.countDown();
},
/* isFlushOrReleaseTask= */ true);
/* isCancellable= */ false);
latch.await();
}

Expand All @@ -217,8 +231,7 @@ public void release(Task releaseTask) throws InterruptedException {
shouldCancelTasks = true;
highPriorityTasks.clear();
}
Future<?> unused =
wrapTaskAndSubmitToExecutorService(releaseTask, /* isFlushOrReleaseTask= */ true);
Future<?> unused = wrapTaskAndSubmitToExecutorService(releaseTask, /* isCancellable= */ false);
if (shouldShutdownExecutorService) {
singleThreadExecutorService.shutdown();
if (!singleThreadExecutorService.awaitTermination(
Expand Down Expand Up @@ -253,12 +266,12 @@ private boolean isRunningOnVideoFrameProcessingThread() throws InterruptedExcept
}

private Future<?> wrapTaskAndSubmitToExecutorService(
Task defaultPriorityTask, boolean isFlushOrReleaseTask) {
Task defaultPriorityTask, boolean isCancellable) {
return singleThreadExecutorService.submit(
() -> {
try {
synchronized (lock) {
if (shouldCancelTasks && !isFlushOrReleaseTask) {
if (shouldCancelTasks && isCancellable) {
return;
}
}
Expand Down

0 comments on commit a1d2310

Please sign in to comment.