Skip to content

Commit

Permalink
add fs sink json encode test
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Sep 29, 2024
1 parent fbfe796 commit 944bf12
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 12 deletions.
6 changes: 3 additions & 3 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,9 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source and sink on parquet file"
key: "s3-source-and-sink-parquet-encode"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py"
- label: "S3 sink on parquet and json file"
key: "s3-sink-parquet-and-json-encode"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink as select
cur.execute(f'''CREATE sink test_file_sink_parquet as select
id,
name,
sex,
Expand All @@ -138,15 +138,15 @@ def _table():
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = '',
s3.path = 'test_parquet_sink/',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

print('Sink into s3...')
print('Sink into s3 in parquet encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_sink_table(
cur.execute(f'''CREATE TABLE test_parquet_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
Expand All @@ -162,7 +162,7 @@ def _table():
test_timestamptz timestamptz,
) WITH (
connector = 's3',
match_pattern = '*.parquet',
match_pattern = 'test_parquet_sink/*.parquet',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
Expand All @@ -173,14 +173,82 @@ def _table():
total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_sink_table')
cur.execute(f'select count(*) from test_parquet_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_parquet_sink_table'
print(f'Execute reading sink files: {stmt}')

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink_json as select
id,
name,
sex,
mark,
test_int,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_json_sink/',
s3.file_type = 'json',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE JSON(force_append_only='true');''')

print('Sink into s3 in json encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_json_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
) FORMAT PLAIN ENCODE JSON;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_json_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_sink_table'
stmt = f'select count(*), sum(id) from test_json_sink_table'
print(f'Execute reading sink files: {stmt}')
cur.execute(stmt)
result = cur.fetchone()
Expand All @@ -194,8 +262,10 @@ def _assert_eq(field, got, expect):
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('File sink test pass!')
cur.execute(f'drop sink test_file_sink')
cur.execute(f'drop table test_sink_table')
cur.execute(f'drop sink test_file_sink_parquet')
cur.execute(f'drop table test_parquet_sink_table')
cur.execute(f'drop sink test_file_sink_json')
cur.execute(f'drop table test_json_sink_table')
cur.close()
conn.close()

Expand Down

0 comments on commit 944bf12

Please sign in to comment.