Skip to content

Commit

Permalink
Merge pull request #39 from TidierOrg/add-iceberg-support
Browse files Browse the repository at this point in the history
Add iceberg support
  • Loading branch information
drizk1 committed Jul 12, 2024
2 parents ddbf1d7 + 80ac9af commit 31d8ba5
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 19 deletions.
9 changes: 7 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# TidierDB.jl updates

## v0.2.2 - 2024-07-07
## v0.2.4 - 2024-07-12
- Switches to DuckDB to 1.0 version
- Adds support for `iceberg` tables via DuckDB to read iceberg paths in `db_table` when `iceberg = true`
- Adds support for DuckDB's beta `delta_scan` to read delta paths in `db_table` when `delta = true`

## v0.2.3 - 2024-07-07
- Adds direct path support for `db_table` when using DuckDB
- Adds `connect` ability for AWS and Google Cloud to allow querying via S3 + DuckDB
- Adds documentation for S3 + DuckDB with TidierDB

## v0.2.1 - 2024-06-27
## v0.2.2 - 2024-06-27
- Adds support for Databricks SQL Rest API
- Adds docs for Databricks use
- Fixes float/int type conversion when Snowflake collects to dataframe
Expand Down
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "TidierDB"
uuid = "86993f9b-bbba-4084-97c5-ee15961ad48b"
authors = ["Daniel Rizk <rizk.daniel.12@gmail.com> and contributors"]
version = "0.2.3"
version = "0.2.4"

[deps]
AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"
Expand Down Expand Up @@ -29,7 +29,7 @@ Chain = "0.6"
ClickHouse = "0.2"
DataFrames = "1.5"
Documenter = "0.27, 1"
DuckDB = "0.10"
DuckDB = "1.0"
GoogleCloud = "0.11"
HTTP = "1.1"
JSON3 = "1.1"
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Supported aggregate functions (as supported by the backend) with more to come
- `@summarize` supports any SQL aggregate function in addition to the list above. Simply write the function as written in SQL syntax and it will work.
- `copy_to` (for DuckDB, MySQL, SQLite)

With DuckDB, `db_table` supports direct paths for S3 bucket locations, iceberg tables, delta table, in addition to csv, parquet, etc.

DuckDB specifically enables copy_to to directly reading in `.parquet`, `.json`, `.csv`, and `.arrow` file, including https file paths.

```julia
Expand Down
2 changes: 2 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ Supported aggregate functions (as supported by the backend) with more to come
- `@summarize` supports any SQL aggregate function in addition to the list above. Simply write the function as written in SQL syntax and it will work
- `copy_to` (for DuckDB, MySQL, SQLite)

With DuckDB, `db_table` supports direct paths for S3 bucket locations, iceberg tables, delta table, in addition to csv, parquet, etc.

DuckDB specifically enables copy_to to directly reading in `.parquet`, `.json`, `.csv`, and `.arrow` file, including https file paths.

```julia
Expand Down
2 changes: 1 addition & 1 deletion src/TBD_macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ macro collect(sqlquery)
# Determine the type of db and execute the query accordingly
if db isa DatabricksConnection
df_result = execute_databricks(db, final_query)
elseif db isa SQLite.DB || db isa LibPQ.Connection || db isa DuckDB.Connection || db isa MySQL.Connection || db isa ODBC.Connection
elseif db isa SQLite.DB || db isa LibPQ.Connection || db isa DuckDB.DB || db isa MySQL.Connection || db isa ODBC.Connection
result = DBInterface.execute(db, final_query)
df_result = DataFrame(result)
elseif current_sql_mode[] == :clickhouse
Expand Down
54 changes: 40 additions & 14 deletions src/TidierDB.jl
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ end


# DuckDB
function get_table_metadata(conn::DuckDB.Connection, table_name::String)
query = """
DESCRIBE SELECT * FROM '$(table_name)' LIMIT 0
"""
function get_table_metadata(conn::DuckDB.DB, table_name::String)
query =
"""
DESCRIBE SELECT * FROM $(table_name) LIMIT 0
"""
result = DuckDB.execute(conn, query) |> DataFrame
result[!, :current_selxn] .= 1
table_name = if occursin(r"[:/]", table_name)
Expand Down Expand Up @@ -255,28 +256,54 @@ function get_table_metadata(conn::ClickHouse.ClickHouseSock, table_name::String)
return select(result, 1 => :name, 2 => :type, :current_selxn, :table_name)
end

