Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading ANSI day time interval type from CSV source #4927

Merged
merged 9 commits into from
Mar 18, 2022

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented Mar 10, 2022

Contributes #4146

Support reading ANSI day time interval type from CSV source.
This PR is an implementation on the plugin side currently.
If cuDF supports this in future, should update, see rapidsai/cudf#10356

Signed-off-by: Chong Gao res_life@163.com

Signed-off-by: Chong Gao <res_life@163.com>
@res-life
Copy link
Collaborator Author

build

@res-life res-life changed the title Support read ANSI day time interval type from CSV source Support reading ANSI day time interval type from CSV source Mar 10, 2022
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

build

@sameerz sameerz added the audit_3.3.0 Audit related tasks for 3.3.0 label Mar 11, 2022
@sameerz sameerz added this to the Feb 28 - Mar 18 milestone Mar 11, 2022
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

Filed 2 Spark issues:
Overflow occurs when reading ANSI day time interval from CSV file
https://issues.apache.org/jira/browse/SPARK-38520

The second range is not [0, 59] in the day time ANSI interval
https://issues.apache.org/jira/browse/SPARK-38324

integration_tests/src/main/python/csv_test.py Outdated Show resolved Hide resolved
integration_tests/src/main/python/data_gen.py Outdated Show resolved Hide resolved
integration_tests/src/main/python/data_gen.py Outdated Show resolved Hide resolved

def supportCsvRead(dt: DataType) : Boolean = false

def csvRead(cv: ColumnVector, dt: DataType): ColumnVector =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def csvRead(cv: ColumnVector, dt: DataType): ColumnVector =
def csvRead(cv: cudf.ColumnVector, dt: DataType): cudf.ColumnVector =

Otherwise, it will conflict with the PR #4926 who imports the org.apache.spark.sql.vectorized.ColumnVector

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above, #4926 is merged, then here should be a conflict now.

@@ -15,6 +15,7 @@
*/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.ColumnVector
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import ai.rapids.cudf.ColumnVector
import ai.rapids.cudf

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see nothing is changed. And the PR I mentioned is merged, so here should be a conflict now.

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

Performance test result

GPU CPU radio
5,057ms 30,569ms 6

Test file is 1.7G and has 12320000 rows and 10 columns, it's duplicated from day-time-interval.csv by many times. The day-time-interval.csv is in the tests module.

The sql is:
spark.sql("select count(c01) + count(c02) + count(c03) + count(c04) + count(c05) + count(c06) + count(c07) + count(c08) + count(c09) + count(c10) from tbl").show()

Details code:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{DayTimeIntervalType, StructField, StructType}

def dayTime(s: Byte, e: Byte): DayTimeIntervalType = DayTimeIntervalType(s, e)

val schema = StructType(Seq(
  StructField("c01", dayTime(DayTimeIntervalType.DAY, DayTimeIntervalType.DAY)),
  StructField("c02", dayTime(DayTimeIntervalType.DAY, DayTimeIntervalType.HOUR)),
  StructField("c03", dayTime(DayTimeIntervalType.DAY, DayTimeIntervalType.MINUTE)),
  StructField("c04", dayTime(DayTimeIntervalType.DAY, DayTimeIntervalType.SECOND)),
  StructField("c05", dayTime(DayTimeIntervalType.HOUR, DayTimeIntervalType.HOUR)),
  StructField("c06", dayTime(DayTimeIntervalType.HOUR, DayTimeIntervalType.MINUTE)),
  StructField("c07", dayTime(DayTimeIntervalType.HOUR, DayTimeIntervalType.SECOND)),
  StructField("c08", dayTime(DayTimeIntervalType.MINUTE, DayTimeIntervalType.MINUTE)),
  StructField("c09", dayTime(DayTimeIntervalType.MINUTE, DayTimeIntervalType.SECOND)),
  StructField("c10", dayTime(DayTimeIntervalType.SECOND, DayTimeIntervalType.SECOND))
))

val path = "/tmp/my-tmp/large.csv"

