Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iceberg support #39

Merged
merged 7 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# TidierDB.jl updates

## v0.2.2 - 2024-07-07
## v0.2.4 - 2024-07-12
- Switches to DuckDB to 1.0 version
- Adds support for `iceberg` tables via DuckDB to read iceberg paths in `db_table` when `iceberg = true`
- Adds support for DuckDB's beta `delta_scan` to read delta paths in `db_table` when `delta = true`

## v0.2.3 - 2024-07-07
- Adds direct path support for `db_table` when using DuckDB
- Adds `connect` ability for AWS and Google Cloud to allow querying via S3 + DuckDB
- Adds documentation for S3 + DuckDB with TidierDB

## v0.2.1 - 2024-06-27
## v0.2.2 - 2024-06-27
- Adds support for Databricks SQL Rest API
- Adds docs for Databricks use
- Fixes float/int type conversion when Snowflake collects to dataframe
Expand Down
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "TidierDB"
uuid = "86993f9b-bbba-4084-97c5-ee15961ad48b"
authors = ["Daniel Rizk <rizk.daniel.12@gmail.com> and contributors"]
version = "0.2.3"
version = "0.2.4"

[deps]
AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"
Expand Down Expand Up @@ -29,7 +29,7 @@ Chain = "0.6"
ClickHouse = "0.2"
DataFrames = "1.5"
Documenter = "0.27, 1"
DuckDB = "0.10"
DuckDB = "1.0"
GoogleCloud = "0.11"
HTTP = "1.1"
JSON3 = "1.1"
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Supported aggregate functions (as supported by the backend) with more to come
- `@summarize` supports any SQL aggregate function in addition to the list above. Simply write the function as written in SQL syntax and it will work.
- `copy_to` (for DuckDB, MySQL, SQLite)

With DuckDB, `db_table` supports direct paths for S3 bucket locations, iceberg tables, delta table, in addition to csv, parquet, etc.

DuckDB specifically enables copy_to to directly reading in `.parquet`, `.json`, `.csv`, and `.arrow` file, including https file paths.

```julia
Expand Down
2 changes: 2 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ Supported aggregate functions (as supported by the backend) with more to come
- `@summarize` supports any SQL aggregate function in addition to the list above. Simply write the function as written in SQL syntax and it will work
- `copy_to` (for DuckDB, MySQL, SQLite)

With DuckDB, `db_table` supports direct paths for S3 bucket locations, iceberg tables, delta table, in addition to csv, parquet, etc.

DuckDB specifically enables copy_to to directly reading in `.parquet`, `.json`, `.csv`, and `.arrow` file, including https file paths.

```julia
Expand Down
2 changes: 1 addition & 1 deletion src/TBD_macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ macro collect(sqlquery)
# Determine the type of db and execute the query accordingly
if db isa DatabricksConnection
df_result = execute_databricks(db, final_query)
elseif db isa SQLite.DB || db isa LibPQ.Connection || db isa DuckDB.Connection || db isa MySQL.Connection || db isa ODBC.Connection
elseif db isa SQLite.DB || db isa LibPQ.Connection || db isa DuckDB.DB || db isa MySQL.Connection || db isa ODBC.Connection
result = DBInterface.execute(db, final_query)
df_result = DataFrame(result)
elseif current_sql_mode[] == :clickhouse
Expand Down
54 changes: 40 additions & 14 deletions src/TidierDB.jl
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ end


# DuckDB
function get_table_metadata(conn::DuckDB.Connection, table_name::String)
query = """
DESCRIBE SELECT * FROM '$(table_name)' LIMIT 0
"""
function get_table_metadata(conn::DuckDB.DB, table_name::String)
query =
"""
DESCRIBE SELECT * FROM $(table_name) LIMIT 0
"""
result = DuckDB.execute(conn, query) |> DataFrame
result[!, :current_selxn] .= 1
table_name = if occursin(r"[:/]", table_name)
Expand Down Expand Up @@ -255,28 +256,54 @@ function get_table_metadata(conn::ClickHouse.ClickHouseSock, table_name::String)
return select(result, 1 => :name, 2 => :type, :current_selxn, :table_name)
end

function db_table(db, table, athena_params::Any=nothing)
"""
$docstring_db_table
"""
function db_table(db, table, athena_params::Any=nothing; iceberg::Bool=false, delta::Bool=false)
table_name = string(table)
metadata = if current_sql_mode[] == :lite
get_table_metadata(db, table_name)
elseif current_sql_mode[] in [:postgres, :duckdb, :mysql, :mssql, :clickhouse, :gbq, :oracle]
get_table_metadata(db, table_name)

