Skip to content

Commit

Permalink
ADD: data prepper plugin schema generation (#4777)
Browse files Browse the repository at this point in the history
* ADD: data-prepper-plugin-schema

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Aug 2, 2024
1 parent e22e969 commit 642db0d
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 5 deletions.
2 changes: 2 additions & 0 deletions config/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@
<suppress files="data-prepper-expression[\\/]build[\\/]generated-src[\\/]antlr[\\/]main[\\/]*" checks="[a-zA-Z0-9]*"/>
<!-- The stdout sink must call System.out.println -->
<suppress files="data-prepper-plugins[\\/]common[\\/]src[\\/]main[\\/]java[\\/]org[\\/]opensearch[\\/]dataprepper[\\/]plugins[\\/]sink[\\/]StdOutSink.java" checks="Regexp"/>
<!-- The DataPrepperPluginSchemaExecute must call System.out.println -->
<suppress files="data-prepper-plugin-schema-cli[\\/]src[\\/]main[\\/]java[\\/]org[\\/]opensearch[\\/]dataprepper[\\/]schemas[\\/]DataPrepperPluginSchemaExecute.java" checks="Regexp"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@
* @since 1.2
*/
public class PipelineModel {
public static final String SOURCE_PLUGIN_TYPE = "source";
public static final String PROCESSOR_PLUGIN_TYPE = "processor";
public static final String BUFFER_PLUGIN_TYPE = "buffer";
public static final String ROUTE_PLUGIN_TYPE = "route";
public static final String SINK_PLUGIN_TYPE = "sink";
private static final Logger LOG = LoggerFactory.getLogger(PipelineModel.class);

@JsonProperty("source")
@JsonProperty(SOURCE_PLUGIN_TYPE)
private final PluginModel source;

@JsonProperty("processor")
@JsonProperty(PROCESSOR_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<PluginModel> processors;

@JsonProperty("buffer")
@JsonProperty(BUFFER_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final PluginModel buffer;

@JsonProperty("routes")
@JsonAlias("route")
@JsonAlias(ROUTE_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private final List<ConditionalRoute> routes;

@JsonProperty("sink")
@JsonProperty(SINK_PLUGIN_TYPE)
private final List<SinkModel> sinks;

@JsonProperty("workers")
Expand Down
12 changes: 12 additions & 0 deletions data-prepper-plugin-schema-cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Data Prepper Plugin Schema CLI

This module includes the SDK and CLI for generating schemas for Data Prepper pipeline plugins.

## CLI Usage

```
./gradlew :data-prepper-plugin-schema-cli:run --args='--plugin_type=processor --plugin_names=grok'
```

* plugin_type: A required parameter specifies type of processor. Valid options are `source`, `buffer`, `processor`, `route`, `sink`.
* plugin_names: An optional parameter filters the result by plugin names separated by `,`, e.g. `grok,date`.
29 changes: 29 additions & 0 deletions data-prepper-plugin-schema-cli/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id 'data-prepper.publish'
id 'application'
}

application {
mainClass = 'org.opensearch.dataprepper.schemas.DataPrepperPluginSchemaExecute'
}

dependencies {
implementation project(':data-prepper-plugins')
implementation project(':data-prepper-plugin-framework')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.reflections:reflections:0.10.2'
implementation 'com.github.victools:jsonschema-maven-plugin:4.35.0'
implementation 'com.github.victools:jsonschema-generator:4.35.0'
implementation 'com.github.victools:jsonschema-module-jackson:4.35.0'
implementation 'com.github.victools:jsonschema-module-jakarta-validation:4.35.0'
implementation 'javax.inject:javax.inject:1'
implementation 'info.picocli:picocli:4.6.1'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.opensearch.dataprepper.schemas;

import com.github.victools.jsonschema.generator.Module;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaVersion;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import org.opensearch.dataprepper.schemas.module.CustomJacksonModule;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.github.victools.jsonschema.module.jackson.JacksonOption.RESPECT_JSONPROPERTY_REQUIRED;

public class DataPrepperPluginSchemaExecute implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(DataPrepperPluginSchemaExecute.class);
static final String DEFAULT_PLUGINS_CLASSPATH = "org.opensearch.dataprepper.plugins";

@CommandLine.Option(names = {"--plugin_type"}, required = true)
private String pluginTypeName;

@CommandLine.Option(names = {"--plugin_names"})
private String pluginNames;

@CommandLine.Option(names = {"--site.url"}, defaultValue = "https://opensearch.org")
private String siteUrl;
@CommandLine.Option(names = {"--site.baseurl"}, defaultValue = "/docs/latest")
private String siteBaseUrl;

public static void main(String[] args) {
final int exitCode = new CommandLine(new DataPrepperPluginSchemaExecute()).execute(args);
System.exit(exitCode);
}

@Override
public void run() {
final List<Module> modules = List.of(
new CustomJacksonModule(RESPECT_JSONPROPERTY_REQUIRED),
new JakartaValidationModule(JakartaValidationOption.NOT_NULLABLE_FIELD_IS_REQUIRED,
JakartaValidationOption.INCLUDE_PATTERN_EXPRESSIONS)
);
final Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(DEFAULT_PLUGINS_CLASSPATH))
.setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
final PluginConfigsJsonSchemaConverter pluginConfigsJsonSchemaConverter = new PluginConfigsJsonSchemaConverter(
reflections, new JsonSchemaConverter(modules), siteUrl, siteBaseUrl);
final Class<?> pluginType = pluginConfigsJsonSchemaConverter.pluginTypeNameToPluginType(pluginTypeName);
final Map<String, String> pluginNameToJsonSchemaMap = pluginConfigsJsonSchemaConverter.convertPluginConfigsIntoJsonSchemas(
SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, pluginType);
if (pluginNames == null) {
pluginNameToJsonSchemaMap.values().forEach(System.out::println);
} else {
final Set<String> pluginNamesSet = Set.of(pluginNames.split(","));
final List<String> result = pluginNamesSet.stream().flatMap(name -> {
if (!pluginNameToJsonSchemaMap.containsKey(name)) {
LOG.error("plugin name: {} not found", name);
return Stream.empty();
}
return Stream.of(pluginNameToJsonSchemaMap.get(name));
}).collect(Collectors.toList());
result.forEach(System.out::println);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.opensearch.dataprepper.schemas;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.victools.jsonschema.generator.FieldScope;
import com.github.victools.jsonschema.generator.Module;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigPart;
import com.github.victools.jsonschema.generator.SchemaVersion;

import java.util.List;

public class JsonSchemaConverter {
static final String DEPRECATED_SINCE_KEY = "deprecated";
private final List<Module> jsonSchemaGeneratorModules;

public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules) {
this.jsonSchemaGeneratorModules = jsonSchemaGeneratorModules;
}

public ObjectNode convertIntoJsonSchema(
final SchemaVersion schemaVersion, final OptionPreset optionPreset, final Class<?> clazz)
throws JsonProcessingException {
final SchemaGeneratorConfigBuilder configBuilder = new SchemaGeneratorConfigBuilder(
schemaVersion, optionPreset);
loadJsonSchemaGeneratorModules(configBuilder);
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart = configBuilder.forFields();
overrideInstanceAttributeWithDeprecated(scopeSchemaGeneratorConfigPart);

final SchemaGeneratorConfig config = configBuilder.build();
final SchemaGenerator generator = new SchemaGenerator(config);
return generator.generateSchema(clazz);
}

private void loadJsonSchemaGeneratorModules(final SchemaGeneratorConfigBuilder configBuilder) {
jsonSchemaGeneratorModules.forEach(configBuilder::with);
}

private void overrideInstanceAttributeWithDeprecated(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withInstanceAttributeOverride((node, field, context) -> {
final Deprecated deprecatedAnnotation = field.getAnnotationConsideringFieldAndGetter(
Deprecated.class);
if (deprecatedAnnotation != null) {
node.put(DEPRECATED_SINCE_KEY, deprecatedAnnotation.since());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package org.opensearch.dataprepper.schemas;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.dataprepper.model.configuration.PipelineModel.BUFFER_PLUGIN_TYPE;
import static org.opensearch.dataprepper.model.configuration.PipelineModel.PROCESSOR_PLUGIN_TYPE;
import static org.opensearch.dataprepper.model.configuration.PipelineModel.ROUTE_PLUGIN_TYPE;
import static org.opensearch.dataprepper.model.configuration.PipelineModel.SINK_PLUGIN_TYPE;
import static org.opensearch.dataprepper.model.configuration.PipelineModel.SOURCE_PLUGIN_TYPE;

public class PluginConfigsJsonSchemaConverter {
private static final Logger LOG = LoggerFactory.getLogger(PluginConfigsJsonSchemaConverter.class);
static final String SITE_URL_PLACEHOLDER = "{{site.url}}";
static final String SITE_BASE_URL_PLACEHOLDER = "{{site.baseurl}}";
static final String DOCUMENTATION_LINK_KEY = "documentation";
static final String PLUGIN_NAME_KEY = "name";
static final String PLUGIN_DOCUMENTATION_URL_FORMAT =
"%s%s/data-prepper/pipelines/configuration/%s/%s/";
static final Map<Class<?>, String> PLUGIN_TYPE_TO_URI_PARAMETER_MAP = Map.of(
Source.class, "sources",
Processor.class, "processors",
ConditionalRoute.class, "processors",
Buffer.class, "buffers",
Sink.class, "sinks"
);
static final String CONDITIONAL_ROUTE_PROCESSOR_NAME = "routes";
static final Map<String, Class<?>> PLUGIN_TYPE_NAME_TO_CLASS_MAP = Map.of(
SOURCE_PLUGIN_TYPE, Source.class,
PROCESSOR_PLUGIN_TYPE, Processor.class,
ROUTE_PLUGIN_TYPE, ConditionalRoute.class,
BUFFER_PLUGIN_TYPE, Buffer.class,
SINK_PLUGIN_TYPE, Sink.class);

private final String siteUrl;
private final String siteBaseUrl;
private final Reflections reflections;
private final JsonSchemaConverter jsonSchemaConverter;

public PluginConfigsJsonSchemaConverter(
final Reflections reflections,
final JsonSchemaConverter jsonSchemaConverter,
final String siteUrl,
final String siteBaseUrl) {
this.reflections = reflections;
this.jsonSchemaConverter = jsonSchemaConverter;
this.siteUrl = siteUrl == null ? SITE_URL_PLACEHOLDER : siteUrl;
this.siteBaseUrl = siteBaseUrl == null ? SITE_BASE_URL_PLACEHOLDER : siteBaseUrl;
}

public Set<String> validPluginTypeNames() {
return PLUGIN_TYPE_NAME_TO_CLASS_MAP.keySet();
}

public Class<?> pluginTypeNameToPluginType(final String pluginTypeName) {
final Class<?> pluginType = PLUGIN_TYPE_NAME_TO_CLASS_MAP.get(pluginTypeName);
if (pluginType == null) {
throw new IllegalArgumentException(String.format("Invalid plugin type name: %s.", pluginTypeName));
}
return pluginType;
}

public Map<String, String> convertPluginConfigsIntoJsonSchemas(
final SchemaVersion schemaVersion, final OptionPreset optionPreset, final Class<?> pluginType) {
final Map<String, Class<?>> nameToConfigClass = scanForPluginConfigs(pluginType);
return nameToConfigClass.entrySet().stream()
.flatMap(entry -> {
final String pluginName = entry.getKey();
String value;
try {
final ObjectNode jsonSchemaNode = jsonSchemaConverter.convertIntoJsonSchema(
schemaVersion, optionPreset, entry.getValue());
addPluginName(jsonSchemaNode, pluginName);
addDocumentationLink(jsonSchemaNode, pluginName, pluginType);
value = jsonSchemaNode.toPrettyString();
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName);
return Stream.empty();
}
return Stream.of(Map.entry(entry.getKey(), value));
})
.filter(entry -> Objects.nonNull(entry.getValue()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
}

private Map<String, Class<?>> scanForPluginConfigs(final Class<?> pluginType) {
if (ConditionalRoute.class.equals(pluginType)) {
return Map.of(CONDITIONAL_ROUTE_PROCESSOR_NAME, ConditionalRoute.class);
}
return reflections.getTypesAnnotatedWith(DataPrepperPlugin.class).stream()
.map(clazz -> clazz.getAnnotation(DataPrepperPlugin.class))
.filter(dataPrepperPlugin -> pluginType.equals(dataPrepperPlugin.pluginType()))
.collect(Collectors.toMap(
DataPrepperPlugin::name,
DataPrepperPlugin::pluginConfigurationType
));
}

private void addDocumentationLink(final ObjectNode jsonSchemaNode,
final String pluginName,
final Class<?> pluginType) {
jsonSchemaNode.put(DOCUMENTATION_LINK_KEY,
String.format(
PLUGIN_DOCUMENTATION_URL_FORMAT,
siteUrl,
siteBaseUrl,
PLUGIN_TYPE_TO_URI_PARAMETER_MAP.get(pluginType),
pluginName));
}

private void addPluginName(final ObjectNode jsonSchemaNode,
final String pluginName) {
jsonSchemaNode.put(PLUGIN_NAME_KEY, pluginName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.dataprepper.schemas.module;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.github.victools.jsonschema.generator.MemberScope;
import com.github.victools.jsonschema.module.jackson.JacksonModule;
import com.github.victools.jsonschema.module.jackson.JacksonOption;

public class CustomJacksonModule extends JacksonModule {

public CustomJacksonModule() {
super();
}

public CustomJacksonModule(JacksonOption... options) {
super(options);
}

@Override
protected String getPropertyNameOverrideBasedOnJsonPropertyAnnotation(MemberScope<?, ?> member) {
JsonProperty annotation = member.getAnnotationConsideringFieldAndGetter(JsonProperty.class);
if (annotation != null) {
String nameOverride = annotation.value();
// check for invalid overrides
if (nameOverride != null && !nameOverride.isEmpty() && !nameOverride.equals(member.getDeclaredName())) {
return nameOverride;
}
}
return PropertyNamingStrategies.SNAKE_CASE.nameForField(null, null, member.getName());
}
}
Loading

0 comments on commit 642db0d

Please sign in to comment.