Skip to content

Commit

Permalink
bridgev2/backfill: add option for aggressive deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Aug 28, 2024
1 parent 838237d commit fd89457
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 7 deletions.
42 changes: 42 additions & 0 deletions bridgev2/networkinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,43 @@ type BackfillMessage struct {
LastThreadMessage networkid.MessageID
}

var (
_ RemoteMessageWithTransactionID = (*BackfillMessage)(nil)
_ RemoteEventWithTimestamp = (*BackfillMessage)(nil)
)

func (b *BackfillMessage) GetType() RemoteEventType {
return RemoteEventMessage
}

func (b *BackfillMessage) GetPortalKey() networkid.PortalKey {
panic("GetPortalKey called for BackfillMessage")
}

func (b *BackfillMessage) AddLogContext(c zerolog.Context) zerolog.Context {
return c
}

func (b *BackfillMessage) GetSender() EventSender {
return b.Sender
}

func (b *BackfillMessage) GetID() networkid.MessageID {
return b.ID
}

func (b *BackfillMessage) GetTransactionID() networkid.TransactionID {
return b.TxnID
}

func (b *BackfillMessage) GetTimestamp() time.Time {
return b.Timestamp
}

func (b *BackfillMessage) ConvertMessage(ctx context.Context, portal *Portal, intent MatrixAPI) (*ConvertedMessage, error) {
return b.ConvertedMessage, nil
}

// FetchMessagesResponse contains the response for a message history pagination request.
type FetchMessagesResponse struct {
// The messages to backfill. Messages should always be sorted in chronological order (oldest to newest).
Expand All @@ -440,6 +477,11 @@ type FetchMessagesResponse struct {
// to mark the messages as read immediately after backfilling.
MarkRead bool

// Should the bridge check each message against the database to ensure it's not a duplicate before bridging?
// By default, the bridge will only drop messages that are older than the last bridged message for forward backfills,
// or newer than the first for backward.
AggressiveDeduplication bool

// When HasMore is true, one of the following fields can be set to report backfill progress:

// Approximate backfill progress as a number between 0 and 1.
Expand Down
42 changes: 35 additions & 7 deletions bridgev2/portalbackfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin,
log.Debug().Msg("No messages to backfill")
return
}
// TODO check pending messages
// TODO mark backfill queue task as done if last message is nil (-> room was empty) and HasMore is false?
resp.Messages = cutoffMessages(&log, resp.Messages, true, lastMessage)
resp.Messages = portal.cutoffMessages(ctx, resp.Messages, resp.AggressiveDeduplication, true, lastMessage)
if len(resp.Messages) == 0 {
log.Warn().Msg("No messages left to backfill after cutting off old messages")
return
Expand Down Expand Up @@ -126,7 +125,7 @@ func (portal *Portal) DoBackwardsBackfill(ctx context.Context, source *UserLogin
}
return nil
}
resp.Messages = cutoffMessages(log, resp.Messages, false, firstMessage)
resp.Messages = portal.cutoffMessages(ctx, resp.Messages, resp.AggressiveDeduplication, false, firstMessage)
if len(resp.Messages) == 0 {
return fmt.Errorf("no messages left to backfill after cutting off too new messages")
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (portal *Portal) fetchThreadBackfill(ctx context.Context, source *UserLogin
log.Debug().Msg("No messages to backfill")
return nil
}
resp.Messages = cutoffMessages(log, resp.Messages, true, anchor)
resp.Messages = portal.cutoffMessages(ctx, resp.Messages, resp.AggressiveDeduplication, true, anchor)
if len(resp.Messages) == 0 {
log.Warn().Msg("No messages left to backfill after cutting off old messages")
return nil
Expand All @@ -182,7 +181,7 @@ func (portal *Portal) doThreadBackfill(ctx context.Context, source *UserLogin, t
}
}

func cutoffMessages(log *zerolog.Logger, messages []*BackfillMessage, forward bool, lastMessage *database.Message) []*BackfillMessage {
func (portal *Portal) cutoffMessages(ctx context.Context, messages []*BackfillMessage, aggressiveDedup, forward bool, lastMessage *database.Message) []*BackfillMessage {
if lastMessage == nil {
return messages
}
Expand All @@ -196,7 +195,7 @@ func cutoffMessages(log *zerolog.Logger, messages []*BackfillMessage, forward bo
}
}
if cutoff != -1 {
log.Debug().
zerolog.Ctx(ctx).Debug().
Int("cutoff_count", cutoff+1).
Int("total_count", len(messages)).
Time("last_bridged_ts", lastMessage.Timestamp).
Expand All @@ -213,14 +212,43 @@ func cutoffMessages(log *zerolog.Logger, messages []*BackfillMessage, forward bo
}
}
if cutoff != -1 {
log.Debug().
zerolog.Ctx(ctx).Debug().
Int("cutoff_count", len(messages)-cutoff).
Int("total_count", len(messages)).
Time("oldest_bridged_ts", lastMessage.Timestamp).
Msg("Cutting off backward backfill messages newer than oldest bridged message")
messages = messages[:cutoff]
}
}
if aggressiveDedup {
filteredMessages := messages[:0]
for _, msg := range messages {
existingMsg, err := portal.Bridge.DB.Message.GetFirstPartByID(ctx, portal.Receiver, msg.ID)
if err != nil {
zerolog.Ctx(ctx).Err(err).Str("message_id", string(msg.ID)).Msg("Failed to check for existing message")
} else if existingMsg != nil {
zerolog.Ctx(ctx).Err(err).
Str("message_id", string(msg.ID)).
Time("message_ts", msg.Timestamp).
Str("message_sender", string(msg.Sender.Sender)).
Msg("Ignoring duplicate message in backfill")
continue
}
if forward && msg.TxnID != "" {
wasPending, _ := portal.checkPendingMessage(ctx, msg)
if wasPending {
zerolog.Ctx(ctx).Err(err).
Str("transaction_id", string(msg.TxnID)).
Str("message_id", string(msg.ID)).
Time("message_ts", msg.Timestamp).
Msg("Found pending message in backfill")
continue
}
}
filteredMessages = append(filteredMessages, msg)
}
messages = filteredMessages
}
return messages
}

Expand Down

0 comments on commit fd89457

Please sign in to comment.