fix: Optimize email fetching logic to fix bandwidth exceeded issues (#8730)

The Inboxes::FetchImapEmailsJob is designed to fetch the entire email object and check if its message id is already in the database. However, this process is resource-intensive and time-consuming, as fetching the full email object takes a significant amount of time.

On average, fetching 100 emails can take approximately 3-4 minutes to complete depending on the IMAP server. Since we are not using server flags to identify which emails have already been fetched (to avoid compatibility issues with flags in different email services), we have to fetch all previously available emails. Currently we fetch all the messages that were created from yesterday. This becomes problematic with services like Gmail, which throttle requests based on bandwidth usage.

To address this issue, I have updated the logic as follows:

Fetch the sequence IDs of all the mails in the "Inbox" that were created from yesterday.
Use the FETCH command to fetch only the message-ids using BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)] with the sequence IDs we got in the previous step. This is a faster operation with lower bandwidth usage, as it only returns the sequence ID and message ID.
Check if the message IDs are already present in the database for the selected inbox.
If not present, fetch the entire email object using the FETCH command and create the message.
If the message ID is already present, ignore it and move on to the next message-id.
I have also addressed the issue of duplicate emails appearing in conversations when two fetch email jobs occur simultaneously. I have added a lock for the channel to ensure that the job gracefully exits without waiting for the current job to complete.

Fixes #7247
Fixes #6082
Fixes #8314

Co-authored-by: Sojan <sojan@pepalo.com>
This commit is contained in:
Pranav Raj S
2024-01-17 23:45:16 -08:00
committed by GitHub
parent c899cc825d
commit fdbb3bf4b1
2 changed files with 35 additions and 32 deletions

View File

@@ -1,17 +1,21 @@
require 'net/imap'
class Inboxes::FetchImapEmailsJob < ApplicationJob
class Inboxes::FetchImapEmailsJob < MutexApplicationJob
queue_as :scheduled_jobs
def perform(channel)
return unless should_fetch_email?(channel)
process_email_for_channel(channel)
with_lock(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id) do
process_email_for_channel(channel)
end
rescue *ExceptionList::IMAP_EXCEPTIONS => e
Rails.logger.error e
channel.authorization_error!
rescue EOFError, OpenSSL::SSL::SSLError, Net::IMAP::NoResponseError, Net::IMAP::BadResponseError => e
Rails.logger.error e
rescue LockAcquisitionError
Rails.logger.error "Lock failed for #{inbox.id}"
rescue StandardError => e
ChatwootExceptionTracker.new(e, account: channel.account).capture_exception
end
@@ -35,33 +39,43 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob
def fetch_mail_for_channel(channel)
imap_inbox = authenticated_imap_inbox(channel, channel.imap_password, 'PLAIN')
last_email_time = DateTime.parse(Net::IMAP.format_datetime(last_email_time(channel)))
received_mails(imap_inbox).each do |message_id|
inbound_mail = Mail.read_from_string imap_inbox.fetch(message_id, 'RFC822')[0].attr['RFC822']
fetch_message_ids(imap_inbox, channel).each do |message_id_uid_pair|
uid, message_id = message_id_uid_pair
next if email_already_present?(channel, message_id)
mail_info_logger(channel, inbound_mail, message_id)
next if email_already_present?(channel, inbound_mail, last_email_time)
inbound_mail = Mail.read_from_string(imap_inbox.fetch(uid, 'RFC822')[0].attr['RFC822'])
mail_info_logger(channel, inbound_mail, uid)
process_mail(inbound_mail, channel)
end
end
def email_already_present?(channel, inbound_mail, _last_email_time)
channel.inbox.messages.find_by(source_id: inbound_mail.message_id).present?
def fetch_message_ids(imap_inbox, channel)
uids = fetch_uids(imap_inbox)
Rails.logger.info "FETCH_EMAILS_FROM: #{channel.email} - Found #{uids.length} emails \n\n\n\n"
message_ids = []
uids.each_slice(10).each do |batch|
uid_fetched = imap_inbox.fetch(batch, 'BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)]')
# print uid_fetched
uid_fetched.each do |data|
message_id = data.attr['BODY[HEADER.FIELDS (MESSAGE-ID)]'].to_s.scan(/<(.+?)>/).flatten.first
message_ids.push([data.seqno, message_id])
end
end
message_ids
end
def received_mails(imap_inbox)
def email_already_present?(channel, message_id)
channel.inbox.messages.find_by(source_id: message_id).present?
end
def fetch_uids(imap_inbox)
imap_inbox.search(['BEFORE', tomorrow, 'SINCE', yesterday])
end
def processed_email?(current_email, last_email_time)
return current_email.date < last_email_time if current_email.date.present?
false
end
def fetch_mail_for_ms_provider(channel)
return if channel.provider_config['access_token'].blank?
@@ -75,7 +89,7 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob
end
def process_mails(imap_inbox, channel)
received_mails(imap_inbox).each do |message_id|
fetch_uids(imap_inbox).each do |message_id|
inbound_mail = Mail.read_from_string imap_inbox.fetch(message_id, 'RFC822')[0].attr['RFC822']
mail_info_logger(channel, inbound_mail, message_id)
@@ -86,11 +100,11 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob
end
end
def mail_info_logger(channel, inbound_mail, message_id)
def mail_info_logger(channel, inbound_mail, uid)
return if Rails.env.test?
Rails.logger.info("
#{channel.provider} Email id: #{inbound_mail.from} and message_source_id: #{inbound_mail.message_id}, message_id: #{message_id}")
#{channel.provider} Email id: #{inbound_mail.from} - message_source_id: #{inbound_mail.message_id} - sequence id: #{uid}")
end
def authenticated_imap_inbox(channel, access_token, auth_method)
@@ -100,18 +114,6 @@ class Inboxes::FetchImapEmailsJob < ApplicationJob
imap
end
def last_email_time(channel)
# we are only checking for emails in last 2 day
last_email_incoming_message = channel.inbox.messages.incoming.where('messages.created_at >= ?', 2.days.ago).last
if last_email_incoming_message.present?
time = last_email_incoming_message.content_attributes['email']['date']
time ||= last_email_incoming_message.created_at.to_s
end
time ||= 1.hour.ago.to_s
DateTime.parse(time)
end
def yesterday
(Time.zone.today - 1).strftime('%d-%b-%Y')
end

View File

@@ -38,4 +38,5 @@ module Redis::RedisKeys
FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%<sender_id>s::%<recipient_id>s'.freeze
IG_MESSAGE_MUTEX = 'IG_MESSAGE_CREATE_LOCK::%<sender_id>s::%<ig_account_id>s'.freeze
SLACK_MESSAGE_MUTEX = 'SLACK_MESSAGE_LOCK::%<conversation_id>s::%<reference_id>s'.freeze
EMAIL_MESSAGE_MUTEX = 'EMAIL_CHANNEL_LOCK::%<inbox_id>s'.freeze
end