Skip to content

Commit

Permalink
add interface getOffsetInQueueByTime(long timestamp, BoundaryType bou…
Browse files Browse the repository at this point in the history
…ndaryTYpe) in ConsumeQueueInterface
  • Loading branch information
凯铎 committed Jul 30, 2023
1 parent e5f3fc9 commit bec6314
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,15 @@ public int getUnitSize() {
return CQ_STORE_UNIT_SIZE;
}

@Deprecated
@Override
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
messageStore.getCommitLog(), BoundaryType.LOWER);
return binarySearchInQueueByTime(mappedFile, timestamp, BoundaryType.LOWER);
}

@Override
public long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType) {
MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
messageStore.getCommitLog(), boundaryType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,12 +1022,7 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long resultOffset = -1;
if (logic instanceof ConsumeQueue) {
resultOffset = ((ConsumeQueue) logic).getOffsetInQueueByTime(timestamp, boundaryType);
} else {
resultOffset = logic.getOffsetInQueueByTime(timestamp);
}
long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType);
// Make sure the result offset is in valid range.
resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -708,8 +709,14 @@ public MappedFile searchOffsetFromFiles(long msgOffset) {
* @param timestamp
* @return
*/
@Deprecated
@Override
public long getOffsetInQueueByTime(final long timestamp) {
return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER);
}

@Override
public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) {
MappedFile targetBcq;
BatchOffsetIndex targetMinOffset;

Expand Down Expand Up @@ -760,7 +767,7 @@ public long getOffsetInQueueByTime(final long timestamp) {
if (timestamp >= maxQueueTimestamp) {
return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX);
}
int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp);
int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType);
if (mid != -1) {
return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX);
}
Expand Down Expand Up @@ -819,11 +826,11 @@ private MappedFile searchTimeFromFiles(long timestamp) {

/**
* Find the offset of which the value is equal or larger than the given targetValue.
* If there are many values equal to the target, then find the earliest one.
* If there are many values equal to the target, then return the lowest offset if boundaryType is LOWER while
* return the highest offset if boundaryType is UPPER.
*/
public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize,
final int unitShift,
long targetValue) {
final int unitShift, long targetValue, BoundaryType boundaryType) {
int mid = -1;
while (left <= right) {
mid = ceil((left + right) / 2);
Expand All @@ -844,10 +851,24 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right,
}
} else {
//mid is actually in the mid
if (tmpValue < targetValue) {
left = mid + unitSize;
} else {
right = mid;
switch (boundaryType) {
case LOWER:
if (tmpValue < targetValue) {
left = mid + unitSize;
} else {
right = mid;
}
break;
case UPPER:
if (tmpValue <= targetValue) {
left = mid;
} else {
right = mid - unitSize;
}
break;
default:
log.warn("Unknown boundary type");
return -1;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.store.queue;

import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.DispatchRequest;
Expand Down Expand Up @@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle {
*/
long getOffsetInQueueByTime(final long timestamp);

/**
* Get the message whose timestamp is the smallest, greater than or equal to the given time and when there are more
* than one message satisfy the condition, decide which one to return based on boundaryType.
* @param timestamp timestamp
* @param boundaryType Lower or Upper
* @return the offset(index)
*/
long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType);

/**
* The max physical offset of commitlog has been dispatched to this queue.
* It should be exclusive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.queue;

import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
Expand Down Expand Up @@ -148,7 +149,7 @@ public SelectMappedBufferResult getBatchMsgIndexOrNextBuffer(final long msgOffse
ByteBuffer byteBuffer = sbr.getByteBuffer();
int left = minOffset.getIndexPos();
int right = maxOffset.getIndexPos();
int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset);
int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER);
if (mid != -1) {
return minOffset.getMappedFile().selectMappedBuffer(mid);
}
Expand Down

0 comments on commit bec6314

Please sign in to comment.