Skip to content

Commit

Permalink
Merge main to v4
Browse files Browse the repository at this point in the history
  • Loading branch information
cleve-fauna committed Feb 9, 2022
2 parents 1780983 + d761e7d commit b1114ef
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 36 deletions.
6 changes: 2 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ executors:
resource_class: large
docker:
- image: openjdk:11
- image: gcr.io/faunadb-cloud/faunadb/enterprise:latest
- image: fauna/faunadb
name: core
auth:
username: _json_key
password: $GCR_KEY

environment:
SBT_VERSION: 1.4.7
FAUNA_ROOT_KEY: secret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ public String value() {
return "action";
}
};

public static EventField IndexField = new EventField() {
@Override
public String value() {
return "index";
}
};
}
58 changes: 57 additions & 1 deletion faunadb-java/src/test/java/com/faunadb/client/ClientSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ public void shouldTestIdentity() throws Exception {
@Test
public void shouldTestCreateAccessProvider() throws Exception {
String roleName = randomStartingWith("role_");
String providerName = randomStartingWith("provider_");
String providerName = randomStartingWith("provider_jvm_");
String issuerName = randomStartingWith("issuer_");
String fullUri = randomStartingWith("https://") + ".auth0.com";

Expand Down Expand Up @@ -3277,6 +3277,62 @@ public void onComplete() {
assertThat(e3.at("event", "document", "data").to(OBJECT).get(), is(Collections.singletonMap("testField", Value("testValue3"))));
}

@Test
public void shouldStreamSetEvents() throws Exception {
String coll = randomStartingWith("collection_");
query(CreateCollection(Obj("name", Value(coll))));

Flow.Publisher<Value> pub = adminClient.stream(Documents(Collection(coll))).get();
CompletableFuture<List<Value>> capturedEvents = new CompletableFuture<>();

Flow.Subscriber<Value> sub = new Flow.Subscriber<>() {
List<Value> captured = new ArrayList<>();
Flow.Subscription sub = null;

@Override
public void onSubscribe(Flow.Subscription sub) {
this.sub = sub;
this.sub.request(1);
}

@Override
public void onNext(Value v) {
captured.add(v);
if (captured.size() == 3) {
capturedEvents.complete(captured);
sub.cancel();
} else {
sub.request(1);
}
}

@Override
public void onError(Throwable throwable) {
capturedEvents.completeExceptionally(throwable);
}

@Override
public void onComplete() {
capturedEvents.completeExceptionally(
new IllegalStateException("not expecting the stream to complete"));
}
};

// subscribe to publisher
pub.subscribe(sub);

// push 2 events
Value doc = adminClient.query(Create(Collection(coll), Obj())).get();
adminClient.query(Delete(doc.at("ref"))).get();

List<Value> events = capturedEvents.get();
assertThat(events.get(0).at("type").to(STRING).get(), equalTo("start"));
assertThat(events.get(1).at("type").to(STRING).get(), equalTo("set"));
assertThat(events.get(1).at("event").at("action").to(STRING).get(), equalTo("add"));
assertThat(events.get(2).at("type").to(STRING).get(), equalTo("set"));
assertThat(events.get(2).at("event").at("action").to(STRING).get(), equalTo("remove"));
}

@Test
public void streamEventsWithSnapshotData() throws Exception {
String collectionName = randomStartingWith("collection_");
Expand Down
31 changes: 0 additions & 31 deletions faunadb-scala/src/load/scala/faunadb/FaunaClientLoadSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,37 +190,6 @@ class FaunaClientLoadSpec extends FixtureAsyncWordSpec with Matchers with ScalaF
}
}

"fail to handle more than 100 concurrent streams on the same client" in { client =>
def setup(): Future[Result[Value]] = {
val collectionName = RandomGenerator.aRandomString
for {
_ <- client.query(CreateCollection(Obj("name" -> collectionName)))
createdDoc <- client.query(Create(Collection(collectionName), Obj("credentials" -> Obj("password" -> "abcdefg"))))
} yield createdDoc("ref")
}

def run(docRef: Value): Future[Unit] = {
val maxConcurrentStreamCount = 100
for {
// create a first publisher to setup the connection that will be reused by all the other publishers
_ <- client.stream(docRef)
// adding 99 publisher (makes 100 in total)
_ <- Future.traverse(List.fill(maxConcurrentStreamCount - 1)(docRef))(ref => client.stream(ref))
// adding 101st subscriber creates the error
_ <- client.stream(docRef)
} yield ()
}

val result =
for {
docRef <- setup()
result <- run(docRef)
} yield result

recoverToExceptionIf[BadRequestException](result).map { e =>
e.getMessage should include("the maximum number of streams has been reached for this client")
}
}
}

def testSubscriber(messageCount: Int, publisher: Flow.Publisher[Value]): Future[List[Value]] = {
Expand Down
1 change: 1 addition & 0 deletions faunadb-scala/src/main/scala/faunadb/FaunaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ object FaunaClient {
case object PrevField extends EventField("prev")
case object DiffField extends EventField("diff")
case object ActionField extends EventField("action")
case object IndexField extends EventField("index")
}

/**
Expand Down
22 changes: 22 additions & 0 deletions faunadb-scala/src/test/scala/faunadb/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,28 @@ class ClientSpec
}
}

it should "stream set events" in {
client.query(CreateCollection(Obj("name" -> "foo"))).futureValue
val publisherValue = client.stream(Documents(Collection("foo"))).futureValue
val events = testSubscriber(3, publisherValue)

// push 3 updates
val doc = client.query(Create(Collection("foo"), Obj())).futureValue
client.query(Delete(doc("ref"))).futureValue

// assertion 3 events (start + 2 updates)
events.futureValue match {
case start :: e1 :: e2 :: Nil =>
start("type").get shouldBe StringV("start")
e1("type").get shouldBe StringV("set")
e1("event", "action").get shouldBe StringV("add")
e2("type").get shouldBe StringV("set")
e2("event", "action").get shouldBe StringV("remove")
case _ =>
fail("expected 3 events")
}
}

it should "stream on document reference with opt-in fields" in {
val createdDoc = client.query(Create(Collection("spells"), Obj("data" -> Obj("testField" -> "testValue0")))).futureValue
val docRef = createdDoc(RefField)
Expand Down

0 comments on commit b1114ef

Please sign in to comment.