Skip to content

Commit

Permalink
[improve][broker] Reduce the pressure from the transaction buffer rea…
Browse files Browse the repository at this point in the history
…ders and writers in rolling restarts

### Motivation

During the rolling restarts, the namespace bundle ownerships will
change. Assuming there is a producer created on a single topic, and the
ownership was transferred to the new broker. Assuming the namespace
bundle has N topics and the namespace is `tenant/ns`,
1. All N topics in the same bundle of that topic will be loaded.
2. For each topic, the managed ledger will be initialized, when the
   transaction coordinator is enabled, a `TopicTransactionBuffer` will
   be created.
   2.1 A Pulsar producer will be created on
     `tenant/ns/__transaction_buffer_snapshot` concurrently.
   2.2 A Pulsar reader will be created on
     `tenant/ns/__transaction_buffer_snapshot` concurrently.
3. Once all N readers are created, the owner of the snapshot topic will
   start dispatching messages to N readers. Each dispatcher will read
   messages from BookKeeper concurrently and might fail with too many
   requests error because BK can only have
  `maxPendingReadRequestsPerThread` pending read requests (default: 10000).

We have a `numTransactionReplayThreadPoolSize` config to limit the
concurrency of transaction snapshot readers. However, it only limits the
read loop. For example, if it's configured with 1, only 1 reader could
read messages at the same time. However, N readers will be created
concurrently. Each when one of these reader explicitly calls `readNext`,
all N dispatchers at brokers side will dispatch messages to N readers.

The behaviors above brings much CPU pressure on the owner broker,
especially for a small cluster with only two brokers.

### Modifications

- Synchronize the reader creation, read loop and the following process
  on its result.
- Delay the snapshot producer creaton to when a message is written via
  `LazySnapshotWriter`.
  • Loading branch information
BewareMyPower committed Jul 22, 2024
1 parent c50f4af commit 79430fc
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

import static org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import static org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;

