Skip to content

Commit

Permalink
hardening tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Feb 24, 2023
1 parent be89af1 commit 57c76fd
Showing 1 changed file with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.Closeable;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
Expand All @@ -53,10 +54,11 @@
@Execution(ExecutionMode.SAME_THREAD)
public class TracingIntegrationTests extends IntegrationTestBase {
private TestSpanProcessor spanProcessor;
private List<AutoCloseable> toClose = new ArrayList<>();
private ServiceBusSenderAsyncClient sender;
ServiceBusReceiverAsyncClient receiver;
ServiceBusReceiverClient receiverSync;
ServiceBusProcessorClient processor;
private ServiceBusReceiverAsyncClient receiver;
private ServiceBusReceiverClient receiverSync;
private ServiceBusProcessorClient processor;

public TracingIntegrationTests() {
super(new ClientLogger(TracingIntegrationTests.class));
Expand Down Expand Up @@ -90,6 +92,9 @@ protected void beforeTest() {
.receiver()
.queueName(getQueueName(0))
.buildClient();
toClose.add(sender);
toClose.add(receiver);
toClose.add(receiverSync);

StepVerifier.setDefaultTimeout(TIMEOUT);
}
Expand All @@ -98,8 +103,14 @@ protected void beforeTest() {
protected void afterTest() {
GlobalOpenTelemetry.resetForTest();
sharedBuilder = null;

if (processor != null)
{
toClose.add(processor);
}

try {
dispose(receiver, sender, processor, receiverSync);
dispose(toClose.toArray(new AutoCloseable[0]));
} catch (Exception e) {
logger.warning("Error occurred when draining queue.", e);
}
Expand Down Expand Up @@ -255,6 +266,7 @@ public void sendAndReceiveParallelAutoComplete() throws InterruptedException {
.queueName(getQueueName(0))
.buildAsyncClient();

toClose.add(receiverAutoComplete);
StepVerifier.create(
receiverAutoComplete.receiveMessages()
.take(messageCount)
Expand Down Expand Up @@ -515,7 +527,6 @@ public void sendAndProcessParallel() throws InterruptedException {
})
.processError(e -> fail("unexpected error", e.getException()))
.buildProcessorClient();

processor.start();
assertTrue(processedFound.await(10, TimeUnit.SECONDS));
processor.stop();
Expand Down Expand Up @@ -561,7 +572,6 @@ public void sendAndProcessParallelNoAutoComplete() throws InterruptedException {
})
.processError(e -> fail("unexpected error", e.getException()))
.buildProcessorClient();

processor.start();
assertTrue(completedFound.await(20, TimeUnit.SECONDS));
processor.stop();
Expand Down Expand Up @@ -605,7 +615,6 @@ public void sendProcessAndFail() throws InterruptedException {
})
.processError(e -> { })
.buildProcessorClient();

processor.start();
assertTrue(messageProcessed.await(10, TimeUnit.SECONDS));
processor.stop();
Expand Down Expand Up @@ -760,11 +769,11 @@ public void onEnd(ReadableSpan readableSpan) {
assertEquals(entityName, readableSpan.getAttribute(AttributeKey.stringKey("messaging.destination.name")));
assertEquals(namespace, readableSpan.getAttribute(AttributeKey.stringKey("net.peer.name")));

spans.add(readableSpan);
Consumer<ReadableSpan> filter = notifier.get();
if (filter != null) {
filter.accept(readableSpan);
}
spans.add(readableSpan);
}

public void notifyIfCondition(CountDownLatch countDownLatch, Predicate<ReadableSpan> filter) {
Expand Down

0 comments on commit 57c76fd

Please sign in to comment.