Skip to content

fs2 document parsing + fs2-ce2 #142

Merged
merged 4 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ lazy val `akka-http` = phobosModule("akka-http") dependsOn (core % "compile-
lazy val `akka-stream` = phobosModule("akka-stream") dependsOn (core % "compile->compile;test->test")
lazy val monix = phobosModule("monix") dependsOn (core % "compile->compile;test->test")
lazy val fs2 = phobosModule("fs2") dependsOn (core % "compile->compile;test->test")
lazy val `fs2-ce2` = phobosModule("fs2-ce2") dependsOn (core % "compile->compile;test->test")
lazy val refined = phobosModule("refined") dependsOn (core % "compile->compile;test->test")
lazy val ast = phobosModule("ast") dependsOn (core % "compile->compile;test->test")

lazy val modules: List[ProjectReference] =
List(core, `akka-http`, derevo, enumeratum, monix, fs2, `akka-stream`, refined, ast)
List(core, `akka-http`, derevo, enumeratum, monix, fs2, `fs2-ce2`, `akka-stream`, refined, ast)

lazy val phobos = project
.in(file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait AttributeDecoder[A] { self =>

object AttributeDecoder extends AttributeLiteralInstances {

def apply[A](implicit instance: AttributeDecoder[A]) = instance

/** Instances
*/
implicit val stringDecoder: AttributeDecoder[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ trait ElementDecoder[A] { self =>

object ElementDecoder extends ElementLiteralInstances {

def apply[A](implicit instance: ElementDecoder[A]) = instance

def errorIfWrongName[A](c: Cursor, localName: String, namespaceUri: Option[String]): Option[FailedDecoder[A]] = {
namespaceUri match {
case _ if c.getLocalName != localName =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ trait TextDecoder[A] { self =>

object TextDecoder extends TextLiteralInstances {

def apply[A](implicit instance: TextDecoder[A]) = instance

class MappedDecoder[A, B](fa: TextDecoder[A], f: A => B) extends TextDecoder[B] {

def decodeAsText(c: Cursor): TextDecoder[B] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ trait AttributeEncoder[A] { self =>

object AttributeEncoder extends AttributeLiteralInstances {

def apply[A](implicit instance: AttributeEncoder[A]) = instance

/** Instances
*/
implicit val stringEncoder: AttributeEncoder[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ trait ElementEncoder[A] { self =>

object ElementEncoder extends ElementLiteralInstances {

def apply[A](implicit instance: ElementEncoder[A]) = instance

/** Instances
*/
implicit val stringEncoder: ElementEncoder[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ trait TextEncoder[A] { self =>

object TextEncoder extends TextLiteralInstances {

def apply[A](implicit instance: TextEncoder[A]) = instance

/** Instances
*/
implicit val stringEncoder: TextEncoder[String] =
Expand Down
4 changes: 4 additions & 0 deletions modules/fs2-ce2/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % "2.5.10",
"co.fs2" %% "fs2-io" % "2.5.10" % "test",
)
39 changes: 39 additions & 0 deletions modules/fs2-ce2/src/main/scala/ru/tinkoff/phobos/fs2/Fs2Ops.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ru.tinkoff.phobos.fs2

import cats.MonadError
import cats.syntax.flatMap._
import fs2.Stream
import javax.xml.stream.XMLStreamConstants
import ru.tinkoff.phobos.decoding.{Cursor, ElementDecoder, XmlDecoder, XmlStreamReader}

trait Fs2Ops {
implicit def decoderOps[A](xmlDecoder: XmlDecoder[A]): DecoderOps[A] = new DecoderOps[A](xmlDecoder)
}

class DecoderOps[A](private val xmlDecoder: XmlDecoder[A]) extends AnyVal {
def decodeFromStream[F[_], G[_]](
stream: Stream[F, Array[Byte]],
charset: String = "UTF-8",
)(implicit compiler: Stream.Compiler[F, G], monadError: MonadError[G, Throwable]): G[A] = {
val sr: XmlStreamReader = XmlDecoder.createStreamReader(charset)
val cursor = new Cursor(sr)

stream
.fold[ElementDecoder[A]](xmlDecoder.elementdecoder) { (decoder, bytes) =>
sr.getInputFeeder.feedInput(bytes, 0, bytes.length)
do {
cursor.next()
} while (cursor.getEventType == XMLStreamConstants.DTD || cursor.getEventType == XMLStreamConstants.START_DOCUMENT)

if (decoder.result(cursor.history).isRight) {
decoder
} else {
decoder.decodeAsElement(cursor, xmlDecoder.localname, xmlDecoder.namespaceuri)
}
}
.map(_.result(cursor.history))
.compile
.lastOrError
.flatMap(result => MonadError[G, Throwable].fromEither(result))
}
}
81 changes: 81 additions & 0 deletions modules/fs2-ce2/src/main/scala/ru/tinkoff/phobos/fs2/Parse.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ru.tinkoff.phobos.fs2

import cats.data.NonEmptyList
import cats.effect.Sync
import fs2._
import ru.tinkoff.phobos.decoding._
import com.fasterxml.aalto.AsyncXMLStreamReader.EVENT_INCOMPLETE
import javax.xml.stream.XMLStreamConstants._
import javax.xml.stream.events.StartElement
import scala.annotation.tailrec

object Parse {
def oneDocument(rootElement: String) = OneDocument(NonEmptyList.one(rootElement))

case class OneDocument(path: NonEmptyList[String]) {
def inElement(elementName: String) = copy(path = elementName :: path)

def everyElementAs[T: XmlDecoder] = OneDocumentDecoderApplied(path, XmlDecoder[T])
}

case class OneDocumentDecoderApplied[T](path: NonEmptyList[String], decoder: XmlDecoder[T]) {
def toFs2Stream[F[_]: Sync](initialStream: Stream[F, Byte]): Stream[F, Either[DecodingError, T]] = {
val path = this.path.toList

val streamReader =
Stream.bracket(Sync[F].pure(XmlDecoder.createStreamReader("UTF-8")))(
reader => Sync[F].delay(reader.close())
)

streamReader.flatMap { sr =>
val cursor = new Cursor(sr)
var lastDecoder = decoder.elementdecoder

initialStream
.chunks
.noneTerminate
.flatMap { available =>
available match {
case Some(chunk) => sr.getInputFeeder().feedInput(chunk.toArray, 0, chunk.size)
case None => sr.getInputFeeder().endOfInput()
}

// Move cursor until:
// - EVENT_INCOMPLETE | END_DOCUMENT -- here we have nothing to do but pull another chunk or terminate
// - START_ELEMENT & cursor.history.tail == path -- stop when meet next element inside given path to decode
// - _ & lastDecoder != decoder.elementdecoder -- lastDecoder is partially filled so we have to proceed any further data
@tailrec
def skipUnnecessary(): Int =
cursor.next() match {
case ev @ (EVENT_INCOMPLETE | END_DOCUMENT) => ev
case ev @ START_ELEMENT if cursor.history.nonEmpty && cursor.history.tail == path => ev
case ev if lastDecoder != decoder.elementdecoder => ev
case _ => skipUnnecessary()
}

skipUnnecessary() match {
case EVENT_INCOMPLETE | END_DOCUMENT => Stream.emit(None)
case _ =>
Stream.unfoldLoop(lastDecoder) { d =>
val newDecoder = d.decodeAsElement(cursor, decoder.localname, decoder.namespaceuri)

if (!newDecoder.isCompleted) {
// Decoder has consumed all the available data although decoding isn't completed.
// We have to pull more data and proceed with this decoder.
lastDecoder = newDecoder
None -> None
} else {
lastDecoder = decoder.elementdecoder
val result = newDecoder.result(cursor.history)

if (skipUnnecessary() == START_ELEMENT) Some(result) -> Some(lastDecoder)
else Some(result) -> None
}
}
}
}.unNone
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package ru.tinkoff.phobos

package object fs2 extends Fs2Ops
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package ru.tinkoff.phobos.test

import org.scalatest.wordspec.AsyncWordSpec
import ru.tinkoff.phobos.annotations.{ElementCodec, XmlCodec}
import ru.tinkoff.phobos.decoding.XmlDecoder
import ru.tinkoff.phobos.syntax.text
import ru.tinkoff.phobos.fs2._
import fs2.Stream
import cats.effect.IO

class Fs2Test extends AsyncWordSpec {

"Fs2 decoder" should {
"decode case classes correctly" in {
@ElementCodec
case class Bar(@text txt: Int)
@XmlCodec("foo")
case class Foo(qux: Int, maybeBar: Option[Bar], bars: List[Bar])

val xml = """
|<foo>
| <qux>1234</qux>
| <bars>2</bars>
| <maybeBar>1</maybeBar>
| <bars>3</bars>
|</foo>
|""".stripMargin

val foo = Foo(1234, Some(Bar(1)), List(Bar(2), Bar(3)))
val stream = Stream.iterable[IO, Array[Byte]](xml.map(x => Array(x.toByte)))
XmlDecoder[Foo]
.decodeFromStream(stream)
.map(result => assert(result == foo))
.unsafeToFuture()
}
}
}
131 changes: 131 additions & 0 deletions modules/fs2-ce2/src/test/scala/ru/tinkoff/phobos/test/ParseTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package ru.tinkoff.phobos.test

import org.scalatest.wordspec.AsyncWordSpec
import ru.tinkoff.phobos.annotations.{ElementCodec, XmlCodec}
import ru.tinkoff.phobos.decoding.XmlDecoder
import ru.tinkoff.phobos.syntax.text
import ru.tinkoff.phobos.fs2._
import fs2.Stream
import cats.instances.future._
import cats.syntax.flatMap._
import cats.effect.IO
import org.scalatest.Inspectors
import ru.tinkoff.phobos.decoding.DecodingError

class ParseTest extends AsyncWordSpec with Inspectors {

@XmlCodec("foo")
case class Foo(@text txt: Int)

object xml {
val simpleSequential =
("root" :: Nil) ->
"""|<root>
| <foo>1</foo>
| <foo>2</foo>
| <foo>3</foo>
| <foo>4</foo>
|</root>
|""".stripMargin

val nestedRepetetive =
("root" :: "sub" :: Nil) ->
"""|<root>
| <sub>
| <foo>1</foo>
| <foo>2</foo>
| </sub>
| <sub>
| <foo>3</foo>
| <foo>4</foo>
| </sub>
|</root>
|""".stripMargin

val nestedRepetetiveIcnludingOtherTags =
("root" :: "sub" :: Nil) ->
"""|<root>
| <sub>
| <foo>1</foo>
| <!-- skip it -->
| <bar>nope</bar>
| <foo>2</foo>
| </sub>
| <sub>
| <foo>3</foo>
| <foo>4</foo>
| </sub>
| <!-- skip it too -->
| <bar>nope</bar>
| <sub>
| <foo>5</foo>
| </sub>
| <sub>
| <!-- and this one -->
| <bar>nope</bar>
| </sub>
|</root>
|""".stripMargin

val all = simpleSequential :: nestedRepetetive :: nestedRepetetiveIcnludingOtherTags :: Nil
}

val expectations = Map(
xml.simpleSequential -> Vector(1, 2, 3, 4).map(Foo(_)).map(Right(_)),
xml.nestedRepetetive -> Vector(1, 2, 3, 4).map(Foo(_)).map(Right(_)),
xml.nestedRepetetiveIcnludingOtherTags -> Vector(
Right(Foo(1)),
Left(DecodingError("Invalid local name. Expected 'foo', but found 'bar'", List("bar", "sub", "root"))),
Right(Foo(2)),
Right(Foo(3)),
Right(Foo(4)),
Right(Foo(5)),
Left(DecodingError("Invalid local name. Expected 'foo', but found 'bar'", List("bar", "sub", "root")))
)
)

def readAtOnce(path: List[String], xmlString: String) = {
val streamBuilder = path.tail.foldLeft(Parse.oneDocument(path.head))(_.inElement(_))
Stream.emits[IO, Byte](xmlString.getBytes).through(streamBuilder.everyElementAs[Foo].toFs2Stream[IO])
}

def readByteByByte(path: List[String], xmlString: String) = {
val streamBuilder = path.tail.foldLeft(Parse.oneDocument(path.head))(_.inElement(_))
Stream.emits[IO, Byte](xmlString.getBytes).unchunk.through(streamBuilder.everyElementAs[Foo].toFs2Stream[IO])
}

def assertStreamResult(stream: Stream[IO, Either[DecodingError, Foo]])(expects: Vector[Either[DecodingError, Foo]]) =
stream.compile.toVector.map(result => assert(result === expects)).unsafeToFuture()

"Parse" should {
"handle all the elements" when {
"oneDocument called with simpleSequential xml" in {
val testCase @ (path, xmlString) = xml.simpleSequential

for {
r1 <- assertStreamResult(readAtOnce(path, xmlString))(expectations(testCase))
r2 <- assertStreamResult(readByteByByte(path, xmlString))(expectations(testCase))
} yield succeed
}

"oneDocument called with nestedRepetetive xml" in {
val testCase @ (path, xmlString) = xml.nestedRepetetive

for {
r1 <- assertStreamResult(readAtOnce(path, xmlString))(expectations(testCase))
r2 <- assertStreamResult(readByteByByte(path, xmlString))(expectations(testCase))
} yield succeed
}

"oneDocument called with nestedRepetetiveIcnludingOtherTags xml" in {
val testCase @ (path, xmlString) = xml.nestedRepetetiveIcnludingOtherTags

for {
r1 <- assertStreamResult(readAtOnce(path, xmlString))(expectations(testCase))
r2 <- assertStreamResult(readByteByByte(path, xmlString))(expectations(testCase))
} yield succeed
}
}

}
}
5 changes: 0 additions & 5 deletions modules/fs2/src/main/scala/ru/tinkoff/phobos/fs2.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package ru.tinkoff.phobos.ops
package ru.tinkoff.phobos.fs2

import cats.MonadError
import cats.syntax.flatMap._
import fs2.{Stream, Compiler}
import javax.xml.stream.XMLStreamConstants
import ru.tinkoff.phobos.decoding.{Cursor, ElementDecoder, XmlDecoder, XmlStreamReader}

private[phobos] trait Fs2Ops {
implicit def DecoderOps[A](xmlDecoder: XmlDecoder[A]): DecoderOps[A] = new DecoderOps[A](xmlDecoder)
trait Fs2Ops {
implicit def decoderOps[A](xmlDecoder: XmlDecoder[A]): DecoderOps[A] = new DecoderOps[A](xmlDecoder)
}

class DecoderOps[A](private val xmlDecoder: XmlDecoder[A]) extends AnyVal {
Expand Down
Loading