Skip to content

Commit

Permalink
Updates to the code for HTTP chunking. (#4919)
Browse files Browse the repository at this point in the history
This refactors the code by placing all logic for serializing the data into the Codec itself. In so doing, it allows for improved testing such as symmetric testing. It also decouples the serialization format from the HTTP server. This also uses the Jackson library for the serialization which yields more accurate JSON.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Sep 9, 2024
1 parent 3dfad0b commit cf24b89
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linecorp.armeria.common.HttpData;

import java.io.IOException;
import java.util.function.Consumer;

/**
* Codec parses the content of HTTP request into custom Java type.
Expand All @@ -22,7 +23,32 @@ public interface Codec<T> {
*/
T parse(HttpData httpData) throws IOException;

default T parse(HttpData httpData, int maxSize) throws IOException {
return parse(httpData);
/**
* Serializes parsed data back into a UTF-8 string.
* <p>
* This API will split into multiple bodies based on splitLength. Note that if a single
* item is larger than this, it will be output and exceed that length.
*
* @param parsedData The parsed data
* @param serializedBodyConsumer A {@link Consumer} to accept each serialized body
* @param splitLength The length at which to split serialized bodies.
* @throws IOException A failure writing data.
*/
void serialize(final T parsedData,
final Consumer<String> serializedBodyConsumer,
final int splitLength) throws IOException;


/**
* Serializes parsed data back into a UTF-8 string.
* <p>
* This API will not split the data into chunks.
*
* @param parsedData The parsed data
* @param serializedBodyConsumer A {@link Consumer} to accept the serialized body
* @throws IOException A failure writing data.
*/
default void serialize(final T parsedData, final Consumer<String> serializedBodyConsumer) throws IOException {
serialize(parsedData, serializedBodyConsumer, Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,126 @@

package org.opensearch.dataprepper.http.codec;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CountingOutputStream;
import com.linecorp.armeria.common.HttpData;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* JsonCodec parses the json array format HTTP data into List&lt;{@link String}&gt;.
* TODO: replace output List&lt;String&gt; with List&lt;InternalModel&gt; type
* <p>
*/
public class JsonCodec implements Codec<List<List<String>>> {
// To account for "[" and "]" when the list is converted to String
private static final String OVERHEAD_CHARACTERS = "[]";
// To account for "," when the list is converted to String
private static final int COMMA_OVERHEAD_LENGTH = 1;
public class JsonCodec implements Codec<List<String>> {
private static final ObjectMapper mapper = new ObjectMapper();
private static final TypeReference<List<Map<String, Object>>> LIST_OF_MAP_TYPE_REFERENCE =
new TypeReference<List<Map<String, Object>>>() {};
new TypeReference<List<Map<String, Object>>>() {
};


@Override
public List<List<String>> parse(HttpData httpData) throws IOException {
List<String> jsonList = new ArrayList<>();
public List<String> parse(final HttpData httpData) throws IOException {
final List<String> jsonList = new ArrayList<>();
final List<Map<String, Object>> logList = mapper.readValue(httpData.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
for (final Map<String, Object> log: logList) {
for (final Map<String, Object> log : logList) {
final String recordString = mapper.writeValueAsString(log);
jsonList.add(recordString);
}

return List.of(jsonList);
return jsonList;
}

@Override
public List<List<String>> parse(HttpData httpData, int maxSize) throws IOException {
List<List<String>> jsonList = new ArrayList<>();
final List<Map<String, Object>> logList = mapper.readValue(httpData.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
List<String> innerJsonList = new ArrayList<>();
int size = OVERHEAD_CHARACTERS.length();
for (Map<String, Object> log: logList) {
final String recordString = mapper.writeValueAsString(log);
final int nextRecordLength = recordString.getBytes(Charset.defaultCharset()).length;
// It is possible that the first record is larger than maxSize, then
// innerJsonList size would be zero.
if (size + nextRecordLength > maxSize && !innerJsonList.isEmpty()) {
jsonList.add(innerJsonList);
innerJsonList = new ArrayList<>();
size = OVERHEAD_CHARACTERS.length();
public void serialize(final List<String> jsonList,
final Consumer<String> serializedBodyConsumer,
final int splitLength) throws IOException {
if (splitLength < 0)
throw new IllegalArgumentException("The splitLength must be greater than or equal to 0.");

if (splitLength == 0) {
performSerialization(jsonList, serializedBodyConsumer, Integer.MAX_VALUE);
} else {
performSerialization(jsonList, serializedBodyConsumer, splitLength);
}
}

private void performSerialization(final List<String> jsonList,
final Consumer<String> serializedBodyConsumer,
final int splitLength) throws IOException {

JsonArrayWriter jsonArrayWriter = new JsonArrayWriter(splitLength, serializedBodyConsumer);

for (final String individualJsonLine : jsonList) {
if (jsonArrayWriter.willExceedByWriting(individualJsonLine)) {
jsonArrayWriter.close();

jsonArrayWriter = new JsonArrayWriter(splitLength, serializedBodyConsumer);

}
// The following may result in a innerJsonList with larger than "maxSize" length recordString
innerJsonList.add(recordString);
size += nextRecordLength + COMMA_OVERHEAD_LENGTH;
jsonArrayWriter.write(individualJsonLine);
}
if (size > OVERHEAD_CHARACTERS.length()) {
jsonList.add(innerJsonList);

jsonArrayWriter.close();
}

private static class JsonArrayWriter {
private static final JsonFactory JSON_FACTORY = new JsonFactory().setCodec(mapper);
private static final int BUFFER_SIZE = 16 * 1024;
private static final String NECESSARY_CHARACTERS_TO_WRITE = ",]";
private final CountingOutputStream countingOutputStream;
private final ByteArrayOutputStream outputStream;
private final int splitLength;
private final Consumer<String> serializedBodyConsumer;
private final JsonGenerator generator;
private boolean hasItem = false;

JsonArrayWriter(final int splitLength, final Consumer<String> serializedBodyConsumer) throws IOException {
outputStream = new ByteArrayOutputStream(Math.min(splitLength, BUFFER_SIZE));
countingOutputStream = new CountingOutputStream(outputStream);
this.splitLength = splitLength;
this.serializedBodyConsumer = serializedBodyConsumer;
generator = JSON_FACTORY.createGenerator(countingOutputStream, JsonEncoding.UTF8);
generator.writeStartArray();
}

return jsonList;
boolean willExceedByWriting(final String individualJsonLine) {
final int lengthToWrite = individualJsonLine.getBytes(StandardCharsets.UTF_8).length;
final long lengthOfDataWritten = countingOutputStream.getCount();
return lengthToWrite + lengthOfDataWritten + NECESSARY_CHARACTERS_TO_WRITE.length() > splitLength;
}

void write(final String individualJsonLine) throws IOException {
final JsonNode jsonNode = mapper.readTree(individualJsonLine);
generator.writeTree(jsonNode);
generator.flush();
hasItem = true;
}

void close() throws IOException {
if (hasItem) {
generator.writeEndArray();
generator.flush();
final String resultJson = outputStream.toString(Charset.defaultCharset());

serializedBodyConsumer.accept(resultJson);
}

generator.close();
outputStream.close();
}
}

}
Loading

0 comments on commit cf24b89

Please sign in to comment.