From 5598b4b27eff0ea4614515dcfc3ecd1e0e929df5 Mon Sep 17 00:00:00 2001 From: Shivam Mishra Date: Fri, 25 Aug 2023 11:58:29 +0700 Subject: [PATCH] feat: implement mutex for `SlackSendJob` (#7783) --- app/jobs/mutex_application_job.rb | 35 ++++++++++++ app/jobs/send_on_slack_job.rb | 7 ++- app/jobs/webhooks/facebook_events_job.rb | 26 +++------ lib/redis/redis_keys.rb | 1 + spec/jobs/mutex_application_job_spec.rb | 53 +++++++++++++++++++ .../jobs/webhooks/facebook_events_job_spec.rb | 53 +++++++------------ 6 files changed, 121 insertions(+), 54 deletions(-) create mode 100644 app/jobs/mutex_application_job.rb create mode 100644 spec/jobs/mutex_application_job_spec.rb diff --git a/app/jobs/mutex_application_job.rb b/app/jobs/mutex_application_job.rb new file mode 100644 index 000000000..d6d20b1e2 --- /dev/null +++ b/app/jobs/mutex_application_job.rb @@ -0,0 +1,35 @@ +# MutexApplicationJob serves as a base class for jobs that require distributed locking mechanisms. +# It abstracts the locking logic using Redis and ensures that a block of code can be executed with +# mutual exclusion. +# +# The primary mechanism provided is the `with_lock` method, which accepts a key format and associated +# arguments. This method attempts to acquire a lock using the generated key, and if successful, it +# executes the provided block of code. If the lock cannot be acquired, it raises a LockAcquisitionError. +# +# To use this class, inherit from MutexApplicationJob and make use of the `with_lock` method in the +# `perform` method of the derived job class. +# +# Also see, retry mechanism here: https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on +# +class MutexApplicationJob < ApplicationJob + class LockAcquisitionError < StandardError; end + + def with_lock(key_format, *args) + lock_key = format(key_format, *args) + lock_manager = Redis::LockManager.new + + if lock_manager.locked?(lock_key) + Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}" + raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" + end + + begin + lock_manager.lock(lock_key) + Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}" + yield + ensure + # Ensure that the lock is released even if there's an error in processing + lock_manager.unlock(lock_key) + end + end +end diff --git a/app/jobs/send_on_slack_job.rb b/app/jobs/send_on_slack_job.rb index e9b84731b..315334c3c 100644 --- a/app/jobs/send_on_slack_job.rb +++ b/app/jobs/send_on_slack_job.rb @@ -1,7 +1,10 @@ -class SendOnSlackJob < ApplicationJob +class SendOnSlackJob < MutexApplicationJob queue_as :medium + retry_on LockAcquisitionError, wait: 1.second, attempts: 6 def perform(message, hook) - Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform + with_lock(::Redis::Alfred::SLACK_MESSAGE_MUTEX, sender_id: message.sender_id, reference_id: hook.reference_id) do + Integrations::Slack::SendOnSlackService.new(message: message, hook: hook).perform + end end end diff --git a/app/jobs/webhooks/facebook_events_job.rb b/app/jobs/webhooks/facebook_events_job.rb index 12e3656d9..653f83c11 100644 --- a/app/jobs/webhooks/facebook_events_job.rb +++ b/app/jobs/webhooks/facebook_events_job.rb @@ -1,28 +1,16 @@ -class Webhooks::FacebookEventsJob < ApplicationJob - class LockAcquisitionError < StandardError; end - +class Webhooks::FacebookEventsJob < MutexApplicationJob queue_as :default - # https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on retry_on LockAcquisitionError, wait: 1.second, attempts: 6 def perform(message) response = ::Integrations::Facebook::MessageParser.new(message) - lock_key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) - lock_manager = Redis::LockManager.new - - if lock_manager.locked?(lock_key) - Rails.logger.error "[Facebook::MessageCreator] Failed to acquire lock on attempt #{executions}: #{lock_key}" - raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" - end - - begin - lock_manager.lock(lock_key) - Rails.logger.info "[Facebook::MessageCreator] Acquired lock for: #{lock_key}" - ::Integrations::Facebook::MessageCreator.new(response).perform - ensure - # Ensure that the lock is released even if there's an error in processing - lock_manager.unlock(lock_key) + with_lock(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) do + process_message(response) end end + + def process_message(response) + ::Integrations::Facebook::MessageCreator.new(response).perform + end end diff --git a/lib/redis/redis_keys.rb b/lib/redis/redis_keys.rb index 9d57e4cbd..65f966037 100644 --- a/lib/redis/redis_keys.rb +++ b/lib/redis/redis_keys.rb @@ -37,4 +37,5 @@ module Redis::RedisKeys ## Sempahores / Locks # We don't want to process messages from the same sender concurrently to prevent creating double conversations FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%s::%s'.freeze + SLACK_MESSAGE_MUTEX = 'SLACK_MESSAGE_LOCK::%s::%s'.freeze end diff --git a/spec/jobs/mutex_application_job_spec.rb b/spec/jobs/mutex_application_job_spec.rb new file mode 100644 index 000000000..8483708f7 --- /dev/null +++ b/spec/jobs/mutex_application_job_spec.rb @@ -0,0 +1,53 @@ +require 'rails_helper' + +RSpec.describe MutexApplicationJob do + let(:lock_manager) { instance_double(Redis::LockManager) } + let(:lock_key) { 'test_key' } + + let(:test_mutex_job_class) do + stub_const('TestMutexJob', Class.new(MutexApplicationJob) do + def perform + with_lock('test_key') do + # Do nothing + end + end + end) + end + + before do + allow(Redis::LockManager).to receive(:new).and_return(lock_manager) + allow(lock_manager).to receive(:locked?).and_return(false) + allow(lock_manager).to receive(:lock).and_return(true) + allow(lock_manager).to receive(:unlock).and_return(true) + end + + describe '#with_lock' do + it 'acquires the lock and yields the block if lock is not acquired' do + expect(lock_manager).to receive(:locked?).with(lock_key).and_return(false) + expect(lock_manager).to receive(:lock).with(lock_key).and_return(true) + expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) + + expect { |b| described_class.new.send(:with_lock, lock_key, &b) }.to yield_control + end + + it 'raises LockAcquisitionError if it cannot acquire the lock' do + allow(lock_manager).to receive(:locked?).with(lock_key).and_return(true) + + expect do + described_class.new.send(:with_lock, lock_key) do + # Do nothing + end + end.to raise_error(MutexApplicationJob::LockAcquisitionError) + end + + it 'ensures that the lock is released even if there is an error during block execution' do + expect(lock_manager).to receive(:locked?).with(lock_key).and_return(false) + expect(lock_manager).to receive(:lock).with(lock_key).and_return(true) + expect(lock_manager).to receive(:unlock).with(lock_key).and_return(true) + + expect do + described_class.new.send(:with_lock, lock_key) { raise StandardError } + end.to raise_error(StandardError) + end + end +end diff --git a/spec/jobs/webhooks/facebook_events_job_spec.rb b/spec/jobs/webhooks/facebook_events_job_spec.rb index be470392d..58d16fba1 100644 --- a/spec/jobs/webhooks/facebook_events_job_spec.rb +++ b/spec/jobs/webhooks/facebook_events_job_spec.rb @@ -5,14 +5,13 @@ RSpec.describe Webhooks::FacebookEventsJob do let(:params) { { test: 'test' } } let(:parsed_response) { instance_double(Integrations::Facebook::MessageParser) } - let(:lock_key) { 'FB_MESSAGE_CREATE_LOCK::sender_id::recipient_id' } # Use a real format if needed - let(:lock_manager) { instance_double(Redis::LockManager) } + let(:lock_key_format) { Redis::Alfred::FACEBOOK_MESSAGE_MUTEX } + let(:lock_key) { format(lock_key_format, sender_id: 'sender_id', recipient_id: 'recipient_id') } # Use real format if different before do allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response) allow(parsed_response).to receive(:sender_id).and_return('sender_id') allow(parsed_response).to receive(:recipient_id).and_return('recipient_id') - allow(Redis::LockManager).to receive(:new).and_return(lock_manager) end it 'enqueues the job' do @@ -22,43 +21,31 @@ RSpec.describe Webhooks::FacebookEventsJob do end describe 'job execution' do - context 'when the lock is already acquired' do - before do - allow(lock_manager).to receive(:locked?).and_return(true) - end + let(:message_creator) { instance_double(Integrations::Facebook::MessageCreator) } - it 'raises a LockAcquisitionError' do - perform_enqueued_jobs do - expect { described_class.perform_now(params) }.to raise_error(Webhooks::FacebookEventsJob::LockAcquisitionError) - end - end + before do + allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response) + allow(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response).and_return(message_creator) + allow(message_creator).to receive(:perform) end - context 'when the lock is not acquired' do - let(:message_creator) { instance_double(Integrations::Facebook::MessageCreator) } + # ensures that the response is built + it 'invokes the message parser and creator' do + expect(Integrations::Facebook::MessageParser).to receive(:new).with(params) + expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response) + expect(message_creator).to receive(:perform) - before do - allow(lock_manager).to receive(:locked?).and_return(false) - allow(lock_manager).to receive(:unlock) - allow(lock_manager).to receive(:lock) - allow(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response).and_return(message_creator) - allow(message_creator).to receive(:perform) - end + described_class.perform_now(params) + end - it 'invokes the message parser and creator' do - expect(Integrations::Facebook::MessageParser).to receive(:new).with(params) - expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response) - expect(message_creator).to receive(:perform) + # this test ensures that the process message function is indeed called + it 'attempts to acquire a lock and then processes the message' do + job_instance = described_class.new + allow(job_instance).to receive(:process_message).with(parsed_response) - described_class.perform_now(params) - end + job_instance.perform(params) - it 'acquires and releases the lock' do - expect(lock_manager).to receive(:lock).with(lock_key) - expect(lock_manager).to receive(:unlock).with(lock_key) - - described_class.perform_now(params) - end + expect(job_instance).to have_received(:process_message).with(parsed_response) end end end