fix: mutex timeout and error handling (#8770)

Fixes the follow cases 
- The ensure block released the lock even on LockAcquisitionError
- Custom timeout was not allowed

This also refactored the with_lock method, now the key has to be constructed in the parent function itself

Co-authored-by: Sojan Jose <sojan@pepalo.com>
This commit is contained in:
Shivam Mishra
2024-01-24 15:48:21 +05:30
committed by GitHub
parent a861257f73
commit 3760f206e8
6 changed files with 54 additions and 25 deletions

View File

@@ -6,7 +6,8 @@ class Inboxes::FetchImapEmailsJob < MutexApplicationJob
def perform(channel) def perform(channel)
return unless should_fetch_email?(channel) return unless should_fetch_email?(channel)
with_lock(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id) do key = format(::Redis::Alfred::EMAIL_MESSAGE_MUTEX, inbox_id: channel.inbox.id)
with_lock(key, 5.minutes) do
process_email_for_channel(channel) process_email_for_channel(channel)
end end
rescue *ExceptionList::IMAP_EXCEPTIONS => e rescue *ExceptionList::IMAP_EXCEPTIONS => e

View File

@@ -14,20 +14,36 @@
class MutexApplicationJob < ApplicationJob class MutexApplicationJob < ApplicationJob
class LockAcquisitionError < StandardError; end class LockAcquisitionError < StandardError; end
def with_lock(key_format, *args) def with_lock(lock_key, timeout = Redis::LockManager::LOCK_TIMEOUT)
lock_key = format(key_format, *args)
lock_manager = Redis::LockManager.new lock_manager = Redis::LockManager.new
begin begin
if lock_manager.lock(lock_key) if lock_manager.lock(lock_key, timeout)
Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}" log_attempt(lock_key, executions)
yield yield
# release the lock after the block has been executed
lock_manager.unlock(lock_key)
else else
Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}" handle_failed_lock_acquisition(lock_key)
raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}"
end end
ensure rescue StandardError => e
lock_manager.unlock(lock_key) handle_error(e, lock_manager, lock_key)
end end
end end
private
def log_attempt(lock_key, executions)
Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}"
end
def handle_error(err, lock_manager, lock_key)
lock_manager.unlock(lock_key) unless err.is_a?(LockAcquisitionError)
raise err
end
def handle_failed_lock_acquisition(lock_key)
Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}"
raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}"
end
end end

View File

@@ -3,7 +3,8 @@ class SendOnSlackJob < MutexApplicationJob
retry_on LockAcquisitionError, wait: 1.second, attempts: 8 retry_on LockAcquisitionError, wait: 1.second, attempts: 8
def perform(message, hook) def perform(message, hook)
with_lock(::Redis::Alfred::SLACK_MESSAGE_MUTEX, conversation_id: message.conversation_id, reference_id: hook.reference_id) do key = format(::Redis::Alfred::SLACK_MESSAGE_MUTEX, conversation_id: message.conversation_id, reference_id: hook.reference_id)
with_lock(key) do
Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform
end end
end end

View File

@@ -5,7 +5,8 @@ class Webhooks::FacebookEventsJob < MutexApplicationJob
def perform(message) def perform(message)
response = ::Integrations::Facebook::MessageParser.new(message) response = ::Integrations::Facebook::MessageParser.new(message)
with_lock(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) do key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id)
with_lock(key) do
process_message(response) process_message(response)
end end
end end

View File

@@ -12,7 +12,8 @@ class Webhooks::InstagramEventsJob < MutexApplicationJob
def perform(entries) def perform(entries)
@entries = entries @entries = entries
with_lock(::Redis::Alfred::IG_MESSAGE_MUTEX, sender_id: sender_id, ig_account_id: ig_account_id) do key = format(::Redis::Alfred::IG_MESSAGE_MUTEX, sender_id: sender_id, ig_account_id: ig_account_id)
with_lock(key) do
process_entries(entries) process_entries(entries)
end end
end end

View File

@@ -4,16 +4,6 @@ RSpec.describe MutexApplicationJob do
let(:lock_manager) { instance_double(Redis::LockManager) } let(:lock_manager) { instance_double(Redis::LockManager) }
let(:lock_key) { 'test_key' } let(:lock_key) { 'test_key' }
let(:test_mutex_job_class) do
stub_const('TestMutexJob', Class.new(MutexApplicationJob) do
def perform
with_lock('test_key') do
# Do nothing
end
end
end)
end
before do before do
allow(Redis::LockManager).to receive(:new).and_return(lock_manager) allow(Redis::LockManager).to receive(:new).and_return(lock_manager)
allow(lock_manager).to receive(:lock).and_return(true) allow(lock_manager).to receive(:lock).and_return(true)
@@ -22,24 +12,43 @@ RSpec.describe MutexApplicationJob do
describe '#with_lock' do describe '#with_lock' do
it 'acquires the lock and yields the block if lock is not acquired' do it 'acquires the lock and yields the block if lock is not acquired' do
expect(lock_manager).to receive(:lock).with(lock_key).and_return(true) expect(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(true)
expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true)
expect { |b| described_class.new.send(:with_lock, lock_key, &b) }.to yield_control expect { |b| described_class.new.send(:with_lock, lock_key, &b) }.to yield_control
end end
it 'acquires the lock with custom timeout' do
expect(lock_manager).to receive(:lock).with(lock_key, 5.seconds).and_return(true)
expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true)
expect { |b| described_class.new.send(:with_lock, lock_key, 5.seconds, &b) }.to yield_control
end
it 'raises LockAcquisitionError if it cannot acquire the lock' do it 'raises LockAcquisitionError if it cannot acquire the lock' do
allow(lock_manager).to receive(:lock).with(lock_key).and_return(false) allow(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(false)
expect do expect do
described_class.new.send(:with_lock, lock_key) do described_class.new.send(:with_lock, lock_key) do
# Do nothing # Do nothing
end end
end.to raise_error(MutexApplicationJob::LockAcquisitionError) end.to raise_error(MutexApplicationJob::LockAcquisitionError)
expect(lock_manager).not_to receive(:unlock)
end
it 'raises StandardError if it execution raises it' do
allow(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(false)
allow(lock_manager).to receive(:unlock).with(lock_key).and_return(true)
expect do
described_class.new.send(:with_lock, lock_key) do
raise StandardError
end
end.to raise_error(StandardError)
end end
it 'ensures that the lock is released even if there is an error during block execution' do it 'ensures that the lock is released even if there is an error during block execution' do
expect(lock_manager).to receive(:lock).with(lock_key).and_return(true) expect(lock_manager).to receive(:lock).with(lock_key, Redis::LockManager::LOCK_TIMEOUT).and_return(true)
expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true)
expect do expect do