diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 0c44ad043fc..a0b886eb0e9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -204,6 +204,7 @@ public int getUnitSize() { return CQ_STORE_UNIT_SIZE; } + @Deprecated @Override public long getOffsetInQueueByTime(final long timestamp) { MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp, @@ -211,6 +212,7 @@ public long getOffsetInQueueByTime(final long timestamp) { return binarySearchInQueueByTime(mappedFile, timestamp, BoundaryType.LOWER); } + @Override public long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType) { MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp, messageStore.getCommitLog(), boundaryType); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 97d9933a23f..25e4a166f3d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1022,12 +1022,7 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) { ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - long resultOffset = -1; - if (logic instanceof ConsumeQueue) { - resultOffset = ((ConsumeQueue) logic).getOffsetInQueueByTime(timestamp, boundaryType); - } else { - resultOffset = logic.getOffsetInQueueByTime(timestamp); - } + long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType); // Make sure the result offset is in valid range. resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue()); resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue()); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 8fec1bf7b01..387c233bf56 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageAccessor; @@ -708,8 +709,14 @@ public MappedFile searchOffsetFromFiles(long msgOffset) { * @param timestamp * @return */ + @Deprecated @Override public long getOffsetInQueueByTime(final long timestamp) { + return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER); + } + + @Override + public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) { MappedFile targetBcq; BatchOffsetIndex targetMinOffset; @@ -760,7 +767,7 @@ public long getOffsetInQueueByTime(final long timestamp) { if (timestamp >= maxQueueTimestamp) { return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX); } - int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp); + int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType); if (mid != -1) { return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX); } @@ -819,11 +826,11 @@ private MappedFile searchTimeFromFiles(long timestamp) { /** * Find the offset of which the value is equal or larger than the given targetValue. - * If there are many values equal to the target, then find the earliest one. + * If there are many values equal to the target, then return the lowest offset if boundaryType is LOWER while + * return the highest offset if boundaryType is UPPER. */ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, - final int unitShift, - long targetValue) { + final int unitShift, long targetValue, BoundaryType boundaryType) { int mid = -1; while (left <= right) { mid = ceil((left + right) / 2); @@ -844,10 +851,24 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, } } else { //mid is actually in the mid - if (tmpValue < targetValue) { - left = mid + unitSize; - } else { - right = mid; + switch (boundaryType) { + case LOWER: + if (tmpValue < targetValue) { + left = mid + unitSize; + } else { + right = mid; + } + break; + case UPPER: + if (tmpValue <= targetValue) { + left = mid; + } else { + right = mid - unitSize; + } + break; + default: + log.warn("Unknown boundary type"); + return -1; } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java index d7213fa37a1..55d08082925 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.queue; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.DispatchRequest; @@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle { */ long getOffsetInQueueByTime(final long timestamp); + /** + * Get the message whose timestamp is the smallest, greater than or equal to the given time and when there are more + * than one message satisfy the condition, decide which one to return based on boundaryType. + * @param timestamp timestamp + * @param boundaryType Lower or Upper + * @return the offset(index) + */ + long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType); + /** * The max physical offset of commitlog has been dispatched to this queue. * It should be exclusive. diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java index 5b397d696bc..4a5f3a93b1d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.queue; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -148,7 +149,7 @@ public SelectMappedBufferResult getBatchMsgIndexOrNextBuffer(final long msgOffse ByteBuffer byteBuffer = sbr.getByteBuffer(); int left = minOffset.getIndexPos(); int right = maxOffset.getIndexPos(); - int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset); + int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER); if (mid != -1) { return minOffset.getMappedFile().selectMappedBuffer(mid); }