chore: Migrate mailers from the worker to jobs (#12331)

Previously, email replies were handled inside workers. There was no
execution logs. This meant if emails silently failed (as reported by a
customer), we had no way to trace where the issue happened, the only
assumption was “no error = mail sent.”

By moving email handling into jobs, we now have proper execution logs
for each attempt. This makes it easier to debug delivery issues and
would have better visibility when investigating customer reports.

Fixes
https://linear.app/chatwoot/issue/CW-5538/emails-are-not-sentdelivered-to-the-contact

---------

Co-authored-by: Sojan Jose <sojan@pepalo.com>
Co-authored-by: Shivam Mishra <scm.mymail@gmail.com>
This commit is contained in:
Pranav
2025-10-21 16:36:37 -07:00
committed by GitHub
parent b4c4f328b2
commit 254d5dcf9a
13 changed files with 446 additions and 165 deletions

View File

@@ -1,27 +1,29 @@
class SendReplyJob < ApplicationJob
queue_as :high
CHANNEL_SERVICES = {
'Channel::TwitterProfile' => ::Twitter::SendOnTwitterService,
'Channel::TwilioSms' => ::Twilio::SendOnTwilioService,
'Channel::Line' => ::Line::SendOnLineService,
'Channel::Telegram' => ::Telegram::SendOnTelegramService,
'Channel::Whatsapp' => ::Whatsapp::SendOnWhatsappService,
'Channel::Sms' => ::Sms::SendOnSmsService,
'Channel::Instagram' => ::Instagram::SendOnInstagramService,
'Channel::Email' => ::Email::SendOnEmailService,
'Channel::WebWidget' => ::Messages::SendEmailNotificationService,
'Channel::Api' => ::Messages::SendEmailNotificationService
}.freeze
def perform(message_id)
message = Message.find(message_id)
conversation = message.conversation
channel_name = conversation.inbox.channel.class.to_s
channel_name = message.conversation.inbox.channel.class.to_s
services = {
'Channel::TwitterProfile' => ::Twitter::SendOnTwitterService,
'Channel::TwilioSms' => ::Twilio::SendOnTwilioService,
'Channel::Line' => ::Line::SendOnLineService,
'Channel::Telegram' => ::Telegram::SendOnTelegramService,
'Channel::Whatsapp' => ::Whatsapp::SendOnWhatsappService,
'Channel::Sms' => ::Sms::SendOnSmsService,
'Channel::Instagram' => ::Instagram::SendOnInstagramService
}
return send_on_facebook_page(message) if channel_name == 'Channel::FacebookPage'
case channel_name
when 'Channel::FacebookPage'
send_on_facebook_page(message)
else
services[channel_name].new(message: message).perform if services[channel_name].present?
end
service_class = CHANNEL_SERVICES[channel_name]
return unless service_class
service_class.new(message: message).perform
end
private

View File

@@ -39,8 +39,7 @@ class ConversationReplyMailer < ApplicationMailer
init_conversation_attributes(message.conversation)
@message = message
reply_mail_object = prepare_mail(true)
message.update(source_id: reply_mail_object.message_id)
prepare_mail(true)
end
def conversation_transcript(conversation, to_email)

View File

@@ -300,7 +300,6 @@ class Message < ApplicationRecord
def execute_after_create_commit_callbacks
# rails issue with order of active record callbacks being executed https://github.com/rails/rails/issues/20911
reopen_conversation
notify_via_mail
set_conversation_activity
dispatch_create_events
send_reply
@@ -386,48 +385,6 @@ class Message < ApplicationRecord
::MessageTemplates::HookExecutionService.new(message: self).perform
end
def email_notifiable_webwidget?
inbox.web_widget? && inbox.channel.continuity_via_email
end
def email_notifiable_api_channel?
inbox.api? && inbox.account.feature_enabled?('email_continuity_on_api_channel')
end
def email_notifiable_channel?
email_notifiable_webwidget? || %w[Email].include?(inbox.inbox_type) || email_notifiable_api_channel?
end
def can_notify_via_mail?
return unless email_notifiable_message?
return unless email_notifiable_channel?
return if conversation.contact.email.blank?
true
end
def notify_via_mail
return unless can_notify_via_mail?
trigger_notify_via_mail
end
def trigger_notify_via_mail
return EmailReplyWorker.perform_in(1.second, id) if inbox.inbox_type == 'Email'
# will set a redis key for the conversation so that we don't need to send email for every new message
# last few messages coupled together is sent every 2 minutes rather than one email for each message
# if redis key exists there is an unprocessed job that will take care of delivering the email
return if Redis::Alfred.get(conversation_mail_key).present?
Redis::Alfred.setex(conversation_mail_key, id)
ConversationReplyEmailWorker.perform_in(2.minutes, conversation.id, id)
end
def conversation_mail_key
format(::Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: conversation.id)
end
def validate_attachments_limit(_attachment)
errors.add(:attachments, message: 'exceeded maximum allowed') if attachments.size >= NUMBER_OF_PERMITTED_ATTACHMENTS
end

View File

@@ -0,0 +1,18 @@
class Email::SendOnEmailService < Base::SendOnChannelService
private
def channel_class
Channel::Email
end
def perform_reply
return unless message.email_notifiable_message?
reply_mail = ConversationReplyMailer.with(account: message.account).email_reply(message).deliver_now
Rails.logger.info("Email message #{message.id} sent with source_id: #{reply_mail.message_id}")
message.update(source_id: reply_mail.message_id)
rescue StandardError => e
ChatwootExceptionTracker.new(e, account: message.account).capture_exception
Messages::StatusUpdateService.new(message, 'failed', e.message).perform
end
end

View File

@@ -0,0 +1,38 @@
class Messages::SendEmailNotificationService
pattr_initialize [:message!]
def perform
return unless should_send_email_notification?
conversation = message.conversation
conversation_mail_key = format(::Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: conversation.id)
# Atomically set redis key to prevent duplicate email workers. Keep the key alive longer than
# the worker delay (1 hour) so slow queues don't enqueue duplicate jobs, but let it expire if
# the worker never manages to clean up.
return unless Redis::Alfred.set(conversation_mail_key, message.id, nx: true, ex: 1.hour.to_i)
ConversationReplyEmailWorker.perform_in(2.minutes, conversation.id, message.id)
end
private
def should_send_email_notification?
return false unless message.email_notifiable_message?
return false if message.conversation.contact.email.blank?
email_reply_enabled?
end
def email_reply_enabled?
inbox = message.inbox
case inbox.channel.class.to_s
when 'Channel::WebWidget'
inbox.channel.continuity_via_email
when 'Channel::Api'
inbox.account.feature_enabled?('email_continuity_on_api_channel')
else
false
end
end
end

View File

@@ -1,16 +0,0 @@
class EmailReplyWorker
include Sidekiq::Worker
sidekiq_options queue: :mailers, retry: 3
def perform(message_id)
message = Message.find(message_id)
return unless message.email_notifiable_message?
# send the email
ConversationReplyMailer.with(account: message.account).email_reply(message).deliver_now
rescue StandardError => e
ChatwootExceptionTracker.new(e, account: message.account).capture_exception
Messages::StatusUpdateService.new(message, 'failed', e.message).perform
end
end

View File

@@ -6,9 +6,21 @@ Sidekiq.configure_client do |config|
config.redis = Redis::Config.app
end
# Logs whenever a job is pulled off Redis for execution.
class ChatwootDequeuedLogger
def call(_worker, job, queue)
Sidekiq.logger.info("Dequeued #{job['class']} #{job['jid']} from #{queue}")
yield
end
end
Sidekiq.configure_server do |config|
config.redis = Redis::Config.app
config.server_middleware do |chain|
chain.add ChatwootDequeuedLogger
end
# skip the default start stop logging
if Rails.env.production?
config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new

View File

@@ -108,5 +108,32 @@ RSpec.describe SendReplyJob do
expect(process_service).to receive(:perform)
described_class.perform_now(message.id)
end
it 'calls ::Email::SendOnEmailService when its email message' do
email_channel = create(:channel_email)
message = create(:message, conversation: create(:conversation, inbox: email_channel.inbox))
allow(Email::SendOnEmailService).to receive(:new).with(message: message).and_return(process_service)
expect(Email::SendOnEmailService).to receive(:new).with(message: message)
expect(process_service).to receive(:perform)
described_class.perform_now(message.id)
end
it 'calls ::Messages::SendEmailNotificationService when its webwidget message' do
webwidget_channel = create(:channel_widget)
message = create(:message, conversation: create(:conversation, inbox: webwidget_channel.inbox))
allow(Messages::SendEmailNotificationService).to receive(:new).with(message: message).and_return(process_service)
expect(Messages::SendEmailNotificationService).to receive(:new).with(message: message)
expect(process_service).to receive(:perform)
described_class.perform_now(message.id)
end
it 'calls ::Messages::SendEmailNotificationService when its api channel message' do
api_channel = create(:channel_api)
message = create(:message, conversation: create(:conversation, inbox: api_channel.inbox))
allow(Messages::SendEmailNotificationService).to receive(:new).with(message: message).and_return(process_service)
expect(Messages::SendEmailNotificationService).to receive(:new).with(message: message)
expect(process_service).to receive(:perform)
described_class.perform_now(message.id)
end
end
end

View File

@@ -243,8 +243,8 @@ RSpec.describe ConversationReplyMailer do
expect(mail.decoded).to include message.content
end
it 'updates the source_id' do
expect(mail.message_id).to eq message.source_id
it 'builds messageID properly' do
expect(mail.message_id).to eq("conversation/#{conversation.uuid}/messages/#{message.id}@#{conversation.account.domain}")
end
context 'when message is a CSAT survey' do

View File

@@ -316,43 +316,69 @@ RSpec.describe Message do
end
context 'with conversation continuity' do
it 'calls notify email method on after save for outgoing messages in website channel' do
allow(ConversationReplyEmailWorker).to receive(:perform_in).and_return(true)
message.message_type = 'outgoing'
message.save!
expect(ConversationReplyEmailWorker).to have_received(:perform_in)
let(:inbox_with_continuity) do
create(:inbox, account: message.account,
channel: build(:channel_widget, account: message.account, continuity_via_email: true))
end
it 'does not call notify email for website channel if continuity is disabled' do
message.inbox = create(:inbox, account: message.account,
channel: build(:channel_widget, account: message.account, continuity_via_email: false))
allow(ConversationReplyEmailWorker).to receive(:perform_in).and_return(true)
it 'schedules email notification for outgoing messages in website channel' do
message.inbox = inbox_with_continuity
message.conversation.update!(inbox: inbox_with_continuity)
message.conversation.contact.update!(email: 'test@example.com')
message.message_type = 'outgoing'
message.save!
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
# Perform jobs inline to test full integration
perform_enqueued_jobs do
message.save!
end
# Verify the email worker is eventually scheduled through the service
jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
expect(jobs_for_conversation_count).to eq(1)
end
it 'wont call notify email method for private notes' do
it 'does not schedule email for website channel if continuity is disabled' do
inbox_without_continuity = create(:inbox, account: message.account,
channel: build(:channel_widget, account: message.account, continuity_via_email: false))
message.inbox = inbox_without_continuity
message.conversation.update!(inbox: inbox_without_continuity)
message.conversation.contact.update!(email: 'test@example.com')
message.message_type = 'outgoing'
initial_job_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
perform_enqueued_jobs do
message.save!
end
# No new jobs should be scheduled for this conversation
jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
expect(jobs_for_conversation_count).to eq(initial_job_count)
end
it 'does not schedule email for private notes' do
message.inbox = inbox_with_continuity
message.conversation.update!(inbox: inbox_with_continuity)
message.conversation.contact.update!(email: 'test@example.com')
message.private = true
allow(ConversationReplyEmailWorker).to receive(:perform_in).and_return(true)
message.save!
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
end
it 'calls EmailReply worker if the channel is email' do
message.inbox = create(:inbox, account: message.account, channel: build(:channel_email, account: message.account))
allow(EmailReplyWorker).to receive(:perform_in).and_return(true)
message.message_type = 'outgoing'
message.content_attributes = { email: { text_content: { quoted: 'quoted text' } } }
message.save!
expect(EmailReplyWorker).to have_received(:perform_in).with(1.second, message.id)
initial_job_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
perform_enqueued_jobs do
message.save!
end
# No new jobs should be scheduled for this conversation
jobs_for_conversation_count = ConversationReplyEmailWorker.jobs.count { |job| job['args'].first == message.conversation.id }
expect(jobs_for_conversation_count).to eq(initial_job_count)
end
it 'wont call notify email method unless its website or email channel' do
message.inbox = create(:inbox, account: message.account, channel: build(:channel_api, account: message.account))
allow(ConversationReplyEmailWorker).to receive(:perform_in).and_return(true)
it 'calls SendReplyJob for all channels' do
allow(SendReplyJob).to receive(:perform_later).and_return(true)
message.message_type = 'outgoing'
message.save!
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
expect(SendReplyJob).to have_received(:perform_later).with(message.id)
end
end
end

View File

@@ -0,0 +1,86 @@
require 'rails_helper'
describe Email::SendOnEmailService do
let(:account) { create(:account) }
let(:email_channel) { create(:channel_email, account: account) }
let(:inbox) { create(:inbox, account: account, channel: email_channel) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
let(:message) { create(:message, conversation: conversation, message_type: 'outgoing') }
let(:service) { described_class.new(message: message) }
describe '#perform' do
let(:mailer_context) { instance_double(ConversationReplyMailer) }
let(:delivery) { instance_double(ActionMailer::MessageDelivery) }
let(:email_message) { instance_double(Mail::Message) }
before do
allow(ConversationReplyMailer).to receive(:with).with(account: message.account).and_return(mailer_context)
end
context 'when message is email notifiable' do
before do
allow(mailer_context).to receive(:email_reply).with(message).and_return(delivery)
allow(delivery).to receive(:deliver_now).and_return(email_message)
allow(email_message).to receive(:message_id).and_return(
"conversation/#{conversation.uuid}/messages/" \
"#{message.id}@#{conversation.account.domain}"
)
end
it 'sends email via ConversationReplyMailer' do
service.perform
expect(ConversationReplyMailer).to have_received(:with).with(account: message.account)
expect(mailer_context).to have_received(:email_reply).with(message)
expect(delivery).to have_received(:deliver_now)
end
it 'updates message source id on success' do
service.perform
expect(message.reload.source_id).to eq("conversation/#{conversation.uuid}/messages/#{message.id}@#{conversation.account.domain}")
end
end
context 'when message is not email notifiable' do
let(:message) { create(:message, conversation: conversation, message_type: 'incoming') }
before do
allow(mailer_context).to receive(:email_reply)
end
it 'does not send email' do
service.perform
expect(ConversationReplyMailer).not_to have_received(:with)
expect(mailer_context).not_to have_received(:email_reply)
end
end
context 'when an error occurs' do
let(:error_message) { 'SMTP connection failed' }
let(:error) { StandardError.new(error_message) }
let(:exception_tracker) { instance_double(ChatwootExceptionTracker, capture_exception: true) }
let(:status_service) { instance_double(Messages::StatusUpdateService, perform: true) }
before do
allow(mailer_context).to receive(:email_reply).with(message).and_return(delivery)
allow(delivery).to receive(:deliver_now).and_raise(error)
allow(ChatwootExceptionTracker).to receive(:new).and_return(exception_tracker)
end
it 'captures the exception' do
expect(ChatwootExceptionTracker).to receive(:new).with(error, account: message.account)
service.perform
end
it 'updates message status to failed' do
service.perform
expect(message.reload.status).to eq('failed')
expect(message.reload.external_error).to eq(error_message)
end
end
end
end

View File

@@ -0,0 +1,190 @@
require 'rails_helper'
describe Messages::SendEmailNotificationService do
let(:account) { create(:account) }
let(:conversation) { create(:conversation, account: account) }
let(:message) { create(:message, conversation: conversation, message_type: 'outgoing') }
let(:service) { described_class.new(message: message) }
describe '#perform' do
context 'when email notification should be sent' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_widget, account: account, continuity_via_email: true)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: 'test@example.com')
allow(Redis::Alfred).to receive(:set).and_return(true)
allow(ConversationReplyEmailWorker).to receive(:perform_in)
end
it 'schedules ConversationReplyEmailWorker' do
service.perform
expect(ConversationReplyEmailWorker).to have_received(:perform_in).with(
2.minutes,
conversation.id,
message.id
)
end
it 'atomically sets redis key to prevent duplicate emails' do
expected_key = format(Redis::Alfred::CONVERSATION_MAILER_KEY, conversation_id: conversation.id)
service.perform
expect(Redis::Alfred).to have_received(:set).with(expected_key, message.id, nx: true, ex: 1.hour.to_i)
end
context 'when redis key already exists' do
before do
allow(Redis::Alfred).to receive(:set).and_return(false)
end
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
end
it 'attempts atomic set once' do
service.perform
expect(Redis::Alfred).to have_received(:set).once
end
end
end
context 'when handling concurrent requests' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_widget, account: account, continuity_via_email: true)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: 'test@example.com')
end
it 'prevents duplicate workers under race conditions' do
# Create 5 threads that simultaneously try to enqueue workers for the same conversation
threads = Array.new(5) do
Thread.new do
msg = create(:message, conversation: conversation, message_type: 'outgoing')
described_class.new(message: msg).perform
end
end
threads.each(&:join)
# Only ONE worker should be scheduled despite 5 concurrent attempts
jobs_for_conversation = ConversationReplyEmailWorker.jobs.select { |job| job['args'].first == conversation.id }
expect(jobs_for_conversation.size).to eq(1)
end
end
context 'when email notification should not be sent' do
before do
allow(ConversationReplyEmailWorker).to receive(:perform_in)
end
context 'when message is not email notifiable' do
let(:message) { create(:message, conversation: conversation, message_type: 'incoming') }
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
end
end
context 'when contact has no email' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_widget, account: account, continuity_via_email: true)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: nil)
end
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
end
end
context 'when channel does not support email notifications' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_sms, account: account)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: 'test@example.com')
end
it 'does not schedule worker' do
service.perform
expect(ConversationReplyEmailWorker).not_to have_received(:perform_in)
end
end
end
end
describe '#should_send_email_notification?' do
context 'with WebWidget channel' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_widget, account: account, continuity_via_email: true)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: 'test@example.com')
end
it 'returns true when continuity_via_email is enabled' do
expect(service.send(:should_send_email_notification?)).to be true
end
context 'when continuity_via_email is disabled' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_widget, account: account, continuity_via_email: false)) }
it 'returns false' do
expect(service.send(:should_send_email_notification?)).to be false
end
end
end
context 'with API channel' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_api, account: account)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: 'test@example.com')
allow(account).to receive(:feature_enabled?).and_return(false)
allow(account).to receive(:feature_enabled?).with('email_continuity_on_api_channel').and_return(true)
end
it 'returns true when email_continuity_on_api_channel feature is enabled' do
expect(service.send(:should_send_email_notification?)).to be true
end
context 'when email_continuity_on_api_channel feature is disabled' do
before do
allow(account).to receive(:feature_enabled?).and_return(false)
allow(account).to receive(:feature_enabled?).with('email_continuity_on_api_channel').and_return(false)
end
it 'returns false' do
expect(service.send(:should_send_email_notification?)).to be false
end
end
end
context 'with other channels' do
let(:inbox) { create(:inbox, account: account, channel: create(:channel_email, account: account)) }
let(:conversation) { create(:conversation, account: account, inbox: inbox) }
before do
conversation.contact.update!(email: 'test@example.com')
end
it 'returns false' do
expect(service.send(:should_send_email_notification?)).to be false
end
end
end
end

