Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Close #169 and add support for DataSet of Avro records #217

Open
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

themodernlife
Copy link

@bdrillard you did an amazing job getting this working and I hope you'll have a look at the slight refactoring I've done to get this over the finish line:

  • Works with multiple versions of Spark (2.0.x and 2.1.x so far)
  • Travis test matrix should be updated appropriately
  • Works with multiple versions of Avro

The main challenge was getting past Spark version incompatibilities. I've done that through the addition of SBT modules, liberal use of reflection and some straight up copy/paste. I've tried to put proper fences around the hacks.

Let me know what you guys think! Again, all credit to @bdrillard since this is 90% his work.

Cheers!

JoshRosen and others added 18 commits November 27, 2016 17:51
To avoid confusion, we should remove the documentation / linking instructions for the 2.x line of releases since the current README describes features which don't apply there. Instead, we should link to the older docs.

Author: Josh Rosen <joshrosen@databricks.com>

Closes databricks#199 from JoshRosen/readme-fixes.

(cherry picked from commit b01a034)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
There is still an issue with ExternalMapToCatalyst to be resolved
This patch builds on databricks#206 in order to restore support for Spark 2.0.x. This means that a single binary artifact can be used with both Spark 2.0.x and 2.1.x, simplifying the builds of downstream projects which are compatible with both Spark versions.

Author: Josh Rosen <joshrosen@databricks.com>

Closes databricks#212 from JoshRosen/add-spark-2.1.
Author: Josh Rosen <joshrosen@databricks.com>

Closes databricks#213 from JoshRosen/prepare-for-3.2-release.
@codecov-io
Copy link

codecov-io commented Feb 10, 2017

Codecov Report

Merging #217 into master will increase coverage by 2.45%.
The diff coverage is 95.56%.

@@            Coverage Diff             @@
##           master     #217      +/-   ##
==========================================
+ Coverage   90.71%   93.16%   +2.45%     
==========================================
  Files           5        7       +2     
  Lines         334      688     +354     
  Branches       50       73      +23     
==========================================
+ Hits          303      641     +338     
- Misses         31       47      +16

@bdrillard
Copy link

@themodernlife thanks for putting this pull-request together, I certainly didn't have the SBT expertise necessary to handle multiple builds of Spark, so this additional work is just what the encoder needed. I had a look at the inlining of additional object types that aren't common to both 2.0.0 and 2.1.x, as well as the serializing of schemas (when used in parsing complex unions), those changes look fine to me. I noticed just a few stray commented-out lines from what may have been some refactoring experiments and a few style points in doc comments. Otherwise this looks great, thanks for the additional legwork on this issue.

@gcpagano
Copy link

gcpagano commented Feb 14, 2017

I'm trying to add the following test based on this PR, but I get a org.apache.spark.SparkException: Task not serializable caused by Cause: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
Here the full stack trace https://gist.github.com/gcpagano/e95a351dc75c4a6ecd2a50fdd7665e90

I'm not sure if this use case wasn't meant to be supported in this PR or maybe I'm doing something wrong in the test code.

  test("create Dataset from GenericRecord") {
    val sparkSession = spark
    import sparkSession.implicits._

    val schema: Schema =
      SchemaBuilder
        .record("GenericRecordTest")
        .namespace("com.databricks.spark.avro")
        .fields()
        .requiredString("field1")
        .endRecord()

    implicit val enc = AvroEncoder.of[GenericData.Record](schema)

    val genericRecords = (1 to 10) map { i =>
      new GenericRecordBuilder(schema)
        .set("field1", "field-" + i)
        .build()
    }

    val rdd: RDD[Record] = sparkSession.sparkContext
      .parallelize(genericRecords)

    val ds = rdd.toDS()

    assert(ds.count() == genericRecords.size)
  }

@themodernlife
Copy link
Author

Hmm... ran into this with SpecificRecord as well. The problem is toDS runs the encoder in mapPartitions. In mapPartitions we check that the encoder's entire expression tree is serializable. Some of the nodes in the expression tree hold a reference to the Avro Schema which is not, and you get errors like the above.

I'm looking into a way to solve this. I have a fix for your unit test, but looking at the code I'm worried there could be other runtime errors scattered about.

@themodernlife
Copy link
Author

@gcpagano pushed change for your unit test and added coverage for fixed and enum cases. Let me know if you have any feedback.

Hi,
based on this issue databricks#67
I create this pull request

Author: Nihed MBAREK <nihedmm@gmail.com>
Author: vlyubin <vlyubin@gmail.com>
Author: nihed <nihedmm@gmail.com>

Closes databricks#124 from nihed/master.

(cherry picked from commit c19f01a)
Signed-off-by: vlyubin <vlyubin@gmail.com>
@gcpagano
Copy link

LGTM

@tkinz27
Copy link

tkinz27 commented Mar 6, 2017

Been watching this PR for a while. Any reason to hold back on this issue?

@bdrillard
Copy link

@themodernlife Could you check and resolve this last merge conflict? It appears to be pretty minor.
@JoshRosen Would it be possible for us to merge this PR soon?

