Skip to content

Commit

Permalink
Update BulkWriter.scala (#39260)
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianMeiswinkel authored Mar 18, 2024
1 parent 0b775bc commit b3322e3
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnostics
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils
import com.azure.cosmos.implementation.batch.{BatchRequestResponseConstants, BulkExecutorDiagnosticsTracker, ItemBulkOperation}
import com.azure.cosmos.models._
import com.azure.cosmos.spark.BulkWriter.{BulkOperationFailedException, bulkWriterBoundedElastic, getThreadInfo, readManyBoundedElastic}
import com.azure.cosmos.spark.CosmosConstants.StatusCodes
import com.azure.cosmos.spark.BulkWriter.{BulkOperationFailedException, bulkWriterRequestsBoundedElastic, bulkWriterResponsesBoundedElastic, getThreadInfo, readManyBoundedElastic}
import com.azure.cosmos.spark.diagnostics.DefaultDiagnostics
import reactor.core.Scannable
import reactor.core.publisher.Mono
Expand Down Expand Up @@ -555,9 +554,10 @@ private class BulkWriter(container: CosmosAsyncContainer,
val bulkOperationResponseFlux: SFlux[CosmosBulkOperationResponse[Object]] =
container
.executeBulkOperations[Object](
bulkInputEmitter.asFlux().publishOn(bulkWriterBoundedElastic),
bulkInputEmitter.asFlux().publishOn(bulkWriterRequestsBoundedElastic),
cosmosBulkExecutionOptions)
.publishOn(bulkWriterBoundedElastic)
.onBackpressureBuffer()
.publishOn(bulkWriterResponsesBoundedElastic)
.asScala

bulkOperationResponseFlux.subscribe(
Expand Down Expand Up @@ -1252,7 +1252,8 @@ private object BulkWriter {
private val maxDelayOn408RequestTimeoutInMs = 10000
private val minDelayOn408RequestTimeoutInMs = 1000
private val maxItemOperationsToShowInErrorMessage = 10
private val BULK_WRITER_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-bounded-elastic"
private val BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-requests-bounded-elastic"
private val BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-responses-bounded-elastic"
private val READ_MANY_BOUNDED_ELASTIC_THREAD_NAME = "read-many-bounded-elastic"
private val TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60 // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS

Expand Down Expand Up @@ -1308,17 +1309,25 @@ private object BulkWriter {

private val bulkProcessingThresholds = new CosmosBulkExecutionThresholdsState()

// Custom bounded elastic scheduler to consume input flux
val bulkWriterRequestsBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + DefaultMaxPendingOperationPerCore,
BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)

// Custom bounded elastic scheduler to switch off IO thread to process response.
val bulkWriterBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
2 * Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BULK_WRITER_BOUNDED_ELASTIC_THREAD_NAME,
val bulkWriterResponsesBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + DefaultMaxPendingOperationPerCore,
BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)


// Custom bounded elastic scheduler to switch off IO thread to process response.
val readManyBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
2 * Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + DefaultMaxPendingOperationPerCore,
READ_MANY_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)

Expand Down

0 comments on commit b3322e3

Please sign in to comment.