Skip to content

Commit

Permalink
Optimize Jackson2Tokenizer
Browse files Browse the repository at this point in the history
  • Loading branch information
sdeleuze committed Apr 3, 2019
1 parent d39e3cc commit 0ab3a67
Showing 1 changed file with 26 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.Assert;

/**
* {@link Function} to transform a JSON stream of arbitrary size, byte array
Expand Down Expand Up @@ -74,7 +78,7 @@ private Jackson2Tokenizer(
}


private Flux<TokenBuffer> tokenize(DataBuffer dataBuffer) {
private List<TokenBuffer> tokenize(DataBuffer dataBuffer) {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Expand All @@ -84,27 +88,32 @@ private Flux<TokenBuffer> tokenize(DataBuffer dataBuffer) {
return parseTokenBufferFlux();
}
catch (JsonProcessingException ex) {
return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
}
catch (IOException ex) {
return Flux.error(ex);
throw Exceptions.propagate(ex);
}
}

private Flux<TokenBuffer> endOfInput() {
this.inputFeeder.endOfInput();
try {
return parseTokenBufferFlux();
}
catch (JsonProcessingException ex) {
return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
}
catch (IOException ex) {
return Flux.error(ex);
}
private Mono<TokenBuffer> endOfInput() {
return Mono.fromCallable(() -> {
this.inputFeeder.endOfInput();
try {
List<TokenBuffer> tokens = parseTokenBufferFlux();
int size = tokens.size();
Assert.isTrue(size <= 1, "0 or 1 token expected at the end of input, but more found");
return (size == 1 ? tokens.get(0) : null);
}
catch (JsonProcessingException ex) {
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
}
catch (IOException ex) {
throw Exceptions.propagate(ex);
}
});
}

private Flux<TokenBuffer> parseTokenBufferFlux() throws IOException {
private List<TokenBuffer> parseTokenBufferFlux() throws IOException {
List<TokenBuffer> result = new ArrayList<>();

while (true) {
Expand All @@ -122,7 +131,7 @@ private Flux<TokenBuffer> parseTokenBufferFlux() throws IOException {
processTokenArray(token, result);
}
}
return Flux.fromIterable(result);
return result;
}

private void updateDepth(JsonToken token) {
Expand Down Expand Up @@ -184,7 +193,7 @@ public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFacto
try {
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, deserializationContext, tokenizeArrayElements);
return dataBuffers.flatMap(tokenizer::tokenize, Flux::error, tokenizer::endOfInput);
return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput());
}
catch (IOException ex) {
return Flux.error(ex);
Expand Down

0 comments on commit 0ab3a67

Please sign in to comment.