Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON option to prune columns #14996

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class json_reader_options {
bool _lines = false;
// Parse mixed types as a string column
bool _mixed_types_as_string = false;
// Use dtypes as filter instead of type inference suggestion
bool _use_dtypes_as_filter = false;
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved

// Bytes to skip from the start
size_t _byte_range_offset = 0;
Expand Down Expand Up @@ -241,6 +243,14 @@ class json_reader_options {
*/
bool is_enabled_mixed_types_as_string() const { return _mixed_types_as_string; }

/**
* @brief Whether to use dtypes as filter instead of type inference suggestion.
* This option is useful for parsing only a subset of columns.
*
* @return `true` if dtypes is used as filter
*/
bool is_enabled_use_dtypes_as_filter() const { return _use_dtypes_as_filter; }

/**
* @brief Whether to parse dates as DD/MM versus MM/DD.
*
Expand Down Expand Up @@ -342,6 +352,14 @@ class json_reader_options {
*/
void enable_mixed_types_as_string(bool val) { _mixed_types_as_string = val; }

/**
* @brief Set whether to use dtypes as filter instead of type inference suggestion.
* This option is useful for parsing only a subset of columns.
*
* @param val Boolean value to enable/disable dtypes use as filter
*/
void enable_use_dtypes_as_filter(bool val) { _use_dtypes_as_filter = val; }

/**
* @brief Set whether to parse dates as DD/MM versus MM/DD.
*
Expand Down Expand Up @@ -508,6 +526,19 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set whether to use dtypes as filter instead of type inference suggestion.
* This option is useful for parsing only a subset of columns.
*
* @param val Boolean value to enable/disable dtypes use as filter
* @return this for chaining
*/
json_reader_options_builder& use_dtypes_as_filter(bool val)
{
options._use_dtypes_as_filter = val;
return *this;
}

/**
* @brief Set whether to parse dates as DD/MM versus MM/DD.
*
Expand Down
146 changes: 101 additions & 45 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ void make_device_json_column(device_span<SymbolT const> input,
}
};
auto init_to_zero = [stream](auto& v) {
thrust::uninitialized_fill(rmm::exec_policy(stream), v.begin(), v.end(), 0);
thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), v.begin(), v.end(), 0);
};

auto initialize_json_columns = [&](auto i, auto& col) {
Expand Down Expand Up @@ -625,13 +625,14 @@ void make_device_json_column(device_span<SymbolT const> input,
// find column_ids which are values, but should be ignored in validity
std::vector<uint8_t> ignore_vals(num_columns, 0);
std::vector<uint8_t> is_mixed_type_column(num_columns, 0);
std::vector<uint8_t> is_filtered(num_columns, 0);
columns.try_emplace(parent_node_sentinel, std::ref(root));

for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto name_and_parent_index = [&is_array_of_arrays,
&row_array_parent_col_id,
&column_parent_ids,
&column_categories,
&column_names](auto this_col_id) {
std::string name = "";
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id == parent_node_sentinel || column_categories[parent_col_id] == NC_LIST) {
Expand All @@ -647,11 +648,48 @@ void make_device_json_column(device_span<SymbolT const> input,
} else {
CUDF_FAIL("Unexpected parent column category");
}
return std::pair{name, parent_col_id};
};

// Filter columns that are not required to be parsed.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (options.is_enabled_use_dtypes_as_filter()) {
for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);
// get path of this column, and get its dtype if present in options
auto nt = tree_path.get_path(this_col_id);
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
std::optional<data_type> user_dtype = get_path_data_type(nt, options);
if (!user_dtype.has_value() and parent_col_id != parent_node_sentinel) {
is_filtered[this_col_id] = 1;
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
continue;
} else {
// make sure all its parents are not filtered.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
auto col_id = this_col_id;
while (parent_col_id != parent_node_sentinel and is_filtered[parent_col_id] == 1) {
is_filtered[parent_col_id] = 0;
col_id = parent_col_id;
parent_col_id = column_parent_ids[col_id];
}
}
}
}

// Build the column tree, also, handles mixed types.
for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);

if (parent_col_id != parent_node_sentinel && is_mixed_type_column[parent_col_id] == 1) {
// if parent is mixed type column, ignore this column.
is_mixed_type_column[this_col_id] = 1;
ignore_vals[this_col_id] = 1;
// if parent is mixed type column or this column is filtered, ignore this column.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (parent_col_id != parent_node_sentinel &&
(is_mixed_type_column[parent_col_id] || is_filtered[this_col_id])) {
ignore_vals[this_col_id] = 1;
if (is_mixed_type_column[parent_col_id]) { is_mixed_type_column[this_col_id] = 1; }
continue;
}

Expand Down Expand Up @@ -714,12 +752,13 @@ void make_device_json_column(device_span<SymbolT const> input,
"A mix of lists and structs within the same column is not supported");
}
}

if (is_enabled_mixed_types_as_string) {
// get path of this column, check if it is a struct forced as string, and enforce it
auto nt = tree_path.get_path(this_col_id);
std::optional<data_type> user_dt = get_path_data_type(nt, options);
if (column_categories[this_col_id] == NC_STRUCT and user_dt.has_value() and
user_dt.value().id() == type_id::STRING) {
auto nt = tree_path.get_path(this_col_id);
std::optional<data_type> user_dtype = get_path_data_type(nt, options);
if (column_categories[this_col_id] == NC_STRUCT and user_dtype.has_value() and
user_dtype.value().id() == type_id::STRING) {
is_mixed_type_column[this_col_id] = 1;
column_categories[this_col_id] = NC_STR;
}
Expand Down Expand Up @@ -873,25 +912,27 @@ void make_device_json_column(device_span<SymbolT const> input,
for (auto& [id, col_ref] : columns) {
auto& col = col_ref.get();
if (col.type == json_col_t::StringColumn) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::inclusive_scan(rmm::exec_policy_nosync(stream),
col.string_offsets.begin(),
col.string_offsets.end(),
col.string_offsets.begin(),
thrust::maximum<json_column::row_offset_t>{});
} else if (col.type == json_col_t::ListColumn) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::inclusive_scan(rmm::exec_policy_nosync(stream),
col.child_offsets.begin(),
col.child_offsets.end(),
col.child_offsets.begin(),
thrust::maximum<json_column::row_offset_t>{});
}
}
stream.synchronize();
}

std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_column_to_cudf_column(
device_json_column& json_col,
device_span<SymbolT const> d_input,
cudf::io::parse_options const& options,
bool use_dtypes_as_filter,
std::optional<schema_element> schema,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
Expand Down Expand Up @@ -982,13 +1023,16 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto& child_col = col->second;
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, get_child_schema(col_name), stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
column_names.back().children = names;
auto& child_col = col->second;
auto child_schema_element = get_child_schema(col_name);
if (!use_dtypes_as_filter or child_schema_element.has_value()) {
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, use_dtypes_as_filter, child_schema_element, stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
column_names.back().children = names;
}
}
auto [result_bitmask, null_count] = make_validity(json_col);
// The null_mask is set after creation of struct column is to skip the superimpose_nulls and
Expand All @@ -1011,8 +1055,12 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
rmm::device_buffer{},
0);
// Create children column
auto child_schema_element = json_col.child_columns.empty()
? std::optional<schema_element>{}
: get_child_schema(json_col.child_columns.begin()->first);
auto [child_column, names] =
json_col.child_columns.empty()
json_col.child_columns.empty() or
(use_dtypes_as_filter and !child_schema_element.has_value())
? std::pair<std::unique_ptr<column>,
// EMPTY type could not used because gather throws exception on EMPTY type.
std::vector<column_name_info>>{std::make_unique<column>(
Expand All @@ -1022,13 +1070,13 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
rmm::device_buffer{},
0),
std::vector<column_name_info>{}}
: device_json_column_to_cudf_column(
json_col.child_columns.begin()->second,
d_input,
options,
get_child_schema(json_col.child_columns.begin()->first),
stream,
mr);
: device_json_column_to_cudf_column(json_col.child_columns.begin()->second,
d_input,
options,
use_dtypes_as_filter,
child_schema_element,
stream,
mr);
column_names.back().children = names;
auto [result_bitmask, null_count] = make_validity(json_col);
auto ret_col = make_lists_column(num_rows,
Expand Down Expand Up @@ -1140,8 +1188,6 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto& json_col = root_struct_col.child_columns.find(col_name)->second;
// Insert this columns name into the schema
out_column_names.emplace_back(col_name);

std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
Expand Down Expand Up @@ -1184,18 +1230,28 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
debug_schema_print(child_schema_element);
#endif

// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] = device_json_column_to_cudf_column(
json_col, d_input, parse_opt, child_schema_element, stream, mr);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
if (!options.is_enabled_use_dtypes_as_filter() or child_schema_element.has_value()) {
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] =
device_json_column_to_cudf_column(json_col,
d_input,
parse_opt,
options.is_enabled_use_dtypes_as_filter(),
child_schema_element,
stream,
mr);
// Insert this columns name into the schema
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
out_column_names.emplace_back(col_name);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
}
}

return table_with_metadata{std::make_unique<table>(std::move(out_columns)), {out_column_names}};
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/json/parser_features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ std::optional<data_type> get_path_data_type(
std::optional<schema_element> col_schema = child_schema_element(path.back().first, options);
// check if it has value, then do recursive call and return.
if (col_schema.has_value()) {
// std::cout<<path.back().first<<".";
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
return get_path_data_type(path, col_schema.value());
} else {
return {};
Expand Down
Loading
Loading