diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 4a5ff746f4039..983822f22941b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); private Map bookieInfoMap = new HashMap<>(); - static MetadataStore getMetadataStore(Configuration conf) throws MetadataException { + public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { MetadataStore store; Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE); if (storeProperty != null) { @@ -116,20 +116,12 @@ public synchronized void setConf(Configuration conf) { super.setConf(conf); MetadataStore store; try { - store = getMetadataStore(conf); - } catch (MetadataException e) { - throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); - } - - bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - store.registerListener(this::handleUpdates); - - try { - var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) - .thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) - .get(); - - for (var bookieMapping : racksWithHost.values()) { + store = createMetadataStore(conf); + bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); + store.registerListener(this::handleUpdates); + racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() + .orElseGet(BookiesRackConfiguration::new); + for (Map bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); } @@ -139,12 +131,10 @@ public synchronized void setConf(Configuration conf) { } } updateRacksWithHost(racksWithHost); - } catch (ExecutionException | InterruptedException e) { - LOG.error("Failed to update rack info. ", e); - throw new RuntimeException(e); + watchAvailableBookies(); + } catch (InterruptedException | ExecutionException | MetadataException e) { + throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } - - watchAvailableBookies(); } private void watchAvailableBookies() { @@ -155,13 +145,13 @@ private void watchAvailableBookies() { field.setAccessible(true); RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver); registrationClient.watchWritableBookies(versioned -> { - bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) - .thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) - .thenAccept(this::updateRacksWithHost) - .exceptionally(ex -> { - LOG.error("Failed to update rack info. ", ex); - return null; - }); + try { + racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() + .orElseGet(BookiesRackConfiguration::new); + updateRacksWithHost(racksWithHost); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to update rack info. ", e); + } }); } catch (NoSuchFieldException | IllegalAccessException e) { LOG.error("Failed watch available bookies.", e); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 62b7ffa1e29da..8839e6e2d26c8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -73,7 +73,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { MetadataStore store; try { - store = BookieRackAffinityMapping.getMetadataStore(conf); + store = BookieRackAffinityMapping.createMetadataStore(conf); } catch (MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized"); }