Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shutdown the weight proof process pool #10163

Merged
merged 9 commits into from
Feb 15, 2022
64 changes: 32 additions & 32 deletions chia/full_node/weight_proof.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,43 +605,43 @@ async def validate_weight_proof(self, weight_proof: WeightProof) -> Tuple[bool,
log.error("failed weight proof sub epoch sample validation")
return False, uint32(0), []

executor = ProcessPoolExecutor(self._num_processes)
constants, summary_bytes, wp_segment_bytes, wp_recent_chain_bytes = vars_to_bytes(
self.constants, summaries, weight_proof
)

recent_blocks_validation_task = asyncio.get_running_loop().run_in_executor(
executor, _validate_recent_blocks, constants, wp_recent_chain_bytes, summary_bytes
)

segments_validated, vdfs_to_validate = _validate_sub_epoch_segments(
constants, rng, wp_segment_bytes, summary_bytes
)
if not segments_validated:
return False, uint32(0), []

vdf_chunks = chunks(vdfs_to_validate, self._num_processes)
vdf_tasks = []
for chunk in vdf_chunks:
byte_chunks = []
for vdf_proof, classgroup, vdf_info in chunk:
byte_chunks.append((bytes(vdf_proof), bytes(classgroup), bytes(vdf_info)))
with ProcessPoolExecutor(self._num_processes) as executor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we spin up these processes every time we validate a weight proof? It seems like we should just keep this ProcessPoolExecutor around, but maybe we only validate weight proofs very rarely.

Either way, probably not for this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I asked about this and Mariano said it was fine to just recreate the pool.

constants, summary_bytes, wp_segment_bytes, wp_recent_chain_bytes = vars_to_bytes(
self.constants, summaries, weight_proof
)

vdf_task = asyncio.get_running_loop().run_in_executor(executor, _validate_vdf_batch, constants, byte_chunks)
vdf_tasks.append(vdf_task)
recent_blocks_validation_task = asyncio.get_running_loop().run_in_executor(
executor, _validate_recent_blocks, constants, wp_recent_chain_bytes, summary_bytes
)

for vdf_task in vdf_tasks:
validated = await vdf_task
if not validated:
segments_validated, vdfs_to_validate = _validate_sub_epoch_segments(
constants, rng, wp_segment_bytes, summary_bytes
)
if not segments_validated:
return False, uint32(0), []

valid_recent_blocks_task = recent_blocks_validation_task
valid_recent_blocks = await valid_recent_blocks_task
if not valid_recent_blocks:
log.error("failed validating weight proof recent blocks")
return False, uint32(0), []
vdf_chunks = chunks(vdfs_to_validate, self._num_processes)
vdf_tasks = []
for chunk in vdf_chunks:
byte_chunks = []
for vdf_proof, classgroup, vdf_info in chunk:
byte_chunks.append((bytes(vdf_proof), bytes(classgroup), bytes(vdf_info)))

vdf_task = asyncio.get_running_loop().run_in_executor(executor, _validate_vdf_batch, constants, byte_chunks)
vdf_tasks.append(vdf_task)

for vdf_task in vdf_tasks:
validated = await vdf_task
if not validated:
return False, uint32(0), []

valid_recent_blocks_task = recent_blocks_validation_task
valid_recent_blocks = await valid_recent_blocks_task
if not valid_recent_blocks:
log.error("failed validating weight proof recent blocks")
return False, uint32(0), []

return True, self.get_fork_point(summaries), summaries
return True, self.get_fork_point(summaries), summaries

def get_fork_point(self, received_summaries: List[SubEpochSummary]) -> uint32:
# iterate through sub epoch summaries to find fork point
Expand Down