Skip to content

Commit

Permalink
Add CMK encryption support to DynamoDB export
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <daixb@amazon.com>
  • Loading branch information
daixba committed Nov 6, 2023
1 parent 60f84ad commit 97a90e3
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 41 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/dynamodb-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ source:

* s3_bucket (Required): The destination bucket to store the exported data files
* s3_prefix (Optional): Custom prefix.
* s3_sse_kms_key_id (Optional): A AWS KMS Customer Managed Key (CMK) to encrypt the export data files. The key id will
be the ARN of the Key, e.g. arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad4-80ca-63b12b3ec147

### Stream Configurations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ public void init() {
Instant startTime = Instant.now();

if (tableInfo.getMetadata().isExportRequired()) {
createExportPartition(tableInfo.getTableArn(), startTime, tableInfo.getMetadata().getExportBucket(), tableInfo.getMetadata().getExportPrefix());
createExportPartition(
tableInfo.getTableArn(),
startTime,
tableInfo.getMetadata().getExportBucket(),
tableInfo.getMetadata().getExportPrefix(),
tableInfo.getMetadata().getExportKmsKeyId());
}

if (tableInfo.getMetadata().isStreamRequired()) {
Expand Down Expand Up @@ -209,11 +214,12 @@ public void init() {
* @param bucket Export bucket
* @param prefix Export Prefix
*/
private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix) {
private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix, String kmsKeyId) {
ExportProgressState exportProgressState = new ExportProgressState();
exportProgressState.setBucket(bucket);
exportProgressState.setPrefix(prefix);
exportProgressState.setExportTime(exportTime.toString()); // information purpose
exportProgressState.setKmsKeyId(kmsKeyId);
ExportPartition exportPartition = new ExportPartition(tableArn, exportTime, Optional.of(exportProgressState));
coordinator.createPartition(exportPartition);
}
Expand Down Expand Up @@ -310,6 +316,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
.streamStartPosition(streamStartPosition)
.exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket())
.exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix())
.exportKmsKeyId(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3SseKmsKeyId())
.build();
return new TableInfo(tableConfig.getTableArn(), metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;

public class ExportConfig {

@JsonProperty("s3_bucket")
@NotBlank(message = "Bucket Name is required for export")
private String s3Bucket;

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("s3_sse_kms_key_id")
private String s3SseKmsKeyId;

public String getS3Bucket() {
return s3Bucket;
}
Expand All @@ -33,4 +37,15 @@ public Region getAwsRegion() {
return s3Region != null ? Region.of(s3Region) : null;
}

public String getS3SseKmsKeyId() {
return s3SseKmsKeyId;
}

@AssertTrue(message = "KMS Key ID must be a valid one.")
boolean isKmsKeyIdValid() {
// If key id is provided, it should be in a format like
// arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad3-80ca-63b12b3ec147
return s3SseKmsKeyId == null || Arn.fromString(s3SseKmsKeyId).resourceAsString() != null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ public class ExportProgressState {
@JsonProperty("prefix")
private String prefix;

@JsonProperty("kmsKeyId")
private String kmsKeyId;

@JsonProperty("exportTime")
private String exportTime;


public String getExportArn() {
return exportArn;
}
Expand Down Expand Up @@ -64,4 +67,12 @@ public String getStatus() {
public void setStatus(String status) {
this.status = status;
}

public String getKmsKeyId() {
return kmsKeyId;
}

public void setKmsKeyId(String kmsKeyId) {
this.kmsKeyId = kmsKeyId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -169,14 +169,12 @@ public void run() {
int lineCount = 0;
int lastLineProcessed = 0;

try {
InputStream inputStream = objectReader.readFile(bucketName, key);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));
try (InputStream inputStream = objectReader.readFile(bucketName, key);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream))) {

String line;
while ((line = reader.readLine()) != null) {

if (shouldStop) {
checkpointer.checkpoint(lastLineProcessed);
LOG.debug("Should Stop flag is set to True, looks like shutdown has triggered");
Expand Down Expand Up @@ -213,9 +211,6 @@ public void run() {
}

lines.clear();
reader.close();
gzipInputStream.close();
inputStream.close();

LOG.info("Completed loading s3://{}/{} to buffer", bucketName, key);

Expand All @@ -226,7 +221,7 @@ public void run() {
} catch (Exception e) {
checkpointer.checkpoint(lineCount);

String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage());
String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %s", bucketName, key, e.getMessage());
throw new RuntimeException(errorMessage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ private BiConsumer<String, Throwable> completeExport(ExportPartition exportParti
ExportProgressState state = exportPartition.getProgressState().get();
String bucketName = state.getBucket();
String exportArn = state.getExportArn();



String manifestKey = exportTaskManager.getExportManifest(exportArn);
LOG.debug("Export manifest summary file is " + manifestKey);

Expand Down Expand Up @@ -189,6 +188,7 @@ private void createDataFilePartitions(String exportArn, String bucketName, Map<S
DataFileProgressState progressState = new DataFileProgressState();
progressState.setTotal(size);
progressState.setLoaded(0);

totalFiles.addAndGet(1);
totalRecords.addAndGet(size);
DataFilePartition partition = new DataFilePartition(exportArn, bucketName, key, Optional.of(progressState));
Expand Down Expand Up @@ -260,7 +260,7 @@ private String getOrCreateExportArn(ExportPartition exportPartition) {

LOG.info("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
// submit a new export request
String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), exportPartition.getExportTime());
String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), state.getKmsKeyId(), exportPartition.getExportTime());

// Update state with export Arn in the coordination table.
// So that it won't be submitted again after a restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import software.amazon.awssdk.services.dynamodb.model.ExportFormat;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeRequest;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeResponse;
import software.amazon.awssdk.services.dynamodb.model.S3SseAlgorithm;

import java.time.Instant;

Expand All @@ -30,12 +31,15 @@ public ExportTaskManager(DynamoDbClient dynamoDBClient) {
this.dynamoDBClient = dynamoDBClient;
}

public String submitExportJob(String tableArn, String bucketName, String prefix, Instant exportTime) {
public String submitExportJob(String tableArn, String bucket, String prefix, String kmsKeyId, Instant exportTime) {
S3SseAlgorithm algorithm = kmsKeyId == null || kmsKeyId.isEmpty() ? S3SseAlgorithm.AES256 : S3SseAlgorithm.KMS;
// No needs to use a client token here.
ExportTableToPointInTimeRequest req = ExportTableToPointInTimeRequest.builder()
.tableArn(tableArn)
.s3Bucket(bucketName)
.s3Bucket(bucket)
.s3Prefix(prefix)
.s3SseAlgorithm(algorithm)
.s3SseKmsKeyId(kmsKeyId)
.exportFormat(DEFAULT_EXPORT_FORMAT)
.exportTime(exportTime)
.build();
Expand All @@ -46,7 +50,6 @@ public String submitExportJob(String tableArn, String bucketName, String prefix,

String exportArn = response.exportDescription().exportArn();
String status = response.exportDescription().exportStatusAsString();

LOG.debug("Export Job submitted with ARN {} and status {}", exportArn, status);
return exportArn;
} catch (SdkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
Expand All @@ -39,22 +37,18 @@ public ManifestFileReader(S3ObjectReader objectReader) {

public ExportSummary parseSummaryFile(String bucket, String key) {
LOG.debug("Try to read the manifest summary file");
InputStream object = objectReader.readFile(bucket, key);

BufferedReader reader = new BufferedReader(new InputStreamReader(object));
try {
try (InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object))) {
// Only one line
String line = reader.readLine();
LOG.debug("Manifest summary: {}", line);
ExportSummary summaryInfo = MAPPER.readValue(line, ExportSummary.class);
return summaryInfo;

} catch (JsonProcessingException e) {
} catch (Exception e) {
LOG.error("Failed to parse the summary info due to {}", e.getMessage());
throw new RuntimeException(e);

} catch (IOException e) {
LOG.error("IO Exception due to {}", e.getMessage());
throw new RuntimeException(e);
}

}
Expand All @@ -63,11 +57,10 @@ public Map<String, Integer> parseDataFile(String bucket, String key) {
LOG.info("Try to read the manifest data file");

Map<String, Integer> result = new HashMap<>();
InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object));

String line;
try {

try (InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object))) {
String line;
while ((line = reader.readLine()) != null) {
// An example line as below:
// {"itemCount":46331,"md5Checksum":"a0k21IY3eelgr2PuWJLjJw==","etag":"51f9f394903c5d682321c6211aae8b6a-1","dataFileS3Key":"test-table-export/AWSDynamoDB/01692350182719-6de2c037/data/fpgzwz7ome3s7a5gqn2mu3ogtq.json.gz"}
Expand All @@ -76,8 +69,9 @@ public Map<String, Integer> parseDataFile(String bucket, String key) {
result.put(map.get(DATA_FILE_S3_KEY), Integer.valueOf(map.get(DATA_FILE_ITEM_COUNT_KEY)));

}
} catch (IOException e) {
LOG.error("IO Exception due to {}", e.getMessage());
} catch (Exception e) {
LOG.error("Exception due to {}", e.getMessage());
throw new RuntimeException(e);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public class TableMetadata {
private static final String REQUIRE_EXPORT_KEY = "export";
private static final String REQUIRE_STREAM_KEY = "stream";


private final String partitionKeyAttributeName;

private final String sortKeyAttributeName;
Expand All @@ -36,6 +35,8 @@ public class TableMetadata {

private final String exportPrefix;

private final String exportKmsKeyId;

private TableMetadata(Builder builder) {
this.partitionKeyAttributeName = builder.partitionKeyAttributeName;
this.sortKeyAttributeName = builder.sortKeyAttributeName;
Expand All @@ -45,6 +46,7 @@ private TableMetadata(Builder builder) {
this.exportBucket = builder.exportBucket;
this.exportPrefix = builder.exportPrefix;
this.streamStartPosition = builder.streamStartPosition;
this.exportKmsKeyId = builder.exportKmsKeyId;

}

Expand All @@ -70,6 +72,8 @@ public static class Builder {

private String exportPrefix;

private String exportKmsKeyId;

private StreamStartPosition streamStartPosition;


Expand Down Expand Up @@ -108,6 +112,11 @@ public Builder exportPrefix(String exportPrefix) {
return this;
}

public Builder exportKmsKeyId(String exportKmsKeyId) {
this.exportKmsKeyId = exportKmsKeyId;
return this;
}

public Builder streamStartPosition(StreamStartPosition streamStartPosition) {
this.streamStartPosition = streamStartPosition;
return this;
Expand Down Expand Up @@ -173,4 +182,8 @@ public String getExportBucket() {
public String getExportPrefix() {
return exportPrefix;
}

public String getExportKmsKeyId() {
return exportKmsKeyId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DataFileLoaderTest {

private final Random random = new Random();

private final int total = random.nextInt(10);
private final int total = random.nextInt(10) + 1;

@BeforeEach
void setup() {
Expand Down Expand Up @@ -153,8 +153,8 @@ void test_run_loadFile_correctly() {
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<ExportRecordConverter> recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> {
exportRecordConverter = mock;
})) {
exportRecordConverter = mock;
})) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(bucketName)
Expand Down

0 comments on commit 97a90e3

Please sign in to comment.