@Slf4j
public class LazySnapshotWriter<T> {

private final PersistentTopic topic;
private final SystemTopicTxnBufferSnapshotService<T> snapshotService;
private ReferenceCountedWriter<T> writer;

public LazySnapshotWriter(PersistentTopic topic, SystemTopicTxnBufferSnapshotService<T> snapshotService) {
this.topic = topic;
this.snapshotService = snapshotService;
}

public synchronized CompletableFuture<Writer<T>> getFuture() {
if (writer != null) {
return writer.getFuture();
}
writer = snapshotService.getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
final var future = writer.getFuture();
future.exceptionally(e -> {
synchronized (LazySnapshotWriter.this) {
// Next time `getWriter()` is called, the writer will be recreated
writer = null;
}
log.error("{} Failed to create snapshot writer", topic.getName(), e);
return null;
});
return future;
}

public synchronized void release() {
if (writer != null) {
writer.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,41 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor {
private final PersistentTopic topic;
private final ReferenceCountedWriter<TransactionBufferSnapshot> takeSnapshotWriter;
/**
* Aborts, map for jude message is aborted, linked for remove abort txn in memory when this
* position have been deleted.
*/
private final LinkedMap<TxnID, Position> aborts = new LinkedMap<>();
private final LazySnapshotWriter<TransactionBufferSnapshot> takeSnapshotWriter;

private volatile long lastSnapshotTimestamps;

private volatile boolean isClosed = false;

public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
this.topic = topic;
this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
this.takeSnapshotWriter.getFuture().exceptionally((ex) -> {
log.error("{} Failed to create snapshot writer", topic.getName());
topic.close();
return null;
});
this.takeSnapshotWriter = new LazySnapshotWriter<>(topic, topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService());
}

@Override
Expand Down Expand Up @@ -98,41 +88,35 @@ private long getSystemClientOperationTimeoutMs() throws Exception {

@Override
public CompletableFuture<Position> recoverFromSnapshot() {
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
try {
final var pulsar = topic.getBrokerService().getPulsar();
final var positionFuture = new CompletableFuture<Position>();
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
try {
final var reader = wait(pulsar.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService().createReader(TopicName.get(topic.getName())), "create reader");
try {
Position startReadCursorPosition = null;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNextAsync()
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
handleSnapshot(transactionBufferSnapshot);
startReadCursorPosition = PositionFactory.create(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
while (reader.hasMoreEvents()) {
final var message = wait(reader.readNextAsync(), "read message");
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
handleSnapshot(transactionBufferSnapshot);
startReadCursorPosition = PositionFactory.create(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
return CompletableFuture.completedFuture(startReadCursorPosition);
} catch (TimeoutException ex) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+ "transactionBufferSnapshot timeout!", topic.getName());
log.error(errorMessage, t);
return FutureUtil.failedFuture(
new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
return FutureUtil.failedFuture(ex);
} finally {
closeReader(reader);
}
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this));
positionFuture.complete(startReadCursorPosition);
} finally {
closeReader(reader);
}
} catch (Exception e) {
positionFuture.completeExceptionally(e);
}
});
return positionFuture;
}

@Override
Expand Down Expand Up @@ -208,4 +192,11 @@ private void handleSnapshot(TransactionBufferSnapshot snapshot) {
}
}

private <T> T wait(CompletableFuture<T> future, String msg) throws Exception {
try {
return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new ExecutionException("Failed to " + msg, e.getCause());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
Expand Down Expand Up @@ -538,8 +537,8 @@ public class PersistentWorker {
private final PersistentTopic topic;

//Persistent snapshot segment and index at the single thread.
private final ReferenceCountedWriter<TransactionBufferSnapshotSegment> snapshotSegmentsWriter;
private final ReferenceCountedWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter;
private final LazySnapshotWriter<TransactionBufferSnapshotSegment> snapshotSegmentsWriter;
private final LazySnapshotWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter;

private volatile boolean closed = false;

Expand All @@ -566,24 +565,12 @@ public enum OperationType {

public PersistentWorker(PersistentTopic topic) {
this.topic = topic;
this.snapshotSegmentsWriter = this.topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotSegmentService()
.getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
this.snapshotSegmentsWriter.getFuture().exceptionally(ex -> {
log.error("{} Failed to create snapshot index writer", topic.getName());
topic.close();
return null;
});
this.snapshotIndexWriter = this.topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotIndexService()
.getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
this.snapshotIndexWriter.getFuture().exceptionally((ex) -> {
log.error("{} Failed to create snapshot writer", topic.getName());
topic.close();
return null;
});
final var snapshotServiceFactory = topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotServiceFactory();
this.snapshotSegmentsWriter = new LazySnapshotWriter<>(topic,
snapshotServiceFactory.getTxnBufferSnapshotSegmentService());
this.snapshotIndexWriter = new LazySnapshotWriter<>(topic,
snapshotServiceFactory.getTxnBufferSnapshotIndexService());
}

public CompletableFuture<Void> appendTask(OperationType operationType,
Expand Down Expand Up @@ -882,4 +869,4 @@ private List<TxnIDData> convertTypeToTxnIDData(List<TxnID> abortedTxns) {
return segment;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public void run() {
this, topic.getName());
return;
}
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition -> {
abortedTxnProcessor.recoverFromSnapshot().thenAccept(startReadCursorPosition -> {
//Transaction is not use for this topic, so just make maxReadPosition as LAC.
if (startReadCursorPosition == null) {
callBack.noNeedToRecover();
Expand Down Expand Up @@ -678,8 +678,7 @@ public void run() {

closeCursor(SUBSCRIPTION_NAME);
callBack.recoverComplete();
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this)).exceptionally(e -> {
}).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
log.error("[{}]Transaction buffer failed to recover snapshot!", topic.getName(), e);
return null;
Expand Down

0 comments on commit 79430fc

Please sign in to comment.