Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-2084] Rerun dbt seed append data instead of refresh data if seed is stored in external table #112

Open
qsbao opened this issue Oct 21, 2020 · 24 comments
Labels
enhancement New feature or request help_wanted Extra attention is needed

Comments

@qsbao
Copy link

qsbao commented Oct 21, 2020

Enviroment:

  • Spark Standalone cluster 2.4.5 with Thrift JDBC/ODBC server
  • dbt 0.18.0
  • dbt-spark 0.18.0

I use example project from https://github.com/fishtown-analytics/jaffle_shop.
Set location_root of seed so data is stored in external table.

seeds:
  jaffle_shop:
      raw_orders:
        location_root: hdfs:///user/qsbao/

While repeat run dbt seed -s raw_orders, I found the number of records of table raw_orders continues to grow.

Found sql from log in a run:

drop table if exists dbt_alice.raw_orders

create table dbt_alice.raw_orders (id bigint,user_id bigint,order_date date,status string)
location 'hdfs:///user/qsbao/raw_orders'

insert into dbt_alice.raw_orders values
            (%s,%s,%s,%s),(%s,%s,%s,%s),(%s,%s,%s,%s)...

Note that the first sql drop table removes only the metadata and not the data itself as it is an external table.

@jtcohen6
Copy link
Contributor

This is a really good point @qsbao. To double-check my understanding, this seed is now populating an external table because we've specified a location for it within the create table statement, right?

Note that the first sql drop table removes only the metadata and not the data itself as it is an external table.

Is there a SparkSQL remedy for this? I see that in Hive-land it's possible to set TBLPROPERTIES ('external.table.purge'='true') and have drop table remove both the metadata and the underlying data.

@qsbao
Copy link
Author

qsbao commented Oct 22, 2020

This is a really good point @qsbao. To double-check my understanding, this seed is now populating an external table because we've specified a location for it within the create table statement, right?

Right!

Note that the first sql drop table removes only the metadata and not the data itself as it is an external table.

Is there a SparkSQL remedy for this? I see that in Hive-land it's possible to set TBLPROPERTIES ('external.table.purge'='true') and have drop table remove both the metadata and the underlying data.

According to SparkSQL drop table reference, I think there is no SparkSQL remedy for this.

@jtcohen6
Copy link
Contributor

jtcohen6 commented Oct 22, 2020

@qsbao Your comment on #18 gives me hope! It sounds like we could try using a create table as step in the seed materialization IFF location_root is specified. I'm envisioning something like:

drop table if exists dbt_alice.raw_orders;

create table dbt_alice.raw_orders
location 'hdfs:///user/qsbao/raw_orders'
as
    select cast(null as bigint) as id, cast(null as bigint) as user_id, cast(null as date) as order_date, cast(null as string) as status
    where false
;

insert into dbt_alice.raw_orders values
            (%s,%s,%s,%s),(%s,%s,%s,%s),(%s,%s,%s,%s)...

If that doesn't work, we could:

  • look into non-SQL ways of handling this
  • document this as a known limitation of dbt seed + location_root

@qsbao
Copy link
Author

qsbao commented Oct 23, 2020

@jtcohen6 I tried your solution, it works!

And oberserved that the second sql create table as select empty will write en empty data file which I think is fine.

// After create table as select empty
$ hdfs dfs -ls hdfs:///user/qsbao/raw_orders
Found 1 items
-rwxr-x--x   3 qsbao qsbao          0 2020-10-23 15:15 hdfs:///user/qsbao/raw_orders/part-00000-20c75120-064f-4558-af5a-5abed4582b37-c000

// After insert into values
$ hdfs dfs -ls hdfs:///user/qsbao/raw_orders
Found 2 items
-rwxr-x--x   3 qsbao qsbao          0 2020-10-23 15:15 hdfs:///user/qsbao/raw_orders/part-00000-20c75120-064f-4558-af5a-5abed4582b37-c000
-rwxr-x--x   3 qsbao qsbao         27 2020-10-23 15:18 hdfs:///user/qsbao/raw_orders/part-00000-f76997dd-5bc4-4303-8521-5dc8ad748f2b-c000

