Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#ISSUE 7035] Fix correct min offset behavior in tiered storage #7038

Merged
merged 5 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void doRedispatchRequestToWriteMap(AppendResult result, CompositeQueueFla
case SUCCESS:
long offset = MessageBufferUtil.getQueueOffset(message);
if (queueOffset != offset) {
logger.error("Message cq offset in commitlog does not meet expectations, " +
logger.warn("Message cq offset in commitlog does not meet expectations, " +
"result={}, topic={}, queueId={}, cq offset={}, msg offset={}",
AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ public CompletableFuture<GetMessageResult> getMessageAsync(
return CompletableFuture.completedFuture(result);
}

// request range | result
// (0, min) | too small
// [min, max) | correct
// [max, max] | overflow one
// (max, +oo) | overflow badly
if (queueOffset < minQueueOffset) {
result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,19 @@ public long getCommitLogBeginTimestamp() {
return commitLog.getBeginTimestamp();
}

public long getConsumeQueueBaseOffset() {
return consumeQueue.getBaseOffset();
}

@Override
public long getCommitLogDispatchCommitOffset() {
return commitLog.getDispatchCommitOffset();
}

public long getConsumeQueueBaseOffset() {
return consumeQueue.getBaseOffset();
}

public long getConsumeQueueMinOffset() {
return consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
long cqOffset = consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
long effectiveOffset = this.commitLog.getMinConsumeQueueOffset();
return Math.max(cqOffset, effectiveOffset);
}

public long getConsumeQueueCommitOffset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.AppendResult;
Expand All @@ -31,6 +32,7 @@
public class TieredCommitLog {

private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
private static final Long NOT_EXIST_MIN_OFFSET = -1L;

/**
* item size: int, 4 bytes
Expand All @@ -42,10 +44,13 @@ public class TieredCommitLog {

private final TieredMessageStoreConfig storeConfig;
private final TieredFlatFile flatFile;
private final AtomicLong minConsumeQueueOffset;

public TieredCommitLog(TieredFileAllocator fileQueueFactory, String filePath) {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
this.correctMinOffset();
}

@VisibleForTesting
Expand All @@ -61,6 +66,10 @@ public long getCommitOffset() {
return flatFile.getCommitOffset();
}

public long getMinConsumeQueueOffset() {
return minConsumeQueueOffset.get() != NOT_EXIST_MIN_OFFSET ? minConsumeQueueOffset.get() : correctMinOffset();
}

public long getDispatchCommitOffset() {
return flatFile.getDispatchCommitOffset();
}
Expand All @@ -82,6 +91,39 @@ public long getEndTimestamp() {
return flatFile.getFileToWrite().getMaxTimestamp();
}

public synchronized long correctMinOffset() {
if (flatFile.getFileSegmentCount() == 0) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
return NOT_EXIST_MIN_OFFSET;
}

// queue offset field length is 8
int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
return NOT_EXIST_MIN_OFFSET;
}

try {
return this.flatFile.readAsync(this.flatFile.getMinOffset(), length)
.thenApply(buffer -> {
long offset = MessageBufferUtil.getQueueOffset(buffer);
minConsumeQueueOffset.set(offset);
log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}",
flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset());
return offset;
})
.exceptionally(throwable -> {
log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}",
flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable);
return minConsumeQueueOffset.get();
}).get();
} catch (Exception e) {
log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e);
}
return minConsumeQueueOffset.get();
}

public AppendResult append(ByteBuffer byteBuf) {
return flatFile.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf));
}
Expand All @@ -99,7 +141,9 @@ public void commit(boolean sync) {
}

public void cleanExpiredFile(long expireTimestamp) {
flatFile.cleanExpiredFile(expireTimestamp);
if (flatFile.cleanExpiredFile(expireTimestamp) > 0) {
correctMinOffset();
}
}

public void destroyExpiredFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public long getDispatchCommitOffset() {
}
}

public String getFilePath() {
return filePath;
}

public FileSegmentType getFileType() {
return fileType;
}

@VisibleForTesting
public List<TieredFileSegment> getFileSegmentList() {
return fileSegmentList;
Expand Down Expand Up @@ -333,10 +341,9 @@ protected TieredFileSegment getFileToWrite() {
TieredFileSegment fileSegment = this.newSegment(fileType, offset, true);
fileSegmentList.add(fileSegment);
needCommitFileSegmentList.add(fileSegment);

Collections.sort(fileSegmentList);

logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType);
logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}",
offset, fileSegment.getPath(), fileType);
return fileSegment;
} finally {
fileSegmentLock.writeLock().unlock();
Expand Down Expand Up @@ -429,7 +436,7 @@ public AppendResult append(ByteBuffer byteBuf, long timeStamp, boolean commit) {
return result;
}

public void cleanExpiredFile(long expireTimestamp) {
public int cleanExpiredFile(long expireTimestamp) {
Set<Long> needToDeleteSet = new HashSet<>();
try {
tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> {
Expand All @@ -438,32 +445,32 @@ public void cleanExpiredFile(long expireTimestamp) {
}
});
} catch (Exception e) {
logger.error("clean expired failed: filePath: {}, file type: {}, expire timestamp: {}",
logger.error("Clean expired file, filePath: {}, file type: {}, expire timestamp: {}",
filePath, fileType, expireTimestamp);
}

if (needToDeleteSet.isEmpty()) {
return;
return 0;
}

fileSegmentLock.writeLock().lock();
try {
for (int i = 0; i < fileSegmentList.size(); i++) {
TieredFileSegment fileSegment = fileSegmentList.get(i);
try {
TieredFileSegment fileSegment = fileSegmentList.get(i);
if (needToDeleteSet.contains(fileSegment.getBaseOffset())) {
fileSegment.close();
fileSegmentList.remove(fileSegment);
needCommitFileSegmentList.remove(fileSegment);
i--;
this.updateFileSegment(fileSegment);
logger.info("expired file {} is been cleaned", fileSegment.getPath());
logger.debug("Clean expired file, filePath: {}", fileSegment.getPath());
} else {
break;
}
} catch (Exception e) {
logger.error("clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}",
filePath, fileType, expireTimestamp, e);
logger.error("Clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}",
fileSegment.getPath(), fileSegment.getFileType(), expireTimestamp, e);
}
}
if (fileSegmentList.size() > 0) {
Expand All @@ -476,6 +483,7 @@ public void cleanExpiredFile(long expireTimestamp) {
} finally {
fileSegmentLock.writeLock().unlock();
}
return needToDeleteSet.size();
}

@VisibleForTesting
Expand All @@ -493,7 +501,6 @@ public void destroyExpiredFile() {
fileSegment.destroyFile();
if (!fileSegment.exists()) {
tieredMetadataStore.deleteFileSegment(filePath, fileType, metadata.getBaseOffset());
logger.info("Destroyed expired file, file path: {}", fileSegment.getPath());
}
} catch (Exception e) {
logger.error("Destroyed expired file failed, file path: {}, file type: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private static void cleanStaticReference() {
public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) {
return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
try {
logger.info("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
"try to create new flat file: topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId());
return new CompositeQueueFlatFile(tieredFileAllocator, mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class TieredStoreMetricsConstant {
public static final String LABEL_OPERATION = "operation";
public static final String LABEL_SUCCESS = "success";

public static final String LABEL_PATH = "path";
public static final String LABEL_TOPIC = "topic";
public static final String LABEL_GROUP = "group";
public static final String LABEL_QUEUE_ID = "queue_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_PATH;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;

/**
* this class is experimental and may change without notice.
Expand All @@ -55,6 +55,7 @@ public class PosixFileSegment extends TieredFileSegment {
private static final String OPERATION_POSIX_READ = "read";
private static final String OPERATION_POSIX_WRITE = "write";

private final String fullPath;
private volatile File file;
private volatile FileChannel readFileChannel;
private volatile FileChannel writeFileChannel;
Expand All @@ -71,7 +72,7 @@ public PosixFileSegment(TieredMessageStoreConfig storeConfig,
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
String brokerClusterName = storeConfig.getBrokerClusterName();
String clusterBasePath = TieredStoreUtil.getHash(brokerClusterName) + UNDERLINE + brokerClusterName;
String fullPath = Paths.get(basePath, clusterBasePath, filePath,
this.fullPath = Paths.get(basePath, clusterBasePath, filePath,
fileType.toString(), TieredStoreUtil.offset2FileName(baseOffset)).toString();
logger.info("Constructing Posix FileSegment, filePath: {}", fullPath);

Expand All @@ -80,13 +81,13 @@ public PosixFileSegment(TieredMessageStoreConfig storeConfig,

protected AttributesBuilder newAttributesBuilder() {
return TieredStoreMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, filePath)
.put(LABEL_PATH, filePath)
.put(LABEL_FILE_TYPE, fileType.name().toLowerCase());
}

@Override
public String getPath() {
return filePath;
return fullPath;
}

@Override
Expand All @@ -107,7 +108,7 @@ public void createFile() {
if (file == null) {
synchronized (this) {
if (file == null) {
File file = new File(filePath);
File file = new File(fullPath);
try {
File dir = file.getParentFile();
if (!dir.exists()) {
Expand Down Expand Up @@ -136,8 +137,9 @@ public void destroyFile() {
if (writeFileChannel != null && writeFileChannel.isOpen()) {
writeFileChannel.close();
}
logger.info("Destroy Posix FileSegment, filePath: {}", fullPath);
} catch (IOException e) {
logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filePath, e);
logger.error("Destroy Posix FileSegment failed, filePath: {}", fullPath, e);
}

if (file.exists()) {
Expand Down Expand Up @@ -181,8 +183,9 @@ public CompletableFuture<ByteBuffer> read0(long position, int length) {
}

@Override
public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
boolean append) {
public CompletableFuture<Boolean> commit0(
TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {

Stopwatch stopwatch = Stopwatch.createStarted();
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
Expand Down
Loading
Loading