Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate state strategy #67

Merged
merged 30 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ scala:
- 2.12.4

jdk:
- oraclejdk8
- openjdk8

cache:
directories:
- .hydra
- $HOME/.m2
- $HOME/.ivy2/cache
- $HOME/.sbt/boot
- $HOME/.sbt
- $HOME/.coursier
- $HOME/.cache

script:
- sbt ++$TRAVIS_SCALA_VERSION clean validate
# See http://www.scala-sbt.org/0.13/docs/Travis-CI-with-sbt.html
# Tricks to avoid unnecessary cache updates
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
- sbt ++$TRAVIS_SCALA_VERSION! clean validate
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Other stuff like state recovery and event persistence is held by Akka Persistenc
So lets define `SubscriptionActions`

```scala
import cats.implicits._ // needed for syntax like flatMap and unit
import cats.implicits._

final class SubscriptionActions[F[_]](
implicit F: MonadAction[F, Option[SubscriptionState], SubscriptionEvent]
Expand All @@ -149,10 +149,9 @@ final class SubscriptionActions[F[_]](
def createSubscription(userId: String, productId: String, planId: String): F[Unit] =
read.flatMap {
case Some(subscription) =>
// Do nothing reply with ()
unit
ignore
case None =>
// Produce event and reply with ()
// Produce event
append(SubscriptionCreated(userId, productId, planId))
}

Expand All @@ -161,23 +160,23 @@ final class SubscriptionActions[F[_]](
case Some(subscription) if subscription.status == Active =>
append(SubscriptionPaused)
case _ =>
unit
ignore
}

def resumeSubscription: F[Unit] =
read.flatMap {
case Some(subscription) if subscription.status == Paused =>
append(SubscriptionResumed)
case _ =>
unit
ignore
}

def cancelSubscription: F[Unit] =
read.flatMap {
case Some(subscription) if subscription.canCancel =>
append(SubscriptionCancelled)
case _ =>
unit
ignore
}
}
```
Expand All @@ -198,8 +197,7 @@ val runtime = AkkaPersistenceRuntime(system, journalAdapter)
val behavior: EventsourcedBehavior[Subscription, IO, Option[SubscriptionState], SubscriptionEvent]
EventsourcedBehavior.optional(
new SubscriptionActions,
SubscriptionState.create,
_.update(_)
Fold.optional(SubscriptionState.create)(_.update(_))
)

val deploySubscriptions: IO[SubscriptionId => Subscription[IO]] =
Expand Down
22 changes: 15 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ lazy val cassandraDriverExtrasVersion = "3.1.0"
lazy val jsr305Version = "3.0.1"
lazy val boopickleVersion = "1.3.0"
lazy val monocleVersion = "1.5.1-cats"
lazy val fs2Version = "1.0.0"
lazy val fs2Version = "1.0.4"
lazy val log4catsVersion = "0.2.0-M1"

lazy val scalaCheckVersion = "1.13.4"
lazy val scalaTestVersion = "3.0.5"
lazy val scalaCheckShapelessVersion = "1.1.8"
lazy val shapelessVersion = "2.3.3"
lazy val kindProjectorVersion = "0.9.9"
lazy val betterMonadicForVersion = "0.3.0-M4"
lazy val scalametaVersion = "1.8.0"

// Example dependencies
Expand All @@ -43,16 +44,17 @@ lazy val commonSettings = Seq(
resolvers += "jitpack" at "https://jitpack.io",
scalacOptions ++= commonScalacOptions,
addCompilerPlugin("org.spire-math" %% "kind-projector" % kindProjectorVersion),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % betterMonadicForVersion),
parallelExecution in Test := false,
scalacOptions in (Compile, doc) := (scalacOptions in (Compile, doc)).value
.filter(_ != "-Xfatal-warnings"),
) ++ warnUnusedImport

lazy val macroSettings = Seq(
scalacOptions += "-Xplugin-require:macroparadise",
scalacOptions += "-Xplugin-require:macroparadise",
addCompilerPlugin(
"org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.full
),
),
sources in (Compile, doc) := Nil // macroparadise doesn't work with scaladoc yet.
)

