Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49249][SPARK-49122] Makes SparkSession.addArtifact work with REPL #48120

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

xupefei
Copy link
Contributor

@xupefei xupefei commented Sep 16, 2024

What changes were proposed in this pull request?

This PR makes the new SparkSession.addArtifact API (added in #47631) work with Spark REPL.

Why are the changes needed?

Because it didn't work before :)

Does this PR introduce any user-facing change?

Yes, the user can add a new artifact in the REPL and use it in the current REPL session.

How was this patch tested?

Added a new test.

Was this patch authored or co-authored using generative AI tooling?

No.

Comment on lines -148 to +150
private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to make this method public so I can call it from REPL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not against this. I am trying to understand the user facing consequences though. I'd probably prefer that we add support for Scala UDFs as well. That can be done in a follow-up though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file a follow-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
JobArtifactSet.withActiveJobArtifactState(sparkSession.artifactManager.state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check how this interacts with all the stuff we do in Connect to make this work? I feel that we are duplicating code now. cc @vicennial

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An FYI to other reviewers: look at this file with hidden whitespace.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, with this in the execution code path, we may not need SessionHolder#withSession in a few places and can be cleaned up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vicennial Is there a end-to-end test for this? I did some modifications and want to know if it won't break anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xupefei The ReplE2ESuite has some tests for the overall client->artifact->execution with artifact flow.
Python client package my have some E2E tests as well but I am not familiar of the current status.

@@ -396,4 +396,41 @@ class ReplSuite extends SparkFunSuite {
Main.sparkContext.stop()
System.clearProperty("spark.driver.port")
}

test("register artifacts via SparkSession.addArtifact") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use a UDF defined in the REPL? If so how does this work with a JobArtifactSet? Do we layer the globally defined classpath over the session specific classpath? (I'd be nice to document this somewhere).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added one more test, which defines a UDF that initialises an external class added as an artifact.

how does this work with a JobArtifactSet?

Can you elaborate? Afaik JobArtifactSet is not involved here since it's the artifact path that is applied when an active SparkSession is applied.

Classpath - It's the other way around: the session classpath is laid over the global one.

body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
JobArtifactSet.withActiveJobArtifactState(sparkSession.artifactManager.state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, with this in the execution code path, we may not need SessionHolder#withSession in a few places and can be cleaned up.

Copy link
Contributor

@vicennial vicennial left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I meant to leave a comment (ignore the approval, I haven't gone through the whole PR)

@@ -121,7 +120,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry)
*/
private[sql] def registerJavaUDAF(name: String, className: String): Unit = {
try {
val clazz = Utils.classForName[AnyRef](className)
val clazz = session.artifactManager.classloader.loadClass(className)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One follow-up here would be to cache the ArtifactManager classloader. I think we create that thing over and over.

Copy link
Contributor Author

@xupefei xupefei Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. We can invalidate the cache when a new JAR is added.

@@ -281,7 +281,10 @@ object SQLExecution extends Logging {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull
// `getCurrentJobArtifactState` will return a stat only in Spark Connect mode. In non-Connect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be safe to use the SparkSession's jobArtifactState. They should be the same. cc @vicennial.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - Pending @vicennial's sign-off.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants