Skip to content

Commit

Permalink
A few improvements to rds source (opensearch-project#4765)
Browse files Browse the repository at this point in the history
* Add error logging to event handlers

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Add tls config and enable tls by default

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Add original event name to metadata

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Update metadata for export and stream events

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Add some fixes

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Remove config alias ssl

Signed-off-by: Hai Yan <oeyh@amazon.com>

---------

Signed-off-by: Hai Yan <oeyh@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
oeyh authored and Krishna Kondaka committed Aug 8, 2024
1 parent 8ac6ca7 commit 111049d
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.rds;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -94,6 +95,11 @@ public void start(Buffer<Record<Event>> buffer) {

if (sourceConfig.isStreamEnabled()) {
BinaryLogClient binaryLogClient = new BinlogClientFactory(sourceConfig, rdsClient).create();
if (sourceConfig.getTlsConfig() == null || !sourceConfig.getTlsConfig().isInsecure()) {
binaryLogClient.setSSLMode(SSLMode.REQUIRED);
} else {
binaryLogClient.setSSLMode(SSLMode.DISABLED);
}
streamScheduler = new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics);
runnableList.add(streamScheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;

import java.util.List;

Expand Down Expand Up @@ -55,9 +56,6 @@ public class RdsSourceConfig {
@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

/**
* S3 bucket for holding both export and stream data
*/
@JsonProperty("s3_bucket")
private String s3Bucket;

Expand All @@ -77,6 +75,9 @@ public class RdsSourceConfig {
@JsonProperty("authentication")
private AuthenticationConfig authenticationConfig;

@JsonProperty("tls")
private TlsConfig tlsConfig;

public String getDbIdentifier() {
return dbIdentifier;
}
Expand Down Expand Up @@ -133,6 +134,10 @@ public boolean isStreamEnabled() {
return streamConfig != null;
}

public TlsConfig getTlsConfig() {
return tlsConfig;
}

public AuthenticationConfig getAuthenticationConfig() {
return this.authenticationConfig;
}
Expand All @@ -151,6 +156,5 @@ public String getUsername() {
public String getPassword() {
return password;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;

public class TlsConfig {

@JsonProperty("insecure")
private boolean insecure = false;

// TODO: add options for server identity verification

public boolean isInsecure() {
return insecure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;

Expand All @@ -21,16 +29,28 @@ public class ExportRecordConverter {

static final String EXPORT_EVENT_TYPE = "EXPORT";

public Event convert(Record<Event> record, String tableName, String primaryKeyName) {
public Event convert(final Record<Event> record,
final String databaseName,
final String tableName,
final List<String> primaryKeys,
final long eventCreateTimeEpochMillis,
final long eventVersionNumber) {
Event event = record.getData();

EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName);
eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName);
eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, OpenSearchBulkActions.INDEX.toString());
eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, EXPORT_EVENT_TYPE);

final Object primaryKeyValue = record.getData().get(primaryKeyName, Object.class);
final String primaryKeyValue = primaryKeys.stream()
.map(key -> event.get(key, String.class))
.collect(Collectors.joining("|"));
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue);

eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreateTimeEpochMillis);
eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber);

return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ public class MetadataKeyAttributes {

static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "event_timestamp";

static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action";
static final String BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action";

static final String CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE = "change_event_type";

static final String EVENT_DATABASE_NAME_METADATA_ATTRIBUTE = "database_name";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.rds.converter;

import com.github.shyiko.mysql.binlog.event.EventType;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand All @@ -19,9 +20,12 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;

Expand All @@ -43,12 +47,15 @@ public StreamRecordConverter(final int partitionCount) {
folderNames = s3PartitionCreator.createPartitions();
}

public Event convert(Map<String, Object> rowData,
String databaseName,
String tableName,
OpenSearchBulkActions bulkAction,
List<String> primaryKeys,
String s3Prefix) {
public Event convert(final Map<String, Object> rowData,
final String databaseName,
final String tableName,
final EventType eventType,
final OpenSearchBulkActions bulkAction,
final List<String> primaryKeys,
final String s3Prefix,
final long eventCreateTimeEpochMillis,
final long eventVersionNumber) {
final Event event = JacksonEvent.builder()
.withEventType("event")
.withData(rowData)
Expand All @@ -58,8 +65,9 @@ public Event convert(Map<String, Object> rowData,

eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName);
eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName);
eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString());
eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString());
eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, STREAM_EVENT_TYPE);
eventMetadata.setAttribute(CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE, eventType.toString());

final String primaryKeyValue = primaryKeys.stream()
.map(rowData::get)
Expand All @@ -68,6 +76,9 @@ public Event convert(Map<String, Object> rowData,
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue);
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3Prefix + S3_PATH_DELIMITER + hashKeyToPartition(primaryKeyValue));

eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreateTimeEpochMillis);
eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber);

return event;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ public class DataFileProgressState {
@JsonProperty("totalRecords")
private int totalRecords;

@JsonProperty("sourceDatabase")
private String sourceDatabase;

@JsonProperty("sourceTable")
private String sourceTable;

@JsonProperty("snapshotTime")
private long snapshotTime;

public int getTotalRecords() {
return totalRecords;
}
Expand All @@ -34,11 +40,27 @@ public void setLoaded(boolean loaded) {
this.isLoaded = loaded;
}

public String getSourceDatabase() {
return sourceDatabase;
}

public void setSourceDatabase(String sourceDatabase) {
this.sourceDatabase = sourceDatabase;
}

public String getSourceTable() {
return sourceTable;
}

public void setSourceTable(String sourceTable) {
this.sourceTable = sourceTable;
}

public long getSnapshotTime() {
return snapshotTime;
}

public void setSnapshotTime(long snapshotTime) {
this.snapshotTime = snapshotTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class ExportProgressState {
@JsonProperty("kmsKeyId")
private String kmsKeyId;

@JsonProperty("exportTime")
private String exportTime;
@JsonProperty("snapshotTime")
private long snapshotTime;

@JsonProperty("status")
private String status;
Expand Down Expand Up @@ -97,12 +97,12 @@ public void setKmsKeyId(String kmsKeyId) {
this.kmsKeyId = kmsKeyId;
}

public String getExportTime() {
return exportTime;
public long getSnapshotTime() {
return snapshotTime;
}

public void setExportTime(String exportTime) {
this.exportTime = exportTime;
public void setSnapshotTime(long snapshotTime) {
this.snapshotTime = snapshotTime;
}

public String getStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.time.Duration;
import java.util.List;

public class DataFileLoader implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);

static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5);

private final DataFilePartition dataFilePartition;
private final String bucket;
private final String objectKey;
Expand Down Expand Up @@ -58,10 +63,21 @@ public void run() {

codec.parse(inputStream, record -> {
try {
final String tableName = dataFilePartition.getProgressState().get().getSourceTable();
DataFileProgressState progressState = dataFilePartition.getProgressState().get();

// TODO: primary key to be obtained by querying database schema
final String primaryKeyName = "id";
Record<Event> transformedRecord = new Record<>(recordConverter.convert(record, tableName, primaryKeyName));

final long snapshotTime = progressState.getSnapshotTime();
final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
Record<Event> transformedRecord = new Record<>(
recordConverter.convert(
record,
progressState.getSourceDatabase(),
progressState.getSourceTable(),
List.of(primaryKeyName),
snapshotTime,
eventVersionNumber));
bufferAccumulator.add(transformedRecord);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Loading

0 comments on commit 111049d

Please sign in to comment.