Skip to content

Commit

Permalink
[ISSUE #4626] Add CRC32 verification when saving checkpoint file (#4627)
Browse files Browse the repository at this point in the history
* Add CRC32 verification when saving checkpoint file

* Add license to header
  • Loading branch information
lizhimins committed Jul 18, 2022
1 parent cc4c196 commit eb4e684
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 38 deletions.
23 changes: 22 additions & 1 deletion common/src/main/java/org/apache/rocketmq/common/EpochEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.common;

import java.util.Objects;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class EpochEntry extends RemotingSerializable {
Expand Down Expand Up @@ -60,11 +61,31 @@ public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

@Override public String toString() {
@Override
public String toString() {
return "EpochEntry{" +
"epoch=" + epoch +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

EpochEntry entry = (EpochEntry) o;
return epoch == entry.epoch && startOffset == entry.startOffset && endOffset == entry.endOffset;
}

@Override
public int hashCode() {
return Objects.hash(epoch, startOffset, endOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,38 @@
package org.apache.rocketmq.common.utils;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;

/**
* Entry Checkpoint file util
* Format:
* First line: Entries size
* Then: every Entry(String)
* <li>First line: Entries size
* <li>Second line: Entries crc32
* <li>Next: Entry data per line
* <p>
* Example:
* <li>2 (size)
* <li>773307083 (crc32)
* <li>7-7000 (entry data)
* <li>8-8000 (entry data)
*/
public class CheckpointFile<T> {

/**
* Not check crc32 when value is 0
*/
private static final int NOT_CHECK_CRC_MAGIC_CODE = 0;
private final String filePath;
private final CheckpointSerializer<T> serializer;

public interface CheckpointSerializer<T> {
/**
* Serialize entry to line
Expand All @@ -48,75 +62,94 @@ public interface CheckpointSerializer<T> {
T fromLine(final String line);
}

private final String path;
private final CheckpointSerializer<T> serializer;

public CheckpointFile(final String path, final CheckpointSerializer<T> serializer) {
this.path = path;
public CheckpointFile(final String filePath, final CheckpointSerializer<T> serializer) {
this.filePath = filePath;
this.serializer = serializer;
}

public String getBackFilePath() {
return this.filePath + ".bak";
}

/**
* Write entries to file
*/
public void write(final List<T> entries) throws IOException {
public void write(final List<T> entries) throws IOException {
if (entries.isEmpty()) {
return;
}
synchronized (this) {
final FileOutputStream fos = new FileOutputStream(this.path);
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
// Write size
writer.write(entries.size() + "");
writer.newLine();

// Write entries
for (T entry : entries) {
final String line = this.serializer.toLine(entry);
if (line != null && !line.isEmpty()) {
writer.write(line);
writer.newLine();
}
StringBuilder entryContent = new StringBuilder();
for (T entry : entries) {
final String line = this.serializer.toLine(entry);
if (line != null && !line.isEmpty()) {
entryContent.append(line);
entryContent.append(System.lineSeparator());
}

writer.flush();
fos.getFD().sync();
}
int crc32 = UtilAll.crc32(entryContent.toString().getBytes(StandardCharsets.UTF_8));

String content = entries.size() + System.lineSeparator() +
crc32 + System.lineSeparator() + entryContent;
MixAll.string2File(content, this.filePath);
}
}

/**
* Read entries from file
*/
public List<T> read() throws IOException {
private List<T> read(String filePath) throws IOException {
final ArrayList<T> result = new ArrayList<>();
synchronized (this) {
final File file = new File(this.path);
final File file = new File(filePath);
if (!file.exists()) {
return result;
}
final BufferedReader reader = Files.newBufferedReader(file.toPath());
try {
try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {
// Read size
int expectedLines = Integer.parseInt(reader.readLine());

// Read block crc
int expectedCrc32 = Integer.parseInt(reader.readLine());

// Read entries
StringBuilder sb = new StringBuilder();
String line = reader.readLine();
while (line != null) {
sb.append(line).append(System.lineSeparator());
final T entry = this.serializer.fromLine(line);
if (entry != null) {
result.add(entry);
}
line = reader.readLine();
}
int truthCrc32 = UtilAll.crc32(sb.toString().getBytes(StandardCharsets.UTF_8));

if (result.size() != expectedLines) {
final String err = String.format("Expect %d entries, only found %d entries", expectedLines, result.size());
final String err = String.format(
"Expect %d entries, only found %d entries", expectedLines, result.size());
throw new IOException(err);
}

if (NOT_CHECK_CRC_MAGIC_CODE != expectedCrc32 && truthCrc32 != expectedCrc32) {
final String err = String.format(
"Entries crc32 not match, file=%s, truth=%s", expectedCrc32, truthCrc32);
throw new IOException(err);
}
return result;
} finally {
reader.close();
}
}
}

/**
* Read entries from file
*/
public List<T> read() throws IOException {
try {
List<T> result = this.read(this.filePath);
if (CollectionUtils.isEmpty(result)) {
result = this.read(this.getBackFilePath());
}
return result;
} catch (IOException e) {
return this.read(this.getBackFilePath());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.EpochEntry;
import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CheckpointFileTest {

private static final String FILE_PATH =
Paths.get(System.getProperty("user.home"), "store-test", "epoch.ckpt").toString();

private List<EpochEntry> entryList;
private CheckpointFile<EpochEntry> checkpoint;

static class EpochEntrySerializer implements CheckpointFile.CheckpointSerializer<EpochEntry> {

@Override
public String toLine(EpochEntry entry) {
if (entry != null) {
return String.format("%d-%d", entry.getEpoch(), entry.getStartOffset());
} else {
return null;
}
}

@Override
public EpochEntry fromLine(String line) {
final String[] arr = line.split("-");
if (arr.length == 2) {
final int epoch = Integer.parseInt(arr[0]);
final long startOffset = Long.parseLong(arr[1]);
return new EpochEntry(epoch, startOffset);
}
return null;
}
}

@Before
public void init() throws IOException {
this.entryList = new ArrayList<>();
entryList.add(new EpochEntry(7, 7000));
entryList.add(new EpochEntry(8, 8000));
this.checkpoint = new CheckpointFile<>(FILE_PATH, new EpochEntrySerializer());
this.checkpoint.write(entryList);
}

@After
public void destroy() {
UtilAll.deleteFile(new File(FILE_PATH));
UtilAll.deleteFile(new File(FILE_PATH + ".bak"));
}

@Test
public void testNormalWriteAndRead() throws IOException {
List<EpochEntry> listFromFile = checkpoint.read();
Assert.assertEquals(entryList, listFromFile);
checkpoint.write(entryList);
listFromFile = checkpoint.read();
Assert.assertEquals(entryList, listFromFile);
}

@Test
public void testAbNormalWriteAndRead() throws IOException {
this.checkpoint.write(entryList);
UtilAll.deleteFile(new File(FILE_PATH));
List<EpochEntry> listFromFile = checkpoint.read();
Assert.assertEquals(entryList, listFromFile);
checkpoint.write(entryList);
listFromFile = checkpoint.read();
Assert.assertEquals(entryList, listFromFile);
}
}

0 comments on commit eb4e684

Please sign in to comment.