Skip to content

Commit

Permalink
Fix dataset-by-id finder
Browse files Browse the repository at this point in the history
  • Loading branch information
eikek committed Sep 1, 2023
1 parent 5354368 commit b483c19
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,19 @@ object EventLog extends TypeSerializers {
session.prepare(query).flatMap(_.stream(projectId, 32).compile.toList)
}

def findSyncEvents(projectId: GitLabId)(implicit ioRuntime: IORuntime): List[CategoryName] = execute { session =>
val query: Query[projects.GitLabId, CategoryName] = sql"""
def findSyncEventsIO(projectId: GitLabId): IO[List[CategoryName]] =
sessionResource.flatMap(_.session).use { session =>
val query: Query[projects.GitLabId, CategoryName] = sql"""
SELECT category_name
FROM subscription_category_sync_time
WHERE project_id = $projectIdEncoder"""
.query(varchar)
.map(category => CategoryName(category))
session.prepare(query).flatMap(_.stream(projectId, 32).compile.toList)
}
.query(varchar)
.map(category => CategoryName(category))
session.prepare(query).flatMap(_.stream(projectId, 32).compile.toList)
}

def findSyncEvents(projectId: GitLabId)(implicit ioRuntime: IORuntime): List[CategoryName] =
findSyncEventsIO(projectId).unsafeRunSync()

def forceCategoryEventTriggering(categoryName: CategoryName, projectId: projects.GitLabId)(implicit
ioRuntime: IORuntime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import cats.effect.{IO, Resource, Temporal}
import cats.{Applicative, Monad}
import eu.timepit.refined.auto._
import io.renku.db.DBConfigProvider
import io.renku.graph.model.{RenkuUrl, projects}
import io.renku.projectauth.{ProjectAuthData, QueryFilter}
import io.renku.triplesgenerator.TgLockDB.SessionResource
import io.renku.triplesgenerator.{TgLockDB, TgLockDbConfigProvider}
import io.renku.triplesstore._
Expand Down Expand Up @@ -54,4 +56,10 @@ object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDat
private def waitForReadiness(implicit logger: Logger[IO]): IO[Unit] =
Monad[IO].whileM_(IO(!isRunning))(logger.info("Waiting for TS") >> (Temporal[IO] sleep (500 millis)))

def findProjectAuth(
slug: projects.Slug
)(implicit renkuUrl: RenkuUrl, sqtr: SparqlQueryTimeRecorder[IO], L: Logger[IO]): IO[Option[ProjectAuthData]] =
ProjectSparqlClient[IO](projectsDSConnectionInfo)
.map(_.asProjectAuthService)
.use(_.getAll(QueryFilter.all.withSlug(slug)).compile.last)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,27 @@ import cats.data.NonEmptyList
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import fs2.Stream
import io.renku.eventlog.events.producers.membersync.{categoryName => memberSyncCategory}
import io.renku.eventlog.events.producers.minprojectinfo.{categoryName => minProjectInfoCategory}
import io.renku.events.CategoryName
import io.renku.graph.acceptancetests.data
import io.renku.graph.acceptancetests.db.EventLog
import io.renku.graph.acceptancetests.db.{EventLog, TriplesStore}
import io.renku.graph.acceptancetests.testing.AcceptanceTestPatience
import io.renku.graph.acceptancetests.tooling.EventLogClient.ProjectEvent
import io.renku.graph.acceptancetests.tooling.{AcceptanceSpec, ApplicationServices, ModelImplicits}
import io.renku.graph.model.events.{CommitId, EventId, EventStatus, EventStatusProgress}
import io.renku.graph.model.projects
import io.renku.http.client.AccessToken
import io.renku.logging.TestSparqlQueryTimeRecorder
import io.renku.testtools.IOSpec
import io.renku.triplesstore.SparqlQueryTimeRecorder
import io.renku.webhookservice.model.HookToken
import org.http4s.Status._
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should
import org.scalatest.{Assertion, EitherValues}
import org.typelevel.log4cats.Logger

import java.lang.Thread.sleep
import scala.annotation.tailrec
import scala.concurrent.duration._

trait TSProvisioning
Expand Down Expand Up @@ -77,6 +79,8 @@ trait TSProvisioning
// commitId is the eventId
val condition = commitIds.map(e => EventId(e.value)).toList.map(_ -> EventStatus.TriplesStore)
waitForAllEvents(project.id, condition: _*)
waitForSyncEvents(project.id, memberSyncCategory)
waitForProjectAuthData(project.slug)
}

private def projectEvents(projectId: projects.GitLabId): Stream[IO, List[ProjectEvent]] = {
Expand Down Expand Up @@ -119,24 +123,55 @@ trait TSProvisioning
lastValue.forall(_ == EventStatusProgress.Stage.Final) shouldBe true
}

def `check hook cannot be found`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion = eventually {
webhookServiceClient.`GET projects/:id/events/status`(projectId, accessToken).status shouldBe NotFound
def getSyncEvents(projectId: projects.GitLabId) = {
val getSyncEvents = EventLog.findSyncEventsIO(projectId)

val waitTimes = Stream.iterate(1d)(_ * 1.5).map(_.seconds).covary[IO].evalMap(IO.sleep)
Stream
.repeatEval(getSyncEvents)
.zip(waitTimes)
.map(_._1)
}

def `wait for the Fast Tract event`(projectId: projects.GitLabId)(implicit ioRuntime: IORuntime): Unit = eventually {
def waitForSyncEvents(projectId: projects.GitLabId, category1: CategoryName, categoryN: CategoryName*) = {
val expected = categoryN.toSet + category1

val sleepTime = 1 second
val tries =
getSyncEvents(projectId)
.evalTap(l => IO.println(s"Sync events for project $projectId: $l"))
.takeThrough(evs => expected.intersect(evs.toSet) != expected)
.take(13)

@tailrec
def checkIfWasSent(categoryName: CategoryName, attempt: Int = 1): Unit = {
if (attempt > 20) fail(s"'$categoryName' event wasn't sent after ${(sleepTime * attempt).toSeconds}")
val lastValue = tries.compile.lastOrError.unsafeRunSync()
expected.intersect(lastValue.toSet) shouldBe expected
}

if (!EventLog.findSyncEvents(projectId).contains(categoryName)) {
sleep(sleepTime.toMillis)
checkIfWasSent(categoryName)
}
}
def getProjectAuthData(slug: projects.Slug) = {
implicit val sqtr: SparqlQueryTimeRecorder[IO] = TestSparqlQueryTimeRecorder.createUnsafe
val waitTimes = Stream.iterate(1d)(_ * 1.5).map(_.seconds).covary[IO].evalMap(IO.sleep)

Stream
.repeatEval(TriplesStore.findProjectAuth(slug))
.zip(waitTimes)
.map(_._1)
}

def waitForProjectAuthData(slug: projects.Slug) = {
val tries =
getProjectAuthData(slug)
.evalTap(r => IO.println(s"project auth data for $slug: $r"))
.takeThrough(_.isEmpty)
.take(15)

checkIfWasSent(CategoryName("ADD_MIN_PROJECT_INFO"))
val lastValue = tries.compile.lastOrError.unsafeRunSync()
lastValue.isDefined shouldBe true
}

def `check hook cannot be found`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion = eventually {
webhookServiceClient.`GET projects/:id/events/status`(projectId, accessToken).status shouldBe NotFound
}

def `wait for the Fast Tract event`(projectId: projects.GitLabId) =
waitForSyncEvents(projectId, minProjectInfoCategory)

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,26 @@ class DatasetsResourcesSpec
private val creator = authUsers.generateOne
private val user = authUsers.generateOne

Feature("GET knowledge-graph/projects/<namespace>/<name>/datasets to find project's datasets") {
import scala.concurrent.duration._

val (dataset1 -> dataset2 -> dataset2Modified, testProject) =
renkuProjectEntities(visibilityPublic, creatorGen = cliShapedPersons)
.modify(removeMembers())
.addDataset(datasetEntities(provenanceInternal(cliShapedPersons)))
.addDatasetAndModification(
datasetEntities(provenanceInternal(cliShapedPersons)),
creatorGen = cliShapedPersons
)
.generateOne
val creatorPerson = cliShapedPersons.generateOne
val project =
dataProjects(testProject)
.map(replaceCreatorFrom(creatorPerson, creator.id))
.map(addMemberFrom(creatorPerson, creator.id) >>> addMemberWithId(user.id))
.generateOne
Feature("GET knowledge-graph/projects/<namespace>/<name>/datasets to find project's datasets") {

Scenario("As a user I would like to find project's datasets by calling a REST endpoint") {
val (dataset1 -> dataset2 -> dataset2Modified, testProject) =
renkuProjectEntities(visibilityPublic, creatorGen = cliShapedPersons)
.modify(removeMembers())
.addDataset(datasetEntities(provenanceInternal(cliShapedPersons)))
.addDatasetAndModification(
datasetEntities(provenanceInternal(cliShapedPersons)),
creatorGen = cliShapedPersons
)
.generateOne
val creatorPerson = cliShapedPersons.generateOne
val project =
dataProjects(testProject)
.map(replaceCreatorFrom(creatorPerson, creator.id))
.map(addMemberFrom(creatorPerson, creator.id) >>> addMemberWithId(user.id))
.generateOne

Given("some data in the Triples Store")
gitLabStub.addAuthenticated(creator)
Expand All @@ -82,6 +83,7 @@ class DatasetsResourcesSpec
gitLabStub.setupProject(project, commitId)
mockCommitDataOnTripleGenerator(project, toPayloadJsonLD(project), commitId)
`data in the Triples Store`(project, commitId, creator.accessToken)
cats.effect.IO.sleep(15.seconds).unsafeRunSync()

When("user fetches project's datasets with GET knowledge-graph/projects/<project-name>/datasets")
val projectDatasetsResponse = knowledgeGraphClient GET s"knowledge-graph/projects/${project.slug}/datasets"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,16 @@ object DatasetIdRecordsFinder {
| where {
| bind (${id.value} as ?dsIdent).
| {
| select distinct ?datasetId where {
| select distinct ?projectId where {
| bind (${id.value} as ?dsIdent).
| graph ?sampleProjectId {
| ?sampleProjectId a schema:Project;
| graph ?projectId {
| ?projectId a schema:Project;
| renku:hasDataset ?datasetId.
| ?datasetId a schema:Dataset;
| schema:identifier ?dsIdent.
| }
| }
| }
| graph schema:Dataset {
| ?topmost renku:datasetProjectLink ?link.
| ?link renku:project ?projectId.
| ?link renku:dataset ?datasetId.
| }
| graph renku:ProjectAuth {
| ?projectId a schema:Project;
| renku:slug ?slug;
Expand Down

0 comments on commit b483c19

Please sign in to comment.