mirror of
				https://github.com/lingble/chatwoot.git
				synced 2025-10-30 18:47:51 +00:00 
			
		
		
		
	feat: locking and retry in FB message parsing (#7701)
This commit is contained in:
		| @@ -189,7 +189,6 @@ RSpec/IndexedLet: | |||||||
| RSpec/NamedSubject: | RSpec/NamedSubject: | ||||||
|   Enabled: false |   Enabled: false | ||||||
|  |  | ||||||
|  |  | ||||||
| # we should bring this down | # we should bring this down | ||||||
| RSpec/MultipleMemoizedHelpers: | RSpec/MultipleMemoizedHelpers: | ||||||
|   Max: 14 |   Max: 14 | ||||||
|   | |||||||
| @@ -1,8 +1,28 @@ | |||||||
| class Webhooks::FacebookEventsJob < ApplicationJob | class Webhooks::FacebookEventsJob < ApplicationJob | ||||||
|  |   class LockAcquisitionError < StandardError; end | ||||||
|  |  | ||||||
|   queue_as :default |   queue_as :default | ||||||
|  |   # https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on | ||||||
|  |   retry_on LockAcquisitionError, wait: 2.seconds, attempts: 5 | ||||||
|  |  | ||||||
|   def perform(message) |   def perform(message) | ||||||
|     response = ::Integrations::Facebook::MessageParser.new(message) |     response = ::Integrations::Facebook::MessageParser.new(message) | ||||||
|  |  | ||||||
|  |     lock_key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id) | ||||||
|  |     lock_manager = Redis::LockManager.new | ||||||
|  |  | ||||||
|  |     if lock_manager.locked?(lock_key) | ||||||
|  |       Rails.logger.error "[Facebook::MessageCreator] Failed to acquire lock on attempt #{executions + 1}: #{lock_key}" | ||||||
|  |       raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" | ||||||
|  |     end | ||||||
|  |  | ||||||
|  |     begin | ||||||
|  |       lock_manager.lock(lock_key) | ||||||
|  |       Rails.logger.info "[Facebook::MessageCreator] Acquired lock for: #{lock_key}" | ||||||
|       ::Integrations::Facebook::MessageCreator.new(response).perform |       ::Integrations::Facebook::MessageCreator.new(response).perform | ||||||
|  |     ensure | ||||||
|  |       # Ensure that the lock is released even if there's an error in processing | ||||||
|  |       lock_manager.unlock(lock_key) | ||||||
|  |     end | ||||||
|   end |   end | ||||||
| end | end | ||||||
|   | |||||||
| @@ -7,11 +7,12 @@ module Redis::Alfred | |||||||
|     # key operations |     # key operations | ||||||
|  |  | ||||||
|     # set a value in redis |     # set a value in redis | ||||||
|     def set(key, value) |     def set(key, value, nx: false, ex: false) # rubocop:disable Naming/MethodParameterName | ||||||
|       $alfred.with { |conn| conn.set(key, value) } |       $alfred.with { |conn| conn.set(key, value, nx: nx, ex: ex) } | ||||||
|     end |     end | ||||||
|  |  | ||||||
|     # set a key with expiry period |     # set a key with expiry period | ||||||
|  |     # TODO: Deprecate this method, use set with ex: 1.day instead | ||||||
|     def setex(key, value, expiry = 1.day) |     def setex(key, value, expiry = 1.day) | ||||||
|       $alfred.with { |conn| conn.setex(key, expiry, value) } |       $alfred.with { |conn| conn.setex(key, expiry, value) } | ||||||
|     end |     end | ||||||
| @@ -30,6 +31,10 @@ module Redis::Alfred | |||||||
|       $alfred.with { |conn| conn.incr(key) } |       $alfred.with { |conn| conn.incr(key) } | ||||||
|     end |     end | ||||||
|  |  | ||||||
|  |     def exists?(key) | ||||||
|  |       $alfred.with { |conn| conn.exists?(key) } | ||||||
|  |     end | ||||||
|  |  | ||||||
|     # list operations |     # list operations | ||||||
|  |  | ||||||
|     def llen(key) |     def llen(key) | ||||||
|   | |||||||
							
								
								
									
										63
									
								
								lib/redis/lock_manager.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								lib/redis/lock_manager.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,63 @@ | |||||||
