From a9d8a84f13de74898b07d40304618b1513ba7710 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 13 Dec 2021 12:27:48 +0100 Subject: [PATCH] Add optional Citus support This adds the option to configure the TPROC-C build step to build the schema with support for Citus. This distributes the tables and all the stored procedures. --- config/postgresql.xml | 1 + src/postgresql/pgoltp.tcl | 43 +++++++++---- src/postgresql/pgopt.tcl | 123 ++++++++++++++++++++++++-------------- 3 files changed, 109 insertions(+), 58 deletions(-) diff --git a/config/postgresql.xml b/config/postgresql.xml index 4ba3474b..f6cf3893 100755 --- a/config/postgresql.xml +++ b/config/postgresql.xml @@ -19,6 +19,7 @@ false false false + false false false diff --git a/src/postgresql/pgoltp.tcl b/src/postgresql/pgoltp.tcl index ad82b6d3..18c82105 100755 --- a/src/postgresql/pgoltp.tcl +++ b/src/postgresql/pgoltp.tcl @@ -29,7 +29,7 @@ set library $library if [catch {::tcl::tm::path add modules} ] { error "Failed to find modules directory" } if [catch {package require tpcccommon} ] { error "Failed to load tpcc common functions" } else { namespace import tpcccommon::* } -proc CreateStoredProcs { lda ora_compatible pg_storedprocs } { +proc CreateStoredProcs { lda ora_compatible citus_compatible pg_storedprocs } { if { $pg_storedprocs eq "true" } { puts "CREATING TPCC STORED PROCEDURES" } else { @@ -455,9 +455,13 @@ proc CreateStoredProcs { lda ora_compatible pg_storedprocs } { WHEN serialization_failure OR deadlock_detected OR no_data_found THEN ROLLBACK; END; } - for { set i 1 } { $i <= 6 } { incr i } { + if { $citus_compatible eq "true" } { + set sql(7) { SELECT create_distributed_function('dbms_random(int,int)') } + set sql(8) { SELECT create_distributed_function(oid, '$1', colocate_with:='warehouse') FROM pg_catalog.pg_proc WHERE proname IN ('neword', 'delivery', 'payment', 'ostat', 'slev') } + } + for { set i 1 } { $i <= [array size sql] } { incr i } { set result [ pg_exec $lda $sql($i) ] - if {[pg_result $result -status] != "PGRES_COMMAND_OK"} { + if {[pg_result $result -status] ni {"PGRES_TUPLES_OK" "PGRES_COMMAND_OK"}} { error "[pg_result $result -error]" } else { pg_result $result -clear @@ -1551,9 +1555,13 @@ proc CreateStoredProcs { lda ora_compatible pg_storedprocs } { ' LANGUAGE 'plpgsql'; } } - for { set i 1 } { $i <= 6 } { incr i } { + if { $citus_compatible eq "true" } { + set sql(7) { SELECT create_distributed_function('dbms_random(int,int)') } + set sql(8) { SELECT create_distributed_function(oid, '$1', colocate_with:='warehouse') FROM pg_catalog.pg_proc WHERE proname IN ('neword', 'delivery', 'payment', 'ostat', 'slev') } + } + for { set i 1 } { $i <= [array size sql] } { incr i } { set result [ pg_exec $lda $sql($i) ] - if {[pg_result $result -status] != "PGRES_COMMAND_OK"} { + if {[pg_result $result -status] ni {"PGRES_TUPLES_OK" "PGRES_COMMAND_OK"}} { error "[pg_result $result -error]" } else { pg_result $result -clear @@ -1642,7 +1650,7 @@ proc CreateUserDatabase { lda host port sslmode db tspace superuser superuser_pa return } -proc CreateTables { lda ora_compatible num_part } { +proc CreateTables { lda ora_compatible citus_compatible num_part } { puts "CREATING TPCC TABLES" if { $ora_compatible eq "true" } { set sql(1) "CREATE TABLE CUSTOMER (C_ID NUMBER(5, 0), C_D_ID NUMBER(2, 0), C_W_ID NUMBER(6, 0), C_FIRST VARCHAR2(16), C_MIDDLE CHAR(2), C_LAST VARCHAR2(16), C_STREET_1 VARCHAR2(20), C_STREET_2 VARCHAR2(20), C_CITY VARCHAR2(20), C_STATE CHAR(2), C_ZIP CHAR(9), C_PHONE CHAR(16), C_SINCE DATE, C_CREDIT CHAR(2), C_CREDIT_LIM NUMBER(12, 2), C_DISCOUNT NUMBER(4, 4), C_BALANCE NUMBER(12, 2), C_YTD_PAYMENT NUMBER(12, 2), C_PAYMENT_CNT NUMBER(8, 0), C_DELIVERY_CNT NUMBER(8, 0), C_DATA VARCHAR2(500))" @@ -1668,10 +1676,21 @@ proc CreateTables { lda ora_compatible num_part } { } else { set sql(9) "CREATE TABLE ORDER_LINE (OL_DELIVERY_D TIMESTAMP WITH TIME ZONE, OL_O_ID INTEGER NOT NULL, OL_W_ID INTEGER NOT NULL, OL_I_ID INTEGER NOT NULL, OL_SUPPLY_W_ID INTEGER NOT NULL, OL_D_ID SMALLINT NOT NULL, OL_NUMBER SMALLINT NOT NULL, OL_QUANTITY SMALLINT NOT NULL, OL_AMOUNT NUMERIC(6,2), OL_DIST_INFO CHARACTER(24), CONSTRAINT ORDER_LINE_I1 PRIMARY KEY (OL_W_ID, OL_D_ID, OL_O_ID, OL_NUMBER)) PARTITION BY HASH (OL_W_ID)" } + if { $citus_compatible eq "true" } { + set sql(10) "SELECT create_distributed_table('customer', 'c_w_id')" + set sql(11) "SELECT create_distributed_table('district', 'd_w_id')" + set sql(12) "SELECT create_distributed_table('history', 'h_w_id')" + set sql(13) "SELECT create_distributed_table('warehouse', 'w_id')" + set sql(14) "SELECT create_distributed_table('stock', 's_w_id')" + set sql(15) "SELECT create_distributed_table('new_order', 'no_w_id')" + set sql(16) "SELECT create_distributed_table('orders', 'o_w_id')" + set sql(17) "SELECT create_distributed_table('order_line', 'ol_w_id')" + set sql(18) "SELECT create_reference_table('item')" + } } - for { set i 1 } { $i <= 9 } { incr i } { + for { set i 1 } { $i <= [array size sql] } { incr i } { set result [ pg_exec $lda $sql($i) ] - if {[pg_result $result -status] != "PGRES_COMMAND_OK"} { + if {[pg_result $result -status] ni {"PGRES_TUPLES_OK" "PGRES_COMMAND_OK"}} { error "[pg_result $result -error]" } else { pg_result $result -clear @@ -2064,7 +2083,7 @@ proc LoadOrd { lda ware_start count_ware MAXITEMS ORD_PER_DIST DIST_PER_WARE ora pg_result $result -clear return } -proc do_tpcc { host port sslmode count_ware superuser superuser_password defaultdb db tspace user password ora_compatible pg_storedprocs partition num_vu } { +proc do_tpcc { host port sslmode count_ware superuser superuser_password defaultdb db tspace user password ora_compatible citus_compatible pg_storedprocs partition num_vu } { set MAXITEMS 100000 set CUST_PER_DIST 3000 set DIST_PER_WARE 10 @@ -2116,7 +2135,7 @@ proc do_tpcc { host port sslmode count_ware superuser superuser_password default } else { set num_part 0 } - CreateTables $lda $ora_compatible $num_part + CreateTables $lda $ora_compatible $citus_compatible $num_part set result [ pg_exec $lda "commit" ] pg_result $result -clear } @@ -2184,7 +2203,7 @@ proc do_tpcc { host port sslmode count_ware superuser superuser_password default } if { $threaded eq "SINGLE-THREADED" || $threaded eq "MULTI-THREADED" && $myposition eq 1 } { CreateIndexes $lda - CreateStoredProcs $lda $ora_compatible $pg_storedprocs + CreateStoredProcs $lda $ora_compatible $citus_compatible $pg_storedprocs GatherStatistics $lda puts "[ string toupper $user ] SCHEMA COMPLETE" pg_disconnect $lda @@ -2192,7 +2211,7 @@ proc do_tpcc { host port sslmode count_ware superuser superuser_password default } } } - .ed_mainFrame.mainwin.textFrame.left.text fastinsert end "do_tpcc $pg_host $pg_port $pg_sslmode $pg_count_ware $pg_superuser $pg_superuserpass $pg_defaultdbase $pg_dbase $pg_tspace $pg_user $pg_pass $pg_oracompat $pg_storedprocs $pg_partition $pg_num_vu" + .ed_mainFrame.mainwin.textFrame.left.text fastinsert end "do_tpcc $pg_host $pg_port $pg_sslmode $pg_count_ware $pg_superuser $pg_superuserpass $pg_defaultdbase $pg_dbase $pg_tspace $pg_user $pg_pass $pg_oracompat $pg_cituscompat $pg_storedprocs $pg_partition $pg_num_vu" } else { return } } diff --git a/src/postgresql/pgopt.tcl b/src/postgresql/pgopt.tcl index a47de254..a04dc2f1 100755 --- a/src/postgresql/pgopt.tcl +++ b/src/postgresql/pgopt.tcl @@ -205,7 +205,7 @@ proc configpgtpcc {option} { setlocaltpccvars $configpostgresql #set matching fields in dialog to temporary dict variable pgfields - set pgfields [ dict create connection {pg_host {.tpc.f1.e1 get} pg_port {.tpc.f1.e2 get} pg_sslmode $pg_sslmode} tpcc {pg_superuser {.tpc.f1.e3 get} pg_superuserpass {.tpc.f1.e4 get} pg_defaultdbase {.tpc.f1.e5 get} pg_user {.tpc.f1.e6 get} pg_pass {.tpc.f1.e7 get} pg_dbase {.tpc.f1.e8 get} pg_tspace {.tpc.f1.e8a get} pg_total_iterations {.tpc.f1.e15 get} pg_rampup {.tpc.f1.e21 get} pg_duration {.tpc.f1.e22 get} pg_async_client {.tpc.f1.e26 get} pg_async_delay {.tpc.f1.e27 get} pg_count_ware $pg_count_ware pg_vacuum $pg_vacuum pg_dritasnap $pg_dritasnap pg_oracompat $pg_oracompat pg_storedprocs $pg_storedprocs pg_partition $pg_partition pg_num_vu $pg_num_vu pg_total_iterations $pg_total_iterations pg_raiseerror $pg_raiseerror pg_keyandthink $pg_keyandthink pg_driver $pg_driver pg_rampup $pg_rampup pg_duration $pg_duration pg_allwarehouse $pg_allwarehouse pg_timeprofile $pg_timeprofile pg_async_scale $pg_async_scale pg_connect_pool $pg_connect_pool pg_async_verbose $pg_async_verbose}] + set pgfields [ dict create connection {pg_host {.tpc.f1.e1 get} pg_port {.tpc.f1.e2 get} pg_sslmode $pg_sslmode} tpcc {pg_superuser {.tpc.f1.e3 get} pg_superuserpass {.tpc.f1.e4 get} pg_defaultdbase {.tpc.f1.e5 get} pg_user {.tpc.f1.e6 get} pg_pass {.tpc.f1.e7 get} pg_dbase {.tpc.f1.e8 get} pg_tspace {.tpc.f1.e8a get} pg_total_iterations {.tpc.f1.e15 get} pg_rampup {.tpc.f1.e21 get} pg_duration {.tpc.f1.e22 get} pg_async_client {.tpc.f1.e26 get} pg_async_delay {.tpc.f1.e27 get} pg_count_ware $pg_count_ware pg_vacuum $pg_vacuum pg_dritasnap $pg_dritasnap pg_oracompat $pg_oracompat pg_cituscompat $pg_cituscompat pg_storedprocs $pg_storedprocs pg_partition $pg_partition pg_num_vu $pg_num_vu pg_total_iterations $pg_total_iterations pg_raiseerror $pg_raiseerror pg_keyandthink $pg_keyandthink pg_driver $pg_driver pg_rampup $pg_rampup pg_duration $pg_duration pg_allwarehouse $pg_allwarehouse pg_timeprofile $pg_timeprofile pg_async_scale $pg_async_scale pg_connect_pool $pg_connect_pool pg_async_verbose $pg_async_verbose}] set whlist [ get_warehouse_list_for_spinbox ] if { $pg_oracompat eq "true" } { if { $pg_port eq "5432" } { set pg_port "5444" } @@ -308,17 +308,48 @@ proc configpgtpcc {option} { grid $Prompt -column 0 -row 10 -sticky e grid $Name -column 1 -row 10 -sticky w bind .tpc.f1.e9