Skip to content

Commit

Permalink
Revert "[Transform] Allow transforms to use PIT with remote clusters … (
Browse files Browse the repository at this point in the history
elastic#107970)

This reverts commit 9b584aa.
  • Loading branch information
prwhelan authored Apr 29, 2024
1 parent 585fae3 commit 28e54f6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/107970.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 107970
summary: "Revert \"[Transform] Allow transforms to use PIT with remote clusters …"
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ private void injectPointInTimeIfNeeded(
ActionListener<Tuple<String, SearchRequest>> listener
) {
SearchRequest searchRequest = namedSearchRequest.v2();
if (disablePit || searchRequest.indices().length == 0) {
// We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems.
if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) {
listener.onResponse(namedSearchRequest);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,8 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
}

public void testDisablePit() throws InterruptedException {
TransformConfig.Builder configBuilder = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig());
if (randomBoolean()) {
// TransformConfigTests.randomTransformConfig never produces remote indices in the source.
// We need to explicitly set the remote index here for coverage.
configBuilder.setSource(new SourceConfig("remote-cluster:remote-index"));
}
TransformConfig config = configBuilder.build();

// TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */
TransformConfig config = TransformConfigTests.randomTransformConfig();
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();

try (var threadPool = createThreadPool()) {
Expand Down Expand Up @@ -371,6 +365,69 @@ public void testDisablePit() throws InterruptedException {
}
}

public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException {
TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig())
// Remote index is configured within source
.setSource(new SourceConfig("remote-cluster:remote-index"))
.build();
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();

try (var threadPool = createThreadPool()) {
final var client = new PitMockClient(threadPool, true);
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
mock(ThreadPool.class),
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
mock(TransformExtension.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
0L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);

// reverse the setting
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);
}
}

public void testHandlePitIndexNotFound() throws InterruptedException {
// simulate a deleted index due to ILM
try (var threadPool = createThreadPool()) {
Expand Down

0 comments on commit 28e54f6

Please sign in to comment.