Skip to content

Commit

Permalink
[SPARK-45517][CONNECT] Expand more exception constructors to support …
Browse files Browse the repository at this point in the history
…error framework parameters

### What changes were proposed in this pull request?

- Expand more exception constructors to support error framework parameters

### Why are the changes needed?

- Better integration with the error framework

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Closes #43368 from heyihong/SPARK-45517.

Authored-by: Yihong He <yihong.he@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
heyihong authored and HyukjinKwon committed Oct 17, 2023
1 parent 721ea9b commit 28961a6
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[spark] case class ExecutorDeadException(message: String)
/**
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException private(
private[spark] class SparkUpgradeException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -169,7 +169,7 @@ private[spark] class SparkUpgradeException private(
/**
* Arithmetic exception thrown from Spark with an error class.
*/
private[spark] class SparkArithmeticException private(
private[spark] class SparkArithmeticException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down Expand Up @@ -207,7 +207,7 @@ private[spark] class SparkArithmeticException private(
/**
* Unsupported operation exception thrown from Spark with an error class.
*/
private[spark] class SparkUnsupportedOperationException private(
private[spark] class SparkUnsupportedOperationException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String])
Expand Down Expand Up @@ -271,7 +271,7 @@ private[spark] class SparkConcurrentModificationException(
/**
* Datetime exception thrown from Spark with an error class.
*/
private[spark] class SparkDateTimeException private(
private[spark] class SparkDateTimeException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down Expand Up @@ -324,7 +324,7 @@ private[spark] class SparkFileNotFoundException(
/**
* Number format exception thrown from Spark with an error class.
*/
private[spark] class SparkNumberFormatException private(
private[spark] class SparkNumberFormatException private[spark](
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down Expand Up @@ -363,7 +363,7 @@ private[spark] class SparkNumberFormatException private(
/**
* Illegal argument exception thrown from Spark with an error class.
*/
private[spark] class SparkIllegalArgumentException private(
private[spark] class SparkIllegalArgumentException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -403,7 +403,7 @@ private[spark] class SparkIllegalArgumentException private(
override def getQueryContext: Array[QueryContext] = context
}

private[spark] class SparkRuntimeException private(
private[spark] class SparkRuntimeException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -480,7 +480,7 @@ private[spark] class SparkSecurityException(
/**
* Array index out of bounds exception thrown from Spark with an error class.
*/
private[spark] class SparkArrayIndexOutOfBoundsException private(
private[spark] class SparkArrayIndexOutOfBoundsException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class StreamingQueryException private[sql](
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis

override def getMessage: String = s"${message}\n${queryDebugString}"
override def getMessage: String =
if (queryDebugString.isEmpty) message else s"${message}\n${queryDebugString}"

override def toString(): String =
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import io.grpc.netty.NettyServerBuilder
import io.grpc.stub.StreamObserver
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.test.ConnectFunSuite

Expand Down Expand Up @@ -208,6 +209,31 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
}
}

for ((name, constructor) <- GrpcExceptionConverter.errorFactory) {
test(s"error framework parameters - ${name}") {
val testParams = GrpcExceptionConverter.ErrorParams(
message = "test message",
cause = None,
errorClass = Some("test error class"),
messageParameters = Map("key" -> "value"),
queryContext = Array.empty)
val error = constructor(testParams)
if (!error.isInstanceOf[ParseException]) {
assert(error.getMessage == testParams.message)
} else {
assert(error.getMessage == s"\n${testParams.message}")
}
assert(error.getCause == null)
error match {
case sparkThrowable: SparkThrowable =>
assert(sparkThrowable.getErrorClass == testParams.errorClass.get)
assert(sparkThrowable.getMessageParameters.asScala == testParams.messageParameters)
assert(sparkThrowable.getQueryContext.isEmpty)
case _ =>
}
}
}

private case class TestPackURI(
connectionString: String,
isCorrect: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlocki
}
}

private object GrpcExceptionConverter {
private[client] object GrpcExceptionConverter {

private case class ErrorParams(
private[client] case class ErrorParams(
message: String,
cause: Option[Throwable],
// errorClass will only be set if the error is both enriched and SparkThrowable.
Expand All @@ -180,7 +180,7 @@ private object GrpcExceptionConverter {
(className, throwableCtr)
}

private val errorFactory = Map(
private[client] val errorFactory = Map(
errorConstructor(params =>
new StreamingQueryException(
params.message,
Expand All @@ -203,23 +203,84 @@ private object GrpcExceptionConverter {
errorClass = params.errorClass,
messageParameters = params.messageParameters,
context = params.queryContext)),
errorConstructor(params => new NamespaceAlreadyExistsException(params.message)),
errorConstructor(params => new TableAlreadyExistsException(params.message, params.cause)),
errorConstructor(params => new TempTableAlreadyExistsException(params.message, params.cause)),
errorConstructor(params => new NoSuchDatabaseException(params.message, params.cause)),
errorConstructor(params => new NoSuchTableException(params.message, params.cause)),
errorConstructor(params =>
new NamespaceAlreadyExistsException(
params.message,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new TableAlreadyExistsException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new TempTableAlreadyExistsException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new NoSuchDatabaseException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new NoSuchTableException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor[NumberFormatException](params =>
new SparkNumberFormatException(params.message)),
new SparkNumberFormatException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[IllegalArgumentException](params =>
new SparkIllegalArgumentException(params.message, params.cause)),
errorConstructor[ArithmeticException](params => new SparkArithmeticException(params.message)),
new SparkIllegalArgumentException(
params.message,
params.cause,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[ArithmeticException](params =>
new SparkArithmeticException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[UnsupportedOperationException](params =>
new SparkUnsupportedOperationException(params.message)),
new SparkUnsupportedOperationException(
params.message,
params.errorClass,
params.messageParameters)),
errorConstructor[ArrayIndexOutOfBoundsException](params =>
new SparkArrayIndexOutOfBoundsException(params.message)),
errorConstructor[DateTimeException](params => new SparkDateTimeException(params.message)),
errorConstructor(params => new SparkRuntimeException(params.message, params.cause)),
errorConstructor(params => new SparkUpgradeException(params.message, params.cause)),
new SparkArrayIndexOutOfBoundsException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[DateTimeException](params =>
new SparkDateTimeException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor(params =>
new SparkRuntimeException(
params.message,
params.cause,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor(params =>
new SparkUpgradeException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new SparkException(
message = params.message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DatabaseAlreadyExistsException(db: String)
extends NamespaceAlreadyExistsException(Array(db))

// any changes to this class should be backward compatible as it may be used by external connectors
class NamespaceAlreadyExistsException private(
class NamespaceAlreadyExistsException private[sql](
message: String,
errorClass: Option[String],
messageParameters: Map[String, String])
Expand Down Expand Up @@ -61,7 +61,7 @@ class NamespaceAlreadyExistsException private(
}

// any changes to this class should be backward compatible as it may be used by external connectors
class TableAlreadyExistsException private(
class TableAlreadyExistsException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -115,7 +115,7 @@ class TableAlreadyExistsException private(
}
}

class TempTableAlreadyExistsException private(
class TempTableAlreadyExistsException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
* Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
*/
class NoSuchDatabaseException private(
class NoSuchDatabaseException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -100,7 +100,7 @@ class NoSuchNamespaceException private(
}

// any changes to this class should be backward compatible as it may be used by external connectors
class NoSuchTableException private(
class NoSuchTableException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down

0 comments on commit 28961a6

Please sign in to comment.