@gengliangwang
Copy link
Contributor

@bdrillard Do you have time to create a clean PR based on latest master? We will keep reviewing on the changes. Thanks!

@cloud-fan
Copy link

@themodernlife I'm afraid you need to send a new PR to set the base branch to master...

@themodernlife themodernlife changed the base branch from branch-3.2 to master October 30, 2017 19:22
@themodernlife
Copy link
Author

@cloud-fan I was able to update the base branch to master. Looking into why Travis is complaining.

@gengliangwang
Copy link
Contributor

Hi @themodernlife you changes contains commits from @JoshRosen and @liancheng , which should not be related to this PR.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2017

I took a look at the implementation and it depends on a lot of the internals of Spark, with a lot of expressions that even do code generation. Basically this would break with every new release of Spark and isn't really maintainable.

We should really design and implement a public API in Spark for expressing encoders, and then implement that here.

@rbrush
Copy link

rbrush commented Nov 3, 2017

I discussed this with @bdrillard, who did the bulk of the Encoder implementation, and we would welcome a stable public API in Spark for custom encoders like this. We will transition this logic to that when it's available. Should we log a JIRA to the Spark project to track that?

In the meantime, we realize this taps into some Spark classes that are subject to change, and are willing to update our internal usage of it as they evolve. We realize this isn't ideal, but it provides quite a bit of value to our existing workloads for reasons described earlier in this chain. Hopefully this state of affairs is temporary and we'll be able to migrate to the public API when available.

@rxin
Copy link
Contributor

rxin commented Nov 3, 2017

Certainly you can use this if you are willing to pay the cost to upgrade it with each version of Spark. But if that's the case, perhaps just do it in your fork for now?

@rbrush
Copy link

rbrush commented Nov 4, 2017

Sure, we're happy to continue with our internal fork for the time being. But of course we're only one of several interested parties on this thread, so I'm sure a public API to support this would be appreciated in general.

@rxin
Copy link
Contributor

rxin commented Nov 4, 2017

Yup designing a public API for encoder/decoders make a lot of sense in Spark proper. We should do it!

@rxin
Copy link
Contributor

rxin commented Nov 7, 2017

@rbrush @bdrillard Aside from that, is there a way to update the patch to not rely on custom code generation? We might need to add couple small expressions to Spark to do that.

If that works, perhaps we can create a release based on that first, and then use that as the basis to create the encoder API.

@bdrillard
Copy link

bdrillard commented Nov 7, 2017

@rxin, to check my understanding of your question, it depends on how broadly you mean "to not rely on custom code generation".

The Serializer/Deserializer of the ExpressionEncoder API produces a custom Expression, and that Expression ultimately generates custom code for the Projection to UnsafeRow for a collection of typed objects conforming to a given Avro schema. I don't think the core task of this patch (getting typed Datasets from collections of arbitrary Avro objects) could be attained without an Encoder, its SerDe, and the "custom code" it would generate. As we discussed a bit above, Spark-Avro already has the ability to convert a collection of Avro objects to a Dataframe, but without the Encoder SerDe we lose static type information that makes authoring different functions in the Spark API so much easier and more reasonable.

More specifically though, the functions producing the SerDe rely on currently public functions in the existing Spark expressions API, e.g. Invoke, InvokeStatic, Literal, etc.

We do however, have a few functions that are custom extensions of Expression, namely ObjectCast, InitializeAvroObject, and ExternalMapToCatalyst.

  • ExternalMapToCatalyst was in the Spark Expressions API at the time this patch was prepared, but the function was in private scope, so we copied it inline. At this time, I believe we could opt for the version in the Spark API, and so we could remove our copy.
  • ObjectCast creates Java code for performing an inline object cast, which is necessary for instances where (in Avro for example) a getter returns only Object and through traversal of the schema, we know what type we want to cast the object to. I think this could be safely/easily added to the Spark API if such functionality isn't already there.
  • InitializeAvroObject is exactly like InitializeJavaBean, but for creating instances of Avro objects. This function is necessary for the Deserializer of the Encoder, so it's not clear how we could take this function out/move it (I don't think it would make sense to add such an Avro-specific Expression to the Spark API).

Let me know your thoughts.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 8, 2017

@bdrillard I think it is the custom extensions of Expression that are the scariest as they build stringly typed code that relies on undocumented and unstable details of codegen. If we can eliminate these, I think it significantly reduces the concern. If we restrict ourselves to using APIs of internal Expressions then changes will at least give compiler errors.

Regarding the specific Expressions:

  • ExternalMapToCatalyst - catalyst is implicitly private (i.e excluded from the scala doc) so there is no need to rely on visibility modifiers. I think we can just make this public and use it here.
  • ObjectCast - I don't think that we have this (but @cloud-fan might know better). If we were going to add this to Spark I might just make it an optional argument to Invoke.
  • InitializeAvroObject - This one does seem a little harder. Again maybe we could generalize Spark's InitializeX to take Seq[(String, Seq[Expression])] or something so that it could be used for both.

