Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitting Schedulers used in BulkWriter between requests and responses #39260

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnostics
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils
import com.azure.cosmos.implementation.batch.{BatchRequestResponseConstants, BulkExecutorDiagnosticsTracker, ItemBulkOperation}
import com.azure.cosmos.models._
import com.azure.cosmos.spark.BulkWriter.{BulkOperationFailedException, bulkWriterBoundedElastic, getThreadInfo, readManyBoundedElastic}
import com.azure.cosmos.spark.CosmosConstants.StatusCodes
import com.azure.cosmos.spark.BulkWriter.{BulkOperationFailedException, bulkWriterRequestsBoundedElastic, bulkWriterResponsesBoundedElastic, getThreadInfo, readManyBoundedElastic}
import com.azure.cosmos.spark.diagnostics.DefaultDiagnostics
import reactor.core.Scannable
import reactor.core.publisher.Mono
Expand Down Expand Up @@ -555,9 +554,10 @@ private class BulkWriter(container: CosmosAsyncContainer,
val bulkOperationResponseFlux: SFlux[CosmosBulkOperationResponse[Object]] =
container
.executeBulkOperations[Object](
bulkInputEmitter.asFlux().publishOn(bulkWriterBoundedElastic),
bulkInputEmitter.asFlux().publishOn(bulkWriterRequestsBoundedElastic),
cosmosBulkExecutionOptions)
.publishOn(bulkWriterBoundedElastic)
.onBackpressureBuffer()
.publishOn(bulkWriterResponsesBoundedElastic)
.asScala

bulkOperationResponseFlux.subscribe(
Expand Down Expand Up @@ -1252,7 +1252,8 @@ private object BulkWriter {
private val maxDelayOn408RequestTimeoutInMs = 10000
private val minDelayOn408RequestTimeoutInMs = 1000
private val maxItemOperationsToShowInErrorMessage = 10
private val BULK_WRITER_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-bounded-elastic"
private val BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-requests-bounded-elastic"
private val BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-responses-bounded-elastic"
private val READ_MANY_BOUNDED_ELASTIC_THREAD_NAME = "read-many-bounded-elastic"
private val TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60 // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS

Expand Down Expand Up @@ -1308,17 +1309,25 @@ private object BulkWriter {

private val bulkProcessingThresholds = new CosmosBulkExecutionThresholdsState()

// Custom bounded elastic scheduler to consume input flux
val bulkWriterRequestsBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + DefaultMaxPendingOperationPerCore,
BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)

// Custom bounded elastic scheduler to switch off IO thread to process response.
val bulkWriterBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
2 * Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BULK_WRITER_BOUNDED_ELASTIC_THREAD_NAME,
val bulkWriterResponsesBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + DefaultMaxPendingOperationPerCore,
BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)


// Custom bounded elastic scheduler to switch off IO thread to process response.
val readManyBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
2 * Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + DefaultMaxPendingOperationPerCore,
READ_MANY_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)

Expand Down
Loading