Skip to content

Commit

Permalink
chore: Add query time measurements to sparql client (#1670)
Browse files Browse the repository at this point in the history
* Record execution times for new sparql client
  • Loading branch information
eikek authored Aug 25, 2023
1 parent de5cf8d commit c032724
Show file tree
Hide file tree
Showing 20 changed files with 248 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ abstract class ExecutionTimeRecorder[F[_]](threshold: ElapsedTime) {
maybeHistogramLabel: Option[String Refined NonEmpty] = None
): F[(ElapsedTime, A)]

def measureAndLogTime[A](condition: PartialFunction[A, String])(
def measureAndLogTime[A](message: PartialFunction[A, String])(
block: F[A]
)(implicit F: Monad[F], L: Logger[F]): F[A] =
measureExecutionTime(block).flatMap(logExecutionTimeWhen(condition))
measureExecutionTime(block).flatMap(logExecutionTimeWhen(message))

def logExecutionTimeWhen[A](
condition: PartialFunction[A, String]
message: PartialFunction[A, String]
)(implicit F: Applicative[F], L: Logger[F]): ((ElapsedTime, A)) => F[A] = { resultAndTime =>
logWarningIfAboveThreshold(resultAndTime, condition.lift).as(resultAndTime._2)
logWarningIfAboveThreshold(resultAndTime, message.lift).as(resultAndTime._2)
}

def logExecutionTime[A](
Expand All @@ -58,10 +58,10 @@ abstract class ExecutionTimeRecorder[F[_]](threshold: ElapsedTime) {

private def logWarningIfAboveThreshold[A](
resultAndTime: (ElapsedTime, A),
condition: A => Option[String]
withMessage: A => Option[String]
)(implicit F: Applicative[F], L: Logger[F]): F[Unit] = {
val (elapsedTime, result) = resultAndTime
condition(result)
withMessage(result)
.filter(_ => elapsedTime >= threshold)
.map(message => L.warn(s"$message in ${elapsedTime}ms"))
.getOrElse(F.unit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.triplesstore

import cats.Monad
import cats.effect._
import cats.syntax.all._
import eu.timepit.refined.api.Refined
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.auto._
import fs2.io.net.Network
import io.renku.jsonld.JsonLD
import io.renku.triplesstore.client.http.{Retry, SparqlClient, SparqlQuery, SparqlUpdate}
import org.typelevel.log4cats.Logger

/** SparQL client fixed to the `projects` dataset. */
trait ProjectSparqlClient[F[_]] extends SparqlClient[F]

object ProjectSparqlClient {
def apply[F[_]: Monad: Logger: SparqlQueryTimeRecorder](c: SparqlClient[F]) =
new ProjectSparqlClient[F] {
private[this] val rec = SparqlQueryTimeRecorder[F].instance
override def update(request: SparqlUpdate) = {
val label = histogramLabel(request)
val work = c.update(request)
rec
.measureExecutionTime(work, label)
.flatMap(rec.logExecutionTime(s"Execute sparql update '$label'"))
}

override def upload(data: JsonLD) = {
val label: String Refined NonEmpty = "jsonld upload"
val work = c.upload(data)
rec
.measureExecutionTime(work, label.some)
.flatMap(rec.logExecutionTime("Execute JSONLD upload"))
}

override def query(request: SparqlQuery) = {
val label = histogramLabel(request)
val work = c.query(request)
rec
.measureExecutionTime(work, label)
.flatMap(rec.logExecutionTime(s"Execute sparql query '$label'"))
}
}

def apply[F[_]: Network: Async: Logger: SparqlQueryTimeRecorder](
cc: ProjectsConnectionConfig,
retryCfg: Retry.RetryConfig = Retry.RetryConfig.default
): Resource[F, ProjectSparqlClient[F]] = {
val cfg = cc.toCC(Some(retryCfg))
SparqlClient[F](cfg).map(apply(_))
}

private def histogramLabel(r: Any): Option[String Refined NonEmpty] =
r match {
case q: io.renku.triplesstore.SparqlQuery => q.name.some
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ import io.renku.jsonld.Schema
import io.renku.tinytypes.StringTinyType
import io.renku.triplesstore.SparqlQuery.Prefix
import io.renku.triplesstore.client.sparql.Fragment
import io.renku.triplesstore.client.http.{SparqlQuery => ClientSparqlQuery, SparqlUpdate => ClientSparqlUpdate}

final case class SparqlQuery(name: String Refined NonEmpty,
prefixes: Set[Prefix],
body: String,
maybePagingRequest: Option[PagingRequest]
) {
) extends ClientSparqlQuery
with ClientSparqlUpdate {

override lazy val toString: String =
s"""|${prefixes.mkString("", "\n", "")}
|$body
|$pagingRequest""".stripMargin.trim

override lazy val render: String = toString

private lazy val pagingRequest =
maybePagingRequest
.map { pagingRequest =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object SparqlQueryTimeRecorder {

import io.renku.metrics.MetricsRegistry

def apply[F[_]: Sync: Logger: MetricsRegistry](): F[SparqlQueryTimeRecorder[F]] = MetricsRegistry[F]
def create[F[_]: Sync: Logger: MetricsRegistry](): F[SparqlQueryTimeRecorder[F]] = MetricsRegistry[F]
.register {
new LabeledHistogramImpl[F](
name = "sparql_execution_times",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ import org.typelevel.log4cats.Logger
object TestSparqlQueryTimeRecorder {
def apply[F[_]: Sync: Logger]: F[SparqlQueryTimeRecorder[F]] = {
implicit val metricsRegistry: MetricsRegistry[F] = TestMetricsRegistry[F]
SparqlQueryTimeRecorder[F]()
SparqlQueryTimeRecorder.create[F]()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.triplesstore

import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import eu.timepit.refined.auto._
import io.prometheus.client.Histogram
import io.renku.cli.model.CliSoftwareAgent
import io.renku.graph.model.agents
import io.renku.interpreters.TestLogger
import io.renku.jsonld.syntax._
import io.renku.logging.TestExecutionTimeRecorder
import io.renku.triplesstore.client.syntax._
import io.renku.triplesstore.client.util.JenaContainerSupport
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should
import org.typelevel.log4cats.Logger

class ProjectSparqlClientSpec extends AsyncFlatSpec with AsyncIOSpec with JenaContainerSupport with should.Matchers {
implicit val logger: Logger[IO] = TestLogger()

val makeHistogram = IO(
new Histogram.Builder().name("test").help("test").labelNames("update").buckets(0.5, 0.8).create()
)

def makeSparqlQueryTimeRecorder(h: Histogram): SparqlQueryTimeRecorder[IO] =
new SparqlQueryTimeRecorder[IO](TestExecutionTimeRecorder[IO](Some(h)))

def withProjectClient(implicit sqr: SparqlQueryTimeRecorder[IO]) =
withDataset("projects").map(ProjectSparqlClient.apply(_))

def assertSampled(histogram: Histogram) =
histogram.collect().get(0).samples.size should be > 0

def assertNotSampled(histogram: Histogram) =
histogram.collect().get(0).samples.size shouldBe 0

def resetHistogram(histogram: Histogram) = {
histogram.clear()
assertNotSampled(histogram)
}

it should "measure execution time for named queries" in {
val histogram = makeHistogram.unsafeRunSync()
implicit val sr: SparqlQueryTimeRecorder[IO] = makeSparqlQueryTimeRecorder(histogram)
withProjectClient.use { c =>
for {
_ <- IO(assertNotSampled(histogram))
up = SparqlQuery.apply(
name = "test-update",
prefixes = Set.empty,
body = sparql"""
|PREFIX p: <http://schema.org/>
|INSERT DATA {
| p:fred p:hasSpouse p:wilma .
| p:fred p:hasChild p:pebbles .
| p:wilma p:hasChild p:pebbles .
| p:pebbles p:hasSpouse p:bamm-bamm ;
| p:hasChild p:roxy, p:chip.
|}""".stripMargin
)
_ <- c.update(up)
_ = assertSampled(histogram)

_ <- IO(resetHistogram(histogram))
q = SparqlQuery(
name = "test-query",
prefixes = Set.empty,
body = sparql"SELECT * WHERE { ?s ?p ?o } LIMIT 1"
)
_ <- c.query(q)
_ = assertSampled(histogram)

_ <- IO(resetHistogram(histogram))
data = CliSoftwareAgent(agents.ResourceId("http://u.rl"), agents.Name("test")).asJsonLD
_ <- c.upload(data)
_ = assertSampled(histogram)
} yield ()
}
}

it should "not measure execution time for un-named queries" in {
val histogram = makeHistogram.unsafeRunSync()
implicit val sr: SparqlQueryTimeRecorder[IO] = makeSparqlQueryTimeRecorder(histogram)

withProjectClient.use { c =>
for {
_ <- IO(assertNotSampled(histogram))

q = sparql"""
|PREFIX p: <http://schema.org/>
|INSERT DATA {
| p:fred p:hasSpouse p:wilma .
| p:fred p:hasChild p:pebbles .
| p:wilma p:hasChild p:pebbles .
| p:pebbles p:hasSpouse p:bamm-bamm ;
| p:hasChild p:roxy, p:chip.
|}""".stripMargin

_ <- c.update(q)

_ = assertNotSampled(histogram)
} yield ()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Microservice extends IOMicroservice {

override def run(args: List[String]): IO[ExitCode] = for {
implicit0(mr: MetricsRegistry[IO]) <- MetricsRegistry[IO]()
implicit0(sqtr: SparqlQueryTimeRecorder[IO]) <- SparqlQueryTimeRecorder[IO]()
implicit0(sqtr: SparqlQueryTimeRecorder[IO]) <- SparqlQueryTimeRecorder.create[IO]()
projectConnConfig <- ProjectsConnectionConfig[IO]()
certificateLoader <- CertificateLoader[IO]
sentryInitializer <- SentryInitializer[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import io.renku.generators.Generators.Implicits._
import io.renku.graph.model.RenkuUrl
import io.renku.graph.model.persons.GitLabId
import io.renku.graph.model.projects.{Role, Visibility}
import io.renku.triplesstore.client.util.JenaContainerSpec
import io.renku.triplesstore.client.util.JenaContainerSupport
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

class ProjectAuthServiceSpec extends AsyncFlatSpec with AsyncIOSpec with JenaContainerSpec with should.Matchers {
class ProjectAuthServiceSpec extends AsyncFlatSpec with AsyncIOSpec with JenaContainerSupport with should.Matchers {
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
implicit val renkuUrl: RenkuUrl = RenkuUrl("http://localhost/renku")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.
import io.renku.triplesgenerator.events.consumers.tsprovisioning.{minprojectinfo, triplesgenerated}
import io.renku.triplesgenerator.init.{CliVersionCompatibilityChecker, CliVersionCompatibilityVerifier}
import io.renku.triplesgenerator.metrics.MetricsService
import io.renku.triplesstore.{ProjectsConnectionConfig, SparqlQueryTimeRecorder}
import io.renku.triplesstore.{ProjectSparqlClient, ProjectsConnectionConfig, SparqlQueryTimeRecorder}
import natchez.Trace.Implicits.noop
import org.http4s.server.Server
import org.typelevel.log4cats.Logger
Expand All @@ -68,25 +68,27 @@ object Microservice extends IOMicroservice {
dbSessionPool <- Resource
.eval(new TgLockDbConfigProvider[IO].map(SessionPoolResource[IO, TgLockDB]))
.flatMap(identity)
implicit0(mr: MetricsRegistry[IO]) <- Resource.eval(MetricsRegistry[IO]())
implicit0(sqtr: SparqlQueryTimeRecorder[IO]) <- Resource.eval(SparqlQueryTimeRecorder.create[IO]())

projectConnConfig <- Resource.eval(ProjectsConnectionConfig[IO](config))
projectsSparql <- ProjectSparqlClient[IO](projectConnConfig)
} yield (config, dbSessionPool, projectsSparql)
} yield (config, dbSessionPool, projectsSparql, mr, sqtr)

resources.use { case (config, dbSessionPool, projectSparqlClient) =>
doRun(config, dbSessionPool, projectSparqlClient)
resources.use { case (config, dbSessionPool, projectSparqlClient, mr, sqtr) =>
doRun(config, dbSessionPool, projectSparqlClient)(mr, sqtr)
}
}

private def doRun(
config: Config,
dbSessionPool: SessionResource[IO, TgLockDB],
projectSparqlClient: ProjectSparqlClient[IO]
): IO[ExitCode] = for {
implicit0(mr: MetricsRegistry[IO]) <- MetricsRegistry[IO]()
implicit0(sqtr: SparqlQueryTimeRecorder[IO]) <- SparqlQueryTimeRecorder[IO]()
implicit0(gc: GitLabClient[IO]) <- GitLabClient[IO]()
implicit0(acf: AccessTokenFinder[IO]) <- AccessTokenFinder[IO]()
implicit0(rp: ReProvisioningStatus[IO]) <- ReProvisioningStatus[IO]()
)(implicit mr: MetricsRegistry[IO], sqtr: SparqlQueryTimeRecorder[IO]): IO[ExitCode] = for {

implicit0(gc: GitLabClient[IO]) <- GitLabClient[IO]()
implicit0(acf: AccessTokenFinder[IO]) <- AccessTokenFinder[IO]()
implicit0(rp: ReProvisioningStatus[IO]) <- ReProvisioningStatus[IO]()

_ <- TgLockDB.migrate[IO](dbSessionPool, 20.seconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import fs2.io.net.Network
import io.renku.graph.model.RenkuUrl
import io.renku.graph.model.projects.{Slug, Visibility}
import io.renku.projectauth.{ProjectAuthData, ProjectAuthService, ProjectMember}
import io.renku.triplesstore.ProjectsConnectionConfig
import io.renku.triplesstore.{ProjectSparqlClient, ProjectsConnectionConfig, SparqlQueryTimeRecorder}
import io.renku.triplesstore.client.http.{RowDecoder, SparqlClient}
import io.renku.triplesstore.client.syntax._
import org.typelevel.log4cats.Logger
Expand All @@ -37,7 +37,9 @@ trait ProjectAuthSync[F[_]] {

object ProjectAuthSync {

def resource[F[_]: Async: Logger: Network](cc: ProjectsConnectionConfig)(implicit renkuUrl: RenkuUrl) =
def resource[F[_]: Async: Logger: Network: SparqlQueryTimeRecorder](cc: ProjectsConnectionConfig)(implicit
renkuUrl: RenkuUrl
) =
ProjectSparqlClient[F](cc).map(apply[F])

def apply[F[_]: Sync](
Expand Down
Loading

0 comments on commit c032724

Please sign in to comment.