|  | # Redis::LockManager provides a simple mechanism to handle distributed locks using Redis. | ||||||
|  | # This class ensures that only one instance of a given operation runs at a given time across all processes/nodes. | ||||||
|  | # It uses the $alfred Redis namespace for all its operations. | ||||||
|  | # | ||||||
|  | # Example Usage: | ||||||
|  | # | ||||||
|  | # lock_manager = Redis::LockManager.new | ||||||
|  | # | ||||||
|  | # if lock_manager.lock("some_key") | ||||||
|  | #   # Critical code that should not be run concurrently | ||||||
|  | #   lock_manager.unlock("some_key") | ||||||
|  | # end | ||||||
|  | # | ||||||
|  | class Redis::LockManager | ||||||
|  |   # Default lock timeout set to 5 seconds. This means that if the lock isn't released | ||||||
|  |   # within 5 seconds, it will automatically expire. | ||||||
|  |   # This helps to avoid deadlocks in case the process holding the lock crashes or fails to release it. | ||||||
|  |   LOCK_TIMEOUT = 5.seconds | ||||||
|  |  | ||||||
|  |   # Attempts to acquire a lock for the given key. | ||||||
|  |   # | ||||||
|  |   # If the lock is successfully acquired, the method returns true. If the key is | ||||||
|  |   # already locked or if any other error occurs, it returns false. | ||||||
|  |   # | ||||||
|  |   # === Parameters | ||||||
|  |   # * +key+ - The key for which the lock is to be acquired. | ||||||
|  |   # * +timeout+ - Duration in seconds for which the lock is valid. Defaults to +LOCK_TIMEOUT+. | ||||||
|  |   # | ||||||
|  |   # === Returns | ||||||
|  |   # * +true+ if the lock was successfully acquired. | ||||||
|  |   # * +false+ if the lock was not acquired. | ||||||
|  |   def lock(key, timeout = LOCK_TIMEOUT) | ||||||
|  |     value = Time.now.to_f.to_s | ||||||
|  |     # nx: true means set the key only if it does not exist | ||||||
|  |     Redis::Alfred.set(key, value, nx: true, ex: timeout) ? true : false | ||||||
|  |   end | ||||||
|  |  | ||||||
|  |   # Releases a lock for the given key. | ||||||
|  |   # | ||||||
|  |   # === Parameters | ||||||
|  |   # * +key+ - The key for which the lock is to be released. | ||||||
|  |   # | ||||||
|  |   # === Returns | ||||||
|  |   # * +true+ indicating the lock release operation was initiated. | ||||||
|  |   # | ||||||
|  |   # Note: If the key wasn't locked, this operation will have no effect. | ||||||
|  |   def unlock(key) | ||||||
|  |     Redis::Alfred.delete(key) | ||||||
|  |     true | ||||||
|  |   end | ||||||
|  |  | ||||||
|  |   # Checks if the given key is currently locked. | ||||||
|  |   # | ||||||
|  |   # === Parameters | ||||||
|  |   # * +key+ - The key to check. | ||||||
|  |   # | ||||||
|  |   # === Returns | ||||||
|  |   # * +true+ if the key is locked. | ||||||
|  |   # * +false+ otherwise. | ||||||
|  |   def locked?(key) | ||||||
|  |     Redis::Alfred.exists?(key) | ||||||
|  |   end | ||||||
|  | end | ||||||
| @@ -33,4 +33,8 @@ module Redis::RedisKeys | |||||||
|   MESSAGE_SOURCE_KEY = 'MESSAGE_SOURCE_KEY::%<id>s'.freeze |   MESSAGE_SOURCE_KEY = 'MESSAGE_SOURCE_KEY::%<id>s'.freeze | ||||||
|   CUSTOM_FILTER_RECORDS_COUNT_KEY = 'CUSTOM_FILTER::%<account_id>d::%<user_id>d::%<filter_id>d'.freeze |   CUSTOM_FILTER_RECORDS_COUNT_KEY = 'CUSTOM_FILTER::%<account_id>d::%<user_id>d::%<filter_id>d'.freeze | ||||||
|   OPENAI_CONVERSATION_KEY = 'OPEN_AI_CONVERSATION_KEY::%<event_name>s::%<conversation_id>d::%<updated_at>d'.freeze |   OPENAI_CONVERSATION_KEY = 'OPEN_AI_CONVERSATION_KEY::%<event_name>s::%<conversation_id>d::%<updated_at>d'.freeze | ||||||
|  |  | ||||||
|  |   ## Sempahores / Locks | ||||||
|  |   # We don't want to process messages from the same sender concurrently to prevent creating double conversations | ||||||
|  |   FACEBOOK_MESSAGE_MUTEX = 'FB_MESSAGE_CREATE_LOCK::%<sender_id>s::%<recipient_id>s'.freeze | ||||||
| end | end | ||||||
|   | |||||||
| @@ -3,7 +3,17 @@ require 'rails_helper' | |||||||
| RSpec.describe Webhooks::FacebookEventsJob do | RSpec.describe Webhooks::FacebookEventsJob do | ||||||
|   subject(:job) { described_class.perform_later(params) } |   subject(:job) { described_class.perform_later(params) } | ||||||
|  |  | ||||||
|   let!(:params) { { test: 'test' } } |   let(:params) { { test: 'test' } } | ||||||
|  |   let(:parsed_response) { instance_double(Integrations::Facebook::MessageParser) } | ||||||
|  |   let(:lock_key) { 'FB_MESSAGE_CREATE_LOCK::sender_id::recipient_id' } # Use a real format if needed | ||||||
|  |   let(:lock_manager) { instance_double(Redis::LockManager) } | ||||||
|  |  | ||||||
|  |   before do | ||||||
|  |     allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parsed_response) | ||||||
|  |     allow(parsed_response).to receive(:sender_id).and_return('sender_id') | ||||||
|  |     allow(parsed_response).to receive(:recipient_id).and_return('recipient_id') | ||||||
|  |     allow(Redis::LockManager).to receive(:new).and_return(lock_manager) | ||||||
|  |   end | ||||||
|  |  | ||||||
|   it 'enqueues the job' do |   it 'enqueues the job' do | ||||||
|     expect { job }.to have_enqueued_job(described_class) |     expect { job }.to have_enqueued_job(described_class) | ||||||
| @@ -11,17 +21,44 @@ RSpec.describe Webhooks::FacebookEventsJob do | |||||||
|       .on_queue('default') |       .on_queue('default') | ||||||
|   end |   end | ||||||
|  |  | ||||||
|   context 'when called with params' do |   describe 'job execution' do | ||||||
|     it 'calls MessagePArsed and do message create' do |     context 'when the lock is already acquired' do | ||||||
|       parser = double |       before do | ||||||
|       creator = double |         allow(lock_manager).to receive(:locked?).and_return(true) | ||||||
|       allow(Integrations::Facebook::MessageParser).to receive(:new).and_return(parser) |       end | ||||||
|       allow(Integrations::Facebook::MessageCreator).to receive(:new).and_return(creator) |  | ||||||
|       allow(creator).to receive(:perform).and_return(true) |       it 'raises a LockAcquisitionError' do | ||||||
|  |         perform_enqueued_jobs do | ||||||
|  |           expect { described_class.perform_now(params) }.to raise_error(Webhooks::FacebookEventsJob::LockAcquisitionError) | ||||||
|  |         end | ||||||
|  |       end | ||||||
|  |     end | ||||||
|  |  | ||||||
|  |     context 'when the lock is not acquired' do | ||||||
|  |       let(:message_creator) { instance_double(Integrations::Facebook::MessageCreator) } | ||||||
|  |  | ||||||
|  |       before do | ||||||
|  |         allow(lock_manager).to receive(:locked?).and_return(false) | ||||||
|  |         allow(lock_manager).to receive(:unlock) | ||||||
|  |         allow(lock_manager).to receive(:lock) | ||||||
|  |         allow(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response).and_return(message_creator) | ||||||
|  |         allow(message_creator).to receive(:perform) | ||||||
|  |       end | ||||||
|  |  | ||||||
|  |       it 'invokes the message parser and creator' do | ||||||
|         expect(Integrations::Facebook::MessageParser).to receive(:new).with(params) |         expect(Integrations::Facebook::MessageParser).to receive(:new).with(params) | ||||||
|       expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parser) |         expect(Integrations::Facebook::MessageCreator).to receive(:new).with(parsed_response) | ||||||
|       expect(creator).to receive(:perform) |         expect(message_creator).to receive(:perform) | ||||||
|  |  | ||||||
|  |         described_class.perform_now(params) | ||||||
|  |       end | ||||||
|  |  | ||||||
|  |       it 'acquires and releases the lock' do | ||||||
|  |         expect(lock_manager).to receive(:lock).with(lock_key) | ||||||
|  |         expect(lock_manager).to receive(:unlock).with(lock_key) | ||||||
|  |  | ||||||
|         described_class.perform_now(params) |         described_class.perform_now(params) | ||||||
|       end |       end | ||||||
|     end |     end | ||||||
|  |   end | ||||||
| end | end | ||||||
|   | |||||||
							
								
								
									
										48
									
								
								spec/lib/redis/lock_manager_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								spec/lib/redis/lock_manager_spec.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | |||||||
