Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Replace all ConcurrentOpenHashMap with the official ConcurrentHashMap in JDK #23216

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Enumeration;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,7 +42,6 @@
import org.apache.bookkeeper.mledger.util.Errors;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -226,8 +226,7 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
BookKeeper bk = factory.getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
ConcurrentOpenHashMap.<String, Long>newBuilder().build();
final var ledgerRetryMap = new ConcurrentHashMap<String, Long>();

final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final Position lastLedgerPosition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,7 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
getTopicReferenceAsync(topicName)
.thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys())))
.thenAccept(topic -> asyncResponse.resume(topic.getSubscriptions().keySet()))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -2021,7 +2021,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
new ArrayList<>((int) topic.getReplicators().size());
List<String> subNames =
new ArrayList<>((int) topic.getSubscriptions().size());
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
subNames.addAll(topic.getSubscriptions().keySet().stream().filter(
subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
for (int i = 0; i < subNames.size(); i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -478,18 +477,17 @@ public void getListFromBundle(
} else {
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true)
.thenAccept(nsBundle -> {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
pulsar().getBrokerService()
.getMultiLayerTopicsMap().get(namespaceName.toString());
final var bundleTopics = pulsar().getBrokerService().getMultiLayerTopicMap()
.get(namespaceName.toString());
if (bundleTopics == null || bundleTopics.isEmpty()) {
asyncResponse.resume(Collections.emptyList());
return;
}
final List<String> topicList = new ArrayList<>();
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
final var topicMap = bundleTopics.get(bundleKey);
if (topicMap != null) {
topicList.addAll(topicMap.keys().stream()
topicList.addAll(topicMap.keySet().stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
Expand Down Expand Up @@ -291,13 +289,11 @@ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
* Map to fill.
*/
public static void fillNamespaceToBundlesMap(final Set<String> bundles,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
final ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>> target) {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
target.computeIfAbsent(namespaceName, __ -> new ConcurrentHashMap<>());
});
}

Expand Down Expand Up @@ -359,7 +355,7 @@ public static boolean isLoadSheddingEnabled(final PulsarService pulsar) {
public static void removeMostServicingBrokersForNamespace(
final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>>>
brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
Expand All @@ -370,11 +366,8 @@ public static void removeMostServicingBrokersForNamespace(

for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.computeIfAbsent(broker, __ -> new ConcurrentHashMap<>())
.computeIfAbsent(namespaceName, __ -> new ConcurrentHashMap<>())
.size();
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
Expand All @@ -387,12 +380,8 @@ public static void removeMostServicingBrokersForNamespace(

final int finalLeastBundles = leastBundles;
candidates.removeIf(
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size() > finalLeastBundles);
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, __ -> new ConcurrentHashMap<>())
.computeIfAbsent(namespaceName, __ -> new ConcurrentHashMap<>()).size() > finalLeastBundles);
}

/**
Expand Down Expand Up @@ -426,7 +415,7 @@ public static void removeMostServicingBrokersForNamespace(
public static void filterAntiAffinityGroupOwnedBrokers(
final PulsarService pulsar, final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>>>
brokerToNamespaceToBundleRange,
Map<String, String> brokerToDomainMap) {
if (candidates.isEmpty()) {
Expand Down Expand Up @@ -572,7 +561,7 @@ private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
*/
public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
final PulsarService pulsar, final String namespaceName,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>>>
brokerToNamespaceToBundleRange) {

CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
Expand Down Expand Up @@ -698,7 +687,6 @@ public static Optional<String> getNamespaceAntiAffinityGroup(
* by different broker.
*
* @param namespace
* @param bundle
* @param currentBroker
* @param pulsar
* @param brokerToNamespaceToBundleRange
Expand All @@ -707,9 +695,9 @@ public static Optional<String> getNamespaceAntiAffinityGroup(
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(
String namespace, String bundle, String currentBroker,
String namespace, String currentBroker,
final PulsarService pulsar,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>>>
brokerToNamespaceToBundleRange,
Set<String> candidateBrokers) throws Exception {

Expand Down Expand Up @@ -753,7 +741,7 @@ private static boolean shouldAntiAffinityNamespaceUnload(
}

public static boolean shouldAntiAffinityNamespaceUnload(
String namespace, String bundle, String currentBroker,
String namespace, String currentBroker,
final PulsarService pulsar,
Set<Map.Entry<String, ServiceUnitStateData>> bundleOwnershipData,
Set<String> candidateBrokers) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -118,7 +116,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evenly across brokers.
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
private final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>>>
brokerToNamespaceToBundleRange;

// Path to the ZNode containing the LocalBrokerData json for this broker.
Expand Down Expand Up @@ -199,10 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
brokerToNamespaceToBundleRange = new ConcurrentHashMap<>();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
Expand Down Expand Up @@ -582,12 +577,8 @@ private void updateBundleData() {
TimeAverageBrokerData timeAverageData = new TimeAverageBrokerData();
timeAverageData.reset(statsMap.keySet(), bundleData, defaultStats);
brokerData.setTimeAverageData(timeAverageData);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k ->
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
final var namespaceToBundleRange = brokerToNamespaceToBundleRange.computeIfAbsent(broker,
__ -> new ConcurrentHashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
Expand Down Expand Up @@ -736,7 +727,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(), brokerTopicLoadingPredicate);
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker, pulsar,
brokerToNamespaceToBundleRange, brokerCandidateCache);
}

Expand Down Expand Up @@ -873,17 +864,8 @@ private void preallocateBundle(String bundle, String broker) {

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
brokerToNamespaceToBundleRange.computeIfAbsent(broker, __ -> new ConcurrentHashMap<>())
.computeIfAbsent(namespaceName, __ -> new ConcurrentHashMap<>()).put(bundleRange, true);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -109,8 +107,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evenly across brokers.
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Boolean>>>
brokerToNamespaceToBundleRange;

// CPU usage per msg/sec
private double realtimeCpuLoadFactor = 0.025;
Expand Down Expand Up @@ -205,10 +203,7 @@ public SimpleLoadManagerImpl() {
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
brokerToNamespaceToBundleRange = new ConcurrentHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerId) {
Expand Down Expand Up @@ -853,14 +848,10 @@ private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, Resource
ResourceQuota quota = this.getResourceQuota(serviceUnitId);
// Add preallocated bundle range so incoming bundles from the same namespace are not assigned to the
// same broker.
brokerToNamespaceToBundleRange
.computeIfAbsent(selectedRU.getResourceId(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build())
.computeIfAbsent(namespaceName, k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
brokerToNamespaceToBundleRange.computeIfAbsent(selectedRU.getResourceId(),
__ -> new ConcurrentHashMap<>())
.computeIfAbsent(namespaceName, __ -> new ConcurrentHashMap<>())
.put(bundleRange, true);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
Expand Down Expand Up @@ -1272,12 +1263,8 @@ private synchronized void updateBrokerToNamespaceToBundle() {
final String broker = resourceUnit.getResourceId();
final Set<String> loadedBundles = ranking.getLoadedBundles();
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
final var namespaceToBundleRange = brokerToNamespaceToBundleRange.computeIfAbsent(broker,
__ -> new ConcurrentHashMap<>());
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
Expand Down
Loading
Loading