Skip to content

Commit

Permalink
Try some different locking
Browse files Browse the repository at this point in the history
  • Loading branch information
mariano54 committed Nov 1, 2021
1 parent bf4a33d commit c44e676
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
1 change: 0 additions & 1 deletion chia/consensus/block_body_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ async def validate_block_body(
assert height == block.height
prev_transaction_block_height: uint32 = uint32(0)


# 1. For non transaction-blocs: foliage block, transaction filter, transactions info, and generator must
# be empty. If it is a block but not a transaction block, there is no body to validate. Check that all fields are
# None
Expand Down
35 changes: 18 additions & 17 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ class FullNode:
initialized: bool
weight_proof_handler: Optional[WeightProofHandler]
_ui_tasks: Set[asyncio.Task]
_mempool_lock_queue: LockQueue
_mempool_lock_high_priority: LockClient
_mempool_lock_low_priority: LockClient
_blockchain_lock_queue: LockQueue
_blockchain_lock_ultra_priority: LockClient
_blockchain_lock_high_priority: LockClient
_blockchain_lock_low_priority: LockClient

def __init__(
self,
Expand Down Expand Up @@ -162,9 +163,10 @@ def sql_trace_callback(req: str):
start_time = time.time()
self.blockchain = await Blockchain.create(self.coin_store, self.block_store, self.constants, self.hint_store)
self.mempool_manager = MempoolManager(self.coin_store, self.constants)
self._mempool_lock_queue = LockQueue(asyncio.Lock())
self._mempool_lock_high_priority = LockClient(0, self._mempool_lock_queue)
self._mempool_lock_low_priority = LockClient(1, self._mempool_lock_queue)
self._blockchain_lock_queue = LockQueue(self.blockchain.lock)
self._blockchain_lock_ultra_priority = LockClient(0, self._blockchain_lock_queue)
self._blockchain_lock_high_priority = LockClient(1, self._blockchain_lock_queue)
self._blockchain_lock_low_priority = LockClient(2, self._blockchain_lock_queue)
self.weight_proof_handler = None
self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())

Expand All @@ -182,7 +184,7 @@ def sql_trace_callback(req: str):
f" {self.blockchain.get_peak().height}, "
f"time taken: {int(time_taken)}s"
)
async with self._mempool_lock_high_priority:
async with self._blockchain_lock_high_priority:
pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_peak())
assert len(pending_tx) == 0 # no pending transactions when starting up

Expand Down Expand Up @@ -303,7 +305,7 @@ async def short_sync_batch(self, peer: ws.WSChiaConnection, start_height: uint32
response = await peer.request_blocks(request)
if not response:
raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}")
async with self.blockchain.lock:
async with self._blockchain_lock_high_priority:
success, advanced_peak, fork_height, coin_changes = await self.receive_block_batch(
response.blocks, peer, None
)
Expand Down Expand Up @@ -763,7 +765,7 @@ async def _sync(self):
self.sync_store.set_sync_mode(True)
self._state_changed("sync_mode")
# Ensures that the fork point does not change
async with self.blockchain.lock:
async with self._blockchain_lock_high_priority:
await self.blockchain.warmup(fork_point)
await self.sync_from_fork_point(fork_point, heaviest_peak_height, heaviest_peak_hash, summaries)
except asyncio.CancelledError:
Expand Down Expand Up @@ -1009,7 +1011,7 @@ async def _finish_sync(self):
return None

peak: Optional[BlockRecord] = self.blockchain.get_peak()
async with self.blockchain.lock:
async with self._blockchain_lock_high_priority:
await self.sync_store.clear_sync_info()

peak_fb: FullBlock = await self.blockchain.get_full_peak()
Expand Down Expand Up @@ -1170,10 +1172,9 @@ async def peak_post_processing(
)

# Update the mempool (returns successful pending transactions added to the mempool)
async with self._mempool_lock_high_priority:
mempool_new_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]] = await self.mempool_manager.new_peak(
self.blockchain.get_peak()
)
mempool_new_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]] = await self.mempool_manager.new_peak(
self.blockchain.get_peak()
)

# Check if we detected a spent transaction, to load up our generator cache
if block.transactions_generator is not None and self.full_node_store.previous_generator is None:
Expand Down Expand Up @@ -1331,7 +1332,7 @@ async def respond_block(
return await self.respond_block(block_response, peer)
coin_changes: Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]] = ([], {})
mempool_new_peak_result, fns_peak_result = None, None
async with self.blockchain.lock:
async with self._blockchain_lock_high_priority:
# After acquiring the lock, check again, because another asyncio thread might have added it
if self.blockchain.contains_block(header_hash):
return None
Expand Down Expand Up @@ -1499,7 +1500,7 @@ async def respond_unfinished_block(
self.log.warning("Too many blocks added, not adding block")
return None

async with self.blockchain.lock:
async with self._blockchain_lock_high_priority:
# TODO: pre-validate VDFs outside of lock
validation_start = time.time()
validate_result = await self.blockchain.validate_unfinished_block(block)
Expand Down Expand Up @@ -1816,7 +1817,7 @@ async def respond_transaction(
except Exception as e:
self.mempool_manager.remove_seen(spend_name)
raise e
async with self._mempool_lock_low_priority:
async with self._blockchain_lock_low_priority:
if self.mempool_manager.get_spendbundle(spend_name) is not None:
self.mempool_manager.remove_seen(spend_name)
return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION
Expand Down
2 changes: 1 addition & 1 deletion chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ async def declare_proof_of_space(
block_generator: Optional[BlockGenerator] = None
additions: Optional[List[Coin]] = []
removals: Optional[List[Coin]] = []
async with self.full_node.blockchain.lock:
async with self.full_node._blockchain_lock_high_priority:
peak: Optional[BlockRecord] = self.full_node.blockchain.get_peak()
if peak is not None:
# Finds the last transaction block before this one
Expand Down
2 changes: 1 addition & 1 deletion chia/rpc/full_node_rpc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ async def get_additions_and_removals(self, request: Dict) -> Optional[Dict]:
if block is None:
raise ValueError(f"Block {header_hash.hex()} not found")

async with self.service.blockchain.lock:
async with self.service._blockchain_lock_low_priority:
if self.service.blockchain.height_to_hash(block.height) != header_hash:
raise ValueError(f"Block at {header_hash.hex()} is no longer in the blockchain (it's in a fork)")
additions: List[CoinRecord] = await self.service.coin_store.get_coins_added_at_height(block.height)
Expand Down
4 changes: 2 additions & 2 deletions chia/simulator/full_node_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def get_all_full_blocks(self) -> List[FullBlock]:

@api_request
async def farm_new_transaction_block(self, request: FarmNewBlockProtocol):
async with self.full_node.blockchain.lock:
async with self.full_node._blockchain_lock_high_priority:
self.log.info("Farming new block!")
current_blocks = await self.get_all_full_blocks()
if len(current_blocks) == 0:
Expand Down Expand Up @@ -80,7 +80,7 @@ async def farm_new_transaction_block(self, request: FarmNewBlockProtocol):

@api_request
async def farm_new_block(self, request: FarmNewBlockProtocol):
async with self.full_node.blockchain.lock:
async with self.full_node._blockchain_lock_high_priority:
self.log.info("Farming new block!")
current_blocks = await self.get_all_full_blocks()
if len(current_blocks) == 0:
Expand Down

0 comments on commit c44e676

Please sign in to comment.