|  | require 'rails_helper' | ||||||
|  |  | ||||||
|  | RSpec.describe Redis::LockManager do | ||||||
|  |   let(:lock_manager) { described_class.new } | ||||||
|  |   let(:lock_key) { 'test_lock' } | ||||||
|  |  | ||||||
|  |   after do | ||||||
|  |     # Cleanup: Ensure that the lock key is deleted after each test to avoid interference | ||||||
|  |     Redis::Alfred.delete(lock_key) | ||||||
|  |   end | ||||||
|  |  | ||||||
|  |   describe '#lock' do | ||||||
|  |     it 'acquires a lock and returns true' do | ||||||
|  |       expect(lock_manager.lock(lock_key)).to be true | ||||||
|  |       expect(lock_manager.locked?(lock_key)).to be true | ||||||
|  |     end | ||||||
|  |  | ||||||
|  |     it 'returns false if the lock is already acquired' do | ||||||
|  |       lock_manager.lock(lock_key) | ||||||
|  |       expect(lock_manager.lock(lock_key)).to be false | ||||||
|  |     end | ||||||
|  |  | ||||||
|  |     it 'can acquire a lock again after the timeout' do | ||||||
|  |       lock_manager.lock(lock_key, 1) # 1-second timeout | ||||||
|  |       sleep 2 | ||||||
|  |       expect(lock_manager.lock(lock_key)).to be true | ||||||
|  |     end | ||||||
|  |   end | ||||||
|  |  | ||||||
|  |   describe '#unlock' do | ||||||
|  |     it 'releases a lock' do | ||||||
|  |       lock_manager.lock(lock_key) | ||||||
|  |       lock_manager.unlock(lock_key) | ||||||
|  |       expect(lock_manager.locked?(lock_key)).to be false | ||||||
|  |     end | ||||||
|  |   end | ||||||
|  |  | ||||||
|  |   describe '#locked?' do | ||||||
|  |     it 'returns true if a key is locked' do | ||||||
|  |       lock_manager.lock(lock_key) | ||||||
|  |       expect(lock_manager.locked?(lock_key)).to be true | ||||||
|  |     end | ||||||
|  |  | ||||||
|  |     it 'returns false if a key is not locked' do | ||||||
|  |       expect(lock_manager.locked?(lock_key)).to be false | ||||||
|  |     end | ||||||
|  |   end | ||||||
|  | end | ||||||
| @@ -73,6 +73,7 @@ RSpec.configure do |config| | |||||||
|   config.include Devise::Test::IntegrationHelpers, type: :request |   config.include Devise::Test::IntegrationHelpers, type: :request | ||||||
|   config.include ActiveSupport::Testing::TimeHelpers |   config.include ActiveSupport::Testing::TimeHelpers | ||||||
|   config.include ActionCable::TestHelper |   config.include ActionCable::TestHelper | ||||||
|  |   config.include ActiveJob::TestHelper | ||||||
| end | end | ||||||
|  |  | ||||||
| Shoulda::Matchers.configure do |config| | Shoulda::Matchers.configure do |config| | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Shivam Mishra
					Shivam Mishra