@cloud-fan
Copy link

Spark doesn't have ObjectCast, but you can easily implement one.

@bdrillard
Copy link

@marmbrus, @cloud-fan, I'd be happy to open a PR in Spark proper with those changes, so we can then refactor this PR to make use of those public functions.

@cloud-fan
Copy link

why not just implement it here? Otherwise you can only support Spark 2.3 or later

@marmbrus
Copy link
Contributor

@cloud-fan the fear is that having an external library depend on the specifics of code-gen is too brittle. I think its okay if this requires a new version of Spark if that reduces the tight coupling with the specifics of execution. Until that is released we can update this PR by depending on a nightly snapshot and then once 2.3 is out we can make a new spark-avro release.

@bdrillard, that sounds great. Feel free to message me on the Spark JIRA / PR and I can help shepherd them in. Thanks for working on this!

@kalvinnchau
Copy link

kalvinnchau commented Nov 15, 2017

@themodernlife
I'm getting a Cause: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Couldn't find bytes on class java.nio.ByteBuffer Exception with this schema format:

{
  "type": "record",
  "name": "GenericRecordTest2",
  "namespace": "com.databricks.spark.avro",
  "fields": [
    {
      "name": "fixedUnionVal",
      "type": [
        {
          "type": "fixed",
          "name": "IPv4",
          "size": 4
        },
        "int"
      ]
    }
  ]
}

Here's the test I added:

  test("create Dataset from Fixed Generic Record") {
    val sparkSession = spark
    import sparkSession.implicits._

    val schema: Schema =
      SchemaBuilder
        .record("GenericRecordTest2")
        .namespace("com.databricks.spark.avro")
        .fields()
        .name("fixedUnionVal").`type`()
          .unionOf()
            .fixed("IPv4").size(4).and()
            .intType()
            .endUnion()
          .noDefault()
        .endRecord()


    implicit val enc = AvroEncoder.of[GenericData.Record](schema)

    val genericRecords = (1 to 10) map { i =>
      new GenericRecordBuilder(schema)
        .set("fixedUnionVal", i)
        .build()
    }

    val rdd: RDD[GenericData.Record] = sparkSession.sparkContext
      .parallelize(genericRecords)

    val ds = rdd.toDS()

    assert(ds.count() == genericRecords.size)
  }

Gist to the stacktrace

@bdrillard
Copy link

@marmbrus I've logged SPARK-22739 to track the additions to the Expressions API.

@kalvinnchau Thanks for the additional test case! We'll be circling back to this once the Spark ticket I link just above is resolved.

@bdrillard
Copy link

An update for this issue, I've created a PR to Spark that includes the necessary expressions for an Avro-Encoder, see #21348.

cc: @marmbrus

@mtraynham
Copy link

mtraynham commented Jul 16, 2018

@bdrillard @themodernlife I've attempted to use this but ran into an exception, conversion between java.lang.String & org.apache.avro.util.Utf8. I was hoping one of you could make a code change suggestion and potentially fix the underlying problem here.

The stack trace is:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.util.Utf8
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_11$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr3$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply3_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.ObjectOperator$$anonfun$serializeObjectToRow$1.apply(objects.scala:153)
	at org.apache.spark.sql.execution.ObjectOperator$$anonfun$serializeObjectToRow$1.apply(objects.scala:151)
	at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$10$$anonfun$apply$3.apply(objects.scala:302)
	at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$10$$anonfun$apply$3.apply(objects.scala:299)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

So this seems to take place within Serialization of the Avro POJO to Row. I've tracked the particular field of the Avro schema down to a property called attributes, which is a Map<String, String>.

That shema field is defined like so:

{
    "name": "attributes",
    "type": [
        "null",
        {
            "type": "map",
            "avro.java.string": "String",
            "values": {
                "type": "string",
                "avro.java.string": "String"
            }
        }
    ],
    "doc": "A mapping of arbitrary event attributes, as key-value pairs.",
    "default": null
}

The "avro.java.string": "String" property tells Avro that the field is to be serde with java.lang.String instead of org.apache.avro.util.Utf8 (which implements java.lang.CharSequence).

Interestingly enough, this error does not occur for fields that are not "type": "map". I have a different field that seems to work correctly:

{
    "name": "segment",
    "type": [
        "null",
        {
            "type": "string",
            "avro.java.string": "String"
        }
    ],
    "default": null
}

@mtraynham
Copy link

Well I think I resolved the issue, I just cast it to CharSequence instead of Utf8, with:

@@ -604,7 +604,7 @@ private object AvroTypeInference {
 
       ExternalMapToCatalyst(
         inputObject,
-        ObjectType(classOf[org.apache.avro.util.Utf8]),
+        ObjectType(classOf[java.lang.CharSequence]),
         serializerFor(_, Schema.create(STRING)),
         valueType,
         serializerFor(_, valueSchema),

There was also a Hive issue which is closely related that I kinda of stole the idea for the fix from: https://issues.apache.org/jira/browse/HIVE-5865

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.