From e7d38cf84afa84b05b9593d7882d0e41f4c25b0c Mon Sep 17 00:00:00 2001 From: shestakovg Date: Thu, 14 Oct 2021 22:36:13 +0300 Subject: [PATCH] OSS-895 [java] Driver needs to detect and throw an error upon loss of networking (#323) * watch on connection * modified parallel scala test * reduced query's quantity * changed fauna query call Co-authored-by: Gennadii Shestakov --- .../java/com/faunadb/client/FaunaClient.java | 3 +- .../streaming/BodyValueFlowProcessor.java | 38 +++++++++++++++++++ .../src/main/scala/faunadb/FaunaClient.scala | 2 +- .../streaming/BodyValueFlowProcessor.scala | 35 ++++++++++++++--- .../src/test/scala/faunadb/ClientSpec.scala | 6 +-- 5 files changed, 74 insertions(+), 10 deletions(-) diff --git a/faunadb-java/src/main/java/com/faunadb/client/FaunaClient.java b/faunadb-java/src/main/java/com/faunadb/client/FaunaClient.java index f11f51c15..38cc37e85 100644 --- a/faunadb-java/src/main/java/com/faunadb/client/FaunaClient.java +++ b/faunadb-java/src/main/java/com/faunadb/client/FaunaClient.java @@ -457,7 +457,8 @@ private CompletableFuture> performStreamRequest(JsonNode b .thenCompose(response -> { CompletableFuture> publisher = new CompletableFuture<>(); if (response.statusCode() < 300) { - BodyValueFlowProcessor bodyValueFlowProcessor = new BodyValueFlowProcessor(json, connection); + BodyValueFlowProcessor bodyValueFlowProcessor = new BodyValueFlowProcessor(json, connection, + () -> connection.performStreamRequest("POST", "stream", body, params)); response.body().subscribe(bodyValueFlowProcessor); publisher.complete(bodyValueFlowProcessor); } else { diff --git a/faunadb-java/src/main/java/com/faunadb/client/streaming/BodyValueFlowProcessor.java b/faunadb-java/src/main/java/com/faunadb/client/streaming/BodyValueFlowProcessor.java index ee7bda704..68ff990be 100644 --- a/faunadb-java/src/main/java/com/faunadb/client/streaming/BodyValueFlowProcessor.java +++ b/faunadb-java/src/main/java/com/faunadb/client/streaming/BodyValueFlowProcessor.java @@ -10,13 +10,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.http.HttpResponse; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; +import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.concurrent.TimeUnit.SECONDS; + public class BodyValueFlowProcessor extends SubmissionPublisher implements Flow.Processor, Value> { public BodyValueFlowProcessor(ObjectMapper json, Connection connection) { @@ -24,6 +30,11 @@ public BodyValueFlowProcessor(ObjectMapper json, Connection connection) { this.connection = connection; } + public BodyValueFlowProcessor(ObjectMapper json, Connection connection, Supplier>>>> connectionWatcher) { + this(json, connection); + connectionWatcherStart(connectionWatcher); + } + private static Value ErrorValue = new Value.StringV("error"); private static Field TxnField = Field.at("txn").to(Long.class); @@ -32,6 +43,7 @@ public BodyValueFlowProcessor(ObjectMapper json, Connection connection) { private Connection connection; private Flow.Subscription subscription = null; private Flow.Subscriber subscriber = null; + private CompletableFuture> connectionWatcherFuture; private void requestOne() { subscription.request(1); @@ -96,5 +108,31 @@ public void onError(Throwable throwable) { public void onComplete() { log.debug("subscription completed"); subscriber.onComplete(); + + if (connectionWatcherFuture != null) { + connectionWatcherFuture.cancel(true); + } + } + + private void connectionWatcherStart(Supplier>>>> connectionWatcher) { + connectionWatcherFuture = CompletableFuture.supplyAsync(() -> { + while (true) { + if(Thread.interrupted()) { + break; + } + try { + connectionWatcher.get().whenCompleteAsync((response, throwable) -> { + if (throwable != null) { + onError(throwable); + } + }); + SECONDS.sleep(10); + } catch (Exception ex) { + onError(ex); + } + } + return Optional.empty(); + } + ); } } diff --git a/faunadb-scala/src/main/scala/faunadb/FaunaClient.scala b/faunadb-scala/src/main/scala/faunadb/FaunaClient.scala index f1498e99f..93f85079c 100644 --- a/faunadb-scala/src/main/scala/faunadb/FaunaClient.scala +++ b/faunadb-scala/src/main/scala/faunadb/FaunaClient.scala @@ -284,7 +284,7 @@ class FaunaClient private (connection: Connection) { .flatMap { case successResponse if successResponse.statusCode() < 300 => // Subscribe a new FlowEventValueProcessor to consume the Body's Flow.Publisher - val flowEventValueProcessor = new BodyValueFlowProcessor(json, txn => syncLastTxnTime(txn)) + val flowEventValueProcessor = new BodyValueFlowProcessor(json, txn => syncLastTxnTime(txn), () => connection.performStreamRequest("POST", "stream", body, params)) successResponse.body().subscribe(flowEventValueProcessor) Future.successful(flowEventValueProcessor) case errorResponse => diff --git a/faunadb-scala/src/main/scala/faunadb/streaming/BodyValueFlowProcessor.scala b/faunadb-scala/src/main/scala/faunadb/streaming/BodyValueFlowProcessor.scala index a42789a49..911362171 100644 --- a/faunadb-scala/src/main/scala/faunadb/streaming/BodyValueFlowProcessor.scala +++ b/faunadb-scala/src/main/scala/faunadb/streaming/BodyValueFlowProcessor.scala @@ -2,18 +2,22 @@ package faunadb.streaming import java.nio.ByteBuffer import java.util -import java.util.concurrent.{Flow, SubmissionPublisher} - +import java.util.concurrent.{CompletableFuture, Flow, SubmissionPublisher} import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import faunadb.QueryError -import faunadb.errors.{StreamingException, UnknownException} +import faunadb.errors.{StreamingException, UnavailableException, UnknownException} import faunadb.values.{StringV, VSuccess, Value} import org.slf4j.LoggerFactory +import java.net.http.HttpResponse import scala.collection.JavaConverters.asScalaIteratorConverter -import scala.util.Try +import scala.compat.java8.FutureConverters.CompletionStageOps +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success, Try} -private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTime: Long => Unit) extends SubmissionPublisher[Value] with Flow.Processor[util.List[ByteBuffer], Value] { +private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTime: Long => Unit, + connectionWatcher:() => CompletableFuture[HttpResponse[Flow.Publisher[java.util.List[ByteBuffer]]]] ) extends SubmissionPublisher[Value] with Flow.Processor[util.List[ByteBuffer], Value] { private val log = LoggerFactory.getLogger(getClass) private var subscription: Flow.Subscription = _ private var subscriber: Flow.Subscriber[_ >: Value] = _ @@ -21,6 +25,8 @@ private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTi // We do not request data from the publisher until we have one subscriber // to avoid discarding events before the subscriber had the chance to subscribe. override def subscribe(subscriber: Flow.Subscriber[_ >: Value]): Unit = { + connectionWatcherStart(this.connectionWatcher) + if (this.subscriber == null) { this.subscriber = subscriber super.subscribe(subscriber) @@ -83,5 +89,24 @@ private [faunadb] class BodyValueFlowProcessor(json: ObjectMapper, syncLastTxnTi private def requestOne(): Unit = subscription.request(1) + + private def connectionWatcherStart(connectionWatcher:() => CompletableFuture[HttpResponse[Flow.Publisher[java.util.List[ByteBuffer]]]]): Unit = { + val infiniteLoop: Future[Unit] = Future { + while (!Thread.currentThread().isInterrupted()) { + connectionWatcher() + .toScala + .flatMap { + case successResponse if successResponse.statusCode() < 300 => Future { successResponse } + case errorResponse => Future {onError(new Throwable(s"Status code: $errorResponse.statusCode() Body: $errorResponse.body()"))} + } + Thread.sleep(10000) + } + } + + infiniteLoop.onComplete { + case Success(value) => println("Succeed with: " + value) + case Failure(e) => onError(e) + } + } } diff --git a/faunadb-scala/src/test/scala/faunadb/ClientSpec.scala b/faunadb-scala/src/test/scala/faunadb/ClientSpec.scala index 2c73c2007..eb60b31a7 100644 --- a/faunadb-scala/src/test/scala/faunadb/ClientSpec.scala +++ b/faunadb-scala/src/test/scala/faunadb/ClientSpec.scala @@ -2263,9 +2263,9 @@ class ClientSpec ) val counter = 100 - def metricsQuery: Future[MetricsResponse] = { + def metricsQuery: Future[Value] = { val taskClient = clientPool(random.nextInt(9)) - val result = taskClient.queryWithMetrics( + val result = taskClient.query( Map( Paginate(Documents(Collection(COLLECTION_NAME))), Lambda(nextRef => Select("data", Get(nextRef))) @@ -2282,7 +2282,7 @@ class ClientSpec (Seq.fill(counter)(metricsQuery)) .par - .foreach((result: Future[MetricsResponse]) => noException should be thrownBy result.futureValue) + .foreach((result: Future[Value]) => noException should be thrownBy result.futureValue) (Seq.fill(counter)(sumQuery)) .par