Skip to content

Commit

Permalink
Revert "[fix] Remove blocking calls from BookieRackAffinityMapping (a…
Browse files Browse the repository at this point in the history
…pache#22846)"

This reverts commit aece67e.
  • Loading branch information
fanjianye committed Sep 11, 2024
1 parent d4839fb commit 763c61d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();

static MetadataStore getMetadataStore(Configuration conf) throws MetadataException {
public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty != null) {
Expand Down Expand Up @@ -116,20 +116,12 @@ public synchronized void setConf(Configuration conf) {
super.setConf(conf);
MetadataStore store;
try {
store = getMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}

bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);

try {
var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
.get();

for (var bookieMapping : racksWithHost.values()) {
store = createMetadataStore(conf);
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
}
Expand All @@ -139,12 +131,10 @@ public synchronized void setConf(Configuration conf) {
}
}
updateRacksWithHost(racksWithHost);
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to update rack info. ", e);
throw new RuntimeException(e);
watchAvailableBookies();
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}

watchAvailableBookies();
}

private void watchAvailableBookies() {
Expand All @@ -155,13 +145,13 @@ private void watchAvailableBookies() {
field.setAccessible(true);
RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver);
registrationClient.watchWritableBookies(versioned -> {
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
.thenAccept(this::updateRacksWithHost)
.exceptionally(ex -> {
LOG.error("Failed to update rack info. ", ex);
return null;
});
try {
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to update rack info. ", e);
}
});
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Failed watch available bookies.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
MetadataStore store;
try {
store = BookieRackAffinityMapping.getMetadataStore(conf);
store = BookieRackAffinityMapping.createMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
}
Expand Down

0 comments on commit 763c61d

Please sign in to comment.