View File

@@ -1,58 +0,0 @@
require 'rails_helper'
RSpec.describe EmailReplyWorker, type: :worker do
let(:account) { create(:account) }
let(:channel) { create(:channel_email, account: account) }
let(:message) { create(:message, message_type: :outgoing, inbox: channel.inbox, account: account) }
let(:private_message) { create(:message, private: true, message_type: :outgoing, inbox: channel.inbox, account: account) }
let(:incoming_message) { create(:message, message_type: :incoming, inbox: channel.inbox, account: account) }
let(:template_message) { create(:message, message_type: :template, content_type: :input_csat, inbox: channel.inbox, account: account) }
let(:mailer) { double }
let(:mailer_action) { double }
describe '#perform' do
context 'when emails are successfully sent' do
before do
allow(ConversationReplyMailer).to receive(:with).and_return(mailer)
allow(mailer).to receive(:email_reply).and_return(mailer_action)
allow(mailer_action).to receive(:deliver_now).and_return(true)
end
it 'calls mailer action with message' do
described_class.new.perform(message.id)
expect(mailer).to have_received(:email_reply).with(message)
expect(mailer_action).to have_received(:deliver_now)
end
it 'does not call mailer action with a private message' do
described_class.new.perform(private_message.id)
expect(mailer).not_to have_received(:email_reply)
expect(mailer_action).not_to have_received(:deliver_now)
end
it 'calls mailer action with a CSAT message' do
described_class.new.perform(template_message.id)
expect(mailer).to have_received(:email_reply).with(template_message)
expect(mailer_action).to have_received(:deliver_now)
end
it 'does not call mailer action with an incoming message' do
described_class.new.perform(incoming_message.id)
expect(mailer).not_to have_received(:email_reply)
expect(mailer_action).not_to have_received(:deliver_now)
end
end
context 'when emails are not sent' do
before do
allow(ConversationReplyMailer).to receive(:with).and_return(mailer)
allow(mailer).to receive(:email_reply).and_return(mailer_action)
allow(mailer_action).to receive(:deliver_now).and_raise(ArgumentError)
end
it 'mark message as failed' do
expect { described_class.new.perform(message.id) }.to change { message.reload.status }.from('sent').to('failed')
end
end
end
end