@qsbao
Copy link
Author

qsbao commented Oct 23, 2020

And after the solution is confirmed, I am happy to work on this.

@machov
Copy link

machov commented May 10, 2021

@jtcohen6
Copy link
Contributor

@mv1742 That's a really good point! It would be a pretty simple change to accomplish our desired behavior around truncate-then-insert, assuming it works with external tables.

The only thing we'll need to be careful of is, dbt splits very large seeds into 10k-row chunks. We'd only want the first chunk to execute an insert overwrite, the rest all insert into.

So perhaps in the seed materialization:
https://github.com/fishtown-analytics/dbt-spark/blob/dff1b613ddf87e4e72e8a47475bcfd1d55796a5c/dbt/include/spark/macros/materializations/seed.sql#L6-L14

The last line there could become:

insert {{ "overwrite" if loop.first else "into" }} {{ this.render() }} values

@machov
Copy link

machov commented May 10, 2021

I just tested with our external table and insert overwrite works. We've been adding dbt-spark workflows as a core component of our pipelines and this feature is important. I would be happy to make the PR if my user is added as contributor. @jtcohen6

@ychebaro
Copy link

ychebaro commented Jun 21, 2021

Any updates on this @qsbao ?
Or maybe @mv1742 were you able to implement what @jtcohen6 suggested?
Thanks!

@jtcohen6
Copy link
Contributor

@mv1742 I'd definitely welcome a PR for this! Are you able to fork the repository, make the change on a branch there, and open a PR from the fork?

@machov
Copy link

machov commented Jun 21, 2021

@jtcohen6 sure, let me do that

@ychebaro
Copy link

Amazing thanks y'all!

This was referenced Jun 22, 2021
@machov
Copy link

machov commented Jun 22, 2021

I created this Pull Request.

With the code above using Insert Overwrite, I was getting the error HY00: 'The SQL contains 0 parameter markers, but 216 parameters were supplied'.
Instead, I used truncate table in order remove all rows from the existing seed tables and replace values as done in the dbt global project @jtcohen6

@jtcohen6
Copy link
Contributor

@mv1742 Thanks for opening the PR! I'll respond over there

@gonzaferreiro
Copy link

Hi everyone. Any chance this will get merged any time soon? It'd be really helpful in a project I'm working in. Thanks!

@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2022

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

@github-actions github-actions bot added the Stale label Mar 1, 2022
@github-actions github-actions bot closed this as completed Mar 9, 2022
@creativedutchmen
Copy link

Any updates on this? A workaround I can use in my projects?

@dejan
Copy link

dejan commented Jun 20, 2022

@creativedutchmen we're also affected by this and we're deleting from the table with a pre-hook as a workaround.

@gatewaycat
Copy link

gatewaycat commented Sep 28, 2022

@creativedutchmen for reference, the workaround for now that worked for us was adding this to the seed's yml file.

seeds:
  - name: SEEDNAME
    config:
      pre-hook:
        - "drop table if exists SCHEMANAME.dummy1;"
        - "create table SCHEMANAME.dummy1 using parquet location 's3://PATH/SEEDNAME' as select 1;"

We are using dbt with spark to build tables and write to s3. Our stakeholders use redshift spectrum to query s3.

The result has 1 extra completely null row, but I can live with that.

Question—Is there a jinja macro like {{ this }} but only has the table name? If so, I'd be able to generalize and move the pre-hook to dbt_project.yml so that it applies to all seeds.

@gatewaycat
Copy link

Requesting this issue be reopened. Fixing this bug would be very helpful for my team. We use redshift spectrum on top of s3.

