Skip to content

Commit

Permalink
Aggregate state strategy (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
notxcain committed Jul 10, 2019
1 parent 59e111f commit 0da51af
Show file tree
Hide file tree
Showing 45 changed files with 631 additions and 592 deletions.
14 changes: 7 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ scala:
- 2.12.4

jdk:
- oraclejdk8
- openjdk8

cache:
directories:
- .hydra
- $HOME/.m2
- $HOME/.ivy2/cache
- $HOME/.sbt/boot
- $HOME/.sbt
- $HOME/.coursier
- $HOME/.cache

script:
- sbt ++$TRAVIS_SCALA_VERSION clean validate
# See http://www.scala-sbt.org/0.13/docs/Travis-CI-with-sbt.html
# Tricks to avoid unnecessary cache updates
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
- sbt ++$TRAVIS_SCALA_VERSION! clean validate
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Other stuff like state recovery and event persistence is held by Akka Persistenc
So lets define `SubscriptionActions`

```scala
import cats.implicits._ // needed for syntax like flatMap and unit
import cats.implicits._

final class SubscriptionActions[F[_]](
implicit F: MonadAction[F, Option[SubscriptionState], SubscriptionEvent]
Expand All @@ -149,10 +149,9 @@ final class SubscriptionActions[F[_]](
def createSubscription(userId: String, productId: String, planId: String): F[Unit] =
read.flatMap {
case Some(subscription) =>
// Do nothing reply with ()
unit
ignore
case None =>
// Produce event and reply with ()
// Produce event
append(SubscriptionCreated(userId, productId, planId))
}

Expand All @@ -161,23 +160,23 @@ final class SubscriptionActions[F[_]](
case Some(subscription) if subscription.status == Active =>
append(SubscriptionPaused)
case _ =>
unit
ignore
}

def resumeSubscription: F[Unit] =
read.flatMap {
case Some(subscription) if subscription.status == Paused =>
append(SubscriptionResumed)
case _ =>
unit
ignore
}

def cancelSubscription: F[Unit] =
read.flatMap {
case Some(subscription) if subscription.canCancel =>
append(SubscriptionCancelled)
case _ =>
unit
ignore
}
}
```
Expand All @@ -198,8 +197,7 @@ val runtime = AkkaPersistenceRuntime(system, journalAdapter)
val behavior: EventsourcedBehavior[Subscription, IO, Option[SubscriptionState], SubscriptionEvent]
EventsourcedBehavior.optional(
new SubscriptionActions,
SubscriptionState.create,
_.update(_)
Fold.optional(SubscriptionState.create)(_.update(_))
)

val deploySubscriptions: IO[SubscriptionId => Subscription[IO]] =
Expand Down
22 changes: 15 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ lazy val cassandraDriverExtrasVersion = "3.1.0"
lazy val jsr305Version = "3.0.1"
lazy val boopickleVersion = "1.3.0"
lazy val monocleVersion = "1.5.1-cats"
lazy val fs2Version = "1.0.0"
lazy val fs2Version = "1.0.4"
lazy val log4catsVersion = "0.2.0-M1"

lazy val scalaCheckVersion = "1.13.4"
lazy val scalaTestVersion = "3.0.5"
lazy val scalaCheckShapelessVersion = "1.1.8"
lazy val shapelessVersion = "2.3.3"
lazy val kindProjectorVersion = "0.9.9"
lazy val betterMonadicForVersion = "0.3.0-M4"
lazy val scalametaVersion = "1.8.0"

// Example dependencies
Expand All @@ -43,16 +44,17 @@ lazy val commonSettings = Seq(
resolvers += "jitpack" at "https://jitpack.io",
scalacOptions ++= commonScalacOptions,
addCompilerPlugin("org.spire-math" %% "kind-projector" % kindProjectorVersion),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % betterMonadicForVersion),
parallelExecution in Test := false,
scalacOptions in (Compile, doc) := (scalacOptions in (Compile, doc)).value
.filter(_ != "-Xfatal-warnings"),
) ++ warnUnusedImport

