diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index bdbd70afbaeac..8860c9bb06d4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1265,14 +1265,14 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr return; } } else { - asyncResponse.resume(new ArrayList<>(subscriptions)); + asyncResponse.resume(subscriptions); } }); } private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) { getTopicReferenceAsync(topicName) - .thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys()))) + .thenAccept(topic -> asyncResponse.resume(topic.getSubscriptions().keySet())) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (isNot307And404Exception(ex)) { @@ -2024,7 +2024,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy new ArrayList<>((int) topic.getReplicators().size()); List subNames = new ArrayList<>((int) topic.getSubscriptions().size()); - subNames.addAll(topic.getSubscriptions().keys().stream().filter( + subNames.addAll(topic.getSubscriptions().keySet().stream().filter( subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList()); for (int i = 0; i < subNames.size(); i++) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3fdfeeee6e152..dce50a54db1f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -26,6 +26,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -576,7 +577,7 @@ && getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddres public abstract int getNumberOfSameAddressConsumers(String clientAddress); protected int getNumberOfSameAddressConsumers(final String clientAddress, - final List subscriptions) { + final Collection subscriptions) { int count = 0; if (clientAddress != null) { for (Subscription subscription : subscriptions) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6b0be07c8f7a8..09f04d878c4e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1176,7 +1176,7 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD // v2 topics have a global name so check if the topic is replicated. if (t.isReplicated()) { // Delete is disallowed on global topic - final List clusters = t.getReplicators().keys(); + final var clusters = t.getReplicators().keySet(); log.error("Delete forbidden topic {} is replicated on clusters {}", topic, clusters); return FutureUtil.failedFuture( new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters)); @@ -1184,7 +1184,7 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD // shadow topic should be deleted first. if (t.isShadowReplicated()) { - final List shadowTopics = t.getShadowReplicators().keys(); + final var shadowTopics = t.getShadowReplicators().keySet(); log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics); return FutureUtil.failedFuture(new IllegalStateException( "Delete forbidden. Topic " + topic + " is replicated to shadow topics.")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 3ec09e9bfcd28..ec7889af6bbbe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -44,7 +44,6 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; @@ -183,7 +182,7 @@ CompletableFuture createSubscription(String subscriptionName, Init CompletableFuture unsubscribe(String subName); - ConcurrentOpenHashMap getSubscriptions(); + Map getSubscriptions(); CompletableFuture delete(); @@ -265,9 +264,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats Subscription getSubscription(String subscription); - ConcurrentOpenHashMap getReplicators(); + Map getReplicators(); - ConcurrentOpenHashMap getShadowReplicators(); + Map getShadowReplicators(); TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 2abd505d527cc..34c2678f847a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -96,7 +97,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; import org.slf4j.Logger; @@ -105,9 +105,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener { // Subscriptions to this topic - private final ConcurrentOpenHashMap subscriptions; + private final Map subscriptions = new ConcurrentHashMap<>(); - private final ConcurrentOpenHashMap replicators; + private final Map replicators = new ConcurrentHashMap<>(); // Ever increasing counter of entries added private static final AtomicLongFieldUpdater ENTRIES_ADDED_COUNTER_UPDATER = @@ -152,17 +152,6 @@ public void reset() { public NonPersistentTopic(String topic, BrokerService brokerService) { super(topic, brokerService); - - this.subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.replicators = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); this.isFenced = false; registerTopicPolicyListener(); } @@ -446,8 +435,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c if (failIfHasSubscriptions) { if (!subscriptions.isEmpty()) { isFenced = false; - deleteFuture.completeExceptionally( - new TopicBusyException("Topic has subscriptions:" + subscriptions.keys())); + deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions:" + + subscriptions.keySet().stream().toList())); return; } } else { @@ -714,18 +703,18 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) { } @Override - public ConcurrentOpenHashMap getSubscriptions() { + public Map getSubscriptions() { return subscriptions; } @Override - public ConcurrentOpenHashMap getReplicators() { + public Map getReplicators() { return replicators; } @Override - public ConcurrentOpenHashMap getShadowReplicators() { - return ConcurrentOpenHashMap.emptyMap(); + public Map getShadowReplicators() { + return Map.of(); } @Override @@ -1043,7 +1032,6 @@ private CompletableFuture checkAndUnsubscribeSubscriptions() { private CompletableFuture disconnectReplicators() { List> futures = new ArrayList<>(); - ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { futures.add(replicator.terminate()); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d664d6812adaa..f8581cfc79985 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -44,6 +44,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -187,7 +188,6 @@ import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.CompactedTopicImpl; import org.apache.pulsar.compaction.Compactor; @@ -207,10 +207,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal protected final ManagedLedger ledger; // Subscriptions to this topic - private final ConcurrentOpenHashMap subscriptions; + private final Map subscriptions = new ConcurrentHashMap<>(); - private final ConcurrentOpenHashMap replicators; - private final ConcurrentOpenHashMap shadowReplicators; + private final Map replicators = new ConcurrentHashMap<>(); + private final Map shadowReplicators = new ConcurrentHashMap<>(); @Getter private volatile List shadowTopics; private final TopicName shadowSourceTopic; @@ -392,18 +392,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS ? brokerService.getTopicOrderedExecutor().chooseThread(topic) : null; this.ledger = ledger; - this.subscriptions = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.replicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.shadowReplicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); @@ -429,6 +417,28 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } } + @VisibleForTesting + PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, + MessageDeduplication messageDeduplication) { + super(topic, brokerService); + // null check for backwards compatibility with tests which mock the broker service + this.orderedExecutor = brokerService.getTopicOrderedExecutor() != null + ? brokerService.getTopicOrderedExecutor().chooseThread(topic) + : null; + this.ledger = ledger; + this.messageDeduplication = messageDeduplication; + this.backloggedCursorThresholdEntries = + brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); + + if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { + this.transactionBuffer = brokerService.getPulsar() + .getTransactionBufferProvider().newTransactionBuffer(this); + } else { + this.transactionBuffer = new TransactionBufferDisable(this); + } + shadowSourceTopic = null; + } + @Override public CompletableFuture initialize() { List> futures = new ArrayList<>(); @@ -476,41 +486,6 @@ public CompletableFuture initialize() { })); } - // for testing purposes - @VisibleForTesting - PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, - MessageDeduplication messageDeduplication) { - super(topic, brokerService); - // null check for backwards compatibility with tests which mock the broker service - this.orderedExecutor = brokerService.getTopicOrderedExecutor() != null - ? brokerService.getTopicOrderedExecutor().chooseThread(topic) - : null; - this.ledger = ledger; - this.messageDeduplication = messageDeduplication; - this.subscriptions = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.replicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.shadowReplicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.backloggedCursorThresholdEntries = - brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); - - if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { - this.transactionBuffer = brokerService.getPulsar() - .getTransactionBufferProvider().newTransactionBuffer(this); - } else { - this.transactionBuffer = new TransactionBufferDisable(this); - } - shadowSourceTopic = null; - } - private void initializeDispatchRateLimiterIfNeeded() { synchronized (dispatchRateLimiterLock) { // dispatch rate limiter for topic @@ -1455,8 +1430,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, // In this case, we shouldn't care if the usageCount is 0 or not, just proceed if (!closeIfClientsConnected) { if (failIfHasSubscriptions && !subscriptions.isEmpty()) { - return FutureUtil.failedFuture( - new TopicBusyException("Topic has subscriptions: " + subscriptions.keys())); + return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions: " + + subscriptions.keySet().stream().toList())); } else if (failIfHasBacklogs) { if (hasBacklogs(false)) { List backlogSubs = @@ -2129,10 +2104,6 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma } return null; }); - // clean up replicator if startup is failed - if (replicator == null) { - replicators.removeNullValue(remoteCluster); - } } finally { lock.readLock().unlock(); } @@ -2210,11 +2181,6 @@ protected CompletableFuture addShadowReplicationCluster(String shadowTopic } return null; }); - - // clean up replicator if startup is failed - if (replicator == null) { - shadowReplicators.removeNullValue(shadowTopic); - } }); } @@ -2274,7 +2240,7 @@ protected String getSchemaId() { } @Override - public ConcurrentOpenHashMap getSubscriptions() { + public Map getSubscriptions() { return subscriptions; } @@ -2284,12 +2250,12 @@ public PersistentSubscription getSubscription(String subscriptionName) { } @Override - public ConcurrentOpenHashMap getReplicators() { + public Map getReplicators() { return replicators; } @Override - public ConcurrentOpenHashMap getShadowReplicators() { + public Map getShadowReplicators() { return shadowReplicators; } @@ -3091,7 +3057,6 @@ private CompletableFuture checkAndDisconnectProducers() { private CompletableFuture checkAndDisconnectReplicators() { List> futures = new ArrayList<>(); - ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { if (replicator.getNumberOfEntriesInBacklog() <= 0) { futures.add(replicator.terminate()); @@ -3106,12 +3071,9 @@ public boolean shouldProducerMigrate() { @Override public boolean isReplicationBacklogExist() { - ConcurrentOpenHashMap replicators = getReplicators(); - if (replicators != null) { - for (Replicator replicator : replicators.values()) { - if (replicator.getNumberOfEntriesInBacklog() > 0) { - return true; - } + for (Replicator replicator : replicators.values()) { + if (replicator.getNumberOfEntriesInBacklog() > 0) { + return true; } } return false; @@ -3759,9 +3721,9 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco public CompletableFuture clearBacklog() { log.info("[{}] Clearing backlog on all cursors in the topic.", topic); List> futures = new ArrayList<>(); - List cursors = getSubscriptions().keys(); - cursors.addAll(getReplicators().keys()); - cursors.addAll(getShadowReplicators().keys()); + List cursors = new ArrayList<>(getSubscriptions().keySet()); + cursors.addAll(getReplicators().keySet()); + cursors.addAll(getShadowReplicators().keySet()); for (String cursor : cursors) { futures.add(clearBacklog(cursor)); } @@ -4161,7 +4123,7 @@ private void unfenceReplicatorsToResume() { checkShadowReplication(); } - private void removeTerminatedReplicators(ConcurrentOpenHashMap replicators) { + private void removeTerminatedReplicators(Map replicators) { Map terminatedReplicators = new HashMap<>(); replicators.forEach((cluster, replicator) -> { if (replicator.isTerminated()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index b873bc93cd1e4..f56cf9de66b75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -243,7 +243,8 @@ private void startNewSnapshot() { pendingSnapshotsMetric.inc(); stats.recordSnapshotStarted(); ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this, - topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC()); + topic.getReplicators().keySet(), topic.getBrokerService().pulsar().getConfiguration(), + Clock.systemUTC()); pendingSnapshots.put(builder.getSnapshotId(), builder); builder.start(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java index 0dacade3eed1c..e08b549f8aec9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java @@ -20,7 +20,6 @@ import io.prometheus.client.Summary; import java.time.Clock; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -43,7 +42,7 @@ public class ReplicatedSubscriptionsSnapshotBuilder { private final ReplicatedSubscriptionsController controller; private final Map responses = new TreeMap<>(); - private final List remoteClusters; + private final Set remoteClusters; private final Set missingClusters; private final boolean needTwoRounds; @@ -60,7 +59,7 @@ public class ReplicatedSubscriptionsSnapshotBuilder { "Time taken to create a consistent snapshot across clusters").register(); public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller, - List remoteClusters, ServiceConfiguration conf, Clock clock) { + Set remoteClusters, ServiceConfiguration conf, Clock clock) { this.snapshotId = UUID.randomUUID().toString(); this.controller = controller; this.remoteClusters = remoteClusters; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 5432b8a430d63..4a1dbface2c63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2315,8 +2315,8 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { admin.namespaces().unsubscribeNamespace("prop-xyz/ns1-bundles", "my-sub"); - assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds2"), - List.of("my-sub-1", "my-sub-2")); + assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds2").stream().sorted() + .toList(), List.of("my-sub-1", "my-sub-2")); assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds1"), List.of("my-sub-1")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 55b4c6e1c6f59..18fd3dd1c8bb3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -251,7 +251,7 @@ public void testGetSubscriptions() { response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); - verify(response, timeout(5000).times(1)).resume(List.of("test")); + verify(response, timeout(5000).times(1)).resume(Set.of("test")); // 6) Delete the subscription response = mock(AsyncResponse.class); @@ -265,7 +265,7 @@ public void testGetSubscriptions() { response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); - verify(response, timeout(5000).times(1)).resume(new ArrayList<>()); + verify(response, timeout(5000).times(1)).resume(Set.of()); // 8) Create a sub of partitioned-topic response = mock(AsyncResponse.class); @@ -279,16 +279,16 @@ public void testGetSubscriptions() { response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", true); - verify(response, timeout(5000).times(1)).resume(List.of("test")); + verify(response, timeout(5000).times(1)).resume(Set.of("test")); // response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); - verify(response, timeout(5000).times(1)).resume(new ArrayList<>()); + verify(response, timeout(5000).times(1)).resume(Set.of()); // response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true); - verify(response, timeout(5000).times(1)).resume(List.of("test")); + verify(response, timeout(5000).times(1)).resume(Set.of("test")); // 9) Delete the partitioned topic response = mock(AsyncResponse.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 1351c41e4279e..dc9a7ec4429fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -88,10 +88,8 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -3199,8 +3197,7 @@ public void testUpdateRetentionWithPartialFailure() throws Exception { // Inject an error that makes dispatch rate update fail. PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); - ConcurrentOpenHashMap subscriptions = - WhiteboxImpl.getInternalState(persistentTopic, "subscriptions"); + final var subscriptions = persistentTopic.getSubscriptions(); PersistentSubscription mockedSubscription = Mockito.mock(PersistentSubscription.class); Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new RuntimeException("Mocked error: getDispatcher")); subscriptions.put("mockedSubscription", mockedSubscription); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index f2faa98636ba2..d92c3126c5404 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -1380,8 +1380,8 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { admin.namespaces().unsubscribeNamespace("prop-xyz/use/ns1-bundles", "my-sub"); - assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/use/ns1-bundles/ds2"), - List.of("my-sub-1", "my-sub-2")); + assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/use/ns1-bundles/ds2").stream() + .sorted().toList(), List.of("my-sub-1", "my-sub-2")); assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/use/ns1-bundles/ds1"), List.of("my-sub-1")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java index 39be56e3f41cf..337717ed97b1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java @@ -24,10 +24,10 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; import static org.testng.Assert.assertEquals; +import java.util.concurrent.ConcurrentHashMap; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.qos.AsyncTokenBucket; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -54,11 +54,7 @@ public void beforeMethod() { .useConstructor("topic", brokerService) .defaultAnswer(CALLS_REAL_METHODS)); - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("subscription", subscription); when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 8ec565f7d4566..e56a3495600f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -49,7 +49,6 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -344,7 +343,7 @@ public void testClusterMigration() throws Exception { assertFalse(topic2.getSubscriptions().isEmpty()); topic1.checkClusterMigration().get(); - ConcurrentOpenHashMap replicators = topic1.getReplicators(); + final var replicators = topic1.getReplicators(); replicators.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); @@ -798,20 +797,20 @@ public void testNamespaceMigration(SubscriptionType subType, boolean isClusterMi blueTopicNs2_1.checkClusterMigration().get(); } - ConcurrentOpenHashMap replicators = blueTopicNs1_1.getReplicators(); + final var replicators = blueTopicNs1_1.getReplicators(); replicators.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); assertTrue(blueTopicNs1_1.getSubscriptions().isEmpty()); if (isClusterMigrate) { - ConcurrentOpenHashMap replicatorsNm = blueTopicNs2_1.getReplicators(); + final var replicatorsNm = blueTopicNs2_1.getReplicators(); replicatorsNm.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); assertTrue(blueTopicNs2_1.getSubscriptions().isEmpty()); } else { - ConcurrentOpenHashMap replicatorsNm = blueTopicNs2_1.getReplicators(); + final var replicatorsNm = blueTopicNs2_1.getReplicators(); replicatorsNm.forEach((r, replicator) -> { assertTrue(replicator.isConnected()); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index cbbb8808f3d1a..85e0887465db2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -53,7 +53,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; @@ -154,7 +153,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { try { barrier.await(); // do subscription delete - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); // Thread.sleep(2,0); log.info("unsubscriber outcome is {}", ps.doUnsubscribe(ps.getConsumers().get(0)).get()); @@ -219,7 +218,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception { try { barrier.await(); // do subscription delete - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); // Thread.sleep(2,0); log.info("unsubscriber outcome is {}", ps.doUnsubscribe(ps.getConsumers().get(0)).get()); @@ -278,7 +277,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception { barrier.await(); // Thread.sleep(2,0); // assertTrue(topic.unsubscribe(successSubName).isDone()); - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); log.info("unsubscribe result : {}", topic.unsubscribe(successSubName).get()); log.info("closing consumer.."); @@ -339,7 +338,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception { log.info("&&&&&&&&& UNSUBSCRIBER TH"); // Thread.sleep(2,0); // assertTrue(topic.unsubscribe(successSubName).isDone()); - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); log.info("unsubscribe result : " + ps.doUnsubscribe(ps.getConsumers().get(0)).get()); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index b975041d04ee4..81c12df4f3918 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -64,6 +64,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; @@ -132,7 +133,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; @@ -777,11 +777,7 @@ private void testMaxConsumersShared() throws Exception { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("sub-1", sub); subscriptions.put("sub-2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -873,11 +869,7 @@ private void testMaxConsumersFailover() throws Exception { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("sub-1", sub); subscriptions.put("sub-2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -992,11 +984,7 @@ public void testMaxSameAddressConsumers() throws Exception { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("sub1", sub1); subscriptions.put("sub2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -1299,7 +1287,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { try { barrier.await(); // do subscription delete - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); // Thread.sleep(5,0); log.info("unsubscriber outcome is {}", ps.doUnsubscribe(ps.getConsumers().get(0)).get()); @@ -1681,7 +1669,7 @@ public void testAtomicReplicationRemoval() throws Exception { PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); topic.initialize().join(); String remoteReplicatorName = topic.getReplicatorPrefix() + "." + remoteCluster; - ConcurrentOpenHashMap replicatorMap = topic.getReplicators(); + final var replicatorMap = topic.getReplicators(); ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); @@ -2018,11 +2006,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { public void testCheckInactiveSubscriptions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); // This subscription is connected by consumer. PersistentSubscription nonDeletableSubscription1 = spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class, topic, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 4273e8bbaeb5b..5b896a22baa33 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -68,7 +68,6 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -292,7 +291,7 @@ public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception sentMessages.add(msg); } Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicators = topic1.getReplicators(); + final var replicators = topic1.getReplicators(); assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), @@ -1072,7 +1071,7 @@ private void testReplicatedSubscriptionWhenEnableReplication(Producer pr Awaitility.await().untilAsserted(() -> { List keys = pulsar1.getBrokerService() .getTopic(topic, false).get().get() - .getReplicators().keys(); + .getReplicators().keySet().stream().toList(); assertEquals(keys.size(), 1); assertTrue(pulsar1.getBrokerService() .getTopic(topic, false).get().get() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 90df16360614d..bec6b558ea401 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -25,9 +25,11 @@ import static org.testng.AssertJUnit.assertFalse; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -101,7 +103,7 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // rate limiter disable by default - assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(getRateLimiter(topic).isPresent()); //set topic-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -112,16 +114,16 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { admin1.topics().setReplicatorDispatchRate(topicName, topicRate); Awaitility.await().untilAsserted(() -> assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate)); - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //remove topic-level policy admin1.topics().removeReplicatorDispatchRate(topicName); Awaitility.await().untilAsserted(() -> assertNull(admin1.topics().getReplicatorDispatchRate(topicName))); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), -1); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), -1L); } @@ -145,7 +147,7 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // rate limiter disable by default - assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(getRateLimiter(topic).isPresent()); //set namespace-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -156,16 +158,16 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate); Awaitility.await().untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), topicRate)); - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //remove topic-level policy admin1.namespaces().removeReplicatorDispatchRate(namespace); Awaitility.await().untilAsserted(() -> assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace))); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), -1); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), -1L); } @@ -189,7 +191,7 @@ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // rate limiter disable by default - assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(getRateLimiter(topic).isPresent()); //set broker-level policy, which should take effect admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg", "10"); @@ -203,9 +205,9 @@ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception { .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"), "20"); }); - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); } @Test @@ -228,9 +230,9 @@ public void testReplicatorRatePriority() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); //use broker-level by default - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 100); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 200L); //set namespace-level policy, which should take effect DispatchRate nsDispatchRate = DispatchRate.builder() @@ -241,8 +243,8 @@ public void testReplicatorRatePriority() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate); Awaitility.await() .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 50); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 60L); //set topic-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -253,8 +255,8 @@ public void testReplicatorRatePriority() throws Exception { admin1.topics().setReplicatorDispatchRate(topicName, topicRate); Awaitility.await().untilAsserted(() -> assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //Set the namespace-level policy, which should not take effect DispatchRate nsDispatchRate2 = DispatchRate.builder() @@ -265,21 +267,21 @@ public void testReplicatorRatePriority() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2); Awaitility.await() .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //remove topic-level policy, namespace-level should take effect admin1.topics().removeReplicatorDispatchRate(topicName); Awaitility.await().untilAsserted(() -> assertNull(admin1.topics().getReplicatorDispatchRate(topicName))); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 500); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 600L); //remove namespace-level policy, broker-level should take effect admin1.namespaces().setReplicatorDispatchRate(namespace, null); Awaitility.await().untilAsserted(() -> - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 100)); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 200L); } @@ -315,7 +317,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // 1. default replicator throttling not configured - Assert.assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + Assert.assertFalse(getRateLimiter(topic).isPresent()); // 2. change namespace setting of replicator dispatchRateMsg, verify topic changed. int messageRate = 100; @@ -329,7 +331,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { boolean replicatorUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + if (getRateLimiter(topic).isPresent()) { replicatorUpdated = true; break; } else { @@ -339,7 +341,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { } } Assert.assertTrue(replicatorUpdated); - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), messageRate); // 3. change namespace setting of replicator dispatchRateByte, verify topic changed. messageRate = 500; @@ -351,7 +353,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateByte); replicatorUpdated = false; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte() == messageRate) { + if (getRateLimiter(topic).get().getDispatchRateOnByte() == messageRate) { replicatorUpdated = true; break; } else { @@ -414,7 +416,7 @@ public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateT boolean replicatorUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + if (getRateLimiter(topic).isPresent()) { replicatorUpdated = true; break; } else { @@ -425,9 +427,9 @@ public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateT } Assert.assertTrue(replicatorUpdated); if (DispatchRateType.messageRate.equals(dispatchRateType)) { - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), messageRate); } else { - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), messageRate); } @Cleanup @@ -499,7 +501,7 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti boolean replicatorUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + if (getRateLimiter(topic).isPresent()) { replicatorUpdated = true; break; } else { @@ -509,7 +511,7 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti } } Assert.assertTrue(replicatorUpdated); - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), messageRate); @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) @@ -578,8 +580,8 @@ public void testReplicatorRateLimiterByBytes() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); Awaitility.await() - .untilAsserted(() -> assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent())); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), byteRate); + .untilAsserted(() -> assertTrue(getRateLimiter(topic).isPresent())); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), byteRate); @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) @@ -608,5 +610,9 @@ public void testReplicatorRateLimiterByBytes() throws Exception { }); } + private static Optional getRateLimiter(PersistentTopic topic) { + return getRateLimiter(topic); + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 8e115e14b3770..aac7a85f477c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -109,7 +109,6 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; @@ -637,7 +636,7 @@ public void testReplicatePeekAndSkip() throws Exception { producer1.produce(2); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() - .get(topic.getReplicators().keys().get(0)); + .get(topic.getReplicators().keySet().stream().toList().get(0)); replicator.skipMessages(2); CompletableFuture result = replicator.peekNthMessage(1); Entry entry = result.get(50, TimeUnit.MILLISECONDS); @@ -664,7 +663,7 @@ public void testReplicatorClearBacklog() throws Exception { producer1.produce(2); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keys().get(0))); + topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); replicator.clearBacklog().get(); Thread.sleep(100); @@ -691,7 +690,7 @@ public void testResetReplicatorSubscriptionPosition() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keys().get(0))); + topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); MessageId id = topic.getLastMessageId().get(); admin1.topics().expireMessages(dest.getPartitionedTopicName(), @@ -795,7 +794,7 @@ public void testDeleteReplicatorFailure() throws Exception { @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); - final String replicatorClusterName = topic.getReplicators().keys().get(0); + final String replicatorClusterName = topic.getReplicators().keySet().stream().toList().get(0); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); CountDownLatch latch = new CountDownLatch(1); // delete cursor already : so next time if topic.removeReplicator will get exception but then it should @@ -836,7 +835,7 @@ public void testReplicatorProducerClosing() throws Exception { @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); - final String replicatorClusterName = topic.getReplicators().keys().get(0); + final String replicatorClusterName = topic.getReplicators().keySet().stream().toList().get(0); Replicator replicator = topic.getPersistentReplicator(replicatorClusterName); pulsar2.close(); pulsar2 = null; @@ -1675,7 +1674,7 @@ public void testReplicatorWithFailedAck() throws Exception { Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { - ConcurrentOpenHashMap replicators = topic.getReplicators(); + final var replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started, replicator.getState()); @@ -1928,9 +1927,9 @@ public void testEnableReplicationWithNamespaceAllowedClustersPolices() throws Ex // Verify the replication from cluster1 to cluster2 is ready, but the replication form the cluster2 to cluster1 // is not ready. Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicatorMap = persistentTopic1.getReplicators(); + final var replicatorMap = persistentTopic1.getReplicators(); assertEquals(replicatorMap.size(), 1); - Replicator replicator = replicatorMap.get(replicatorMap.keys().get(0)); + Replicator replicator = replicatorMap.get(replicatorMap.keySet().stream().toList().get(0)); assertTrue(replicator.isConnected()); }); @@ -1940,16 +1939,16 @@ public void testEnableReplicationWithNamespaceAllowedClustersPolices() throws Ex .get(); Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicatorMap = persistentTopic2.getReplicators(); + final var replicatorMap = persistentTopic2.getReplicators(); assertEquals(replicatorMap.size(), 0); }); // Enable replication at the topic level in the cluster2. admin2.topics().setReplicationClusters(topicName.toString(), List.of("r1", "r2")); // Verify the replication between cluster1 and cluster2 is ready. Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicatorMap = persistentTopic2.getReplicators(); + final var replicatorMap = persistentTopic2.getReplicators(); assertEquals(replicatorMap.size(), 1); - Replicator replicator = replicatorMap.get(replicatorMap.keys().get(0)); + Replicator replicator = replicatorMap.get(replicatorMap.keySet().stream().toList().get(0)); assertTrue(replicator.isConnected()); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index f89ca2bdebb91..ab1f0c0ece2e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -748,7 +747,7 @@ public void testRemoveReplicationClusters() throws Exception { assertNotNull(topicRef); Awaitility.await().untilAsserted(() -> { - List replicaClusters = topicRef.getReplicators().keys().stream().sorted().collect(Collectors.toList()); + List replicaClusters = topicRef.getReplicators().keySet().stream().sorted().toList(); assertEquals(replicaClusters.size(), 1); assertEquals(replicaClusters.toString(), "[r2]"); }); @@ -756,7 +755,7 @@ public void testRemoveReplicationClusters() throws Exception { // removing topic replica cluster policy, so namespace policy should take effect admin1.topics().removeReplicationClusters(persistentTopicName); Awaitility.await().untilAsserted(() -> { - List replicaClusters = topicRef.getReplicators().keys().stream().sorted().collect(Collectors.toList()); + List replicaClusters = topicRef.getReplicators().keySet().stream().sorted().toList(); assertEquals(replicaClusters.size(), 2); assertEquals(replicaClusters.toString(), "[r2, r3]"); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java index 2d348f8259746..aa39e859a8c3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java @@ -32,7 +32,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -118,7 +117,7 @@ public void testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() th } txn1.commit().get(); Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicators = topic1.getReplicators(); + final var replicators = topic1.getReplicators(); assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index e2aec70fb114e..e0d6a432bdad2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -37,7 +37,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; @@ -212,7 +211,7 @@ public void testSubscriptionsOnNonPersistentTopic() throws Exception { .subscriptionMode(SubscriptionMode.Durable) .subscribe(); - ConcurrentOpenHashMap subscriptionMap = mockTopic.getSubscriptions(); + final var subscriptionMap = mockTopic.getSubscriptions(); assertEquals(subscriptionMap.size(), 4); // Check exclusive subscription diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java index 562c5eda58109..fa409832fc17b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java @@ -28,9 +28,8 @@ import io.netty.buffer.ByteBuf; import java.time.Clock; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; @@ -75,10 +74,8 @@ public void setup() { @Test public void testBuildSnapshotWith2Clusters() throws Exception { - List remoteClusters = Collections.singletonList("b"); - ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, + Set.of("b"), conf, clock); assertTrue(markers.isEmpty()); @@ -115,10 +112,8 @@ public void testBuildSnapshotWith2Clusters() throws Exception { @Test public void testBuildSnapshotWith3Clusters() throws Exception { - List remoteClusters = Arrays.asList("b", "c"); - ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, + Set.of("b", "c"), conf, clock); assertTrue(markers.isEmpty()); @@ -198,10 +193,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { @Test public void testBuildTimeout() { - List remoteClusters = Collections.singletonList("b"); - ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, + Set.of("b"), conf, clock); assertFalse(builder.isTimedOut()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java index 45e3fb253bf11..e091eee178d8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java @@ -30,16 +30,14 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.Replicator; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; @@ -71,7 +69,7 @@ public void testGenerateSubscriptionsStats() { // prepare multi-layer topic map final var bundlesMap = new ConcurrentHashMap>(); final var topicsMap = new ConcurrentHashMap(); - ConcurrentOpenHashMap subscriptionsMaps = ConcurrentOpenHashMap.newBuilder().build(); + final var subscriptionsMaps = new ConcurrentHashMap(); bundlesMap.put("my-bundle", topicsMap); multiLayerTopicsMap.put(namespace, bundlesMap); @@ -87,7 +85,7 @@ public void testGenerateSubscriptionsStats() { // Prepare topic and subscription PersistentTopic topic = Mockito.mock(PersistentTopic.class); - Subscription subscription = Mockito.mock(Subscription.class); + PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); Consumer consumer = Mockito.mock(Consumer.class); ConsumerStatsImpl consumerStats = new ConsumerStatsImpl(); when(consumer.getStats()).thenReturn(consumerStats); @@ -99,7 +97,7 @@ public void testGenerateSubscriptionsStats() { when(topic.getStats(false, false, false)).thenReturn(topicStats); when(topic.getBrokerService()).thenReturn(broker); when(topic.getSubscriptions()).thenReturn(subscriptionsMaps); - when(topic.getReplicators()).thenReturn(ConcurrentOpenHashMap.newBuilder().build()); + when(topic.getReplicators()).thenReturn(new ConcurrentHashMap<>()); when(topic.getManagedLedger()).thenReturn(ml); when(topic.getBacklogQuota(Mockito.any())).thenReturn(Mockito.mock(BacklogQuota.class)); PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index bd0119823fd95..88286af98ae5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -29,7 +29,6 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Queues; import com.google.common.collect.Sets; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,14 +48,11 @@ import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -703,11 +699,7 @@ public void testBlockBrokerDispatching() { stopBroker(); startBroker(); - Field field = BrokerService.class.getDeclaredField("blockedDispatchers"); - field.setAccessible(true); - @SuppressWarnings("unchecked") - ConcurrentOpenHashSet blockedDispatchers = - (ConcurrentOpenHashSet) field.get(pulsar.getBrokerService()); + final var blockedDispatchers = pulsar.getBrokerService().getBlockedDispatchers(); final int receiverQueueSize = 10; final int totalProducedMsgs = maxUnAckPerBroker * 3; @@ -783,7 +775,7 @@ public void testBlockBrokerDispatching() { consumer2Sub1.close(); // (1.c) verify that dispatcher is part of blocked dispatcher assertEquals(blockedDispatchers.size(), 1); - String dispatcherName = blockedDispatchers.values().get(0).getName(); + String dispatcherName = blockedDispatchers.stream().findFirst().orElseThrow().getName(); String subName = dispatcherName.substring(dispatcherName.lastIndexOf("/") + 2, dispatcherName.length()); assertEquals(subName, subscriberName1); timestamps.add(System.currentTimeMillis()); @@ -918,10 +910,7 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { stopBroker(); startBroker(); - Field field = BrokerService.class.getDeclaredField("blockedDispatchers"); - field.setAccessible(true); - ConcurrentOpenHashSet blockedDispatchers = - (ConcurrentOpenHashSet) field.get(pulsar.getBrokerService()); + final var blockedDispatchers = pulsar.getBrokerService().getBlockedDispatchers(); final int receiverQueueSize = 10; final int totalProducedMsgs = maxUnAckPerBroker * 3; @@ -992,7 +981,7 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { consumer2Sub1.close(); // (1.c) verify that dispatcher is part of blocked dispatcher assertEquals(blockedDispatchers.size(), 1); - String dispatcherName = blockedDispatchers.values().get(0).getName(); + String dispatcherName = blockedDispatchers.stream().findFirst().orElseThrow().getName(); String subName = dispatcherName.substring(dispatcherName.lastIndexOf("/") + 2, dispatcherName.length()); assertEquals(subName, subscriberName1);