Skip to content

Commit

Permalink
Merge pull request ClickHouse#10757 from ClickHouse/fix-parallel-mv
Browse files Browse the repository at this point in the history
Fix parallel MV

(cherry picked from commit ef1c7da)
  • Loading branch information
alesapin authored and abyss7 committed May 18, 2020
1 parent ca84f23 commit f350855
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
else
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr());

views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out)});
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr});
}

/* Do not push to destination table if the flag is set */
Expand Down Expand Up @@ -161,7 +161,12 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
{
// Process sequentially
for (size_t view_num = 0; view_num < views.size(); ++view_num)
{
process(block, view_num);

if (views[view_num].exception)
std::rethrow_exception(views[view_num].exception);
}
}
}

Expand Down Expand Up @@ -189,8 +194,18 @@ void PushingToViewsBlockOutputStream::writeSuffix()
if (output)
output->writeSuffix();

std::exception_ptr first_exception;

for (auto & view : views)
{
if (view.exception)
{
if (!first_exception)
first_exception = view.exception;

continue;
}

try
{
view.out->writeSuffix();
Expand All @@ -201,6 +216,9 @@ void PushingToViewsBlockOutputStream::writeSuffix()
throw;
}
}

if (first_exception)
std::rethrow_exception(first_exception);
}

void PushingToViewsBlockOutputStream::flush()
Expand Down Expand Up @@ -268,7 +286,11 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
throw;
view.exception = std::current_exception();
}
catch (...)
{
view.exception = std::current_exception();
}
}

Expand Down
1 change: 1 addition & 0 deletions src/DataStreams/PushingToViewsBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PushingToViewsBlockOutputStream : public IBlockOutputStream
ASTPtr query;
StorageID table_id;
BlockOutputStreamPtr out;
std::exception_ptr exception;
};

std::vector<ViewInfo> views;
Expand Down
4 changes: 4 additions & 0 deletions tests/queries/0_stateless/01275_parallel_mv.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
10
10
0
10
18 changes: 18 additions & 0 deletions tests/queries/0_stateless/01275_parallel_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
drop table if exists testX;
drop table if exists testXA;
drop table if exists testXB;
drop table if exists testXC;

create table testX (A Int64) engine=MergeTree order by tuple();

create materialized view testXA engine=MergeTree order by tuple() as select sleep(1) from testX;
create materialized view testXB engine=MergeTree order by tuple() as select sleep(2), throwIf(A=1) from testX;
create materialized view testXC engine=MergeTree order by tuple() as select sleep(1) from testX;

set parallel_view_processing=1;
insert into testX select number from numbers(10); -- {serverError 395}

select count() from testX;
select count() from testXA;
select count() from testXB;
select count() from testXC;

0 comments on commit f350855

Please sign in to comment.