Expand Down Expand Up @@ -143,7 +145,8 @@ lazy val benchmarks = aecorModule("benchmarks", "Aecor Benchmarks")

lazy val coreSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"co.fs2" %% "fs2-core" % fs2Version,
"org.typelevel" %% "cats-tagless-core" % catsTaglessVersion,
"com.chuusai" %% "shapeless" % shapelessVersion,
"org.typelevel" %% "cats-core" % catsVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
Expand All @@ -161,6 +164,7 @@ lazy val boopickleWireProtocolSettings = Seq(

lazy val scheduleSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"com.datastax.cassandra" % "cassandra-driver-extras" % cassandraDriverExtrasVersion,
"com.google.code.findbugs" % "jsr305" % jsr305Version % Compile
)
Expand All @@ -172,7 +176,7 @@ lazy val distributedProcessingSettings = commonProtobufSettings ++ Seq(

lazy val akkaPersistenceSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
Expand All @@ -181,7 +185,10 @@ lazy val akkaPersistenceSettings = commonProtobufSettings ++ Seq(
)

lazy val akkaGenericSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion
)
)

lazy val exampleSettings =
Expand All @@ -190,8 +197,8 @@ lazy val exampleSettings =
resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven",
libraryDependencies ++=
Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"com.github.krasserm" %% "streamz-converter" % "0.10-M2",
"co.fs2" %% "fs2-core" % "1.0.0",
"org.typelevel" %% "cats-mtl-core" % catsMTLVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.http4s" %% "http4s-dsl" % http4sVersion,
Expand All @@ -215,6 +222,7 @@ lazy val testKitSettings = Seq(

lazy val testingSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ class AkkaPersistenceRuntime[O] private[akkapersistence] (system: ActorSystem,
val props =
AkkaPersistenceRuntimeActor.props(
typeName,
behavior.actions,
behavior.create,
behavior.update,
behavior,
snapshotPolicy,
tagging,
settings.idleTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {

def props[M[_[_]], F[_]: Effect, I: KeyDecoder, State, Event: PersistentEncoder: PersistentDecoder](
entityName: String,
actions: M[ActionT[F, State, Event, ?]],
initialState: State,
updateState: (State, Event) => Folded[State],
behavior: EventsourcedBehavior[M, F, State, Event],
snapshotPolicy: SnapshotPolicy[State],
tagging: Tagging[I],
idleTimeout: FiniteDuration,
Expand All @@ -51,9 +49,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {
Props(
new AkkaPersistenceRuntimeActor(
entityName,
actions,
initialState,
updateState,
behavior,
snapshotPolicy,
tagging,
idleTimeout,
Expand All @@ -78,9 +74,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {
*/
private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_], I: KeyDecoder, State, Event: PersistentEncoder: PersistentDecoder](
entityName: String,
actions: M[ActionT[F, State, Event, ?]],
initialState: State,
updateState: (State, Event) => Folded[State],
behavior: EventsourcedBehavior[M, F, State, Event],
snapshotPolicy: SnapshotPolicy[State],
tagger: Tagging[I],
idleTimeout: FiniteDuration,
Expand Down Expand Up @@ -113,7 +107,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],

log.info("[{}] Starting...", persistenceId)

private var state: State = initialState
private var state: State = behavior.fold.initial

private var eventCount = 0L

Expand Down Expand Up @@ -178,7 +172,12 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],
M.decoder
.decodeValue(commandBytes) match {
case Attempt.Successful(pair) =>
log.debug("[{}] [{}] Received invocation [{}]", self.path, persistenceId, pair.first.toString)
log.debug(
"[{}] [{}] Received invocation [{}]",
self.path,
persistenceId,
pair.first.toString
)
performInvocation(pair.first, pair.second)
case Attempt.Failure(cause) =>
val decodingError = new IllegalArgumentException(cause.messageWithContext)
Expand All @@ -188,9 +187,8 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],

def performInvocation[A](invocation: Invocation[M, A], resultEncoder: Encoder[A]): Unit = {
val opId = UUID.randomUUID()
invocation
.run(actions)
.run(state, updateState)
behavior
.run(state, invocation)
.flatMap {
case Next((events, result)) =>
F.delay(
Expand Down Expand Up @@ -261,7 +259,8 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],
}

private def applyEvent(event: Event): Unit =
state = updateState(state, event)
state = behavior.fold
.reduce(state, event)
.getOrElse {
val error = new IllegalStateException(s"Illegal state after applying [$event] to [$state]")
log.error(error, error.getMessage)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.event.Logging
import akka.persistence.cassandra.Session.Init
import akka.persistence.cassandra.session.CassandraSessionSettings
import akka.persistence.cassandra.session.scaladsl.CassandraSession
import cats.effect.Effect
import cats.effect.{ ContextShift, Effect }
import cats.implicits._

object DefaultJournalCassandraSession {
Expand All @@ -15,13 +15,18 @@ object DefaultJournalCassandraSession {
* Creates CassandraSession using settings of default cassandra journal.
*
*/
def apply[F[_]](system: ActorSystem, metricsCategory: String, init: Init[F])(
implicit F: Effect[F]
): F[CassandraSession] = F.delay {
def apply[F[_]: ContextShift](
system: ActorSystem,
metricsCategory: String,
init: Init[F],
sessionProvider: Option[SessionProvider] = None
)(implicit F: Effect[F]): F[CassandraSession] = F.delay {
val log = Logging(system, classOf[CassandraSession])
val provider = SessionProvider(
system.asInstanceOf[ExtendedActorSystem],
system.settings.config.getConfig("cassandra-journal")
val provider = sessionProvider.getOrElse(
SessionProvider(
system.asInstanceOf[ExtendedActorSystem],
system.settings.config.getConfig("cassandra-journal")
)
)
val settings = CassandraSessionSettings(system.settings.config.getConfig("cassandra-journal"))
new CassandraSession(system, provider, settings, system.dispatcher, log, metricsCategory, { x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package akka.persistence.cassandra
import java.util.concurrent.Executor

import cats.data.Kleisli
import cats.effect.Async
import cats.effect.{ Async, ContextShift }
import com.datastax.driver.core.{ ResultSet, TypeCodec, Session => DatastaxSession }

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

trait Session[F[_]] {
Expand All @@ -14,26 +15,32 @@ trait Session[F[_]] {

object Session {
type Init[F[_]] = Kleisli[F, Session[F], Unit]
def Init[F[_]](f: Session[F] => F[Unit]): Init[F] = Kleisli(f)
private val immediateExecutor = new Executor {
override def execute(command: Runnable): Unit =
command.run()
}

def apply[F[_]](datastaxSession: DatastaxSession)(implicit F: Async[F]): Session[F] =
private val immediateExecutionContext = ExecutionContext.fromExecutor(immediateExecutor)

def apply[F[_]](datastaxSession: DatastaxSession)(implicit F: Async[F],
contextShift: ContextShift[F]): Session[F] =
new Session[F] {
final override def execute(query: String): F[ResultSet] =
F.async { cb =>
val future = datastaxSession.executeAsync(query)
val runnable = new Runnable {
override def run(): Unit =
try {
cb(Right(future.get()))
} catch {
case NonFatal(e) =>
cb(Left(e))
}
contextShift.evalOn(immediateExecutionContext) {
F.async { cb =>
val future = datastaxSession.executeAsync(query)
val runnable = new Runnable {
override def run(): Unit =
try {
cb(Right(future.get()))
} catch {
case NonFatal(e) =>
cb(Left(e))
}
}
future.addListener(runnable, immediateExecutor)
}
future.addListener(runnable, immediateExecutor)
}
override def registerCodec[A](codec: TypeCodec[A]): F[Unit] =
F.delay {
Expand Down
Loading