From d9feb5c3db816944cfb6e32c49a20824eabca402 Mon Sep 17 00:00:00 2001 From: Alex Damian Date: Wed, 23 May 2018 12:16:12 -0400 Subject: [PATCH] Added retry limit for the backoff performer class (#70) --- include/cppkafka/utils/backoff_performer.h | 15 +++++++++++++-- src/utils/backoff_performer.cpp | 9 ++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/include/cppkafka/utils/backoff_performer.h b/include/cppkafka/utils/backoff_performer.h index 17b7c0c..7f8cc84 100644 --- a/include/cppkafka/utils/backoff_performer.h +++ b/include/cppkafka/utils/backoff_performer.h @@ -46,6 +46,7 @@ public: static const TimeUnit DEFAULT_INITIAL_BACKOFF; static const TimeUnit DEFAULT_BACKOFF_STEP; static const TimeUnit DEFAULT_MAXIMUM_BACKOFF; + static const size_t DEFAULT_MAXIMUM_RETRIES; /** * The backoff policy to use @@ -97,6 +98,15 @@ public: * \param value The value to be used */ void set_maximum_backoff(TimeUnit value); + + /** + * \brief Sets the maximum number of retries for the commit operation + * + * \param value The number of retries before giving up + * + * \remark Setting value to 0 is equivalent to 1, i.e. it will try at least once + */ + void set_maximum_retries(size_t value); /** * \brief Executes an action and backs off if it fails @@ -108,13 +118,13 @@ public: template void perform(const Functor& callback) { TimeUnit backoff = initial_backoff_; - while (true) { + size_t retries = maximum_retries_; + while (retries--) { auto start = std::chrono::steady_clock::now(); // If the callback returns true, we're done if (callback()) { return; } - auto end = std::chrono::steady_clock::now(); auto time_elapsed = end - start; // If we still have time left, then sleep @@ -132,6 +142,7 @@ private: TimeUnit backoff_step_; TimeUnit maximum_backoff_; BackoffPolicy policy_; + size_t maximum_retries_; }; } // cppkafka diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp index 05822c3..cafa625 100644 --- a/src/utils/backoff_performer.cpp +++ b/src/utils/backoff_performer.cpp @@ -28,20 +28,23 @@ */ #include +#include #include "utils/backoff_performer.h" using std::min; +using std::numeric_limits; namespace cppkafka { const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_INITIAL_BACKOFF{100}; const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_BACKOFF_STEP{50}; const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_MAXIMUM_BACKOFF{1000}; +const size_t BackoffPerformer::DEFAULT_MAXIMUM_RETRIES{numeric_limits::max()}; BackoffPerformer::BackoffPerformer() : initial_backoff_(DEFAULT_INITIAL_BACKOFF), backoff_step_(DEFAULT_BACKOFF_STEP), maximum_backoff_(DEFAULT_MAXIMUM_BACKOFF), - policy_(BackoffPolicy::LINEAR) { + policy_(BackoffPolicy::LINEAR), maximum_retries_(DEFAULT_MAXIMUM_RETRIES) { } @@ -61,6 +64,10 @@ void BackoffPerformer::set_maximum_backoff(TimeUnit value) { maximum_backoff_ = value; } +void BackoffPerformer::set_maximum_retries(size_t value) { + maximum_retries_ = value == 0 ? 1 : value; +} + BackoffPerformer::TimeUnit BackoffPerformer::increase_backoff(TimeUnit backoff) { if (policy_ == BackoffPolicy::LINEAR) { backoff = backoff + backoff_step_;