lazy val macroSettings = Seq(
scalacOptions += "-Xplugin-require:macroparadise",
scalacOptions += "-Xplugin-require:macroparadise",
addCompilerPlugin(
"org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.full
),
),
sources in (Compile, doc) := Nil // macroparadise doesn't work with scaladoc yet.
)

Expand Down Expand Up @@ -143,7 +145,8 @@ lazy val benchmarks = aecorModule("benchmarks", "Aecor Benchmarks")

lazy val coreSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"co.fs2" %% "fs2-core" % fs2Version,
"org.typelevel" %% "cats-tagless-core" % catsTaglessVersion,
"com.chuusai" %% "shapeless" % shapelessVersion,
"org.typelevel" %% "cats-core" % catsVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
Expand All @@ -161,6 +164,7 @@ lazy val boopickleWireProtocolSettings = Seq(

lazy val scheduleSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"com.datastax.cassandra" % "cassandra-driver-extras" % cassandraDriverExtrasVersion,
"com.google.code.findbugs" % "jsr305" % jsr305Version % Compile
)
Expand All @@ -172,7 +176,7 @@ lazy val distributedProcessingSettings = commonProtobufSettings ++ Seq(

lazy val akkaPersistenceSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
Expand All @@ -181,7 +185,10 @@ lazy val akkaPersistenceSettings = commonProtobufSettings ++ Seq(
)

lazy val akkaGenericSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion
)
)

