-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster topic removal in MultiDC setup #1884
Changes from all commits
103948a
c1f4c73
c03cade
8a14299
d32c40a
0bd5a76
f88ea54
2d20bfb
04bf62a
0f7e412
bf7d5f6
bed84bc
a75a7bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.apache.commons.lang3.ArrayUtils; | ||
import org.apache.curator.framework.CuratorFramework; | ||
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; | ||
import org.apache.curator.utils.ZKPaths; | ||
import org.apache.zookeeper.data.Stat; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -18,6 +20,7 @@ | |
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.function.BiConsumer; | ||
import java.util.stream.Collectors; | ||
|
||
public abstract class ZookeeperBasedRepository { | ||
|
||
|
@@ -75,6 +78,13 @@ protected List<String> childrenOf(String path) { | |
} | ||
} | ||
|
||
protected List<String> childrenPathsOf(String path) { | ||
List<String> childNodes = childrenOf(path); | ||
return childNodes.stream() | ||
.map(child -> ZKPaths.makePath(path, child)) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
protected byte[] readFrom(String path) { | ||
return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get(); | ||
|
@@ -156,6 +166,20 @@ protected void createInTransaction(String path, Object value, String childPath) | |
.commit(); | ||
} | ||
|
||
protected void deleteInTransaction(List<String> paths) throws Exception { | ||
if (paths.isEmpty()) { | ||
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK"); | ||
} | ||
ensureConnected(); | ||
CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and(); | ||
|
||
for (int i = 1; i < paths.size(); i++) { | ||
transaction = transaction.delete().forPath(paths.get(i)).and(); | ||
} | ||
|
||
transaction.commit(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: Here is my proposition for refactor: protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
zookeeper.transaction().forOperations(
paths.stream().map(x -> {
try {
return zookeeper.transactionOp().delete().forPath(x);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList())
);
} Feel free to tweak with it :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will be resolved here: #1886 |
||
|
||
protected void create(String path, Object value) throws Exception { | ||
ensureConnected(); | ||
zookeeper.create().forPath(path, mapper.writeValueAsBytes(value)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException; | ||
import pl.allegro.tech.hermes.domain.topic.TopicRepository; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
@@ -77,12 +78,67 @@ public void createTopic(Topic topic) { | |
} | ||
} | ||
|
||
/** | ||
* To remove topic node, we must remove topic node and its children. The tree looks like this: | ||
* <ul> | ||
* <li>- topic | ||
* <li>----- /subscriptions (required) | ||
* <li>----- /preview (optional) | ||
* <li>----- /metrics (optional) | ||
* <li>--------------- /volume | ||
* <li>--------------- /published | ||
* </ul> | ||
* | ||
* <p>One way to remove the whole tree for topic that would be to use <code>deletingChildrenIfNeeded()</code>: | ||
* e.g. <code>zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath)</code>. | ||
* However, <code>deletingChildrenIfNeeded</code> is not atomic. It first tries to remove the node <code>topic</code> | ||
* and upon receiving <code>KeeperException.NotEmptyException</code> it tries to remove children recursively | ||
* and then retries the node removal. This means that there is a potentially large time gap between | ||
* removal of <code>topic/subscriptions</code> node and <code>topic</code> node, especially when topic removal is being done | ||
* in remote DC. | ||
* | ||
* <p>It turns out that <code>PathChildrenCache</code> used by <code>HierarchicalCacheLevel</code> in | ||
* Consumers and Frontend listens for <code>topics/subscriptions</code> changes and recreates that node when deleted. | ||
* If the recreation happens between the <code>topic/subscriptions</code> and <code>topic</code> node removal | ||
* than the whole removal process must be repeated resulting in a lengthy loop that may even result in <code>StackOverflowException</code>. | ||
* Example of that scenario would be | ||
* <ol> | ||
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException | ||
* <li> DELETE <code>topic/subscriptions</code> - issued by management, succeeds | ||
* <li> CREATE <code>topic/subscriptions</code> - issued by frontend, succeeds | ||
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException | ||
* <li> [...] | ||
* </ol> | ||
* | ||
* <p>To solve this we must remove <code>topic</code> and <code>topic/subscriptions</code> atomically. However, we must also remove | ||
* other <code>topic</code> children. Transaction API does not allow for optional deletes so we: | ||
* <ol> | ||
* <li> find all children paths | ||
* <li> delete all children in one transaction | ||
* </ol> | ||
*/ | ||
@Override | ||
public void removeTopic(TopicName topicName) { | ||
ensureTopicExists(topicName); | ||
logger.info("Removing topic: " + topicName); | ||
|
||
List<String> pathsForRemoval = new ArrayList<>(); | ||
String topicMetricsPath = paths.topicMetricsPath(topicName); | ||
if (pathExists(topicMetricsPath)) { | ||
pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath)); | ||
pathsForRemoval.add(topicMetricsPath); | ||
} | ||
|
||
String topicPreviewPath = paths.topicPreviewPath(topicName); | ||
if (pathExists(topicPreviewPath)) { | ||
pathsForRemoval.add(topicPreviewPath); | ||
} | ||
|
||
pathsForRemoval.add(paths.subscriptionsPath(topicName)); | ||
pathsForRemoval.add(paths.topicPath(topicName)); | ||
|
||
try { | ||
remove(paths.topicPath(topicName)); | ||
deleteInTransaction(pathsForRemoval); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we also use this method in other places ? because now we have two different methods for removing paths🤔 for paths without children it will be no-brainer (just use a simple remove), but for those with children we now have a choice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Transactional removal should only be used when needed - it has a higher overhead + it is required to supply all the paths. Most of the time we remove nodes with prefix e.g. delete all nodes for topic ((like |
||
} catch (Exception e) { | ||
throw new InternalProcessingException(e); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: ZooKeeper API mentions that
inTransaction
method is deprecated.Can we use
transaction()
method here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be resolved here: #1886