Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cross build for Scala3 #1012

Merged
merged 8 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lazy val core = project
Compile / packageBin / packageOptions += Package.ManifestAttributes(
"Automatic-Module-Name" -> "akka.persistence.cassandra"))
.configs(MultiJvm)
.settings(Scala3.settings)

lazy val cassandraLauncher = project
.in(file("cassandra-launcher"))
Expand All @@ -37,6 +38,7 @@ lazy val cassandraLauncher = project
name := "akka-persistence-cassandra-launcher",
Compile / managedResourceDirectories += (cassandraBundle / target).value / "bundle",
Compile / managedResources += (cassandraBundle / assembly).value)
.settings(Scala3.settings)

// This project doesn't get published directly, rather the assembled artifact is included as part of cassandraLaunchers
// resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.datastax.oss.driver.api.core.cql.Row
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ClassicActorSystemProvider

object EventsByTagMigration {
Expand Down Expand Up @@ -79,7 +78,7 @@ class EventsByTagMigration(
private lazy val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](pluginConfigPath + ".query")
private implicit val sys: ActorSystem = system

implicit val ec =
implicit val ec: ExecutionContext =
system.dispatchers.lookup(system.settings.config.getString(s"$pluginConfigPath.journal.plugin-dispatcher"))
private val settings: PluginSettings =
new PluginSettings(system, system.settings.config.getConfig(pluginConfigPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
package akka.persistence.cassandra.cleanup

import java.lang.{ Integer => JInt, Long => JLong }

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, ClassicActorSystemProvider }
import akka.actor.{ ActorRef, ActorSystem, ClassicActorSystemProvider }
import akka.annotation.ApiMayChange
import akka.event.Logging
import akka.pattern.ask
Expand All @@ -22,6 +21,7 @@ import akka.persistence.cassandra.journal.CassandraJournal
import akka.persistence.cassandra.reconciler.Reconciliation
import akka.persistence.cassandra.reconciler.ReconciliationSettings
import akka.persistence.cassandra.snapshot.{ CassandraSnapshotStatements, CassandraSnapshotStore }
import akka.stream.Materializer
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.Timeout
Expand All @@ -48,11 +48,11 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
systemProvider,
new CleanupSettings(systemProvider.classicSystem.settings.config.getConfig("akka.persistence.cassandra.cleanup")))

private implicit val system = systemProvider.classicSystem
private implicit val system: ActorSystem = systemProvider.classicSystem
import settings._
import system.dispatcher

private val log = Logging(system, getClass)
private val log = Logging(system, getClass.asInstanceOf[Class[Any]])

// operations on journal, snapshotStore and tagViews should be only be done when dry-run = false
private val journal: ActorRef = Persistence(system).journalFor(pluginLocation + ".journal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import akka.stream.scaladsl.Source
private val statements: CassandraStatements = new CassandraStatements(settings)
private val healthCheckCql = settings.healthCheckSettings.healthCheckCql
private val serialization = SerializationExtension(context.system)
private val log: LoggingAdapter = Logging(context.system, getClass)
private val log: LoggingAdapter = Logging(context.system, getClass.asInstanceOf[Class[Any]])

private implicit val ec: ExecutionContext = context.dispatcher

Expand Down Expand Up @@ -259,7 +259,7 @@ import akka.stream.scaladsl.Source
writeInProgress.put(pid, writeInProgressForPersistentId.future)

val toReturn: Future[Nil.type] = Future.sequence(writesWithUuids.map(w => serialize(w))).flatMap {
serialized: Seq[SerializedAtomicWrite] =>
(serialized: Seq[SerializedAtomicWrite]) =>
val result: Future[Any] =
if (messages.map(_.payload.size).sum <= journalSettings.maxMessageBatchSize) {
// optimize for the common case
Expand Down Expand Up @@ -390,7 +390,7 @@ import akka.stream.scaladsl.Source
maxPnr - minPnr <= 1,
"Do not support AtomicWrites that span 3 partitions. Keep AtomicWrites <= max partition size.")

val writes: Seq[Future[BoundStatement]] = all.map { m: Serialized =>
val writes: Seq[Future[BoundStatement]] = all.map { (m: Serialized) =>
// using two separate statements with or without the meta data columns because
// then users doesn't have to alter table and add the new columns if they don't use
// the meta data feature
Expand Down Expand Up @@ -862,7 +862,7 @@ import akka.stream.scaladsl.Source

class EventDeserializer(system: ActorSystem) {

private val log = Logging(system, this.getClass)
private val log = Logging(system, this.getClass.asInstanceOf[Class[Any]])

private val serialization = SerializationExtension(system)
val columnDefinitionCache = new ColumnDefinitionCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
private val serialization = SerializationExtension(system)

// used for local asks
private implicit val timeout = Timeout(10.second)
private implicit val timeout: Timeout = Timeout(10.second)

import statements._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import scala.collection.immutable
import java.lang.{ Integer => JInt, Long => JLong }
import java.net.URLEncoder
import java.util.UUID

import scala.concurrent.Promise
import akka.Done
import akka.actor.SupervisorStrategy.Escalate
Expand All @@ -27,7 +26,7 @@ import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.journal.TagWriter._
import akka.persistence.cassandra.journal.TagWriters._
import akka.persistence.cassandra.journal.TagWriters.TagWritersSession
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
import akka.util.ByteString
import akka.util.Timeout
Expand Down Expand Up @@ -176,6 +175,7 @@ import scala.util.Try
with Timers
with ActorLogging {

import akka.persistence.cassandra.journal.TagWriters._
import context.dispatcher

// eager init and val because used from Future callbacks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import com.typesafe.config.Config
class CassandraReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String)
extends ReadJournalProvider {

override val scaladslReadJournal: scaladsl.CassandraReadJournal =
new scaladsl.CassandraReadJournal(system, config, configPath)
private val scaladslReadJournalInstance = new scaladsl.CassandraReadJournal(system, config, configPath)

override val javadslReadJournal: javadsl.CassandraReadJournal =
new javadsl.CassandraReadJournal(scaladslReadJournal)
override def scaladslReadJournal(): scaladsl.CassandraReadJournal = scaladslReadJournalInstance

private val javadslReadJournalInstance = new javadsl.CassandraReadJournal(scaladslReadJournalInstance)

override def javadslReadJournal(): javadsl.CassandraReadJournal = javadslReadJournalInstance

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.persistence.cassandra.query

import java.lang.{ Long => JLong }
import java.util.concurrent.ThreadLocalRandom

import akka.Done
import akka.annotation.InternalApi
import akka.stream.{ Attributes, Outlet, SourceShape }
Expand All @@ -16,11 +15,10 @@ import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.{ Failure, Success, Try }

import com.datastax.oss.driver.api.core.CqlSession

import scala.annotation.nowarn
import scala.compat.java8.FutureConverters._

import akka.persistence.cassandra.PluginSettings

/**
Expand Down Expand Up @@ -126,7 +124,7 @@ import akka.persistence.cassandra.PluginSettings
override protected def logSource: Class[_] =
classOf[EventsByPersistenceIdStage]

implicit def ec = materializer.executionContext
implicit def ec: ExecutionContext = materializer.executionContext

val donePromise = Promise[Done]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ package akka.persistence.cassandra.query

import java.util.UUID
import java.util.concurrent.ThreadLocalRandom

import akka.annotation.InternalApi
import akka.persistence.cassandra._
import akka.persistence.cassandra.journal.TimeBucket
import akka.persistence.cassandra.query.EventsByTagStage._
import akka.stream.stage.{ GraphStage, _ }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.util.PrettyDuration._
Expand All @@ -21,11 +19,11 @@ import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future }
import scala.util.{ Failure, Success, Try }
import java.lang.{ Long => JLong }

import akka.actor.Scheduler
import akka.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator }
import akka.persistence.cassandra.EventsByTagSettings.RetrySettings
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.query.EventsByTagStage.{ TagStageSession, UUIDRow }
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
import akka.util.UUIDComparator
import com.datastax.oss.driver.api.core.CqlSession
Expand Down Expand Up @@ -218,6 +216,7 @@ import scala.compat.java8.FutureConverters._
scanner: TagViewSequenceNumberScanner)
extends GraphStage[SourceShape[UUIDRow]] {

import akka.persistence.cassandra.query.EventsByTagStage._
import settings.querySettings
import settings.eventsByTagSettings

Expand All @@ -240,7 +239,7 @@ import scala.compat.java8.FutureConverters._
toOffset.map(Uuids.unixTimestamp).getOrElse(Long.MaxValue)

lazy val system = materializer.system
lazy implicit val scheduler = system.scheduler
lazy implicit val scheduler: Scheduler = system.scheduler

private def calculateToOffset(): UUID = {
val to: Long = Uuids.unixTimestamp(Uuids.timeBased()) - eventsByTagSettings.eventualConsistency.toMillis
Expand All @@ -267,7 +266,7 @@ import scala.compat.java8.FutureConverters._
private def updateQueryState(state: QueryState): Unit =
updateStageState(_.copy(state = state))

implicit def ec: ExecutionContextExecutor = materializer.executionContext
implicit def ec: ExecutionContext = materializer.executionContext

setHandler(out, this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import akka.stream.scaladsl.Sink
@InternalApi private[akka] class TagViewSequenceNumberScanner(session: Session, pluginDispatcher: String)(
implicit materializer: Materializer,
@nowarn("msg=never used") ec: ExecutionContext) {
private val log = Logging(materializer.system, getClass)
private val log = Logging(materializer.system, getClass.asInstanceOf[Class[Any]])

/**
* This could be its own stage and return half way through a query to better meet the deadline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.persistence.cassandra.query.scaladsl

import java.net.URLEncoder
import java.util.UUID

import akka.{ Done, NotUsed }
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.annotation.InternalApi
Expand All @@ -29,7 +28,7 @@ import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
Expand Down Expand Up @@ -138,7 +137,7 @@ class CassandraReadJournal protected (
new CassandraJournal.EventDeserializer(system)

private val serialization = SerializationExtension(system)
implicit private val ec =
implicit private val ec: ExecutionContext =
system.dispatchers.lookup(querySettings.pluginDispatcher)
implicit private val sys: ActorSystem = system

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import akka.event.Logging
import akka.persistence.cassandra.journal.CassandraTagRecovery
import akka.persistence.cassandra.Extractors
import akka.util.Timeout
import akka.stream.OverflowStrategy
import akka.stream.{ OverflowStrategy }
import akka.stream.scaladsl.Sink
import akka.annotation.InternalApi
import akka.serialization.SerializationExtension
Expand All @@ -35,15 +35,15 @@ private[akka] final class BuildTagViewForPersisetceId(

import system.dispatcher

private implicit val sys = system
private implicit val sys: ActorSystem = system
private val log = Logging(system, classOf[BuildTagViewForPersisetceId])
private val serialization = SerializationExtension(system)

private val queries: CassandraReadJournal =
PersistenceQuery(system.asInstanceOf[ExtendedActorSystem])
.readJournalFor[CassandraReadJournal]("akka.persistence.cassandra.query")

private implicit val flushTimeout = Timeout(30.seconds)
private implicit val flushTimeout: Timeout = Timeout(30.seconds)

def reconcile(flushEvery: Int = 1000): Future[Done] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[akka] final class DeleteTagViewForPersistenceId(
settings: PluginSettings,
queries: CassandraReadJournal) {
private val log = Logging(system, s"DeleteTagView($tag)")
private implicit val sys = system
private implicit val sys: ActorSystem = system
import system.dispatcher

def execute(): Future[Done] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
// this meta query gets slower than slower if snapshots are deleted without a criteria.minSequenceNr as
// all previous tombstones are scanned in the meta data query
metadata(snapshotMetaPs, persistenceId, criteria, limit = None).flatMap {
mds: immutable.Seq[SnapshotMetadata] =>
(mds: immutable.Seq[SnapshotMetadata]) =>
val boundStatementBatches = mds
.map(md =>
preparedDeleteSnapshot.map(_.bind(md.persistenceId, md.sequenceNr: JLong)
Expand Down Expand Up @@ -303,7 +303,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
@InternalApi
private[akka] class SnapshotSerialization(system: ActorSystem)(implicit val ec: ExecutionContext) {

private val log = Logging(system, this.getClass)
private val log = Logging(system, this.getClass.asInstanceOf[Class[Any]])

private val serialization = SerializationExtension(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
var allReceived: Map[String, List[Long]] = Map.empty.withDefaultValue(List.empty)
probe.request(messagesPerPersistenceId * nrPersistenceIds)

(1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { i: Long =>
(1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { (i: Long) =>
val event = try {
probe.expectNext(veryLongWait)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ abstract class CassandraSpec(

final override def systemName = system.name

implicit val patience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Milliseconds))
implicit val patience: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Milliseconds))

val pidCounter = new AtomicInteger()
def nextPid = s"pid=${pidCounter.incrementAndGet()}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }

class EventsByTagStressSpec extends CassandraSpec(s"""
akka.persistence.cassandra {
Expand All @@ -21,7 +21,7 @@ class EventsByTagStressSpec extends CassandraSpec(s"""
}
""") {

implicit val ec = system.dispatcher
implicit val ec: ExecutionContext = system.dispatcher

val writers = 10
val readers = 20
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

package akka.persistence.cassandra

import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.actor.{ ActorSystem, Scheduler }
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.scalatest.BeforeAndAfterAll
Expand All @@ -21,8 +20,8 @@ class RetriesSpec
with ScalaFutures
with BeforeAndAfterAll
with Matchers {
implicit val scheduler = system.scheduler
implicit val ec = system.dispatcher
implicit val scheduler: Scheduler = system.scheduler
implicit val ec: ExecutionContext = system.dispatcher
"Retries" should {
"retry N number of times" in {
val failProbe = TestProbe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class BufferSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem()
implicit val system: ActorSystem = ActorSystem()

"Buffer" should {
"not write when empty" in {
Expand Down
Loading