Skip to content

Commit

Permalink
Add support for dynamic rule detection for pipeline config transforma…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Srikanth Govindarajan <srikanthjg123@gmail.com>
  • Loading branch information
srikanthjg committed Jun 5, 2024
1 parent 2180a69 commit 7de88ce
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@
import org.springframework.context.annotation.Configuration;

import javax.inject.Named;
import java.nio.file.Path;
import java.nio.file.Paths;

@Configuration
public class PipelineTransformationConfiguration {
public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH";
public static final String RULES_DIRECTORY_PATH = "RULES_DIRECTORY_PATH";
private static final Path currentDir = Paths.get(System.getProperty("user.dir"));
// private static final String parserRelativePath = "/data-prepper-pipeline-parser/src";

@Bean
@Named(RULES_DIRECTORY_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.ReadContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
Expand All @@ -25,6 +24,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

Expand All @@ -34,52 +34,39 @@ public class RuleEvaluator {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
private final TransformersFactory transformersFactory;
private String PLUGIN_NAME = null;

public RuleEvaluator(TransformersFactory transformersFactory) {
this.transformersFactory = transformersFactory;
}

public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelineModel) {
//TODO - Dynamically scan the rules folder and get the corresponding template.
return isDocDBSource(pipelineModel);
}

/**
* Evaluates model based on pre defined rules and
* result contains the name of the pipeline that will need transformation,
* evaluated boolean result and the corresponding template model
* Assumption: only one pipeline can have transformation.
*
* @param pipelinesModel
* @return RuleEvaluatorResult
*/
private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) {
PLUGIN_NAME = "documentdb";

Map<String, PipelineModel> pipelines = pipelinesModel.getPipelines();
Map<String, PipelineModel> pipelines = pipelineModel.getPipelines();
for (Map.Entry<String, PipelineModel> entry : pipelines.entrySet()) {
try {
String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry);
if (evaluate(pipelineJson, PLUGIN_NAME)) {
LOG.info("Rule for {} is evaluated true for pipelineJson {}", PLUGIN_NAME, pipelineJson);
RuleFileEvaluation ruleFileEvaluation = evaluate(pipelineJson);

if (ruleFileEvaluation.result) {
String pluginName = ruleFileEvaluation.pluginName;
LOG.info("Applying rule {}",ruleFileEvaluation.ruleFileName.toString());
LOG.info("Rule for {} is evaluated true for pipelineJson {}", pluginName, pipelineJson);

InputStream templateStream = transformersFactory.getPluginTemplateFileStream(PLUGIN_NAME);
InputStream templateStream = transformersFactory.getPluginTemplateFileStream(pluginName);
PipelineTemplateModel templateModel = yamlMapper.readValue(templateStream,
PipelineTemplateModel.class);
LOG.info("Template is chosen for {}", PLUGIN_NAME);
LOG.info("Template is chosen for {}", pluginName);

return RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
.withPipelineTemplateModel(templateModel)
.withPipelineName(entry.getKey())
.build();
}
} catch (FileNotFoundException e){
LOG.error("Template File Not Found for {}", PLUGIN_NAME);
} catch (FileNotFoundException e) {
LOG.error("Template File Not Found");
throw new RuntimeException(e);
}
catch (JsonProcessingException e) {
} catch (JsonProcessingException e) {
LOG.error("Error processing json");
throw new RuntimeException(e);
} catch (IOException e) {
Expand All @@ -94,38 +81,63 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel)
.build();
}

private Boolean evaluate(String pipelinesJson,
String pluginName) {
private RuleFileEvaluation evaluate(String pipelinesJson) {

Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
.jsonProvider(new JacksonJsonNodeJsonProvider())
.mappingProvider(new JacksonMappingProvider())
.options(Option.AS_PATH_LIST)
.options(Option.SUPPRESS_EXCEPTIONS)
.build();
ParseContext parseContext = JsonPath.using(parseConfig);
ReadContext readPathContext = parseContext.parse(pipelinesJson);

RuleTransformerModel rulesModel = null;
InputStream ruleStream = null;
try {
ruleStream = transformersFactory.getPluginRuleFileStream(pluginName);
List<Path> ruleFiles = transformersFactory.getRuleFiles();

rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
for (String rule : rules) {
try {
Object result = readPathContext.read(rule);
} catch (PathNotFoundException e) {
LOG.warn("Json Path not found for {}", pluginName);
return false;
for (Path ruleFile : ruleFiles) {

ruleStream = transformersFactory.readRuleFile(ruleFile);
if(ruleStream == null){
continue;
}
rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
String pluginName = rulesModel.getPluginName();
boolean allRulesValid = true;

for (String rule : rules) {
try {
JsonNode result = JsonPath.using(parseConfig).parse(pipelinesJson).read(rule);
if (result == null || result.size()==0) {
allRulesValid = false;
break;
}
} catch (PathNotFoundException e) {
LOG.debug("Json Path not found for {}", ruleFile.getFileName().toString());
allRulesValid = false;
break;
}
}

if (allRulesValid) {
return RuleFileEvaluation.builder()
.withPluginName(pluginName)
.withRuleFileName(ruleFile.getFileName().toString())
.withResult(true)
.build();
}
}
} catch (FileNotFoundException e){
LOG.warn("Rule File Not Found for {}", pluginName);
return false;
} catch(IOException e){

} catch (FileNotFoundException e) {
LOG.debug("Rule File Not Found");
return RuleFileEvaluation.builder()
.withPluginName(null)
.withRuleFileName(null)
.withResult(false)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
} finally {
if (ruleStream != null) {
try {
ruleStream.close();
Expand All @@ -134,7 +146,10 @@ private Boolean evaluate(String pipelinesJson,
}
}
}
return true;
return RuleFileEvaluation.builder()
.withPluginName(null)
.withRuleFileName(null)
.withResult(false)
.build();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Builder(setterPrefix = "with")
@AllArgsConstructor
@Data
public class RuleFileEvaluation {
Boolean result;
String ruleFileName;
String pluginName;

public RuleFileEvaluation() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,31 @@
*/
package org.opensearch.dataprepper.pipeline.parser.rule;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;

@Data
@AllArgsConstructor
public class RuleTransformerModel {

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("apply_when")
private List<String> applyWhen;

public RuleTransformerModel() {
}

public RuleTransformerModel(List<String> applyWhen) {
this.applyWhen = applyWhen;
}
@JsonProperty("plugin_name")
private String pluginName;

public List<String> getApplyWhen() {
return applyWhen;
}

public void setApplyWhen(List<String> applyWhen) {
this.applyWhen = applyWhen;
public RuleTransformerModel() {
}

@Override
public String toString() {
return "RuleConfiguration{" +
"applyWhen=" + applyWhen +
'}';
"\npluginName="+ pluginName +'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TransformersFactory implements PipelineTransformationPathProvider {

Expand Down Expand Up @@ -41,35 +53,63 @@ public String getTransformationRulesDirectoryLocation() {
}

public String getPluginTemplateFileLocation(String pluginName) {
if(pluginName == null || pluginName.isEmpty()){
throw new RuntimeException("Transformation plugin not found");
if (pluginName == null || pluginName.isEmpty()) {
throw new RuntimeException("Transformation plugin not found");
}
return templatesDirectoryPath + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN;
}

public String getPluginRuleFileLocation(String pluginName) {
if(pluginName == null || pluginName.isEmpty()){
throw new RuntimeException("Transformation plugin not found");
public InputStream getPluginTemplateFileStream(String pluginName) {
if (pluginName == null || pluginName.isEmpty()) {
throw new RuntimeException("Transformation plugin not found");
}
return rulesDirectoryPath + "/" + pluginName + RULE_FILE_NAME_PATTERN;
ClassLoader classLoader = TransformersFactory.class.getClassLoader();
InputStream filestream = classLoader.getResourceAsStream("templates" + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN);
return filestream;
}

public InputStream getPluginRuleFileStream(String pluginName) {
if(pluginName == null || pluginName.isEmpty()){
throw new RuntimeException("Transformation plugin not found");
public List<Path> getRuleFiles() {
// Get the URI of the rules folder
URI uri = null;
try {
uri = getClass().getClassLoader().getResource("rules").toURI();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

Path rulesFolderPath;

if ("jar".equals(uri.getScheme())) {
// File is inside a JAR, create a filesystem for it
try (FileSystem fileSystem = FileSystems.newFileSystem(uri, Collections.emptyMap())) {
rulesFolderPath = fileSystem.getPath("rules");
return scanFolder(rulesFolderPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
// File is not inside a JAR
rulesFolderPath = Paths.get(uri);
return scanFolder(rulesFolderPath);
}
ClassLoader classLoader = TransformersFactory.class.getClassLoader();
InputStream filestream = classLoader.getResourceAsStream("rules" + "/" + pluginName + RULE_FILE_NAME_PATTERN);
return filestream;
}

public InputStream getPluginTemplateFileStream(String pluginName) {
if(pluginName == null || pluginName.isEmpty()){
throw new RuntimeException("Transformation plugin not found");
private List<Path> scanFolder(Path folderPath) {
List<Path> pathsList = new ArrayList<>();
try (Stream<Path> paths = Files.walk(folderPath)) {
pathsList = paths
.filter(Files::isRegularFile) // Filter to include only regular files
.collect(Collectors.toList()); // Collect paths into the list
} catch (IOException e) {
throw new RuntimeException(e);
}
return pathsList;
}

public InputStream readRuleFile(Path ruleFile) throws IOException {
ClassLoader classLoader = TransformersFactory.class.getClassLoader();
InputStream filestream = classLoader.getResourceAsStream("templates" + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN);
return filestream;
InputStream ruleStream = classLoader.getResourceAsStream("rules" + "/" + ruleFile.getFileName().toString());
return ruleStream;
}

public PipelineTemplateModel getTemplateModel(String pluginName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
plugin_name: "documentdb"
apply_when:
- "$..source.documentdb"
- "$..source.documentdb.s3_bucket"
Loading

0 comments on commit 7de88ce

Please sign in to comment.