Added retry limit for the backoff performer class (#70)

This commit is contained in:
Alex Damian
2018-05-23 12:16:12 -04:00
committed by Matias Fontanini
parent 2451c74c4f
commit d9feb5c3db
2 changed files with 21 additions and 3 deletions

View File

@@ -46,6 +46,7 @@ public:
static const TimeUnit DEFAULT_INITIAL_BACKOFF; static const TimeUnit DEFAULT_INITIAL_BACKOFF;
static const TimeUnit DEFAULT_BACKOFF_STEP; static const TimeUnit DEFAULT_BACKOFF_STEP;
static const TimeUnit DEFAULT_MAXIMUM_BACKOFF; static const TimeUnit DEFAULT_MAXIMUM_BACKOFF;
static const size_t DEFAULT_MAXIMUM_RETRIES;
/** /**
* The backoff policy to use * The backoff policy to use
@@ -98,6 +99,15 @@ public:
*/ */
void set_maximum_backoff(TimeUnit value); void set_maximum_backoff(TimeUnit value);
/**
* \brief Sets the maximum number of retries for the commit operation
*
* \param value The number of retries before giving up
*
* \remark Setting value to 0 is equivalent to 1, i.e. it will try at least once
*/
void set_maximum_retries(size_t value);
/** /**
* \brief Executes an action and backs off if it fails * \brief Executes an action and backs off if it fails
* *
@@ -108,13 +118,13 @@ public:
template <typename Functor> template <typename Functor>
void perform(const Functor& callback) { void perform(const Functor& callback) {
TimeUnit backoff = initial_backoff_; TimeUnit backoff = initial_backoff_;
while (true) { size_t retries = maximum_retries_;
while (retries--) {
auto start = std::chrono::steady_clock::now(); auto start = std::chrono::steady_clock::now();
// If the callback returns true, we're done // If the callback returns true, we're done
if (callback()) { if (callback()) {
return; return;
} }
auto end = std::chrono::steady_clock::now(); auto end = std::chrono::steady_clock::now();
auto time_elapsed = end - start; auto time_elapsed = end - start;
// If we still have time left, then sleep // If we still have time left, then sleep
@@ -132,6 +142,7 @@ private:
TimeUnit backoff_step_; TimeUnit backoff_step_;
TimeUnit maximum_backoff_; TimeUnit maximum_backoff_;
BackoffPolicy policy_; BackoffPolicy policy_;
size_t maximum_retries_;
}; };
} // cppkafka } // cppkafka

View File

@@ -28,20 +28,23 @@
*/ */
#include <algorithm> #include <algorithm>
#include <limits>
#include "utils/backoff_performer.h" #include "utils/backoff_performer.h"
using std::min; using std::min;
using std::numeric_limits;
namespace cppkafka { namespace cppkafka {
const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_INITIAL_BACKOFF{100}; const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_INITIAL_BACKOFF{100};
const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_BACKOFF_STEP{50}; const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_BACKOFF_STEP{50};
const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_MAXIMUM_BACKOFF{1000}; const BackoffPerformer::TimeUnit BackoffPerformer::DEFAULT_MAXIMUM_BACKOFF{1000};
const size_t BackoffPerformer::DEFAULT_MAXIMUM_RETRIES{numeric_limits<size_t>::max()};
BackoffPerformer::BackoffPerformer() BackoffPerformer::BackoffPerformer()
: initial_backoff_(DEFAULT_INITIAL_BACKOFF), : initial_backoff_(DEFAULT_INITIAL_BACKOFF),
backoff_step_(DEFAULT_BACKOFF_STEP), maximum_backoff_(DEFAULT_MAXIMUM_BACKOFF), backoff_step_(DEFAULT_BACKOFF_STEP), maximum_backoff_(DEFAULT_MAXIMUM_BACKOFF),
policy_(BackoffPolicy::LINEAR) { policy_(BackoffPolicy::LINEAR), maximum_retries_(DEFAULT_MAXIMUM_RETRIES) {
} }
@@ -61,6 +64,10 @@ void BackoffPerformer::set_maximum_backoff(TimeUnit value) {
maximum_backoff_ = value; maximum_backoff_ = value;
} }
void BackoffPerformer::set_maximum_retries(size_t value) {
maximum_retries_ = value == 0 ? 1 : value;
}
BackoffPerformer::TimeUnit BackoffPerformer::increase_backoff(TimeUnit backoff) { BackoffPerformer::TimeUnit BackoffPerformer::increase_backoff(TimeUnit backoff) {
if (policy_ == BackoffPolicy::LINEAR) { if (policy_ == BackoffPolicy::LINEAR) {
backoff = backoff + backoff_step_; backoff = backoff + backoff_step_;