Skip to content

Commit

Permalink
feat: Use IMAP APPEND command to upload sync messages (#5845)
Browse files Browse the repository at this point in the history
Why:
- With IMAP APPEND we can upload messages directly to the DeltaChat folder (for non-chatmail
  accounts).
- We can set the `\Seen` flag immediately so that if the user has other MUA, it doesn't alert about
  a new message if it's just a sync message (there were several such reports on the support
  forum). Though this also isn't useful for chatmail.
- We don't need SMTP envelope and overall remove some overhead on processing sync messages.
  • Loading branch information
iequidoo committed Sep 20, 2024
1 parent 0b908db commit d6845bd
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 68 deletions.
15 changes: 3 additions & 12 deletions deltachat-jsonrpc/typescript/test/online.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ describe("online tests", function () {
null
);
const chatId = await dc.rpc.createChatByContactId(accountId1, contactId);
const eventPromise = Promise.race([
waitForEvent(dc, "MsgsChanged", accountId2),
waitForEvent(dc, "IncomingMsg", accountId2),
]);
const eventPromise = waitForEvent(dc, "IncomingMsg", accountId2);

await dc.rpc.miscSendTextMessage(accountId1, chatId, "Hello");
const { chatId: chatIdOnAccountB } = await eventPromise;
Expand Down Expand Up @@ -119,10 +116,7 @@ describe("online tests", function () {
null
);
const chatId = await dc.rpc.createChatByContactId(accountId1, contactId);
const eventPromise = Promise.race([
waitForEvent(dc, "MsgsChanged", accountId2),
waitForEvent(dc, "IncomingMsg", accountId2),
]);
const eventPromise = waitForEvent(dc, "IncomingMsg", accountId2);
dc.rpc.miscSendTextMessage(accountId1, chatId, "Hello2");
// wait for message from A
console.log("wait for message from A");
Expand All @@ -143,10 +137,7 @@ describe("online tests", function () {
);
expect(message.text).equal("Hello2");
// Send message back from B to A
const eventPromise2 = Promise.race([
waitForEvent(dc, "MsgsChanged", accountId1),
waitForEvent(dc, "IncomingMsg", accountId1),
]);
const eventPromise2 = waitForEvent(dc, "IncomingMsg", accountId1);
dc.rpc.miscSendTextMessage(accountId2, chatId, "super secret message");
// Check if answer arrives at A and if it is encrypted
await eventPromise2;
Expand Down
12 changes: 10 additions & 2 deletions python/tests/test_1_online.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,18 @@ def test_move_sync_msgs(acfactory):
ac1 = acfactory.new_online_configuring_account(bcc_self=True, sync_msgs=True, fix_is_chatmail=True)
acfactory.bring_accounts_online()

ac1.direct_imap.select_folder("DeltaChat")
# Sync messages may also be sent during the configuration.
mvbox_msg_cnt = len(ac1.direct_imap.get_all_messages())

ac1.set_config("displayname", "Alice")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
ac1._evtracker.get_matching("DC_EVENT_MSG_DELIVERED")
ac1.set_config("displayname", "Bob")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
ac1._evtracker.get_matching("DC_EVENT_MSG_DELIVERED")
ac1.direct_imap.select_folder("Inbox")
assert len(ac1.direct_imap.get_all_messages()) == 0
ac1.direct_imap.select_folder("DeltaChat")
assert len(ac1.direct_imap.get_all_messages()) == mvbox_msg_cnt + 2


def test_forward_messages(acfactory, lp):
Expand Down
50 changes: 27 additions & 23 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,7 @@ pub(crate) async fn sync(context: &Context, id: SyncId, action: SyncAction) -> R
context
.add_sync_item(SyncData::AlterChat { id, action })
.await?;
context.scheduler.interrupt_smtp().await;
context.scheduler.interrupt_inbox().await;
Ok(())
}

Expand Down Expand Up @@ -2906,10 +2906,9 @@ async fn prepare_send_msg(
create_send_msg_jobs(context, msg).await
}

/// Constructs jobs for sending a message and inserts them into the `smtp` table.
/// Constructs jobs for sending a message and inserts them into the appropriate table.
///
/// Returns row ids if jobs were created or an empty `Vec` otherwise, e.g. when sending to a
/// group with only self and no BCC-to-self configured.
/// Returns row ids if `smtp` table jobs were created or an empty `Vec` otherwise.
///
/// The caller has to interrupt SMTP loop or otherwise process new rows.
pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result<Vec<i64>> {
Expand Down Expand Up @@ -3003,12 +3002,6 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
}
}

if let Some(sync_ids) = rendered_msg.sync_ids_to_delete {
if let Err(err) = context.delete_sync_ids(sync_ids).await {
error!(context, "Failed to delete sync ids: {err:#}.");
}
}

if attach_selfavatar {
if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, now).await {
error!(context, "Failed to set selfavatar timestamp: {err:#}.");
Expand All @@ -3025,19 +3018,30 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
let chunk_size = context.get_max_smtp_rcpt_to().await?;
let trans_fn = |t: &mut rusqlite::Transaction| {
let mut row_ids = Vec::<i64>::new();
for recipients_chunk in recipients.chunks(chunk_size) {
let recipients_chunk = recipients_chunk.join(" ");
let row_id = t.execute(
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \
VALUES (?1, ?2, ?3, ?4)",
(
&rendered_msg.rfc724_mid,
recipients_chunk,
&rendered_msg.message,
msg.id,
),
if let Some(sync_ids) = rendered_msg.sync_ids_to_delete {
t.execute(
&format!("DELETE FROM multi_device_sync WHERE id IN ({sync_ids})"),
(),
)?;
row_ids.push(row_id.try_into()?);
t.execute(
"INSERT INTO imap_send (mime, msg_id) VALUES (?, ?)",
(&rendered_msg.message, msg.id),
)?;
} else {
for recipients_chunk in recipients.chunks(chunk_size) {
let recipients_chunk = recipients_chunk.join(" ");
let row_id = t.execute(
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \
VALUES (?1, ?2, ?3, ?4)",
(
&rendered_msg.rfc724_mid,
recipients_chunk,
&rendered_msg.message,
msg.id,
),
)?;
row_ids.push(row_id.try_into()?);
}
}
Ok(row_ids)
};
Expand Down Expand Up @@ -3793,7 +3797,7 @@ pub(crate) async fn add_contact_to_chat_ex(
.log_err(context)
.is_ok()
{
context.scheduler.interrupt_smtp().await;
context.scheduler.interrupt_inbox().await;
}
}
context.emit_event(EventType::ChatModified(chat_id));
Expand Down
12 changes: 9 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,12 @@ impl Context {
&& !self.get_config_bool(Config::Bot).await?)
}

/// Returns whether sync messages should be uploaded to the mvbox.
pub(crate) async fn should_move_sync_msgs(&self) -> Result<bool> {
Ok(self.get_config_bool(Config::MvboxMove).await?
|| !self.get_config_bool(Config::IsChatmail).await?)
}

/// Returns whether MDNs should be requested.
pub(crate) async fn should_request_mdns(&self) -> Result<bool> {
match self.get_config_bool_opt(Config::MdnsEnabled).await? {
Expand Down Expand Up @@ -792,7 +798,7 @@ impl Context {
{
return Ok(());
}
self.scheduler.interrupt_smtp().await;
self.scheduler.interrupt_inbox().await;
Ok(())
}

Expand Down Expand Up @@ -1175,7 +1181,7 @@ mod tests {
let status = "Synced via usual message";
alice0.set_config(Config::Selfstatus, Some(status)).await?;
alice0.send_sync_msg().await?;
alice0.pop_sent_msg().await;
alice0.pop_sent_sync_msg().await;
let status1 = "Synced via sync message";
alice1.set_config(Config::Selfstatus, Some(status1)).await?;
tcm.send_recv(alice0, alice1, "hi Alice!").await;
Expand All @@ -1199,7 +1205,7 @@ mod tests {
.set_config(Config::Selfavatar, Some(file.to_str().unwrap()))
.await?;
alice0.send_sync_msg().await?;
alice0.pop_sent_msg().await;
alice0.pop_sent_sync_msg().await;
let file = alice1.dir.path().join("avatar.jpg");
let bytes = include_bytes!("../test-data/image/avatar1000x1000.jpg");
tokio::fs::write(&file, bytes).await?;
Expand Down
47 changes: 47 additions & 0 deletions src/imap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::contact::{Contact, ContactId, Modifier, Origin};
use crate::context::Context;
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::log::LogExt;
use crate::login_param::{
prioritize_server_login_params, ConfiguredLoginParam, ConfiguredServerLoginParam,
};
Expand Down Expand Up @@ -1043,6 +1044,52 @@ impl Session {
Ok(())
}

/// Uploads sync messages from the `imap_send` table with `\Seen` flag set.
pub(crate) async fn send_sync_msgs(&mut self, context: &Context, folder: &str) -> Result<()> {
context.send_sync_msg().await?;
while let Some((id, mime, msg_id, attempts)) = context
.sql
.query_row_optional(
"SELECT id, mime, msg_id, attempts FROM imap_send ORDER BY id LIMIT 1",
(),
|row| {
let id: i64 = row.get(0)?;
let mime: String = row.get(1)?;
let msg_id: MsgId = row.get(2)?;
let attempts: i64 = row.get(3)?;
Ok((id, mime, msg_id, attempts))
},
)
.await
.context("Failed to SELECT from imap_send")?
{
let res = self
.append(folder, Some("(\\Seen)"), None, mime)
.await
.with_context(|| format!("IMAP APPEND to {folder} failed for {msg_id}"))
.log_err(context);
if res.is_ok() {
msg_id.set_delivered(context).await?;
}
const MAX_ATTEMPTS: i64 = 2;
if res.is_ok() || attempts >= MAX_ATTEMPTS - 1 {
context
.sql
.execute("DELETE FROM imap_send WHERE id=?", (id,))
.await
.context("Failed to delete from imap_send")?;
} else {
context
.sql
.execute("UPDATE imap_send SET attempts=attempts+1 WHERE id=?", (id,))
.await
.context("Failed to update imap_send.attempts")?;
res?;
}
}
Ok(())
}

/// Stores pending `\Seen` flags for messages in `imap_markseen` table.
pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
let rows = context
Expand Down
6 changes: 2 additions & 4 deletions src/mimefactory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ pub struct RenderedEmail {
pub is_gossiped: bool,
pub last_added_location_id: Option<u32>,

/// A comma-separated string of sync-IDs that are used by the rendered email
/// and must be deleted once the message is actually queued for sending
/// (deletion must be done by `delete_sync_ids()`).
/// If the rendered email is not queued for sending, the IDs must not be deleted.
/// A comma-separated string of sync-IDs that are used by the rendered email and must be deleted
/// from `multi_device_sync` once the message is actually queued for sending.
pub sync_ids_to_delete: Option<String>,

/// Message ID (Message in the sense of Email)
Expand Down
4 changes: 2 additions & 2 deletions src/qr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
token::save(context, token::Namespace::InviteNumber, None, &invitenumber).await?;
token::save(context, token::Namespace::Auth, None, &authcode).await?;
context.sync_qr_code_tokens(None).await?;
context.scheduler.interrupt_smtp().await;
context.scheduler.interrupt_inbox().await;
}
Qr::ReviveVerifyGroup {
invitenumber,
Expand All @@ -736,7 +736,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
.await?;
token::save(context, token::Namespace::Auth, Some(&grpid), &authcode).await?;
context.sync_qr_code_tokens(Some(&grpid)).await?;
context.scheduler.interrupt_smtp().await;
context.scheduler.interrupt_inbox().await;
}
Qr::Login { address, options } => {
configure_from_login_qr(context, &address, options).await?
Expand Down
15 changes: 15 additions & 0 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,21 @@ async fn fetch_idle(
};

if folder_config == Config::ConfiguredInboxFolder {
let mvbox;
let syncbox = match ctx.should_move_sync_msgs().await? {
false => &watch_folder,
true => {
mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
mvbox.as_deref().unwrap_or(&watch_folder)
}
};
session
.send_sync_msgs(ctx, syncbox)
.await
.context("fetch_idle: send_sync_msgs")
.log_err(ctx)
.ok();

session
.store_seen_flags_on_imap(ctx)
.await
Expand Down
4 changes: 2 additions & 2 deletions src/securejoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option<ChatId>) -> Resu
context
.sync_qr_code_tokens(Some(chat.grpid.as_str()))
.await?;
context.scheduler.interrupt_smtp().await;
context.scheduler.interrupt_inbox().await;
}
format!(
"OPENPGP4FPR:{}#a={}&g={}&x={}&i={}&s={}",
Expand All @@ -124,7 +124,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option<ChatId>) -> Resu
// parameters used: a=n=i=s=
if sync_token {
context.sync_qr_code_tokens(None).await?;
context.scheduler.interrupt_smtp().await;
context.scheduler.interrupt_inbox().await;
}
format!(
"OPENPGP4FPR:{}#a={}&n={}&i={}&s={}",
Expand Down
1 change: 0 additions & 1 deletion src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp)
let ratelimited = if context.ratelimit.read().await.can_send() {
// add status updates and sync messages to end of sending queue
context.flush_status_updates().await?;
context.send_sync_msg().await?;
false
} else {
true
Expand Down
14 changes: 14 additions & 0 deletions src/sql/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,20 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
.await?;
}

inc_and_check(&mut migration_version, 119)?;
if dbversion < migration_version {
sql.execute_migration(
"CREATE TABLE imap_send (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mime TEXT NOT NULL, -- Message content
msg_id INTEGER NOT NULL, -- ID of the message in the `msgs` table
attempts INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the message
)",
migration_version,
)
.await?;
}

let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?
Expand Down
Loading

0 comments on commit d6845bd

Please sign in to comment.