From d7a0999ccf7fab26b2ce4d3f4a7e52f3065430f8 Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Thu, 15 Aug 2024 09:16:58 -0700 Subject: [PATCH] Move rules and templates to plugin level Signed-off-by: Srikanth Govindarajan --- .../PipelineTransformationConfiguration.java | 27 +-- .../pipeline/parser/rule/RuleEvaluator.java | 7 +- .../pipeline/parser/rule/RuleInputStream.java | 33 --- .../pipeline/parser/rule/RuleStream.java | 26 +++ .../pipeline/parser/rule/TemplateStream.java | 20 ++ .../transformer/TransformersFactory.java | 195 +++++++----------- .../templates/documentdb-template.yaml | 81 -------- .../parser/rule/RuleEvaluatorTest.java | 17 +- .../DynamicConfigTransformerTest.java | 58 +++--- .../transformer/TransformersFactoryTest.java | 116 ++++++----- .../transforms/rules/test-plugin-rule.yaml | 3 + .../templates/test-plugin-template.yaml | 4 + .../transformation/rules/documentdb-rule.yaml | 2 +- .../transforms}/rules/documentdb-rule.yaml | 0 .../templates/documentdb-template.yaml | 81 ++++++++ .../dataprepper/transforms/rules/rule.yaml | 0 .../dataprepper/transforms/rules/rule.yaml | 0 .../templates/existingPlugin-template.yaml | 0 18 files changed, 304 insertions(+), 366 deletions(-) delete mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleInputStream.java create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java delete mode 100644 data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml create mode 100644 data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml create mode 100644 data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml rename {data-prepper-pipeline-parser/src/main/resources => data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms}/rules/documentdb-rule.yaml (100%) create mode 100644 data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml create mode 100644 data-prepper-plugins/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/rule.yaml create mode 100644 test-directory/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/rule.yaml create mode 100644 test-directory/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/existingPlugin-template.yaml diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java index 07a476641e..aa7d593bfb 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java @@ -9,35 +9,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.inject.Named; - @Configuration public class PipelineTransformationConfiguration { - public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH"; - public static final String RULES_DIRECTORY_PATH = "RULES_DIRECTORY_PATH"; - - @Bean - @Named(RULES_DIRECTORY_PATH) - static String provideRulesDirectoryPath() { - ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader(); - String filePath = classLoader.getResource("rules").getFile(); - return filePath; - } - - @Bean - @Named(TEMPLATES_DIRECTORY_PATH) - static String provideTemplateDirectoryPath() { - ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader(); - String filePath = classLoader.getResource("templates").getFile(); - return filePath; - } @Bean - TransformersFactory transformersFactory( - @Named(RULES_DIRECTORY_PATH) String rulesDirectoryPath, - @Named(TEMPLATES_DIRECTORY_PATH) String templatesDirectoryPath - ) { - return new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath); + TransformersFactory transformersFactory() { + return new TransformersFactory(); } @Bean diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java index 948e94cd0c..5ef2610cef 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -91,11 +91,12 @@ private RuleFileEvaluation evaluate(String pipelinesJson) { RuleTransformerModel rulesModel = null; try { - Collection ruleStreams = transformersFactory.loadRules(); + Collection ruleStreams = transformersFactory.loadRules(); - for (RuleInputStream ruleStream : ruleStreams) { + //walk through all rules and return first valid + for (RuleStream ruleStream : ruleStreams) { try { - rulesModel = yamlMapper.readValue(ruleStream.getInputStream(), RuleTransformerModel.class); + rulesModel = yamlMapper.readValue(ruleStream.getRuleStream(), RuleTransformerModel.class); List rules = rulesModel.getApplyWhen(); String pluginName = rulesModel.getPluginName(); boolean allRulesValid = true; diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleInputStream.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleInputStream.java deleted file mode 100644 index bf87adfbfe..0000000000 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleInputStream.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.opensearch.dataprepper.pipeline.parser.rule; - -import java.io.IOException; -import java.io.InputStream; - -public class RuleInputStream { - private String name; - private InputStream inputStream; - - public RuleInputStream(String name, InputStream inputStream) { - this.name = name; - this.inputStream = inputStream; - } - - public String getName() { - return name; - } - - public InputStream getInputStream() { - return inputStream; - } - - public void close() { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } -} - diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java new file mode 100644 index 0000000000..6d467787fa --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.IOException; +import java.io.InputStream; + +@Data +@AllArgsConstructor +public class RuleStream { + private String name; + private InputStream ruleStream; + + + public void close() { + if (ruleStream != null) { + try { + ruleStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} + diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java new file mode 100644 index 0000000000..2fc238d796 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import java.io.IOException; +import java.io.InputStream; + +public class TemplateStream { + private String name; + private InputStream templateStream; + + + public void close() { + if (templateStream != null) { + try { + templateStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java index a1a46685da..f6ffd944d9 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java @@ -4,20 +4,16 @@ */ package org.opensearch.dataprepper.pipeline.parser.transformer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.RULES_DIRECTORY_PATH; -import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.TEMPLATES_DIRECTORY_PATH; -import org.opensearch.dataprepper.pipeline.parser.rule.RuleInputStream; - -import javax.inject.Named; -import java.io.File; -import java.io.FileNotFoundException; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.InputStream; -import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; import java.nio.file.FileSystem; +import java.nio.file.FileSystemNotFoundException; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; @@ -26,149 +22,96 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; -public class TransformersFactory implements PipelineTransformationPathProvider { - - private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); +public class TransformersFactory { + private static final Logger LOG = LoggerFactory.getLogger(TransformersFactory.class); + private static final String TEMPLATES_PATH = "org/opensearch/dataprepper/transforms/templates/"; + private static final String RULES_PATH = "org/opensearch/dataprepper/transforms/rules/"; private final String TEMPLATE_FILE_NAME_PATTERN = "-template.yaml"; private final String RULE_FILE_NAME_PATTERN = "-rule.yaml"; - private final String templatesDirectoryPath; - private final String rulesDirectoryPath; - - public TransformersFactory(@Named(RULES_DIRECTORY_PATH) final String rulesDirectoryPath, - @Named(TEMPLATES_DIRECTORY_PATH) final String templatesDirectoryPath) { - this.templatesDirectoryPath = templatesDirectoryPath; - this.rulesDirectoryPath = rulesDirectoryPath; - } - - @Override - public String getTransformationTemplateDirectoryLocation() { - return templatesDirectoryPath; - } - @Override - public String getTransformationRulesDirectoryLocation() { - return rulesDirectoryPath; + public TransformersFactory(){ } - public String getPluginTemplateFileLocation(String pluginName) { - if (pluginName == null || pluginName.isEmpty()) { - throw new RuntimeException("Transformation plugin not found"); - } - return templatesDirectoryPath + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN; - } public InputStream getPluginTemplateFileStream(String pluginName) { if (pluginName == null || pluginName.isEmpty()) { throw new RuntimeException("Transformation plugin not found"); } - ClassLoader classLoader = TransformersFactory.class.getClassLoader(); - InputStream filestream = classLoader.getResourceAsStream("templates" + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN); - return filestream; - } - public Collection loadRules() { - URI uri; - try { - uri = getClass().getClassLoader().getResource("rules").toURI(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + // Construct the expected file name + String templateFileName = pluginName + TEMPLATE_FILE_NAME_PATTERN; - List ruleInputStreams = new ArrayList<>(); - - if ("jar".equals(uri.getScheme())) { - try (FileSystem fileSystem = FileSystems.newFileSystem(uri, Collections.emptyMap())) { - Path rulesFolderPath = fileSystem.getPath("rules"); - try (Stream paths = Files.walk(rulesFolderPath)) { - paths.filter(Files::isRegularFile) - .forEach(path -> { - InputStream ruleStream = getClass().getClassLoader().getResourceAsStream("rules" + "/" + path.getFileName().toString()); - if (ruleStream != null) { - ruleInputStreams.add(new RuleInputStream(path.getFileName().toString(), ruleStream)); - } - }); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - Path rulesFolderPath = Paths.get(uri); - try (Stream paths = Files.walk(rulesFolderPath)) { - paths.filter(Files::isRegularFile) - .forEach(path -> { - try { - InputStream ruleStream = Files.newInputStream(path); - ruleInputStreams.add(new RuleInputStream(path.getFileName().toString(), ruleStream)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return ruleInputStreams; - } + // Use the ClassLoader to find the template file on the classpath + ClassLoader classLoader = getClass().getClassLoader(); + URL templateURL = classLoader.getResource(TEMPLATES_PATH + templateFileName); + if (templateURL == null) { + throw new RuntimeException("Template file not found for plugin: " + pluginName); + } - public List getRuleFiles() { - // Get the URI of the rules folder - URI uri = null; try { - uri = getClass().getClassLoader().getResource("rules").toURI(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + // Convert the URL to a URI, then to a Path to read the file + Path templatePath; + try { + templatePath = Paths.get(templateURL.toURI()); + } catch (FileSystemNotFoundException e) { + // Handle the case where the file system is not accessible (e.g., in a JAR) + FileSystem fileSystem = FileSystems.newFileSystem(templateURL.toURI(), Collections.emptyMap()); + templatePath = fileSystem.getPath(TEMPLATES_PATH + templateFileName); + } - Path rulesFolderPath; + // Return an InputStream for the found file + return Files.newInputStream(templatePath); - if ("jar".equals(uri.getScheme())) { - // File is inside a JAR, create a filesystem for it - try (FileSystem fileSystem = FileSystems.newFileSystem(uri, Collections.emptyMap())) { - rulesFolderPath = fileSystem.getPath("rules"); - return scanFolder(rulesFolderPath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - // File is not inside a JAR - rulesFolderPath = Paths.get(uri); - return scanFolder(rulesFolderPath); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException("Failed to load template file for plugin: " + pluginName, e); } } - private List scanFolder(Path folderPath) { - List pathsList = new ArrayList<>(); - try (Stream paths = Files.walk(folderPath)) { - pathsList = paths - .filter(Files::isRegularFile) // Filter to include only regular files - .collect(Collectors.toList()); // Collect paths into the list - } catch (IOException e) { - throw new RuntimeException(e); - } - return pathsList; - } + public Collection loadRules() { + List ruleStreams = new ArrayList<>(); - public InputStream readRuleFile(Path ruleFile) throws IOException { - ClassLoader classLoader = TransformersFactory.class.getClassLoader(); - InputStream ruleStream = classLoader.getResourceAsStream("rules" + "/" + ruleFile.getFileName().toString()); - return ruleStream; - } + // Use the ClassLoader to find resources on the classpath + ClassLoader classLoader = getClass().getClassLoader(); + + // Assume 'rules' directory is in 'data-prepper-plugins' project's resources folder + URL rulesURL = classLoader.getResource(RULES_PATH); - public PipelineTemplateModel getTemplateModel(String pluginName) { - String templatePath = getPluginTemplateFileLocation(pluginName); + if (rulesURL == null) { + throw new RuntimeException("Rules directory not found in classpath."); + } try { - PipelineTemplateModel pipelineTemplateModel = YAML_MAPPER.readValue(new File(templatePath), PipelineTemplateModel.class); - return pipelineTemplateModel; - } catch (FileNotFoundException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); + // Convert the URL to a URI, then to a Path to read the directory contents + Path rulesPath; + try { + rulesPath = Paths.get(rulesURL.toURI()); + } catch (FileSystemNotFoundException e) { + // Handle the case where the file system is not accessible (e.g., in a JAR) + // In this case, create a new FileSystem for the JAR and access the file + FileSystem fileSystem = FileSystems.newFileSystem(rulesURL.toURI(), Collections.emptyMap()); + rulesPath = fileSystem.getPath(RULES_PATH); + } + + // Scan the directory for rule files + try (Stream paths = Files.walk(rulesPath)) { + paths.filter(Files::isRegularFile) + .forEach(rulePath -> { + try { + InputStream ruleInputStream = Files.newInputStream(rulePath); + ruleStreams.add(new RuleStream(rulePath.getFileName().toString(), ruleInputStream)); + } catch (IOException e) { + throw new RuntimeException("Failed to load rule: " + rulePath, e); + } + }); + } + } catch (IOException | URISyntaxException e) { + throw new RuntimeException("Failed to scan rules directory on classpath.", e); } + + return ruleStreams; } } diff --git a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml deleted file mode 100644 index 0e0e6d5325..0000000000 --- a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml +++ /dev/null @@ -1,81 +0,0 @@ -"<>": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: "<<$.<>.buffer>>" - source: - documentdb: "<<$.<>.source.documentdb>>" - routes: - - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' - - stream_load: 'getMetadata("ingestion_type") == "STREAM"' - sink: - - s3: - routes: - - initial_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "120s" - maximum_size: "2mb" - aggregate_threshold: - maximum_size: "128mb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - event_json: - default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" - - s3: - routes: - - stream_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "15s" - maximum_size: "1mb" - aggregate_threshold: - maximum_size: "128mb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - event_json: - default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" -"<>-s3": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: "<<$.<>.buffer>>" - source: - s3: - codec: - event_json: - compression: "none" - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - acknowledgments: true - delete_s3_objects_on_read: true - disable_s3_metadata_in_event: true - scan: - folder_partitions: - depth: "<>.source.documentdb.s3_prefix>>" - max_objects_per_ownership: 50 - buckets: - - bucket: - name: "<<$.<>.source.documentdb.s3_bucket>>" - filter: - include_prefix: ["<>.source.documentdb.s3_prefix>>"] - scheduling: - interval: "60s" - processor: "<<$.<>.processor>>" - sink: "<<$.<>.sink>>" - routes: "<<$.<>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel. \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java index e46607ebc5..a7bf0f5e6c 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -41,9 +40,7 @@ void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws IOExc Map sourceOptions = new HashMap<>(); Map s3Bucket = new HashMap<>(); s3Bucket.put("s3_bucket", "bucket-name"); - List> collections = new ArrayList<>(); - collections.add(s3Bucket); - sourceOptions.put("collections", collections); + sourceOptions.put("s3_bucket", s3Bucket); final PluginModel source = new PluginModel(pluginName, sourceOptions); final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); @@ -53,12 +50,13 @@ void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws IOExc (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); TransformersFactory transformersFactory = mock(TransformersFactory.class); +// TransformersFactory transformersFactory = spy(new TransformersFactory("", "")); InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(ruleDocDBTemplatePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -78,9 +76,7 @@ void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() throws IOEx Map sourceOptions = new HashMap<>(); Map s3Bucket = new HashMap<>(); s3Bucket.put("s3_bucket", "bucket-name"); - List> collections = new ArrayList<>(); - collections.add(s3Bucket); - sourceOptions.put("collections", collections); + sourceOptions.put("s3_bucket", s3Bucket); final PluginModel source = new PluginModel(pluginName, sourceOptions); final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); @@ -88,9 +84,6 @@ void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() throws IOEx TransformersFactory transformersFactory = mock(TransformersFactory.class); - when(transformersFactory.getRuleFiles()).thenReturn(List.of()); - - final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index ce85863e2b..a1f417054c 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -19,7 +19,7 @@ import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.pipeline.parser.TestConfigurationProvider; import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; -import org.opensearch.dataprepper.pipeline.parser.rule.RuleInputStream; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; import java.io.File; import java.io.FileInputStream; @@ -35,8 +35,6 @@ class DynamicConfigTransformerTest { private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory() .disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER)); - private final String RULES_DIRECTORY_PATH = "src/test/resources/transformation/rules"; - private final String TEMPLATES_DIRECTORY_PATH = "src/test/resources/transformation/templates/testSource"; TransformersFactory transformersFactory; RuleEvaluator ruleEvaluator; @@ -47,7 +45,7 @@ void test_successful_transformation_with_only_source_and_sink() throws IOExcepti String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB1_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCDB1_CONFIG_FILE; - String pluginName = "documentdb"; + String pluginName = "documentdb"; PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); @@ -55,8 +53,8 @@ void test_successful_transformation_with_only_source_and_sink() throws IOExcepti TransformersFactory transformersFactory = mock(TransformersFactory.class); InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -78,7 +76,7 @@ void test_successful_transformation_with_documentdb() throws IOException { String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; - String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; String pluginName = "documentdb"; PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); @@ -90,8 +88,8 @@ void test_successful_transformation_with_documentdb() throws IOException { when(ruleFile.getFileName()).thenReturn(Paths.get(ruleDocDBFilePath).getFileName()); InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); @@ -112,7 +110,7 @@ void test_successful_transformation_with_subpipelines() throws IOException { String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE; - String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_SUBPIPLINES_CONFIG_FILE; String pluginName = "documentdb"; PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); @@ -124,11 +122,11 @@ void test_successful_transformation_with_subpipelines() throws IOException { InputStream ruleStream1 = new FileInputStream(ruleDocDBFilePath); InputStream ruleStream2 = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream1 = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream1); - RuleInputStream ruleInputStream2 = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream2); + RuleStream ruleInputStream1 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream1); + RuleStream ruleInputStream2 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream2); - List ruleStreams1 = Collections.singletonList(ruleInputStream1); - List ruleStreams2 = Collections.singletonList(ruleInputStream2); + List ruleStreams1 = Collections.singletonList(ruleInputStream1); + List ruleStreams2 = Collections.singletonList(ruleInputStream2); when(transformersFactory.loadRules()).thenReturn(ruleStreams1).thenReturn(ruleStreams2); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -160,9 +158,9 @@ void test_successful_transformation_with_functionPlaceholder() throws IOExceptio InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -194,9 +192,9 @@ void test_successful_transformation_with_complete_template() throws IOException InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -230,9 +228,9 @@ void test_successful_transformation_with_routes_keyword() throws IOException { InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -266,9 +264,9 @@ void test_successful_transformation_with_route_keyword() throws IOException { InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -303,13 +301,13 @@ void test_successful_transformation_with_routes_and_subpipelines() throws IOExce InputStream ruleStream2 = new FileInputStream(ruleDocDBFilePath); InputStream ruleStream3 = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream1 = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream1); - RuleInputStream ruleInputStream2 = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream2); - RuleInputStream ruleInputStream3 = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream3); + RuleStream ruleInputStream1 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream1); + RuleStream ruleInputStream2 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream2); + RuleStream ruleInputStream3 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream3); - List ruleStreams1 = Collections.singletonList(ruleInputStream1); - List ruleStreams2 = Collections.singletonList(ruleInputStream2); - List ruleStreams3 = Collections.singletonList(ruleInputStream3); + List ruleStreams1 = Collections.singletonList(ruleInputStream1); + List ruleStreams2 = Collections.singletonList(ruleInputStream2); + List ruleStreams3 = Collections.singletonList(ruleInputStream3); when(transformersFactory.loadRules()).thenReturn(ruleStreams1).thenReturn(ruleStreams2).thenReturn(ruleStreams3); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); @@ -342,9 +340,9 @@ void testInvalidJsonPathThrowsException() throws IOException { InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); - List ruleStreams = Collections.singletonList(ruleInputStream); + List ruleStreams = Collections.singletonList(ruleInputStream); when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java index b67a64a9d4..ff5b6f21c6 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java @@ -1,86 +1,92 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ package org.opensearch.dataprepper.pipeline.parser.transformer; -import com.fasterxml.jackson.databind.ObjectMapper; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; - -public class TransformersFactoryTest { - - private final String templatesDirectoryPath = "src/test/resources/transformation/templates"; - private final String rulesDirectoryPath = "src/test/resources/transformation/rules"; - private final String validPluginName = "testPlugin"; - private final String invalidPluginName = ""; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; + +import java.io.InputStream; +import java.util.Collection; + +class TransformersFactoryTest { + private TransformersFactory transformersFactory; @BeforeEach - public void setUp() { - transformersFactory = spy(new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath)); + void setUp() { + transformersFactory = new TransformersFactory(); } @Test - public void testGetPluginTemplateFileLocation_validPluginName() { - String expectedPath = templatesDirectoryPath + "/" + validPluginName + "-template.yaml"; - assertEquals(expectedPath, transformersFactory.getPluginTemplateFileLocation(validPluginName)); + void testGetPluginTemplateFileStream_whenTemplateExists_shouldReturnInputStream() throws Exception { + String pluginName = "test-plugin"; + + // Load the actual resource + InputStream inputStream = transformersFactory.getPluginTemplateFileStream(pluginName); + + assertNotNull(inputStream); + inputStream.close(); } @Test - public void testGetPluginTemplateFileLocation_invalidPluginName() { + void testGetPluginTemplateFileStream_whenTemplateDoesNotExist_shouldThrowException() { + String pluginName = "non-existent-plugin"; + Exception exception = assertThrows(RuntimeException.class, () -> { - transformersFactory.getPluginTemplateFileLocation(invalidPluginName); + transformersFactory.getPluginTemplateFileStream(pluginName); }); - assertEquals("Transformation plugin not found", exception.getMessage()); + + assertEquals("Template file not found for plugin: " + pluginName, exception.getMessage()); } @Test - public void testGetTemplateModel_throwsRuntimeExceptionOnIOException() throws IOException { - ObjectMapper mockedYamlMapper = Mockito.mock(ObjectMapper.class); - String templatePath = templatesDirectoryPath + "/" + validPluginName + "-template.yaml"; - File expectedFile = new File(templatePath); + void testLoadRules_whenRulesExist_shouldReturnRuleStreams() throws Exception { + Collection ruleStreams = transformersFactory.loadRules(); - Mockito.when(mockedYamlMapper.readValue(Mockito.eq(expectedFile), Mockito.eq(PipelineTemplateModel.class))) - .thenThrow(new IOException("Test exception")); + assertNotNull(ruleStreams); + assertFalse(ruleStreams.isEmpty()); + + for (RuleStream ruleStream : ruleStreams) { + assertNotNull(ruleStream.getRuleStream()); + assertNotNull(ruleStream.getName()); + } + } - assertThrows(RuntimeException.class, () -> transformersFactory.getTemplateModel(validPluginName)); + @Test + void testLoadRules_whenFilesExist_shouldReturnRuleStreams() throws Exception { + // Ensure the rules directory has at least one file + Collection ruleStreams = transformersFactory.loadRules(); + + assertNotNull(ruleStreams); + assertFalse(ruleStreams.isEmpty()); + + for (RuleStream ruleStream : ruleStreams) { + assertNotNull(ruleStream.getRuleStream()); + assertNotNull(ruleStream.getName()); + assertTrue(ruleStream.getName().endsWith("-rule.yaml")); + } } @Test - public void testGetTemplateModel_invalidPluginNameThrowsRuntimeException() { - assertThrows(RuntimeException.class, () -> transformersFactory.getTemplateModel(invalidPluginName), - "Should throw a RuntimeException for empty plugin name."); + void testGetPluginTemplateFileStream_whenPluginNameIsNull_shouldThrowException() { + Exception exception = assertThrows(RuntimeException.class, () -> { + transformersFactory.getPluginTemplateFileStream(null); + }); + + assertEquals("Transformation plugin not found", exception.getMessage()); } @Test - public void testReadFile() throws IOException { - // Mocking the getRuleFiles method - List mockRuleFiles = Arrays.asList( - Paths.get("src/test/resources/transformation/rules/documentdb-rule1.yaml"), - Paths.get("src/test/resources/transformation/rules/documentdb-rule.yaml") - ); - doReturn(mockRuleFiles).when(transformersFactory).getRuleFiles(); - - List ruleFiles = transformersFactory.getRuleFiles(); - assertEquals(ruleFiles.size(), 2); - Path firstRuleFile = ruleFiles.get(0); - Path secondRuleFile = ruleFiles.get(1); - - assertEquals(firstRuleFile.getFileName().toString(), "documentdb-rule1.yaml"); + void testGetPluginTemplateFileStream_whenPluginNameIsEmpty_shouldThrowException() { + Exception exception = assertThrows(RuntimeException.class, () -> { + transformersFactory.getPluginTemplateFileStream(""); + }); + + assertEquals("Transformation plugin not found", exception.getMessage()); } } - diff --git a/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml new file mode 100644 index 0000000000..bdee2aaacf --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml @@ -0,0 +1,3 @@ +plugin_name: "test-plugin" +apply_when: + - "$..source.documentdb" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml new file mode 100644 index 0000000000..e9e95b4ff0 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml @@ -0,0 +1,4 @@ +"<>-transformed": + source: "<<$.<>.source>>" + sink: + - noop: diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml index 1127f51dbd..b120d1531c 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml @@ -1,4 +1,4 @@ plugin_name: "documentdb" apply_when: - "$..source.documentdb" - - "$..source.documentdb.collections[0].s3_bucket" + - "$..source.documentdb.s3_bucket" diff --git a/data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml similarity index 100% rename from data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml rename to data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml new file mode 100644 index 0000000000..38bb70d8ca --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml @@ -0,0 +1,81 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + documentdb: "<<$.<>.source.documentdb>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "15s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" +"<>-s3": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + s3: + codec: + event_json: + compression: "none" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + disable_s3_metadata_in_event: true + scan: + folder_partitions: + depth: "<>.source.documentdb.s3_prefix>>" + max_objects_per_ownership: 50 + buckets: + - bucket: + name: "<<$.<>.source.documentdb.s3_bucket>>" + filter: + include_prefix: ["<>.source.documentdb.s3_prefix>>"] + scheduling: + interval: "60s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" + routes: "<<$.<>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel. \ No newline at end of file diff --git a/data-prepper-plugins/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/rule.yaml b/data-prepper-plugins/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/rule.yaml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test-directory/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/rule.yaml b/test-directory/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/rule.yaml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test-directory/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/existingPlugin-template.yaml b/test-directory/test-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/existingPlugin-template.yaml new file mode 100644 index 0000000000..e69de29bb2