diff --git a/.rubocop.yml b/.rubocop.yml index 2befbac4b..d0724ef20 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -73,7 +73,7 @@ Rails/ApplicationController: - 'app/controllers/survey/responses_controller.rb' Rails/FindEach: Enabled: true - Include: + Include: - 'app/**/*.rb' Rails/CompactBlank: Enabled: false @@ -189,7 +189,6 @@ RSpec/IndexedLet: RSpec/NamedSubject: Enabled: false - # we should bring this down RSpec/MultipleMemoizedHelpers: Max: 14 diff --git a/app/jobs/webhooks/facebook_events_job.rb b/app/jobs/webhooks/facebook_events_job.rb index 8d212d686..de32af8cf 100644 --- a/app/jobs/webhooks/facebook_events_job.rb +++ b/app/jobs/webhooks/facebook_events_job.rb @@ -1,8 +1,28 @@ class Webhooks::FacebookEventsJob < ApplicationJob + class LockAcquisitionError < StandardError; end + queue_as :default + # https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on + retry_on LockAcquisitionError, wait: 2.seconds, attempts: 5 def perform(message) response = ::Integrations::Facebook::MessageParser.new(message) - ::Integrations::Facebook::MessageCreator.new(response).perform + + lock_key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) + lock_manager = Redis::LockManager.new + + if lock_manager.locked?(lock_key) + Rails.logger.error "[Facebook::MessageCreator] Failed to acquire lock on attempt #{executions + 1}: #{lock_key}" + raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" + end + + begin + lock_manager.lock(lock_key) + Rails.logger.info "[Facebook::MessageCreator] Acquired lock for: #{lock_key}" + ::Integrations::Facebook::MessageCreator.new(response).perform + ensure + # Ensure that the lock is released even if there's an error in processing + lock_manager.unlock(lock_key) + end end end diff --git a/lib/redis/alfred.rb b/lib/redis/alfred.rb index 9e6f0d251..b3e156420 100644 --- a/lib/redis/alfred.rb +++ b/lib/redis/alfred.rb @@ -7,11 +7,12 @@ module Redis::Alfred # key operations # set a value in redis - def set(key, value) - $alfred.with { |conn| conn.set(key, value) } + def set(key, value, nx: false, ex: false) # rubocop:disable Naming/MethodParameterName + $alfred.with { |conn| conn.set(key, value, nx: nx, ex: ex) } end # set a key with expiry period + # TODO: Deprecate this method, use set with ex: 1.day instead def setex(key, value, expiry = 1.day) $alfred.with { |conn| conn.setex(key, expiry, value) } end @@ -30,6 +31,10 @@ module Redis::Alfred $alfred.with { |conn| conn.incr(key) } end + def exists?(key) + $alfred.with { |conn| conn.exists?(key) } + end + # list operations def llen(key) diff --git a/lib/redis/lock_manager.rb b/lib/redis/lock_manager.rb new file mode 100644 index 000000000..87106168a --- /dev/null +++ b/lib/redis/lock_manager.rb @@ -0,0 +1,63 @@ +# Redis::LockManager provides a simple mechanism to handle distributed locks using Redis. +# This class ensures that only one instance of a given operation runs at a given time across all processes/nodes. +# It uses the $alfred Redis namespace for all its operations. +# +# Example Usage: +# +# lock_manager = Redis::LockManager.new +# +# if lock_manager.lock("some_key") +# # Critical code that should not be run concurrently +# lock_manager.unlock("some_key") +# end +# +class Redis::LockManager + # Default lock timeout set to 5 seconds. This means that if the lock isn't released + # within 5 seconds, it will automatically expire. + # This helps to avoid deadlocks in case the process holding the lock crashes or fails to release it. + LOCK_TIMEOUT = 5.seconds + + # Attempts to acquire a lock for the given key. + # + # If the lock is successfully acquired, the method returns true. If the key is + # already locked or if any other error occurs, it returns false. + # + # === Parameters + # * +key+ - The key for which the lock is to be acquired. + # * +timeout+ - Duration in seconds for which the lock is valid. Defaults to +LOCK_TIMEOUT+. + # + # === Returns + # * +true+ if the lock was successfully acquired. + # * +false+ if the lock was not acquired. + def lock(key, timeout = LOCK_TIMEOUT) + value = Time.now.to_f.to_s + # nx: true means set the key only if it does not exist + Redis::Alfred.set(key, value, nx: true, ex: timeout) ? true : false + end + + # Releases a lock for the given key. + # + # === Parameters + # * +key+ - The key for which the lock is to be released. + # + # === Returns + # * +true+ indicating the lock release operation was initiated. + # + # Note: If the key wasn't locked, this operation will have no effect. + def unlock(key) + Redis::Alfred.delete(key) + true + end + + # Checks if the given key is currently locked. + # + # === Parameters + # * +key+ - The key to check. + # + # === Returns + # * +true+ if the key is locked. + # * +false+ otherwise. + def locked?(key) + Redis::Alfred.exists?(key) + end +end diff --git a/lib/redis/redis_keys.rb b/lib/redis/redis_keys.rb index bf024b349..9d57e4cbd 100644 --- a/lib/redis/redis_keys.rb +++ b/lib/redis/redis_keys.rb @@ -33,4 +33,8 @@ module Redis::RedisKeys MESSAGE_SOURCE_KEY = 'MESSAGE_SOURCE_KEY::%s'.freeze CUSTOM_FILTER_RECORDS_COUNT_KEY = 'CUSTOM_FILTER::%d::%d::%d'.freeze OPENAI_CONVERSATION_KEY = 'OPEN_AI_CONVERSATION_KEY::%s::%d::%d'.freeze + + ## Sempahores / Locks + # We don't want to process messages from the same sender concurrently to prevent creating double conversations + FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%s::%s'.freeze end diff --git a/spec/jobs/webhooks/facebook_events_job_spec.rb b/spec/jobs/webhooks/facebook_events_job_spec.rb index 14868dc4f..be470392d 100644 --- a/spec/jobs/webhooks/facebook_events_job_spec.rb +++ b/spec/jobs/webhooks/facebook_events_job_spec.rb @@ -3,7 +3,17 @@ require 'rails_helper' RSpec.describe Webhooks::FacebookEventsJob do subject(:job) { described_class.perform_later(params) } - let!(:params) { { test: 'test' } } + let(:params) { { test: 'test' } } + let(:parsed_response) { instance_double(Integrations::Facebook::MessageParser) } + let(:lock_key) { 'FB_MESSAGE_CREATE_LOCK::sender_id::recipient_id' } # Use a real format if needed + let(:lock_manager) { instance_double(Redis::LockManager) } + + before do + allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response) + allow(parsed_response).to receive(:sender_id).and_return('sender_id') + allow(parsed_response).to receive(:recipient_id).and_return('recipient_id') + allow(Redis::LockManager).to receive(:new).and_return(lock_manager) + end it 'enqueues the job' do expect { job }.to have_enqueued_job(described_class) @@ -11,17 +21,44 @@ RSpec.describe Webhooks::FacebookEventsJob do .on_queue('default') end - context 'when called with params' do - it 'calls MessagePArsed and do message create' do - parser = double - creator = double - allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parser) - allow(Integrations::Facebook::MessageCreator).to receive(:new).and_return(creator) - allow(creator).to receive(:perform).and_return(true) - expect(Integrations::Facebook::MessageParser).to receive(:new).with(params) - expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parser) - expect(creator).to receive(:perform) - described_class.perform_now(params) + describe 'job execution' do + context 'when the lock is already acquired' do + before do + allow(lock_manager).to receive(:locked?).and_return(true) + end + + it 'raises a LockAcquisitionError' do + perform_enqueued_jobs do + expect { described_class.perform_now(params) }.to raise_error(Webhooks::FacebookEventsJob::LockAcquisitionError) + end + end + end + + context 'when the lock is not acquired' do + let(:message_creator) { instance_double(Integrations::Facebook::MessageCreator) } + + before do + allow(lock_manager).to receive(:locked?).and_return(false) + allow(lock_manager).to receive(:unlock) + allow(lock_manager).to receive(:lock) + allow(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response).and_return(message_creator) + allow(message_creator).to receive(:perform) + end + + it 'invokes the message parser and creator' do + expect(Integrations::Facebook::MessageParser).to receive(:new).with(params) + expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response) + expect(message_creator).to receive(:perform) + + described_class.perform_now(params) + end + + it 'acquires and releases the lock' do + expect(lock_manager).to receive(:lock).with(lock_key) + expect(lock_manager).to receive(:unlock).with(lock_key) + + described_class.perform_now(params) + end end end end diff --git a/spec/lib/redis/lock_manager_spec.rb b/spec/lib/redis/lock_manager_spec.rb new file mode 100644 index 000000000..208594322 --- /dev/null +++ b/spec/lib/redis/lock_manager_spec.rb @@ -0,0 +1,48 @@ +require 'rails_helper' + +RSpec.describe Redis::LockManager do + let(:lock_manager) { described_class.new } + let(:lock_key) { 'test_lock' } + + after do + # Cleanup: Ensure that the lock key is deleted after each test to avoid interference + Redis::Alfred.delete(lock_key) + end + + describe '#lock' do + it 'acquires a lock and returns true' do + expect(lock_manager.lock(lock_key)).to be true + expect(lock_manager.locked?(lock_key)).to be true + end + + it 'returns false if the lock is already acquired' do + lock_manager.lock(lock_key) + expect(lock_manager.lock(lock_key)).to be false + end + + it 'can acquire a lock again after the timeout' do + lock_manager.lock(lock_key, 1) # 1-second timeout + sleep 2 + expect(lock_manager.lock(lock_key)).to be true + end + end + + describe '#unlock' do + it 'releases a lock' do + lock_manager.lock(lock_key) + lock_manager.unlock(lock_key) + expect(lock_manager.locked?(lock_key)).to be false + end + end + + describe '#locked?' do + it 'returns true if a key is locked' do + lock_manager.lock(lock_key) + expect(lock_manager.locked?(lock_key)).to be true + end + + it 'returns false if a key is not locked' do + expect(lock_manager.locked?(lock_key)).to be false + end + end +end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 99aaee26a..616d62d28 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -73,6 +73,7 @@ RSpec.configure do |config| config.include Devise::Test::IntegrationHelpers, type: :request config.include ActiveSupport::Testing::TimeHelpers config.include ActionCable::TestHelper + config.include ActiveJob::TestHelper end Shoulda::Matchers.configure do |config|