lazy val exampleSettings =
Expand All @@ -190,8 +197,8 @@ lazy val exampleSettings =
resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven",
libraryDependencies ++=
Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"com.github.krasserm" %% "streamz-converter" % "0.10-M2",
"co.fs2" %% "fs2-core" % "1.0.0",
"org.typelevel" %% "cats-mtl-core" % catsMTLVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.http4s" %% "http4s-dsl" % http4sVersion,
Expand All @@ -215,6 +222,7 @@ lazy val testKitSettings = Seq(

lazy val testingSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-tagless-macros" % catsTaglessVersion,
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ class AkkaPersistenceRuntime[O] private[akkapersistence] (system: ActorSystem,
val props =
AkkaPersistenceRuntimeActor.props(
typeName,
behavior.actions,
behavior.create,
behavior.update,
behavior,
snapshotPolicy,
tagging,
settings.idleTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {

def props[M[_[_]], F[_]: Effect, I: KeyDecoder, State, Event: PersistentEncoder: PersistentDecoder](
entityName: String,
actions: M[ActionT[F, State, Event, ?]],
initialState: State,
updateState: (State, Event) => Folded[State],
behavior: EventsourcedBehavior[M, F, State, Event],
snapshotPolicy: SnapshotPolicy[State],
tagging: Tagging[I],
idleTimeout: FiniteDuration,
Expand All @@ -51,9 +49,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {
Props(
new AkkaPersistenceRuntimeActor(
entityName,
actions,
initialState,
updateState,
behavior,
snapshotPolicy,
tagging,
idleTimeout,
Expand All @@ -78,9 +74,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {
*/
private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_], I: KeyDecoder, State, Event: PersistentEncoder: PersistentDecoder](
entityName: String,
actions: M[ActionT[F, State, Event, ?]],
initialState: State,
updateState: (State, Event) => Folded[State],
behavior: EventsourcedBehavior[M, F, State, Event],
snapshotPolicy: SnapshotPolicy[State],
tagger: Tagging[I],
idleTimeout: FiniteDuration,
Expand Down Expand Up @@ -113,7 +107,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],

log.info("[{}] Starting...", persistenceId)

private var state: State = initialState
private var state: State = behavior.fold.initial

private var eventCount = 0L

Expand Down Expand Up @@ -178,7 +172,12 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],
M.decoder
.decodeValue(commandBytes) match {
case Attempt.Successful(pair) =>
log.debug("[{}] [{}] Received invocation [{}]", self.path, persistenceId, pair.first.toString)
log.debug(
"[{}] [{}] Received invocation [{}]",
self.path,
persistenceId,
pair.first.toString
)
performInvocation(pair.first, pair.second)
case Attempt.Failure(cause) =>
val decodingError = new IllegalArgumentException(cause.messageWithContext)
Expand All @@ -188,9 +187,8 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],

def performInvocation[A](invocation: Invocation[M, A], resultEncoder: Encoder[A]): Unit = {
val opId = UUID.randomUUID()
invocation
.run(actions)
.run(state, updateState)
behavior
.run(state, invocation)
.flatMap {
case Next((events, result)) =>
F.delay(
Expand Down Expand Up @@ -261,7 +259,8 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_],
}

private def applyEvent(event: Event): Unit =
state = updateState(state, event)
state = behavior.fold
.reduce(state, event)
.getOrElse {
val error = new IllegalStateException(s"Illegal state after applying [$event] to [$state]")
log.error(error, error.getMessage)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.event.Logging
import akka.persistence.cassandra.Session.Init
import akka.persistence.cassandra.session.CassandraSessionSettings
import akka.persistence.cassandra.session.scaladsl.CassandraSession
import cats.effect.Effect
import cats.effect.{ ContextShift, Effect }
import cats.implicits._

object DefaultJournalCassandraSession {
Expand All @@ -15,13 +15,18 @@ object DefaultJournalCassandraSession {
* Creates CassandraSession using settings of default cassandra journal.
*
*/
def apply[F[_]](system: ActorSystem, metricsCategory: String, init: Init[F])(
implicit F: Effect[F]
): F[CassandraSession] = F.delay {
def apply[F[_]: ContextShift](
system: ActorSystem,
metricsCategory: String,
init: Init[F],
sessionProvider: Option[SessionProvider] = None
)(implicit F: Effect[F]): F[CassandraSession] = F.delay {
val log = Logging(system, classOf[CassandraSession])
val provider = SessionProvider(
system.asInstanceOf[ExtendedActorSystem],
system.settings.config.getConfig("cassandra-journal")
val provider = sessionProvider.getOrElse(
SessionProvider(
system.asInstanceOf[ExtendedActorSystem],
system.settings.config.getConfig("cassandra-journal")
)
)
val settings = CassandraSessionSettings(system.settings.config.getConfig("cassandra-journal"))
new CassandraSession(system, provider, settings, system.dispatcher, log, metricsCategory, { x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package akka.persistence.cassandra
import java.util.concurrent.Executor

import cats.data.Kleisli
import cats.effect.Async
import cats.effect.{ Async, ContextShift }
import com.datastax.driver.core.{ ResultSet, TypeCodec, Session => DatastaxSession }

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

trait Session[F[_]] {
Expand All @@ -14,26 +15,32 @@ trait Session[F[_]] {

object Session {
type Init[F[_]] = Kleisli[F, Session[F], Unit]
def Init[F[_]](f: Session[F] => F[Unit]): Init[F] = Kleisli(f)
private val immediateExecutor = new Executor {
override def execute(command: Runnable): Unit =
command.run()
}

def apply[F[_]](datastaxSession: DatastaxSession)(implicit F: Async[F]): Session[F] =
private val immediateExecutionContext = ExecutionContext.fromExecutor(immediateExecutor)

def apply[F[_]](datastaxSession: DatastaxSession)(implicit F: Async[F],
contextShift: ContextShift[F]): Session[F] =
new Session[F] {
final override def execute(query: String): F[ResultSet] =
F.async { cb =>
val future = datastaxSession.executeAsync(query)
val runnable = new Runnable {
override def run(): Unit =
try {
cb(Right(future.get()))
} catch {
case NonFatal(e) =>
cb(Left(e))
}
contextShift.evalOn(immediateExecutionContext) {
F.async { cb =>
val future = datastaxSession.executeAsync(query)
val runnable = new Runnable {
override def run(): Unit =
try {
cb(Right(future.get()))
} catch {
case NonFatal(e) =>
cb(Left(e))
}
}
future.addListener(runnable, immediateExecutor)
}
future.addListener(runnable, immediateExecutor)
}
override def registerCodec[A](codec: TypeCodec[A]): F[Unit] =
F.delay {
Expand Down
Loading

0 comments on commit 0da51af

Please sign in to comment.