@Fleid Fleid added enhancement New feature or request help_wanted Extra attention is needed and removed Stale labels Feb 11, 2023
@Fleid Fleid added the jira label Feb 11, 2023
@github-actions github-actions bot changed the title Rerun dbt seed append data instead of refresh data if seed is stored in external table [CT-2084] Rerun dbt seed append data instead of refresh data if seed is stored in external table Feb 11, 2023
@Fleid Fleid reopened this Feb 14, 2023
@juanrondineau
Copy link

@gatewaycat there are two properties: this.schema and this.name but when i tried to use them in seeds pre or post hooks it failed

@jeremyyeo
Copy link

jeremyyeo commented Mar 14, 2023

Adding another failure mode that relates to table/column comments...

# dbt_project.yml
...
seeds:
  my_dbt_project:
    +file_format: delta
    +location_root: /mnt/root/seeds
    +persist_docs:
      relation: true
      columns: true

# seeds/schema.yml
version: 2
seeds:
  - name: person
    description: Persons
    columns:
      - name: id
        description: Id
      - name: name
        description: Name

# seeds/person.csv
id,name
1,alice

First things, first - ensure dbfs is clean and /mnt/root/seeds does NOT contain a persons folder...

# In notebook
dbutils.fs.rm("/mnt/root/seeds/person", recurse=True)
drop table if exists dbt_jyeo.person;

Seed twice:

$ dbt seed && dbt seed

============================== 2023-03-14 02:58:07.509895 | 8a669556-e7fa-41db-b4d7-eb8bec21ae20 ==============================
02:58:07.509895 [info ] [MainThread]: Running with dbt=1.4.4
02:58:07.512979 [debug] [MainThread]: running dbt with arguments {'debug': True, 'write_json': True, 'use_colors': True, 'printer_width': 80, 'version_check': True, 'partial_parse': True, 'static_parser': True, 'profiles_dir': '/Users/jeremy/.dbt', 'send_anonymous_usage_stats': True, 'quiet': False, 'no_print': False, 'cache_selected_only': False, 'show': False, 'which': 'seed', 'rpc_method': 'seed', 'indirect_selection': 'eager'}
02:58:07.513615 [debug] [MainThread]: Tracking: tracking
02:58:07.529992 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a48ddb0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a48db10>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a48d9f0>]}
02:58:07.550017 [debug] [MainThread]: checksum: 170d819e8a7f11e09c497566dd7f61e1355cb9fb514921503937b951cb4a2250, vars: {}, profile: None, target: None, version: 1.4.4
02:58:07.617022 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
02:58:07.617566 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
02:58:07.627417 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '8a669556-e7fa-41db-b4d7-eb8bec21ae20', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a653f40>]}
02:58:07.636036 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '8a669556-e7fa-41db-b4d7-eb8bec21ae20', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a621090>]}
02:58:07.636646 [info ] [MainThread]: Found 2 models, 0 tests, 0 snapshots, 0 analyses, 330 macros, 0 operations, 1 seed file, 0 sources, 0 exposures, 0 metrics
02:58:07.637324 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '8a669556-e7fa-41db-b4d7-eb8bec21ae20', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a6210f0>]}
02:58:07.639376 [info ] [MainThread]: 
02:58:07.642166 [debug] [MainThread]: Acquiring new spark connection 'master'
02:58:07.643936 [debug] [ThreadPool]: Acquiring new spark connection 'list_schemas'
02:58:07.658051 [debug] [ThreadPool]: Using spark connection "list_schemas"
02:58:07.658947 [debug] [ThreadPool]: On list_schemas: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "connection_name": "list_schemas"} */

    show databases
  
02:58:07.659528 [debug] [ThreadPool]: Opening a new connection, currently in state init
02:58:10.610525 [debug] [ThreadPool]: SQL status: OK in 3 seconds
02:58:10.618848 [debug] [ThreadPool]: On list_schemas: Close
02:58:11.100162 [debug] [ThreadPool]: Acquiring new spark connection 'list_None_dbt_jyeo'
02:58:11.113140 [debug] [ThreadPool]: Spark adapter: NotImplemented: add_begin_query
02:58:11.113941 [debug] [ThreadPool]: Using spark connection "list_None_dbt_jyeo"
02:58:11.114655 [debug] [ThreadPool]: On list_None_dbt_jyeo: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "connection_name": "list_None_dbt_jyeo"} */
show table extended in dbt_jyeo like '*'
  
