Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BQIO to a single scheduled executor service reduce threads #23234

Merged

Conversation

johnjcasey
Copy link
Contributor

fixes #21368

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@johnjcasey johnjcasey changed the title Update to a single scheduled executor service reduce threads Update BQIO to a single scheduled executor service reduce threads Sep 14, 2022
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@johnjcasey
Copy link
Contributor Author

run java precommit

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kileys for label java.
R: @pabloem for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @kileys @pabloem

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@johnjcasey
Copy link
Contributor Author

Run Java_Examples_Dataflow_Java17 PreCommit

@johnjcasey
Copy link
Contributor Author

r: @lukecwik can you take a look at this? I believe we will need to have a separate executor service to preserve backwards compatibility, because if someone provides a non-scheduled executor service then BQIO will fail

*/

return Executors.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was your reasoning around choosing this as the minimum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dug into the default configuration in BQ. When we set the minimum to 0, threads weren't firing, so I looked to the known working configuration in BQ, and used that

*/
@JsonIgnore
@Description(
"The ScheduledExecutorService instance to use to create threads, can be overridden to specify "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes a lot of sense to not declare this pipeline option in GcsOptions since the original choice of having this in GcsOptions was done a long time ago since GCS needed it but it is used in many places.

Since it is used both during pipeline construction and pipeline execution it makes sense to move it to its own options class in org/apache/beam/sdk/options/ExecutorOptions.java in sdks/java/core

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I'll refactor this

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All nits in the current review.

Since we are returning a ScheduledExecutorService now for GcsOptions#getExecutorService, do we not get test failures anymore (e.g. RemoteExecutionTest)?

In a follow-up can you migrate all usages of GcsOptions#getExecutorService to ExecutorOptions#getScheduledExecutorService?

ExecutorService getExecutorService();

/**
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService(ScheduledExecutorService)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService(ScheduledExecutorService)}
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService}

@johnjcasey
Copy link
Contributor Author

All nits in the current review.

Since we are returning a ScheduledExecutorService now for GcsOptions#getExecutorService, do we not get test failures anymore (e.g. RemoteExecutionTest)?

In a follow-up can you migrate all usages of GcsOptions#getExecutorService to ExecutorOptions#getScheduledExecutorService?

I don't think we can do this, because if someone is using GcsOptions#setExecutorService, this change could break them

@lukecwik
Copy link
Member

lukecwik commented Oct 6, 2022

It looks like setting core pool size to int.max and allowing core thread timeout is working it is recommended to not do that, but the concern of running out of threads doesn't appear to be valid.

This does not do what you want since a corePoolSize == Integer.MAX will mean that we will want to start a new thread before checking to see if there is already an idle thread capable of doing the work. This leads to every task running on a new thread and then expiring after the timeout (which is very expensive performance wise). This is why the previous thread pool had a core pool size of 0. This forced the thread pool to first offer it to the queue and then start a new worker if all the workers are busy.

@johnjcasey
Copy link
Contributor Author

ah, thats unfortunate, but you're right.

I'm not confident in implementing my own scheduler. Looking through some of the guarantees given by the interface definitions, it may be quite challenging.

@johnjcasey
Copy link
Contributor Author

I don't think I can accurately implement this in < several weeks

@johnjcasey
Copy link
Contributor Author

A number of the guarantees around periodic and delayed execution depend on methods that are package private, so I can't just copy the scheduled executor code either

@lukecwik
Copy link
Member

lukecwik commented Oct 7, 2022

A number of the guarantees around periodic and delayed execution depend on methods that are package private, so I can't just copy the scheduled executor code either

Give me a couple of hours to take a deeper look.

@lukecwik
Copy link
Member

lukecwik commented Oct 7, 2022

I think I got an implementation for an unbounded ScheduledExecutorService with the properties that we want.
PTAL at #23545

@lukecwik
Copy link
Member

@johnjcasey Ping me when you have merged your changes with the new UnboundedScheduledExecutorService.

…educe-gax-threads

# Conflicts:
#	sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All my comments are minor, please fix and merge when addressed.

@@ -287,7 +287,8 @@ def buildAndPushDockerJavaContainer = tasks.register("buildAndPushDockerJavaCont
commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerJavaImageName}"
}
exec {
commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}"
// commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was commited in error

import java.util.concurrent.TimeUnit;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;

public interface ExecutorOptions extends PipelineOptions {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment and @Description needs to be updated since the core pool size is irrelevant now.

import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;

public interface ExecutorOptions extends PipelineOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class comment?

Comment on lines -135 to -141
/* The SDK requires an unbounded thread pool because a step may create X writers
* each requiring their own thread to perform the writes otherwise a writer may
* block causing deadlock for the step because the writers buffer is full.
* Also, the MapTaskExecutor launches the steps in reverse order and completes
* them in forward order thus requiring enough threads so that each step's writers
* can be active.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this comment into ScheduledExecutorServiceFactory just above new UnboundedScheduledExecutorService()

…org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@lukecwik lukecwik merged commit 8e2431c into apache:master Oct 15, 2022
@reuvenlax
Copy link
Contributor

This PR causes the BigQuery client to hang on shutdown.

reuvenlax added a commit to reuvenlax/incubator-beam that referenced this pull request Oct 22, 2022
reuvenlax added a commit that referenced this pull request Oct 22, 2022
reuvenlax added a commit to reuvenlax/incubator-beam that referenced this pull request Oct 22, 2022
johnjcasey pushed a commit to johnjcasey/beam that referenced this pull request Oct 26, 2022
chamikaramj pushed a commit that referenced this pull request Oct 26, 2022
* Merge pull request #23795: Revert 23234: issue #23794

* Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793)

This reverts commit 8e2431c.

(cherry picked from commit 01da3fc)

* Merge pull request #23795: Revert 23234: issue #23794

(cherry picked from commit d38f577)

Co-authored-by: reuvenlax <relax@google.com>
@mdlnr
Copy link

mdlnr commented Nov 7, 2022

This PR causes the BigQuery client to hang on shutdown.

@reuvenlax We observed the behaviour of hanging threads in our Dataflow jobs during StreamWriter.close with Beam Version 2.41.0. So maybe the PR does not cause this problem but might make it more visible.

In our pipelines we see continously increasing number of threads with stack traces like these:

"pool-3-thread-61" #1767 prio=5 os_prio=0 cpu=0.22ms elapsed=337556.83s tid=0x00007f2d982459d0 nid=0x6f7 in Object.wait()  [0x00007f2d886f1000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(java.base@17.0.2/Native Method)
	- waiting on <no object reference available>
	at java.lang.Thread.join(java.base@17.0.2/Thread.java:1304)
	- locked <merged>(a java.lang.Thread)
	at java.lang.Thread.join(java.base@17.0.2/Thread.java:1372)
	at com.google.cloud.bigquery.storage.v1.StreamWriter.close(StreamWriter.java:369)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.close(BigQueryServicesImpl.java:1339)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$$Lambda$692/0x00000008015dd4c8.run(Unknown Source)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords.lambda$runAsyncIgnoreFailure$1(StorageApiWritesShardedRecords.java:138)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$$Lambda$685/0x00000008015d6b58.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.2/Executors.java:539)
	at java.util.concurrent.FutureTask.run(java.base@17.0.2/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.2/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.2/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(java.base@17.0.2/Thread.java:833)

We think this is related to using a pretty old version of the dependency com.google.cloud:google-cloud-bigquerystorage:2.12.2. The library has since made many changes in the StreamWriter class which could fix this issue. Is there anything that prevents updating a newer released version?

@reuvenlax
Copy link
Contributor

reuvenlax commented Nov 7, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce number of Gax related threads, likely by providing common executor to GAX clients
4 participants