Skip to content

Commit

Permalink
[improve][broker] Replace ConcurrentOpenHashMap with ConcurrentHashMa…
Browse files Browse the repository at this point in the history
…p in Topic classes (apache#23322)
  • Loading branch information
BewareMyPower committed Sep 21, 2024
1 parent f5c1ad2 commit 1ce7855
Show file tree
Hide file tree
Showing 25 changed files with 162 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1265,14 +1265,14 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr
return;
}
} else {
asyncResponse.resume(new ArrayList<>(subscriptions));
asyncResponse.resume(subscriptions);
}
});
}

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
getTopicReferenceAsync(topicName)
.thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys())))
.thenAccept(topic -> asyncResponse.resume(topic.getSubscriptions().keySet()))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -2024,7 +2024,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
new ArrayList<>((int) topic.getReplicators().size());
List<String> subNames =
new ArrayList<>((int) topic.getSubscriptions().size());
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
subNames.addAll(topic.getSubscriptions().keySet().stream().filter(
subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
for (int i = 0; i < subNames.size(); i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -576,7 +577,7 @@ && getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddres
public abstract int getNumberOfSameAddressConsumers(String clientAddress);

protected int getNumberOfSameAddressConsumers(final String clientAddress,
final List<? extends Subscription> subscriptions) {
final Collection<? extends Subscription> subscriptions) {
int count = 0;
if (clientAddress != null) {
for (Subscription subscription : subscriptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1176,15 +1176,15 @@ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceD
// v2 topics have a global name so check if the topic is replicated.
if (t.isReplicated()) {
// Delete is disallowed on global topic
final List<String> clusters = t.getReplicators().keys();
final var clusters = t.getReplicators().keySet();
log.error("Delete forbidden topic {} is replicated on clusters {}", topic, clusters);
return FutureUtil.failedFuture(
new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
}

// shadow topic should be deleted first.
if (t.isShadowReplicated()) {
final List<String> shadowTopics = t.getShadowReplicators().keys();
final var shadowTopics = t.getShadowReplicators().keySet();
log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics);
return FutureUtil.failedFuture(new IllegalStateException(
"Delete forbidden. Topic " + topic + " is replicated to shadow topics."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -183,7 +182,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> unsubscribe(String subName);

ConcurrentOpenHashMap<String, ? extends Subscription> getSubscriptions();
Map<String, ? extends Subscription> getSubscriptions();

CompletableFuture<Void> delete();

Expand Down Expand Up @@ -265,9 +264,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

Subscription getSubscription(String subscription);

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();
Map<String, ? extends Replicator> getReplicators();

ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();
Map<String, ? extends Replicator> getShadowReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -96,7 +97,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
Expand All @@ -105,9 +105,9 @@
public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener {

// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
private final Map<String, NonPersistentSubscription> subscriptions = new ConcurrentHashMap<>();

private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
private final Map<String, NonPersistentReplicator> replicators = new ConcurrentHashMap<>();

// Ever increasing counter of entries added
private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER =
Expand Down Expand Up @@ -152,17 +152,6 @@ public void reset() {

public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);

this.subscriptions =
ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators =
ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.isFenced = false;
registerTopicPolicyListener();
}
Expand Down Expand Up @@ -446,8 +435,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(
new TopicBusyException("Topic has subscriptions:" + subscriptions.keys()));
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions:"
+ subscriptions.keySet().stream().toList()));
return;
}
} else {
Expand Down Expand Up @@ -714,18 +703,18 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) {
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
public Map<String, NonPersistentSubscription> getSubscriptions() {
return subscriptions;
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
public Map<String, NonPersistentReplicator> getReplicators() {
return replicators;
}

@Override
public ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators() {
return ConcurrentOpenHashMap.emptyMap();
public Map<String, ? extends Replicator> getShadowReplicators() {
return Map.of();
}

@Override
Expand Down Expand Up @@ -1043,7 +1032,6 @@ private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {

private CompletableFuture<Void> disconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
futures.add(replicator.terminate());
});
Expand Down
Loading

0 comments on commit 1ce7855

Please sign in to comment.