function db_table(db, table, athena_params::Any=nothing)
"""
$docstring_db_table
"""
function db_table(db, table, athena_params::Any=nothing; iceberg::Bool=false, delta::Bool=false)
table_name = string(table)
metadata = if current_sql_mode[] == :lite
get_table_metadata(db, table_name)
elseif current_sql_mode[] in [:postgres, :duckdb, :mysql, :mssql, :clickhouse, :gbq, :oracle]
get_table_metadata(db, table_name)

if current_sql_mode[] == :lite
metadata = get_table_metadata(db, table_name)
elseif current_sql_mode[] == :postgres ||current_sql_mode[] == :duckdb || current_sql_mode[] == :mysql || current_sql_mode[] == :mssql || current_sql_mode[] == :clickhouse || current_sql_mode[] == :gbq ||current_sql_mode[] == :oracle
if iceberg
DBInterface.execute(db, "INSTALL iceberg;")
DBInterface.execute(db, "LOAD iceberg;")
table_name2 = "iceberg_scan('$table_name', allow_moved_paths = true)"
metadata = get_table_metadata(db, table_name2)
elseif delta
DuckDB.execute(db, "INSTALL delta;")
DuckDB.execute(db, "LOAD delta;")
table_name2 = "delta_scan('$table_name')"
# println(table_name2)
metadata = get_table_metadata(db, table_name2)
elseif occursin(r"[:/]", table_name)
table_name2 = "'$table_name'"
metadata = get_table_metadata(db, table_name2)
else
metadata = get_table_metadata(db, table_name)
end
elseif current_sql_mode[] == :athena
get_table_metadata_athena(db, table_name, athena_params)
metadata = get_table_metadata_athena(db, table_name, athena_params)
elseif current_sql_mode[] == :snowflake
get_table_metadata(db, table_name)
metadata = get_table_metadata(db, table_name)
else
error("Unsupported SQL mode: $(current_sql_mode[])")
end

formatted_table_name = if current_sql_mode[] == :snowflake
"$(db.database).$(db.schema).$table_name"
elseif db isa DatabricksConnection
"$(db.database).$(db.schema).$table_name"
elseif occursin(r"[:/]", table_name)
elseif iceberg
"iceberg_scan('$table_name', allow_moved_paths = true)"
elseif delta
"delta_scan('$table_name')"
elseif occursin(r"[:/]", table_name) && !(iceberg || delta)
"'$table_name'"
else
table_name
end

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)
end

Expand Down Expand Up @@ -370,9 +397,8 @@ function connect(backend::Symbol; kwargs...)
set_sql_mode(:lite)
return SQLite.DB(db_path)
elseif backend == :DuckDB || backend == :duckdb
mem = DuckDB.open(":memory:")
set_sql_mode(:duckdb)
db = DuckDB.connect(mem)
db = DBInterface.connect(DuckDB.DB, ":memory:")
DBInterface.execute(db, "SET autoinstall_known_extensions=1;")
DBInterface.execute(db, "SET autoload_known_extensions=1;")

Expand Down
42 changes: 42 additions & 0 deletions src/docstrings.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,46 @@ julia> @chain db_table(db, "df_mem") begin
─────┼───────────────────────────
1 │ AA 1 0.1
```
"""

const docstring_db_table =
"""
db_table(database, table_name, athena_params, delta = false, iceberg = false)
`db_table` starts the underlying SQL query struct, adding the metadata and table.
#arguments
`database`: The Database or connection object
`table_name`: tablename as a string. Table name can be a name of a table on the database or paths to the following types
-CSV
-Parquet
-Json
-Iceberg
-Delta
-S3 tables from AWS or Google Cloud
`delta`: must be true to read delta
`iceberg`: must be true to read iceberg
# Example
```julia
julia> df = DataFrame(id = [string('A' + i ÷ 26, 'A' + i % 26) for i in 0:9],
groups = [i % 2 == 0 ? "aa" : "bb" for i in 1:10],
value = repeat(1:5, 2),
percent = 0.1:0.1:1.0);
julia> db = connect(:duckdb);
julia> copy_to(db, df, "df_mem");
julia> db_table(db, "df_mem")
TidierDB.SQLQuery("", "df_mem", "", "", "", "", "", "", false, false, 4×4 DataFrame
Row │ name type current_selxn table_name
│ String? String? Int64 String
─────┼─────────────────────────────────────────────
1 │ id VARCHAR 1 df_mem
2 │ groups VARCHAR 1 df_mem
3 │ value BIGINT 1 df_mem
4 │ percent DOUBLE 1 df_mem, false, DuckDB.Connection(":memory:"), TidierDB.CTE[], 0, nothing)
```
"""

0 comments on commit 31d8ba5

Please sign in to comment.