Skip to content

Commit

Permalink
Internal: store tx_hash in rejected messages table
Browse files Browse the repository at this point in the history
Problem: if a confirmed message is rejected, we lose the tx hash
information.

Solution: store the tx hash as a separate column.
  • Loading branch information
odesenfans committed Jul 30, 2023
1 parent c8d1998 commit f89162b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""tx_hash in rejected messages
Revision ID: 3bf484f2cc95
Revises: f9fa39b6bdef
Create Date: 2023-07-31 00:08:17.990537
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "3bf484f2cc95"
down_revision = "f9fa39b6bdef"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("rejected_messages", sa.Column("tx_hash", sa.String(), nullable=True))
op.create_foreign_key(None, "rejected_messages", "chain_txs", ["tx_hash"], ["hash"])
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"rejected_messages_tx_hash_fkey", "rejected_messages", type_="foreignkey"
)
op.drop_column("rejected_messages", "tx_hash")
# ### end Alembic commands ###
11 changes: 11 additions & 0 deletions src/aleph/db/accessors/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,20 +398,23 @@ def make_upsert_rejected_message_statement(
error_code: int,
details: Optional[Mapping[str, Any]] = None,
exc_traceback: Optional[str] = None,
tx_hash: Optional[str] = None,
) -> Insert:
insert_rejected_message_stmt = insert(RejectedMessageDb).values(
item_hash=item_hash,
message=pending_message_dict,
error_code=error_code,
details=details,
traceback=exc_traceback,
tx_hash=tx_hash,
)
upsert_rejected_message_stmt = insert_rejected_message_stmt.on_conflict_do_update(
constraint="rejected_messages_pkey",
set_={
"error_code": insert_rejected_message_stmt.excluded.error_code,
"details": details,
"traceback": insert_rejected_message_stmt.excluded.traceback,
"tx_hash": tx_hash,
},
)
return upsert_rejected_message_stmt
Expand All @@ -422,6 +425,7 @@ def mark_pending_message_as_rejected(
item_hash: str,
pending_message_dict: Mapping[str, Any],
exception: BaseException,
tx_hash: Optional[str],
) -> RejectedMessageDb:
if isinstance(exception, MessageProcessingException):
error_code = exception.error_code
Expand Down Expand Up @@ -449,6 +453,7 @@ def mark_pending_message_as_rejected(
details=details,
error_code=error_code,
exc_traceback=exc_traceback,
tx_hash=tx_hash,
)

session.execute(upsert_status_stmt)
Expand All @@ -460,6 +465,7 @@ def mark_pending_message_as_rejected(
traceback=exc_traceback,
error_code=error_code,
details=details,
tx_hash=tx_hash,
)


Expand All @@ -468,6 +474,7 @@ def reject_new_pending_message(
session: DbSession,
pending_message: Mapping[str, Any],
exception: BaseException,
tx_hash: Optional[str],
) -> None:
...

Expand All @@ -477,6 +484,7 @@ def reject_new_pending_message(
session: DbSession,
pending_message: PendingMessageDb,
exception: BaseException,
tx_hash: Optional[str],
) -> None:
...

Expand All @@ -485,6 +493,7 @@ def reject_new_pending_message(
session: DbSession,
pending_message: Union[Mapping[str, Any], PendingMessageDb],
exception: BaseException,
tx_hash: Optional[str],
) -> Optional[RejectedMessageDb]:
"""
Reject a pending message that is not yet in the DB.
Expand Down Expand Up @@ -526,6 +535,7 @@ def reject_new_pending_message(
item_hash=item_hash,
pending_message_dict=pending_message_dict,
exception=exception,
tx_hash=tx_hash,
)


Expand Down Expand Up @@ -563,6 +573,7 @@ def reject_existing_pending_message(
item_hash=item_hash,
pending_message_dict=pending_message_dict,
exception=exception,
tx_hash=pending_message.tx_hash,
)
delete_pending_message(session=session, pending_message=pending_message)
return rejected_message
Expand Down
1 change: 1 addition & 0 deletions src/aleph/db/models/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,4 @@ class RejectedMessageDb(Base):
)
details: Optional[Dict[str, Any]] = Column(JSONB, nullable=True)
traceback: Optional[str] = Column(String, nullable=True)
tx_hash: Optional[str] = Column(ForeignKey("chain_txs.hash"), nullable=True)
12 changes: 9 additions & 3 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ async def add_pending_message(
tx_hash: Optional[str] = None,
check_message: bool = True,
) -> Optional[PendingMessageDb]:

# TODO: this implementation is just messy, improve it.
with self.session_factory() as session:
try:
Expand All @@ -184,7 +183,10 @@ async def add_pending_message(
except InvalidMessageException as e:
LOGGER.warning(e)
reject_new_pending_message(
session=session, pending_message=message_dict, exception=e
session=session,
pending_message=message_dict,
exception=e,
tx_hash=tx_hash,
)
session.commit()
return None
Expand All @@ -203,7 +205,10 @@ async def add_pending_message(
except InvalidMessageException as e:
LOGGER.warning("Invalid message: %s - %s", message.item_hash, str(e))
reject_new_pending_message(
session=session, pending_message=message_dict, exception=e
session=session,
pending_message=message_dict,
exception=e,
tx_hash=tx_hash,
)
session.commit()
return None
Expand Down Expand Up @@ -235,6 +240,7 @@ async def add_pending_message(
session=session,
pending_message=message_dict,
exception=e,
tx_hash=tx_hash,
)
session.commit()
return None
Expand Down

0 comments on commit f89162b

Please sign in to comment.