diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index d1086092f7e2..43ab9dca535d 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -528,9 +528,9 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - - label: "S3 source and sink on parquet file" - key: "s3-source-and-sink-parquet-encode" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py" + - label: "S3 sink on parquet and json file" + key: "s3-sink-parquet-and-json-encode" + command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink.py" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" diff --git a/e2e_test/s3/fs_parquet_source_and_sink.py b/e2e_test/s3/fs_sink.py similarity index 71% rename from e2e_test/s3/fs_parquet_source_and_sink.py rename to e2e_test/s3/fs_sink.py index 1bf7de6ad6ec..344b1b807d7e 100644 --- a/e2e_test/s3/fs_parquet_source_and_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -116,7 +116,7 @@ def _table(): return 's3_test_parquet' # Execute a SELECT statement - cur.execute(f'''CREATE sink test_file_sink as select + cur.execute(f'''CREATE sink test_file_sink_parquet as select id, name, sex, @@ -138,15 +138,15 @@ def _table(): s3.credentials.access = 'hummockadmin', s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', - s3.path = '', + s3.path = 'test_parquet_sink/', s3.file_type = 'parquet', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') - print('Sink into s3...') + print('Sink into s3 in parquet encode...') # Execute a SELECT statement - cur.execute(f'''CREATE TABLE test_sink_table( + cur.execute(f'''CREATE TABLE test_parquet_sink_table( id bigint primary key, name TEXT, sex bigint, @@ -162,7 +162,7 @@ def _table(): test_timestamptz timestamptz, ) WITH ( connector = 's3', - match_pattern = '*.parquet', + match_pattern = 'test_parquet_sink/*.parquet', s3.region_name = 'custom', s3.bucket_name = 'hummock001', s3.credentials.access = 'hummockadmin', @@ -173,14 +173,82 @@ def _table(): total_rows = file_num * item_num_per_file MAX_RETRIES = 40 for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from test_sink_table') + cur.execute(f'select count(*) from test_parquet_sink_table') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") + sleep(10) + + stmt = f'select count(*), sum(id) from test_parquet_sink_table' + print(f'Execute reading sink files: {stmt}') + + # Execute a SELECT statement + cur.execute(f'''CREATE sink test_file_sink_json as select + id, + name, + sex, + mark, + test_int, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp, + test_timestamptz + from {_table()} WITH ( + connector = 's3', + match_pattern = '*.parquet', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_json_sink/', + s3.file_type = 'json', + type = 'append-only', + force_append_only='true' + ) FORMAT PLAIN ENCODE JSON(force_append_only='true');''') + + print('Sink into s3 in json encode...') + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE test_json_sink_table( + id bigint primary key, + name TEXT, + sex bigint, + mark bigint, + test_int int, + test_real real, + test_double_precision double precision, + test_varchar varchar, + test_bytea bytea, + test_date date, + test_time time, + test_timestamp timestamp, + test_timestamptz timestamptz, + ) WITH ( + connector = 's3', + match_pattern = 'test_json_sink/*.json', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + ) FORMAT PLAIN ENCODE JSON;''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from test_json_sink_table') result = cur.fetchone() if result[0] == total_rows: break print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") sleep(10) - stmt = f'select count(*), sum(id) from test_sink_table' + stmt = f'select count(*), sum(id) from test_json_sink_table' print(f'Execute reading sink files: {stmt}') cur.execute(stmt) result = cur.fetchone() @@ -194,8 +262,10 @@ def _assert_eq(field, got, expect): _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) print('File sink test pass!') - cur.execute(f'drop sink test_file_sink') - cur.execute(f'drop table test_sink_table') + cur.execute(f'drop sink test_file_sink_parquet') + cur.execute(f'drop table test_parquet_sink_table') + cur.execute(f'drop sink test_file_sink_json') + cur.execute(f'drop table test_json_sink_table') cur.close() conn.close()