Skip to content

Commit

Permalink
Fix returning an empty flowable. Fixes #82
Browse files Browse the repository at this point in the history
  • Loading branch information
jameskleeh committed Oct 12, 2020
1 parent 76b95e1 commit 8e72f2b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,25 @@ class JettyJsonStreamSpec extends Specification {
result.timeout(10, TimeUnit.SECONDS).blockingGet().bookCount == 7
}

void "test returning an empty publisher"() {
when:
List<Book> books = Flowable.fromPublisher(bookClient.empty()).toList().blockingGet()

then:
noExceptionThrown()
books.isEmpty()
}

@Client("/jsonstream/books")
static interface BookClient {
@Get(consumes = MediaType.APPLICATION_JSON_STREAM)
Publisher<Book> list();

@Post(uri = "/count", processes = MediaType.APPLICATION_JSON_STREAM)
Single<LibraryStats> count(@Body Flowable<Book> theBooks)

@Get(uri = "/empty", consumes = MediaType.APPLICATION_JSON)
Publisher<Book> empty();
}

@Controller("/jsonstream/books")
Expand Down Expand Up @@ -219,6 +231,11 @@ class JettyJsonStreamSpec extends Specification {
.map({ chunkList -> "\n" + chunkList.join("\n")})
.blockingGet()
}

@Get(uri = "/empty", produces = MediaType.APPLICATION_JSON)
Publisher<Book> empty() {
return Flowable.empty()
}
}

static class Book {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
@Internal
public class DefaultServletHttpResponse<B> implements ServletHttpResponse<HttpServletResponse, B> {

private static final byte[] EMPTY_ARRAY = "[]".getBytes();

private final HttpServletResponse delegate;
private final DefaultServletHttpRequest<?> request;
private final ServletResponseHeaders headers;
Expand Down Expand Up @@ -181,7 +183,11 @@ public void onComplete() {
if (finished.compareAndSet(false, true)) {
try {
if (!raw && isJson && outputStream.isReady()) {
outputStream.write(']');
if (first) { //empty publisher
outputStream.write(EMPTY_ARRAY);
} else {
outputStream.write(']');
}
flushIfReady();
}
emitter.onNext(DefaultServletHttpResponse.this);
Expand Down

0 comments on commit 8e72f2b

Please sign in to comment.