if current_sql_mode[] == :lite
metadata = get_table_metadata(db, table_name)
elseif current_sql_mode[] == :postgres ||current_sql_mode[] == :duckdb || current_sql_mode[] == :mysql || current_sql_mode[] == :mssql || current_sql_mode[] == :clickhouse || current_sql_mode[] == :gbq ||current_sql_mode[] == :oracle
if iceberg
DBInterface.execute(db, "INSTALL iceberg;")
DBInterface.execute(db, "LOAD iceberg;")
table_name2 = "iceberg_scan('$table_name', allow_moved_paths = true)"
metadata = get_table_metadata(db, table_name2)
elseif delta
DuckDB.execute(db, "INSTALL delta;")
DuckDB.execute(db, "LOAD delta;")
table_name2 = "delta_scan('$table_name')"
# println(table_name2)
metadata = get_table_metadata(db, table_name2)
elseif occursin(r"[:/]", table_name)
table_name2 = "'$table_name'"
metadata = get_table_metadata(db, table_name2)
else
metadata = get_table_metadata(db, table_name)
end
elseif current_sql_mode[] == :athena
get_table_metadata_athena(db, table_name, athena_params)
metadata = get_table_metadata_athena(db, table_name, athena_params)
elseif current_sql_mode[] == :snowflake
get_table_metadata(db, table_name)
metadata = get_table_metadata(db, table_name)
else
error("Unsupported SQL mode: $(current_sql_mode[])")
end

formatted_table_name = if current_sql_mode[] == :snowflake
"$(db.database).$(db.schema).$table_name"
elseif db isa DatabricksConnection
"$(db.database).$(db.schema).$table_name"
elseif occursin(r"[:/]", table_name)
elseif iceberg
"iceberg_scan('$table_name', allow_moved_paths = true)"
elseif delta
"delta_scan('$table_name')"
elseif occursin(r"[:/]", table_name) && !(iceberg || delta)
"'$table_name'"
else
table_name
end

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)
end

Expand Down Expand Up @@ -370,9 +397,8 @@ function connect(backend::Symbol; kwargs...)
set_sql_mode(:lite)
return SQLite.DB(db_path)
elseif backend == :DuckDB || backend == :duckdb
mem = DuckDB.open(":memory:")
set_sql_mode(:duckdb)
db = DuckDB.connect(mem)
db = DBInterface.connect(DuckDB.DB, ":memory:")
DBInterface.execute(db, "SET autoinstall_known_extensions=1;")
DBInterface.execute(db, "SET autoload_known_extensions=1;")

Expand Down
42 changes: 42 additions & 0 deletions src/docstrings.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,46 @@ julia> @chain db_table(db, "df_mem") begin
─────┼───────────────────────────
1 │ AA 1 0.1
```
"""

const docstring_db_table =
"""
db_table(database, table_name, athena_params, delta = false, iceberg = false)

`db_table` starts the underlying SQL query struct, adding the metadata and table.

#arguments
`database`: The Database or connection object
`table_name`: tablename as a string. Table name can be a name of a table on the database or paths to the following types
-CSV
-Parquet
-Json
-Iceberg
-Delta
-S3 tables from AWS or Google Cloud
`delta`: must be true to read delta
`iceberg`: must be true to read iceberg

# Example
```julia

julia> df = DataFrame(id = [string('A' + i ÷ 26, 'A' + i % 26) for i in 0:9],
groups = [i % 2 == 0 ? "aa" : "bb" for i in 1:10],
value = repeat(1:5, 2),
percent = 0.1:0.1:1.0);

julia> db = connect(:duckdb);

julia> copy_to(db, df, "df_mem");

julia> db_table(db, "df_mem")
TidierDB.SQLQuery("", "df_mem", "", "", "", "", "", "", false, false, 4×4 DataFrame
Row │ name type current_selxn table_name
│ String? String? Int64 String
─────┼─────────────────────────────────────────────
1 │ id VARCHAR 1 df_mem
2 │ groups VARCHAR 1 df_mem
3 │ value BIGINT 1 df_mem
4 │ percent DOUBLE 1 df_mem, false, DuckDB.Connection(":memory:"), TidierDB.CTE[], 0, nothing)
```
"""
Loading