diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index cb5aa69fd..dd35e445d 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -19,6 +19,7 @@ from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..exceptions import InvalidMessageError from ..schemas.pending_messages import parse_message LOGGER = getLogger("jobs.pending_messages") @@ -49,7 +50,15 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: - message = parse_message(pending["message"]) + delete_pending_message_op = DbBulkOperation( + PendingMessage, DeleteOne({"_id": pending["_id"]}) + ) + + try: + message = parse_message(pending["message"]) + except InvalidMessageError: + # If an invalid message somehow ended in pending messages, drop it. + return [delete_pending_message_op] async with sem: status, operations = await incoming( @@ -64,9 +73,7 @@ async def handle_pending_message( ) if status != IncomingStatus.RETRYING_LATER: - operations.append( - DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) - ) + operations.append(delete_pending_message_op) return operations