spark.conf.set("spark.rapids.sql.enabled", false)
var start = System.currentTimeMillis()
spark.read.schema(schema).csv(path).createOrReplaceTempView("tbl")
spark.sql("select count(c01)  + count(c02) + count(c03) + count(c04)  + count(c05) + count(c06) + count(c07) + count(c08)  + count(c09) + count(c10) from tbl").show()
println((System.currentTimeMillis() - start) + "ms")


spark.conf.set("spark.rapids.sql.enabled", true)
start = System.currentTimeMillis()
spark.read.schema(schema).csv(path).createOrReplaceTempView("tbl")
spark.sql("select count(c01)  + count(c02) + count(c03) + count(c04)  + count(c05) + count(c06) + count(c07) + count(c08)  + count(c09) + count(c10) from tbl").show()
println((System.currentTimeMillis() - start) + "ms")

@res-life
Copy link
Collaborator Author

build

seconds = 0
microseconds = 0

if (start_field, end_field) == ("day", "day"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to use the below structure, which should get a better perf.

if:
    ...
elif:
    ...
elif:
    ...
else:
    ...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this totally differently? There is just so much copy/paste between each part of the code. We know that the python code really is only converting it to micro seconds. So can we just generate a random number for the microseconds, up to max_micros, and then truncate it to the proper time? Also can we convert the "string" start/end fields into something simpler to write code for?

# in __init__
DAY = 0
HOUR = 1
MIN = 2
SEC = 3
fields_to_look_at = ["day", "hour", "minute", "second"]
si = fields_to_look_at.index(start_field)
ei = fileds_to_look_at.index(end_field)
assert si <= ei

self.hasDays = si <= DAY and ei >= DAY
self.hasHours = si <= HOUR and ei >= HOUR
self.hasMin = si <= MIN and ei >= MIN
self.hasSec = si <= SEC and ei >= SEC

@res-life
Copy link
Collaborator Author

@revans2 help to review

integration_tests/src/main/python/data_gen.py Show resolved Hide resolved
integration_tests/src/main/python/data_gen.py Outdated Show resolved Hide resolved
integration_tests/src/main/python/data_gen.py Outdated Show resolved Hide resolved
integration_tests/src/main/python/data_gen.py Outdated Show resolved Hide resolved
seconds = 0
microseconds = 0

if (start_field, end_field) == ("day", "day"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this totally differently? There is just so much copy/paste between each part of the code. We know that the python code really is only converting it to micro seconds. So can we just generate a random number for the microseconds, up to max_micros, and then truncate it to the proper time? Also can we convert the "string" start/end fields into something simpler to write code for?

# in __init__
DAY = 0
HOUR = 1
MIN = 2
SEC = 3
fields_to_look_at = ["day", "hour", "minute", "second"]
si = fields_to_look_at.index(start_field)
ei = fileds_to_look_at.index(end_field)
assert si <= ei

self.hasDays = si <= DAY and ei >= DAY
self.hasHours = si <= HOUR and ei >= HOUR
self.hasMin = si <= MIN and ei >= MIN
self.hasSec = si <= SEC and ei >= SEC

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

The last comment: #4927 (comment)

@res-life
Copy link
Collaborator Author

Will resolve the conflict after #4946 is merged, because of 4946 is also conflict with this.

@res-life
Copy link
Collaborator Author

res-life commented Mar 17, 2022

Filed a Spark issue: Interval types are not truncated to the expected endField when creating a DataFrame via Duration https://issues.apache.org/jira/browse/SPARK-38577

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

build

}

/**
* TODO: Blocked by Spark overflow issue: https://issues.apache.org/jira/browse/SPARK-38520
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we do the right thing with the overflow? If so then we need to document this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should update doc, another followup PR: #4981

@res-life res-life merged commit 3dfd03c into NVIDIA:branch-22.04 Mar 18, 2022
@res-life res-life deleted the support-ansi-intervals-for-csv branch April 2, 2022 08:03
@nartal1 nartal1 mentioned this pull request Apr 5, 2022
49 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
audit_3.3.0 Audit related tasks for 3.3.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants