Skip to content

Commit

Permalink
OSS-895 [java] Driver needs to detect and throw an error upon loss of…
Browse files Browse the repository at this point in the history
… networking (#323)

* watch on connection
* modified parallel scala test
* reduced query's quantity
* changed fauna query call

Co-authored-by: Gennadii Shestakov <shestakov,g@gmail.com>
  • Loading branch information
shestakovg and Gennadii Shestakov committed Oct 14, 2021
1 parent 7c31a13 commit e7d38cf
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ private CompletableFuture<Flow.Publisher<Value>> performStreamRequest(JsonNode b
.thenCompose(response -> {
CompletableFuture<Flow.Publisher<Value>> publisher = new CompletableFuture<>();
if (response.statusCode() < 300) {
BodyValueFlowProcessor bodyValueFlowProcessor = new BodyValueFlowProcessor(json, connection);
BodyValueFlowProcessor bodyValueFlowProcessor = new BodyValueFlowProcessor(json, connection,
() -> connection.performStreamRequest("POST", "stream", body, params));
response.body().subscribe(bodyValueFlowProcessor);
publisher.complete(bodyValueFlowProcessor);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.SECONDS;

public class BodyValueFlowProcessor extends SubmissionPublisher<Value> implements Flow.Processor<java.util.List<ByteBuffer>, Value> {

public BodyValueFlowProcessor(ObjectMapper json, Connection connection) {
this.json = json;
this.connection = connection;
}

public BodyValueFlowProcessor(ObjectMapper json, Connection connection, Supplier<CompletableFuture<HttpResponse<Flow.Publisher<List<ByteBuffer>>>>> connectionWatcher) {
this(json, connection);
connectionWatcherStart(connectionWatcher);
}

private static Value ErrorValue = new Value.StringV("error");
private static Field<Long> TxnField = Field.at("txn").to(Long.class);

Expand All @@ -32,6 +43,7 @@ public BodyValueFlowProcessor(ObjectMapper json, Connection connection) {
private Connection connection;
private Flow.Subscription subscription = null;
private Flow.Subscriber<? super Value> subscriber = null;
private CompletableFuture<Optional<Exception>> connectionWatcherFuture;

private void requestOne() {
subscription.request(1);
Expand Down Expand Up @@ -96,5 +108,31 @@ public void onError(Throwable throwable) {
public void onComplete() {
log.debug("subscription completed");
subscriber.onComplete();

if (connectionWatcherFuture != null) {
connectionWatcherFuture.cancel(true);
}
}

private void connectionWatcherStart(Supplier<CompletableFuture<HttpResponse<Flow.Publisher<List<ByteBuffer>>>>> connectionWatcher) {
connectionWatcherFuture = CompletableFuture.supplyAsync(() -> {
while (true) {
if(Thread.interrupted()) {
break;
}
try {
connectionWatcher.get().whenCompleteAsync((response, throwable) -> {
if (throwable != null) {
onError(throwable);
}
});
SECONDS.sleep(10);
} catch (Exception ex) {
onError(ex);
}
}
return Optional.empty();
}
);
}
}
2 changes: 1 addition & 1 deletion faunadb-scala/src/main/scala/faunadb/FaunaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class FaunaClient private (connection: Connection) {
.flatMap {
case successResponse if successResponse.statusCode() < 300 =>
// Subscribe a new FlowEventValueProcessor to consume the Body's Flow.Publisher
val flowEventValueProcessor = new BodyValueFlowProcessor(json, txn => syncLastTxnTime(txn))
val flowEventValueProcessor = new BodyValueFlowProcessor(json, txn => syncLastTxnTime(txn), () => connection.performStreamRequest("POST", "stream", body, params))
successResponse.body().subscribe(flowEventValueProcessor)
Future.successful(flowEventValueProcessor)
case errorResponse =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ package faunadb.streaming

import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{Flow, SubmissionPublisher}

import java.util.concurrent.{CompletableFuture, Flow, SubmissionPublisher}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import faunadb.QueryError
import faunadb.errors.{StreamingException, UnknownException}
import faunadb.errors.{StreamingException, UnavailableException, UnknownException}
import faunadb.values.{StringV, VSuccess, Value}
import org.slf4j.LoggerFactory

import java.net.http.HttpResponse
import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.util.Try
import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}

private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTime: Long => Unit) extends SubmissionPublisher[Value] with Flow.Processor[util.List[ByteBuffer], Value] {
private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTime: Long => Unit,
connectionWatcher:() => CompletableFuture[HttpResponse[Flow.Publisher[java.util.List[ByteBuffer]]]] ) extends SubmissionPublisher[Value] with Flow.Processor[util.List[ByteBuffer], Value] {
private val log = LoggerFactory.getLogger(getClass)
private var subscription: Flow.Subscription = _
private var subscriber: Flow.Subscriber[_ >: Value] = _

// We do not request data from the publisher until we have one subscriber
// to avoid discarding events before the subscriber had the chance to subscribe.
override def subscribe(subscriber: Flow.Subscriber[_ >: Value]): Unit = {
connectionWatcherStart(this.connectionWatcher)

if (this.subscriber == null) {
this.subscriber = subscriber
super.subscribe(subscriber)
Expand Down Expand Up @@ -83,5 +89,24 @@ private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTi

private def requestOne(): Unit =
subscription.request(1)

private def connectionWatcherStart(connectionWatcher:() => CompletableFuture[HttpResponse[Flow.Publisher[java.util.List[ByteBuffer]]]]): Unit = {
val infiniteLoop: Future[Unit] = Future {
while (!Thread.currentThread().isInterrupted()) {
connectionWatcher()
.toScala
.flatMap {
case successResponse if successResponse.statusCode() < 300 => Future { successResponse }
case errorResponse => Future {onError(new Throwable(s"Status code: $errorResponse.statusCode() Body: $errorResponse.body()"))}
}
Thread.sleep(10000)
}
}

infiniteLoop.onComplete {
case Success(value) => println("Succeed with: " + value)
case Failure(e) => onError(e)
}
}
}

6 changes: 3 additions & 3 deletions faunadb-scala/src/test/scala/faunadb/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2263,9 +2263,9 @@ class ClientSpec
)

val counter = 100
def metricsQuery: Future[MetricsResponse] = {
def metricsQuery: Future[Value] = {
val taskClient = clientPool(random.nextInt(9))
val result = taskClient.queryWithMetrics(
val result = taskClient.query(
Map(
Paginate(Documents(Collection(COLLECTION_NAME))),
Lambda(nextRef => Select("data", Get(nextRef)))
Expand All @@ -2282,7 +2282,7 @@ class ClientSpec

(Seq.fill(counter)(metricsQuery))
.par
.foreach((result: Future[MetricsResponse]) => noException should be thrownBy result.futureValue)
.foreach((result: Future[Value]) => noException should be thrownBy result.futureValue)

(Seq.fill(counter)(sumQuery))
.par
Expand Down

0 comments on commit e7d38cf

Please sign in to comment.