02:58:11.115364 [debug] [ThreadPool]: Opening a new connection, currently in state closed
02:58:14.369108 [debug] [ThreadPool]: SQL status: OK in 3 seconds
02:58:14.376131 [debug] [ThreadPool]: On list_None_dbt_jyeo: ROLLBACK
02:58:14.377479 [debug] [ThreadPool]: Spark adapter: NotImplemented: rollback
02:58:14.378564 [debug] [ThreadPool]: On list_None_dbt_jyeo: Close
02:58:14.897919 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '8a669556-e7fa-41db-b4d7-eb8bec21ae20', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a620970>]}
02:58:14.899255 [debug] [MainThread]: Spark adapter: NotImplemented: add_begin_query
02:58:14.900207 [debug] [MainThread]: Spark adapter: NotImplemented: commit
02:58:14.901889 [info ] [MainThread]: Concurrency: 1 threads (target='dev')
02:58:14.902959 [info ] [MainThread]: 
02:58:14.911107 [debug] [Thread-1 (]: Began running node seed.my_dbt_project.person
02:58:14.912130 [info ] [Thread-1 (]: 1 of 1 START seed file dbt_jyeo.person ......................................... [RUN]
02:58:14.913357 [debug] [Thread-1 (]: Acquiring new spark connection 'seed.my_dbt_project.person'
02:58:14.914114 [debug] [Thread-1 (]: Began compiling node seed.my_dbt_project.person
02:58:14.914983 [debug] [Thread-1 (]: Timing info for seed.my_dbt_project.person (compile): 2023-03-14 02:58:14.914832 => 2023-03-14 02:58:14.914842
02:58:14.916058 [debug] [Thread-1 (]: Began executing node seed.my_dbt_project.person
02:58:14.985229 [debug] [Thread-1 (]: Spark adapter: NotImplemented: add_begin_query
02:58:14.985933 [debug] [Thread-1 (]: Using spark connection "seed.my_dbt_project.person"
02:58:14.986919 [debug] [Thread-1 (]: On seed.my_dbt_project.person: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "node_id": "seed.my_dbt_project.person"} */

    create table dbt_jyeo.person (`id` bigint,`name` string)
    
    using delta
    
    
    
    location '/mnt/root/seeds/person'
    comment 'Persons'
      
  
02:58:14.988160 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
02:58:21.434274 [debug] [Thread-1 (]: SQL status: OK in 6 seconds
02:58:21.469866 [debug] [Thread-1 (]: Using spark connection "seed.my_dbt_project.person"
02:58:21.470486 [debug] [Thread-1 (]: On seed.my_dbt_project.person: 
          insert into dbt_jyeo.person values
          (cast(%s as bigint),cast(%s as string))
      ...
02:58:36.317133 [debug] [Thread-1 (]: SQL status: OK in 15 seconds
02:58:36.334360 [debug] [Thread-1 (]: Writing runtime SQL for node "seed.my_dbt_project.person"
02:58:36.370808 [debug] [Thread-1 (]: Using spark connection "seed.my_dbt_project.person"
02:58:36.371353 [debug] [Thread-1 (]: On seed.my_dbt_project.person: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "node_id": "seed.my_dbt_project.person"} */

    
        alter table dbt_jyeo.person change column
            id
            comment 'Id';
      
  
02:58:37.817472 [debug] [Thread-1 (]: SQL status: OK in 1 seconds
02:58:37.819397 [debug] [Thread-1 (]: Using spark connection "seed.my_dbt_project.person"
02:58:37.819891 [debug] [Thread-1 (]: On seed.my_dbt_project.person: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "node_id": "seed.my_dbt_project.person"} */

    
        alter table dbt_jyeo.person change column
            name
            comment 'Name';
      
  
02:58:39.310216 [debug] [Thread-1 (]: SQL status: OK in 1 seconds
02:58:39.325556 [debug] [Thread-1 (]: Spark adapter: NotImplemented: commit
02:58:39.327266 [debug] [Thread-1 (]: Timing info for seed.my_dbt_project.person (execute): 2023-03-14 02:58:14.916791 => 2023-03-14 02:58:39.327150
02:58:39.328135 [debug] [Thread-1 (]: On seed.my_dbt_project.person: ROLLBACK
02:58:39.329269 [debug] [Thread-1 (]: Spark adapter: NotImplemented: rollback
02:58:39.330027 [debug] [Thread-1 (]: On seed.my_dbt_project.person: Close
02:58:39.872066 [debug] [Thread-1 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '8a669556-e7fa-41db-b4d7-eb8bec21ae20', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a6dd8a0>]}
02:58:39.873724 [info ] [Thread-1 (]: 1 of 1 OK loaded seed file dbt_jyeo.person ..................................... [INSERT 1 in 24.96s]
02:58:39.878215 [debug] [Thread-1 (]: Finished running node seed.my_dbt_project.person
02:58:39.881564 [debug] [MainThread]: Acquiring new spark connection 'master'
02:58:39.882496 [debug] [MainThread]: On master: ROLLBACK
02:58:39.883381 [debug] [MainThread]: Opening a new connection, currently in state init
02:58:41.552001 [debug] [MainThread]: Spark adapter: NotImplemented: rollback
02:58:41.554057 [debug] [MainThread]: Spark adapter: NotImplemented: add_begin_query
02:58:41.555255 [debug] [MainThread]: Spark adapter: NotImplemented: commit
02:58:41.556448 [debug] [MainThread]: On master: ROLLBACK
02:58:41.557600 [debug] [MainThread]: Spark adapter: NotImplemented: rollback
02:58:41.558738 [debug] [MainThread]: On master: Close
02:58:41.994624 [debug] [MainThread]: Connection 'master' was properly closed.
02:58:41.996413 [debug] [MainThread]: Connection 'seed.my_dbt_project.person' was properly closed.
02:58:41.998164 [info ] [MainThread]: 
02:58:42.000181 [info ] [MainThread]: Finished running 1 seed in 0 hours 0 minutes and 34.36 seconds (34.36s).
02:58:42.001786 [debug] [MainThread]: Command end result
02:58:42.015385 [info ] [MainThread]: 
02:58:42.016222 [info ] [MainThread]: Completed successfully
02:58:42.016963 [info ] [MainThread]: 
02:58:42.017670 [info ] [MainThread]: Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
02:58:42.018519 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a6dd2a0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a6dc910>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a6dd3c0>]}
02:58:42.019255 [debug] [MainThread]: Flushing usage events


============================== 2023-03-14 02:58:45.783287 | 8928ec38-2205-400b-b88c-3c48482bdd98 ==============================
02:58:45.783287 [info ] [MainThread]: Running with dbt=1.4.4
02:58:45.785876 [debug] [MainThread]: running dbt with arguments {'debug': True, 'write_json': True, 'use_colors': True, 'printer_width': 80, 'version_check': True, 'partial_parse': True, 'static_parser': True, 'profiles_dir': '/Users/jeremy/.dbt', 'send_anonymous_usage_stats': True, 'quiet': False, 'no_print': False, 'cache_selected_only': False, 'show': False, 'which': 'seed', 'rpc_method': 'seed', 'indirect_selection': 'eager'}
02:58:45.786393 [debug] [MainThread]: Tracking: tracking
02:58:45.801578 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112515d20>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112515a80>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112515960>]}
02:58:45.817662 [debug] [MainThread]: checksum: 170d819e8a7f11e09c497566dd7f61e1355cb9fb514921503937b951cb4a2250, vars: {}, profile: None, target: None, version: 1.4.4
02:58:45.858411 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
02:58:45.859122 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
02:58:45.868737 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '8928ec38-2205-400b-b88c-3c48482bdd98', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1126dfee0>]}
02:58:45.878328 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '8928ec38-2205-400b-b88c-3c48482bdd98', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1126a9030>]}
02:58:45.879082 [info ] [MainThread]: Found 2 models, 0 tests, 0 snapshots, 0 analyses, 330 macros, 0 operations, 1 seed file, 0 sources, 0 exposures, 0 metrics
02:58:45.879821 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '8928ec38-2205-400b-b88c-3c48482bdd98', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1126a9090>]}
02:58:45.881761 [info ] [MainThread]: 
02:58:45.884310 [debug] [MainThread]: Acquiring new spark connection 'master'
02:58:45.886197 [debug] [ThreadPool]: Acquiring new spark connection 'list_schemas'
02:58:45.900086 [debug] [ThreadPool]: Using spark connection "list_schemas"
02:58:45.900825 [debug] [ThreadPool]: On list_schemas: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "connection_name": "list_schemas"} */

    show databases
  
02:58:45.901458 [debug] [ThreadPool]: Opening a new connection, currently in state init
02:58:48.163031 [debug] [ThreadPool]: SQL status: OK in 2 seconds
02:58:48.173147 [debug] [ThreadPool]: On list_schemas: Close
02:58:48.784771 [debug] [ThreadPool]: Acquiring new spark connection 'list_None_dbt_jyeo'
02:58:48.797730 [debug] [ThreadPool]: Spark adapter: NotImplemented: add_begin_query
02:58:48.798591 [debug] [ThreadPool]: Using spark connection "list_None_dbt_jyeo"
02:58:48.799257 [debug] [ThreadPool]: On list_None_dbt_jyeo: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "connection_name": "list_None_dbt_jyeo"} */
show table extended in dbt_jyeo like '*'
  
02:58:48.799903 [debug] [ThreadPool]: Opening a new connection, currently in state closed
02:58:51.710360 [debug] [ThreadPool]: SQL status: OK in 3 seconds
02:58:51.717372 [debug] [ThreadPool]: On list_None_dbt_jyeo: ROLLBACK
02:58:51.718749 [debug] [ThreadPool]: Spark adapter: NotImplemented: rollback
02:58:51.719973 [debug] [ThreadPool]: On list_None_dbt_jyeo: Close
02:58:52.258581 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '8928ec38-2205-400b-b88c-3c48482bdd98', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1126a8910>]}
02:58:52.259432 [debug] [MainThread]: Spark adapter: NotImplemented: add_begin_query
02:58:52.259990 [debug] [MainThread]: Spark adapter: NotImplemented: commit
02:58:52.261009 [info ] [MainThread]: Concurrency: 1 threads (target='dev')
02:58:52.261596 [info ] [MainThread]: 
02:58:52.265159 [debug] [Thread-1 (]: Began running node seed.my_dbt_project.person
02:58:52.266180 [info ] [Thread-1 (]: 1 of 1 START seed file dbt_jyeo.person ......................................... [RUN]
02:58:52.267474 [debug] [Thread-1 (]: Acquiring new spark connection 'seed.my_dbt_project.person'
02:58:52.268244 [debug] [Thread-1 (]: Began compiling node seed.my_dbt_project.person
02:58:52.268799 [debug] [Thread-1 (]: Timing info for seed.my_dbt_project.person (compile): 2023-03-14 02:58:52.268693 => 2023-03-14 02:58:52.268701
02:58:52.269359 [debug] [Thread-1 (]: Began executing node seed.my_dbt_project.person
02:58:52.310170 [debug] [Thread-1 (]: Using spark connection "seed.my_dbt_project.person"
02:58:52.310857 [debug] [Thread-1 (]: On seed.my_dbt_project.person: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "node_id": "seed.my_dbt_project.person"} */
drop table if exists dbt_jyeo.person
02:58:52.311437 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
02:58:54.933566 [debug] [Thread-1 (]: SQL status: OK in 3 seconds
02:58:54.996629 [debug] [Thread-1 (]: Spark adapter: NotImplemented: add_begin_query
02:58:54.997237 [debug] [Thread-1 (]: Using spark connection "seed.my_dbt_project.person"
02:58:54.997949 [debug] [Thread-1 (]: On seed.my_dbt_project.person: /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "node_id": "seed.my_dbt_project.person"} */

    create table dbt_jyeo.person (`id` bigint,`name` string)
    
    using delta
    
    
    
    location '/mnt/root/seeds/person'
    comment 'Persons'
      
  
02:58:56.679265 [debug] [Thread-1 (]: Spark adapter: Error while running:
/* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "spark", "target_name": "dev", "node_id": "seed.my_dbt_project.person"} */

    create table dbt_jyeo.person (`id` bigint,`name` string)
    
    using delta
    
    
    
    location '/mnt/root/seeds/person'
    comment 'Persons'
      
  
02:58:56.680454 [debug] [Thread-1 (]: Spark adapter: ('42000', '[42000] [Simba][Hardy] (80) Syntax or semantic analysis error thrown in server while executing query. Error message from server: org.apache.hive.service.cli.HiveSQLException: Error running query: [DELTA_CREATE_TABLE_SCHEME_MISMATCH] com.databricks.sql.transaction.tahoe.DeltaAnalysisException: The specified schema does not match the existing schema at dbfs:/mnt/root/seeds/person.\n\n== Specified ==\nroot\n |-- id: long (nullable = true)\n |-- name: string (nullable = true)\n\n\n== Existing ==\nroot\n |-- id: long (nullable =\x00 (80) (SQLExecDirectW)')
02:58:56.681210 [debug] [Thread-1 (]: Timing info for seed.my_dbt_project.person (execute): 2023-03-14 02:58:52.269753 => 2023-03-14 02:58:56.681112
02:58:56.681706 [debug] [Thread-1 (]: On seed.my_dbt_project.person: ROLLBACK
02:58:56.682167 [debug] [Thread-1 (]: Spark adapter: NotImplemented: rollback
02:58:56.682584 [debug] [Thread-1 (]: On seed.my_dbt_project.person: Close
02:58:57.141320 [debug] [Thread-1 (]: Runtime Error in seed person (seeds/person.csv)
  ('42000', '[42000] [Simba][Hardy] (80) Syntax or semantic analysis error thrown in server while executing query. Error message from server: org.apache.hive.service.cli.HiveSQLException: Error running query: [DELTA_CREATE_TABLE_SCHEME_MISMATCH] com.databricks.sql.transaction.tahoe.DeltaAnalysisException: The specified schema does not match the existing schema at dbfs:/mnt/root/seeds/person.\n\n== Specified ==\nroot\n |-- id: long (nullable = true)\n |-- name: string (nullable = true)\n\n\n== Existing ==\nroot\n |-- id: long (nullable =\x00 (80) (SQLExecDirectW)')
02:58:57.142722 [debug] [Thread-1 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '8928ec38-2205-400b-b88c-3c48482bdd98', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11275baf0>]}
02:58:57.144330 [error] [Thread-1 (]: 1 of 1 ERROR loading seed file dbt_jyeo.person ................................. [ERROR in 4.88s]
02:58:57.148275 [debug] [Thread-1 (]: Finished running node seed.my_dbt_project.person
02:58:57.151267 [debug] [MainThread]: Acquiring new spark connection 'master'
02:58:57.152432 [debug] [MainThread]: On master: ROLLBACK
02:58:57.153361 [debug] [MainThread]: Opening a new connection, currently in state init
02:58:58.606398 [debug] [MainThread]: Spark adapter: NotImplemented: rollback
02:58:58.608365 [debug] [MainThread]: Spark adapter: NotImplemented: add_begin_query
02:58:58.609995 [debug] [MainThread]: Spark adapter: NotImplemented: commit
02:58:58.611294 [debug] [MainThread]: On master: ROLLBACK
02:58:58.612405 [debug] [MainThread]: Spark adapter: NotImplemented: rollback
02:58:58.613530 [debug] [MainThread]: On master: Close
02:58:59.125587 [debug] [MainThread]: Connection 'master' was properly closed.
02:58:59.127438 [debug] [MainThread]: Connection 'seed.my_dbt_project.person' was properly closed.
02:58:59.129108 [info ] [MainThread]: 
02:58:59.130452 [info ] [MainThread]: Finished running 1 seed in 0 hours 0 minutes and 13.25 seconds (13.25s).
02:58:59.131791 [debug] [MainThread]: Command end result
02:58:59.145619 [info ] [MainThread]: 
02:58:59.146532 [info ] [MainThread]: Completed with 1 error and 0 warnings:
02:58:59.147357 [info ] [MainThread]: 
02:58:59.148056 [error] [MainThread]: Runtime Error in seed person (seeds/person.csv)
02:58:59.148717 [error] [MainThread]:   ('42000', '[42000] [Simba][Hardy] (80) Syntax or semantic analysis error thrown in server while executing query. Error message from server: org.apache.hive.service.cli.HiveSQLException: Error running query: [DELTA_CREATE_TABLE_SCHEME_MISMATCH] com.databricks.sql.transaction.tahoe.DeltaAnalysisException: The specified schema does not match the existing schema at dbfs:/mnt/root/seeds/person.\n\n== Specified ==\nroot\n |-- id: long (nullable = true)\n |-- name: string (nullable = true)\n\n\n== Existing ==\nroot\n |-- id: long (nullable =\x00 (80) (SQLExecDirectW)')
02:58:59.149567 [info ] [MainThread]: 
02:58:59.150202 [info ] [MainThread]: Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
02:58:59.150961 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11275baf0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11251cb20>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112787d60>]}
02:58:59.151744 [debug] [MainThread]: Flushing usage events

Same behaviour is observed in dbt-databricks 1.4.

@felipe-curebase
Copy link

felipe-curebase commented Jul 24, 2023

This would be tremendously helpful to our team as this issue breaks Idempotency of pipelines and the workaround looks ugly, dbt "this" variable to fetch the table name doesn't even work in seed pre-hooks and we have to hardcode the seed names into the manual create table commands, it is just not scalable.

@gatewaycat
Copy link

gatewaycat commented Aug 16, 2023

Better workaround. Add the following file spark__load_csv_rows.sql to your macros folder
Make sure to remove any other workarounds like the pre-hook in my previous post.

{% macro spark__load_csv_rows(model, agate_table) %}
    {% set batch_size = 1000 %}

    {% set statements = [] %}

    {% for chunk in agate_table.rows | batch(batch_size) %}
        {% set bindings = [] %}

        {% for row in chunk %}
          {% do bindings.extend(row) %}
        {% endfor %}

        {% set sql %}
            insert {{ "overwrite" if loop.first else "into" }} {{ this.render() }} values
            {% for row in chunk -%}
                ({%- for column in agate_table.columns -%}
                    {%- if 'ISODate' in (column.data_type | string) -%}
                      cast(%s as timestamp)
                    {%- else -%}
                    %s
                    {%- endif -%}
                    {%- if not loop.last%},{%- endif %}
                {%- endfor -%})
                {%- if not loop.last%},{%- endif %}
            {%- endfor %}
        {% endset %}

        {% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}

        {% if loop.index0 == 0 %}
            {% do statements.append(sql) %}
        {% endif %}
    {% endfor %}

    {# Return SQL so we can render it out into the compiled files #}
    {{ return(statements[0]) }}
{% endmacro %}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment