From 799d7f8b153fe40ac54a2a8c7ba63db24c8c0b30 Mon Sep 17 00:00:00 2001 From: drizk1 Date: Tue, 14 May 2024 09:37:32 -0400 Subject: [PATCH 1/6] add big query support --- Project.toml | 6 +- src/TBD_macros.jl | 3 +- src/TidierDB.jl | 7 +- src/parsing_gbq.jl | 253 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 266 insertions(+), 3 deletions(-) create mode 100644 src/parsing_gbq.jl diff --git a/Project.toml b/Project.toml index 62f82d5..0e64cca 100644 --- a/Project.toml +++ b/Project.toml @@ -11,6 +11,8 @@ ClickHouse = "82f2e89e-b495-11e9-1d9d-fb40d7cf2130" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" DuckDB = "d2f5444f-75bc-4fdf-ac35-56f514c445e1" +GoogleCloud = "55e21f81-8b0a-565e-b5ad-6816892a5ee7" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" LibPQ = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1" MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09" MySQL = "39abe10b-433b-5dbd-92d4-e302a9df00cd" @@ -19,14 +21,16 @@ Reexport = "189a3867-3050-52da-a836-e630ba90ab69" SQLite = "0aa819cd-b072-5ff4-a722-6bc24af294d9" [compat] -Arrow = "2.7" AWS = "1.9" +Arrow = "2.7" Chain = "0.6" ClickHouse = "0.2" DataFrames = "1.5" Documenter = "0.27, 1" DuckDB = "0.10" LibPQ = "1.17" +JSON3 = "1.1" +GoogleCloud = ".8" MacroTools = "0.5" MySQL = "1.4" ODBC = "1.1" diff --git a/src/TBD_macros.jl b/src/TBD_macros.jl index de9796d..7761d12 100644 --- a/src/TBD_macros.jl +++ b/src/TBD_macros.jl @@ -660,11 +660,12 @@ macro collect(sqlquery) if db isa SQLite.DB || db isa LibPQ.Connection || db isa DuckDB.Connection || db isa MySQL.Connection || db isa ODBC.Connection result = DBInterface.execute(db, final_query) df_result = DataFrame(result) - elseif current_sql_mode[] == :clickhouse df_result = ClickHouse.select_df(db, final_query) selected_columns_order = sq.metadata[sq.metadata.current_selxn .== 1, :name] df_result = df_result[:, selected_columns_order] + elseif db isa GoogleSession{JSONCredentials} + df_result = collect_gbq(sq.db, final_query) elseif current_sql_mode[] == :athena exe_query = Athena.start_query_execution(final_query, sq.athena_params; aws_config = db) status = "RUNNING" diff --git a/src/TidierDB.jl b/src/TidierDB.jl index 51f2610..6af1488 100644 --- a/src/TidierDB.jl +++ b/src/TidierDB.jl @@ -12,6 +12,8 @@ using ODBC import ClickHouse using Arrow using AWS +using JSON3 +using GoogleCloud @reexport using DataFrames: DataFrame @reexport using Chain @@ -36,6 +38,7 @@ include("parsing_mysql.jl") include("parsing_mssql.jl") include("parsing_clickhouse.jl") include("parsing_athena.jl") +include("parsing_gbq.jl") include("joins_sq.jl") include("slices_sq.jl") @@ -63,6 +66,8 @@ function expr_to_sql(expr, sq; from_summarize::Bool = false) return expr_to_sql_clickhouse(expr, sq; from_summarize=from_summarize) elseif current_sql_mode[] == :athena return expr_to_sql_trino(expr, sq; from_summarize=from_summarize) + elseif current_sql_mode[] == :gbq + return expr_to_sql_gbq(expr, sq; from_summarize=from_summarize) else error("Unsupported SQL mode: $(current_sql_mode[])") end @@ -233,7 +238,7 @@ function db_table(db, table, athena_params::Any=nothing) table_name = string(table) metadata = if current_sql_mode[] == :lite get_table_metadata(db, table_name) - elseif current_sql_mode[] == :postgres || current_sql_mode[] == :duckdb || current_sql_mode[] == :mysql || current_sql_mode[] == :mssql || current_sql_mode[] == :clickhouse + elseif current_sql_mode[] == :postgres || current_sql_mode[] == :duckdb || current_sql_mode[] == :mysql || current_sql_mode[] == :mssql || current_sql_mode[] == :clickhouse, current_sql_mode[] == :gbq get_table_metadata(db, table_name) elseif current_sql_mode[] == :athena get_table_metadata_athena(db, table_name, athena_params) diff --git a/src/parsing_gbq.jl b/src/parsing_gbq.jl new file mode 100644 index 0000000..e735ff1 --- /dev/null +++ b/src/parsing_gbq.jl @@ -0,0 +1,253 @@ + +mutable struct GBQ + projectname::String + session::GoogleSession + bigquery_resource + bigquery_method +end + +function connect(type::Symbol, json_key_path::String, project_id::String) + # Expand the user's path to the JSON key + creds_path = expanduser(json_key_path) + set_sql_mode(:gbq) + # Create credentials and session for Google Cloud + creds = JSONCredentials(creds_path) + session = GoogleSession(creds, ["https://www.googleapis.com/auth/bigquery"]) + + # Define the API method for BigQuery + bigquery_method = GoogleCloud.api.APIMethod( + :POST, + "https://bigquery.googleapis.com/bigquery/v2/projects/$(project_id)/queries", + "Run query", + Dict{Symbol, Any}(); + transform=(x, t) -> x + ) + + # Define the API resource for BigQuery + bigquery_resource = GoogleCloud.api.APIResource( + "https://bigquery.googleapis.com/bigquery/v2", + ;query=bigquery_method # Pass the method as a named argument + ) + + # Store all data in a global GBQ instance + global gbq_instance = GBQ(project_id, session, bigquery_resource, bigquery_method) + + # Return only the session + return session +end + +function collect_gbq(conn, query) + query_data = Dict( + "query" => query, + "useLegacySql" => false, + "location" => "US") + + response = GoogleCloud.api.execute( + conn, + gbq_instance.bigquery_resource, # Use the resource from GBQ + gbq_instance.bigquery_method, + data=query_data + ) + response_string = String(response) + response_data = JSON3.read(response_string) + rows = get(response_data, "rows", []) + + # Convert rows to DataFrame + # First, extract column names from the schema + column_names = [field["name"] for field in response_data["schema"]["fields"]] + column_types = [field["type"] for field in response_data["schema"]["fields"]] + # Then, convert each row's data (currently nested inside dicts with key "v") into arrays of dicts + if !isempty(rows) + # Return an empty DataFrame with the correct columns but 0 rows + data = [get(row["f"][i], "v", missing) for row in rows, i in 1:length(column_names)] + df = DataFrame(data, Symbol.(column_names)) + df = parse_gbq_df(df, column_types) + return df + else + # Convert each row's data (nested inside dicts with key "v") into arrays of dicts + df =DataFrame([Vector{Union{Missing, Any}}(undef, 0) for _ in column_names], Symbol.(column_names)) + df = parse_gbq_df(df, column_types) + return df + end + + return df +end + + +function apply_type_conversion_gbq(df, col_index, col_type) + if col_type == "FLOAT" + df[!, col_index] = [ismissing(x) ? missing : parse(Float64, x) for x in df[!, col_index]] + elseif col_type == "INTEGER" + df[!, col_index] = [ismissing(x) ? missing : parse(Int, x) for x in df[!, col_index]] + elseif col_type == "STRING" + # Assuming varchar needs to stay as String, no conversion needed + end +end + +function parse_gbq_df(df, column_types) + for (i, col_type) in enumerate(column_types) + # Check if column index is within bounds of DataFrame columns + if i <= size(df, 2) + try + apply_type_conversion_gbq(df, i, col_type) + catch e + # @warn "Failed to convert column $(i) to $(col_type): $e" + end + else + # @warn "Column index $(i) is out of bounds for the current DataFrame." + end + end; + return df +end + + +function expr_to_sql_gbq(expr, sq; from_summarize::Bool) + expr = parse_char_matching(expr) + expr = exc_capture_bug(expr, names_to_modify) + MacroTools.postwalk(expr) do x + # Handle basic arithmetic and functions + if @capture(x, a_ + b_) + return :($a + $b) + elseif @capture(x, a_ - b_) + return :($a - $b) + elseif @capture(x, a_ * b_) + return :($a * $b) + elseif @capture(x, a_ / b_) + return :($a / $b) + elseif @capture(x, a_ ^ b_) + return :(POWER($a, $b)) + elseif @capture(x, round(a_)) + return :(ROUND($a)) + elseif @capture(x, round(a_, b_)) + return :(ROUND($a, $b)) + elseif @capture(x, mean(a_)) + if from_summarize + return :(AVG($a)) + else + window_clause = construct_window_clause(sq) + return "AVG($(string(a))) $(window_clause)" + end + elseif @capture(x, minimum(a_)) + if from_summarize + return :(MIN($a)) + else + window_clause = construct_window_clause(sq) + return "MIN($(string(a))) $(window_clause)" + end + elseif @capture(x, maximum(a_)) + if from_summarize + return :(MAX($a)) + else + window_clause = construct_window_clause(sq) + return "MAX($(string(a))) $(window_clause)" + end + elseif @capture(x, sum(a_)) + if from_summarize + return :(SUM($a)) + else + window_clause = construct_window_clause(sq) + return "SUM($(string(a))) $(window_clause)" + end + elseif @capture(x, cumsum(a_)) + if from_summarize + error("cumsum is only available through a windowed @mutate") + else + # sq.windowFrame = "ROWS UNBOUNDED PRECEDING " + window_clause = construct_window_clause(sq, from_cumsum = true) + return "SUM($(string(a))) $(window_clause)" + end + #stats agg + elseif @capture(x, std(a_)) + if from_summarize + return :(STDDEV_SAMP($a)) + else + window_clause = construct_window_clause(sq, ) + return "STDDEV_SAMP($(string(a))) $(window_clause)" + end + elseif @capture(x, cor(a_, b_)) + if from_summarize + return :(CORR($a)) + else + window_clause = construct_window_clause(sq) + return "CORR($(string(a))) $(window_clause)" + end + elseif @capture(x, cov(a_, b_)) + if from_summarize + return :(COVAR_SAMP($a)) + else + window_clause = construct_window_clause(sq) + return "COVAR_SAMP($(string(a))) $(window_clause)" + end + elseif @capture(x, var(a_)) + if from_summarize + return :(VAR_SAMP($a)) + else + window_clause = construct_window_clause(sq) + return "VAR_SAMP($(string(a))) $(window_clause)" + end + #elseif @capture(x, sql_agg(str_)) + # if from_summarize + # return error("sql_agg is only needed with aggregate functions in @mutate") + # else + # window_clause = construct_window_clause(sq) + # return "$(str) $(window_clause)" + # end + #stringr functions, have to use function that removes _ so capture can capture name + elseif @capture(x, strreplaceall(str_, pattern_, replace_)) + return :(REGEXP_REPLACE($str, $pattern, $replace, 'g')) + elseif @capture(x, strreplace(str_, pattern_, replace_)) + return :(REGEXP_REPLACE($str, $pattern, $replace)) + elseif @capture(x, strremoveall(str_, pattern_)) + return :(REGEXP_REPLACE($str, $pattern, "", "g")) + elseif @capture(x, strremove(str_, pattern_)) + return :(REGEXP_REPLACE($str, $pattern, "")) + elseif @capture(x, ismissing(a_)) + return "($(string(a)) IS NULL)" + # Date extraction functions + elseif @capture(x, year(a_)) + return "EXTRACT(YEAR FROM " * string(a) * ")" + elseif @capture(x, month(a_)) + return "EXTRACT(MONTH FROM " * string(a) * ")" + elseif @capture(x, day(a_)) + return "EXTRACT(DAY FROM " * string(a) * ")" + elseif @capture(x, hour(a_)) + return "EXTRACT(HOUR FROM " * string(a) * ")" + elseif @capture(x, minute(a_)) + return "EXTRACT(MINUTE FROM " * string(a) * ")" + elseif @capture(x, second(a_)) + return "EXTRACT(SECOND FROM " * string(a) * ")" + elseif @capture(x, floordate(time_column_, unit_)) + return :(DATE_TRUNC($unit, $time_column)) + elseif @capture(x, difftime(endtime_, starttime_, unit_)) + return :(date_diff($unit, $starttime, $endtime)) + elseif @capture(x, replacemissing(column_, replacement_value_)) + return :(COALESCE($column, $replacement_value)) + elseif @capture(x, missingif(column_, value_to_replace_)) + return :(NULLIF($column, $value_to_replace)) + elseif isa(x, Expr) && x.head == :call + if x.args[1] == :if_else && length(x.args) == 4 + return parse_if_else(x) + elseif x.args[1] == :as_float && length(x.args) == 2 + column = x.args[2] + return "CAST(" * string(column) * " AS DECIMAL)" + elseif x.args[1] == :as_integer && length(x.args) == 2 + column = x.args[2] + return "CAST(" * string(column) * " AS INT)" + elseif x.args[1] == :as_string && length(x.args) == 2 + column = x.args[2] + return "CAST(" * string(column) * " AS STRING)" + elseif x.args[1] == :case_when + return parse_case_when(x) + elseif isa(x, Expr) && x.head == :call && x.args[1] == :! && x.args[1] != :!= && length(x.args) == 2 + inner_expr = expr_to_sql_gbq(x.args[2], sq, from_summarize = false) # Recursively transform the inner expression + return string("NOT (", inner_expr, ")") + elseif x.args[1] == :str_detect && length(x.args) == 3 + column, pattern = x.args[2], x.args[3] + return string(column, " LIKE \'%", pattern, "%'") + elseif isa(x, Expr) && x.head == :call && x.args[1] == :n && length(x.args) == 1 + return "COUNT(*)" + end + end + return x + end +end From 77db65263129df3c53cb058746852468777d8bed Mon Sep 17 00:00:00 2001 From: drizk1 Date: Tue, 14 May 2024 10:04:02 -0400 Subject: [PATCH 2/6] gbq successfully queries and collects --- src/TidierDB.jl | 2 +- src/docstrings.jl | 2 ++ src/parsing_gbq.jl | 26 ++++++++++++++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/TidierDB.jl b/src/TidierDB.jl index 6af1488..36c4fa0 100644 --- a/src/TidierDB.jl +++ b/src/TidierDB.jl @@ -238,7 +238,7 @@ function db_table(db, table, athena_params::Any=nothing) table_name = string(table) metadata = if current_sql_mode[] == :lite get_table_metadata(db, table_name) - elseif current_sql_mode[] == :postgres || current_sql_mode[] == :duckdb || current_sql_mode[] == :mysql || current_sql_mode[] == :mssql || current_sql_mode[] == :clickhouse, current_sql_mode[] == :gbq + elseif current_sql_mode[] == :postgres || current_sql_mode[] == :duckdb || current_sql_mode[] == :mysql || current_sql_mode[] == :mssql || current_sql_mode[] == :clickhouse || current_sql_mode[] == :gbq get_table_metadata(db, table_name) elseif current_sql_mode[] == :athena get_table_metadata_athena(db, table_name, athena_params) diff --git a/src/docstrings.jl b/src/docstrings.jl index 26398e5..13fe63f 100644 --- a/src/docstrings.jl +++ b/src/docstrings.jl @@ -982,6 +982,8 @@ This function establishes a database connection based on the specified backend a # conn = connect(:clickhouse; host="localhost", port=9000, database="mydb", user="default", password="") # Connect to SQLite # conn = connect(:lite) +# Connect to Google Big Query +# conn = connect(:gbq, "json_user_key_path", "project_id") # Connect to DuckDB julia> db = connect(:duckdb) DuckDB.Connection(":memory:") diff --git a/src/parsing_gbq.jl b/src/parsing_gbq.jl index e735ff1..0599467 100644 --- a/src/parsing_gbq.jl +++ b/src/parsing_gbq.jl @@ -100,6 +100,32 @@ function parse_gbq_df(df, column_types) return df end +function get_table_metadata(conn::GoogleSession{JSONCredentials}, table_name::String) + query = " SELECT * FROM + $table_name LIMIT 0 + ;" + query_data = Dict( + "query" => query, + "useLegacySql" => false, + "location" => "US") + # Define the API resource + + response = GoogleCloud.api.execute( + conn, + gbq_instance.bigquery_resource, + gbq_instance.bigquery_method, + data=query_data + ) + response_string = String(response) + response_data = JSON3.read(response_string) + column_names = [field["name"] for field in response_data["schema"]["fields"]] + column_types = [field["type"] for field in response_data["schema"]["fields"]] + result = DataFrame(name = column_names, type = column_types) + result[!, :current_selxn] .= 1 + result[!, :table_name] .= table_name + + return select(result, 1 => :name, 2 => :type, :current_selxn, :table_name) +end function expr_to_sql_gbq(expr, sq; from_summarize::Bool) expr = parse_char_matching(expr) From eaf8276dd7bd24864446995149f2170ea94dfa91 Mon Sep 17 00:00:00 2001 From: drizk1 Date: Tue, 14 May 2024 11:52:42 -0400 Subject: [PATCH 3/6] fixes bigquery joining, update readme/index --- README.md | 1 + docs/examples/UserGuide/from_queryex.jl | 2 +- docs/src/index.md | 1 + src/joins_sq.jl | 51 +++++++++++++++---------- src/parsing_gbq.jl | 20 ++++++++++ 5 files changed, 54 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 2177214..787e57f 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ The main goal of TidierDB.jl is to bring the syntax of Tidier.jl to multiple SQL - MSSQL `set_sql_mode(:mssql)` - Postgres `set_sql_mode(:postgres)` - Athena `set_sql_mode(:athena)` +- Google Big Query `set_sql_mode(:gbq)` The style of SQL that is generated can be modified using `set_sql_mode()`. diff --git a/docs/examples/UserGuide/from_queryex.jl b/docs/examples/UserGuide/from_queryex.jl index bc83f99..7a68d89 100644 --- a/docs/examples/UserGuide/from_queryex.jl +++ b/docs/examples/UserGuide/from_queryex.jl @@ -3,7 +3,7 @@ # ```julia # import TidierDB as DB # con = DB.connect(:duckdb) -# DB.copy_to(con, "https://gist.githubusercontent.com/seankross/a412dfbd88b3db70b74b/raw/5f23f993cd87c283ce766e7ac6b329ee7cc2e1d1/mtcars.csv", mtcars2) +# DB.copy_to(con, "https://gist.githubusercontent.com/seankross/a412dfbd88b3db70b74b/raw/5f23f993cd87c283ce766e7ac6b329ee7cc2e1d1/mtcars.csv", "mtcars2") # ``` # Start a query to analyze fuel efficiency by number of cylinders. However, to further build on this query later, end the chain without using `@show_query` or `@collect` diff --git a/docs/src/index.md b/docs/src/index.md index a9c2976..c17ba3c 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -15,6 +15,7 @@ The main goal of TidierDB.jl is to bring the syntax of Tidier.jl to multiple SQL - MSSQL `set_sql_mode(:mssql)` - Postgres `set_sql_mode(:postgres)` - Athena `set_sql_mode(:athena)` +- Google Big Query `set_sql_mode(:gbq)` The style of SQL that is generated can be modified using `set_sql_mode()`. diff --git a/src/joins_sq.jl b/src/joins_sq.jl index 089b6e0..f02a66c 100644 --- a/src/joins_sq.jl +++ b/src/joins_sq.jl @@ -1,3 +1,14 @@ +function gbq_join_parse(input) + input = string(input) + parts = split(input, ".") + if current_sql_mode[] == :gbq && length(parts) >=2 + return join(parts[2:end], ".") + else + return input + end +end + + """ $docstring_left_join """ @@ -18,8 +29,8 @@ macro left_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth most_recent_source = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - join_sql = " " * most_recent_source * ".*, " * string($(esc(join_table))) * ".* FROM " * most_recent_source * - " LEFT JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", most_recent_source, ".", $rhs_col_str) + join_sql = " " * most_recent_source * ".*, " * string(gbq_join_parse($(esc(join_table)))) * ".* FROM " * gbq_join_parse(most_recent_source) * + " LEFT JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(most_recent_source), ".", $rhs_col_str) # Create and add the new CTE new_cte = CTE(name=cte_name, select=join_sql) @@ -28,7 +39,7 @@ macro left_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth # Update the FROM clause sq.from = cte_name else - join_clause = " LEFT JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", sq.from, ".", $rhs_col_str) + join_clause = " LEFT JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(sq.from), ".", $rhs_col_str) sq.from *= join_clause end @@ -65,8 +76,8 @@ macro right_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=not most_recent_source = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - join_sql = " " * most_recent_source * ".*, " * string($(esc(join_table))) * ".* FROM " * most_recent_source * - " RIGHT JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", most_recent_source, ".", $rhs_col_str) + join_sql = " " * most_recent_source * ".*, " * string(gbq_join_parse($(esc(join_table)))) * ".* FROM " * gbq_join_parse(most_recent_source) * + " RIGHT JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(most_recent_source), ".", $rhs_col_str) # Create and add the new CTE new_cte = CTE(name=cte_name, select=join_sql) @@ -75,7 +86,7 @@ macro right_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=not # Update the FROM clause sq.from = cte_name else - join_clause = " RIGHT JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", sq.from, ".", $rhs_col_str) + join_clause = " RIGHT JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(sq.from), ".", $rhs_col_str) sq.from *= join_clause end @@ -93,7 +104,6 @@ macro right_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=not end - """ $docstring_inner_join """ @@ -114,8 +124,8 @@ macro inner_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=not most_recent_source = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - join_sql = " " * most_recent_source * ".*, " * string($(esc(join_table))) * ".* FROM " * most_recent_source * - " INNER JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", most_recent_source, ".", $rhs_col_str) + join_sql = " " * most_recent_source * ".*, " * string(gbq_join_parse($(esc(join_table)))) * ".* FROM " * gbq_join_parse(most_recent_source) * + " INNER JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(most_recent_source), ".", $rhs_col_str) # Create and add the new CTE new_cte = CTE(name=cte_name, select=join_sql) @@ -124,7 +134,7 @@ macro inner_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=not # Update the FROM clause sq.from = cte_name else - join_clause = " INNER JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", sq.from, ".", $rhs_col_str) + join_clause = " INNER JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(sq.from), ".", $rhs_col_str) sq.from *= join_clause end @@ -162,8 +172,8 @@ macro full_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth most_recent_source = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - join_sql = " " * most_recent_source * ".*, " * string($(esc(join_table))) * ".* FROM " * most_recent_source * - " FULL JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", most_recent_source, ".", $rhs_col_str) + join_sql = " " * most_recent_source * ".*, " * string(gbq_join_parse($(esc(join_table)))) * ".* FROM " * gbq_join_parse(most_recent_source) * + " FULL JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(most_recent_source), ".", $rhs_col_str) # Create and add the new CTE new_cte = CTE(name=cte_name, select=join_sql) @@ -172,7 +182,7 @@ macro full_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth # Update the FROM clause sq.from = cte_name else - join_clause = " FULL JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", sq.from, ".", $rhs_col_str) + join_clause = " FULL JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(sq.from), ".", $rhs_col_str) sq.from *= join_clause end @@ -210,8 +220,8 @@ macro semi_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth most_recent_source = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - join_sql = " " * most_recent_source * ".*, " * string($(esc(join_table))) * ".* FROM " * most_recent_source * - " SEMI JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", most_recent_source, ".", $rhs_col_str) + join_sql = " " * most_recent_source * ".*, " * string(gbq_join_parse($(esc(join_table)))) * ".* FROM " * gbq_join_parse(most_recent_source) * + " SEMI JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(most_recent_source), ".", $rhs_col_str) # Create and add the new CTE new_cte = CTE(name=cte_name, select=join_sql) @@ -220,7 +230,7 @@ macro semi_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth # Update the FROM clause sq.from = cte_name else - join_clause = " SEMI JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", sq.from, ".", $rhs_col_str) + join_clause = " SEMI JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(sq.from), ".", $rhs_col_str) sq.from *= join_clause end @@ -258,8 +268,8 @@ macro anti_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth most_recent_source = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - join_sql = " " * most_recent_source * ".*, " * string($(esc(join_table))) * ".* FROM " * most_recent_source * - " ANTI JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", most_recent_source, ".", $rhs_col_str) + join_sql = " " * most_recent_source * ".*, " * string(gbq_join_parse($(esc(join_table)))) * ".* FROM " * gbq_join_parse(most_recent_source) * + " ANTI JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(most_recent_source), ".", $rhs_col_str) # Create and add the new CTE new_cte = CTE(name=cte_name, select=join_sql) @@ -268,7 +278,7 @@ macro anti_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth # Update the FROM clause sq.from = cte_name else - join_clause = " ANTI JOIN " * string($(esc(join_table))) * " ON " * string($(esc(join_table)), ".", $lhs_col_str, " = ", sq.from, ".", $rhs_col_str) + join_clause = " ANTI JOIN " * string($(esc(join_table))) * " ON " * string(gbq_join_parse($(esc(join_table))), ".", $lhs_col_str, " = ", gbq_join_parse(sq.from), ".", $rhs_col_str) sq.from *= join_clause end @@ -283,4 +293,5 @@ macro anti_join(sqlquery, join_table, lhs_column, rhs_column, athena_params=noth end sq end -end \ No newline at end of file +end + diff --git a/src/parsing_gbq.jl b/src/parsing_gbq.jl index 0599467..fd64a4c 100644 --- a/src/parsing_gbq.jl +++ b/src/parsing_gbq.jl @@ -277,3 +277,23 @@ function expr_to_sql_gbq(expr, sq; from_summarize::Bool) return x end end + + + + +function process_column(input::String) + if current_sql_mode == :gbq + return join_gbq_parse(input, full=false) + else + return input + end +end + +function join_gbq_parse(input_str::String; full::Bool = true) + parts = split(input_str, ".") + if full + return input_str + else + return parts[end] + end +end \ No newline at end of file From 2b65b6ad16e1cee6c767ca9557be83b3e920bb00 Mon Sep 17 00:00:00 2001 From: drizk1 Date: Tue, 14 May 2024 12:00:05 -0400 Subject: [PATCH 4/6] fix googlecloud compat --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 0e64cca..85373c5 100644 --- a/Project.toml +++ b/Project.toml @@ -30,7 +30,7 @@ Documenter = "0.27, 1" DuckDB = "0.10" LibPQ = "1.17" JSON3 = "1.1" -GoogleCloud = ".8" +GoogleCloud = "0.11" MacroTools = "0.5" MySQL = "1.4" ODBC = "1.1" From f79e26117a09dafcbbd671dd40abf12779e70314 Mon Sep 17 00:00:00 2001 From: drizk1 Date: Tue, 14 May 2024 12:08:41 -0400 Subject: [PATCH 5/6] update news --- NEWS.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/NEWS.md b/NEWS.md index 91c63af..e8ff3ef 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,9 @@ # TidierDB.jl updates +## v0.1.4 - 2024-05-TBD +- Adds Google Big Query support +- use `connect` with BGQ JSON credentials and project id establish connection + ## v0.1.3 - 2024-05-09 - Adds `@full_join`, `@semi_join`, `@anti_join` - Fixes bug to allow joining tables for Athena backend From ca524f4d1e025b535b8776eabd181f13468f7b7d Mon Sep 17 00:00:00 2001 From: drizk1 Date: Tue, 14 May 2024 12:09:45 -0400 Subject: [PATCH 6/6] bump version --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 85373c5..d91abb4 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "TidierDB" uuid = "86993f9b-bbba-4084-97c5-ee15961ad48b" authors = ["Daniel Rizk and contributors"] -version = "0.1.3" +version = "0.1.4" [deps] AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"