diff --git a/include/cppkafka/utils/backoff_committer.h b/include/cppkafka/utils/backoff_committer.h index b4ab1a1..40d310b 100644 --- a/include/cppkafka/utils/backoff_committer.h +++ b/include/cppkafka/utils/backoff_committer.h @@ -34,6 +34,7 @@ #include #include #include "../consumer.h" +#include "backoff_performer.h" namespace cppkafka { @@ -68,13 +69,8 @@ namespace cppkafka { * committer.commit(some_message); * \endcode */ -class BackoffCommitter { +class BackoffCommitter : public BackoffPerformer { public: - using TimeUnit = std::chrono::milliseconds; - static constexpr TimeUnit DEFAULT_INITIAL_BACKOFF{100}; - static constexpr TimeUnit DEFAULT_BACKOFF_STEP{50}; - static constexpr TimeUnit DEFAULT_MAXIMUM_BACKOFF{1000}; - /** * \brief The error callback. * @@ -84,14 +80,6 @@ public: */ using ErrorCallback = std::function; - /** - * The backoff policy to use - */ - enum class BackoffPolicy { - LINEAR, - EXPONENTIAL - }; - /** * \brief Constructs an instance using default values * @@ -101,42 +89,6 @@ public: */ BackoffCommitter(Consumer& consumer); - /** - * \brief Sets the backoff policy - * - * \param policy The backoff policy to be used - */ - void set_backoff_policy(BackoffPolicy policy); - - /** - * \brief Sets the initial backoff - * - * The first time a commit fails, this will be the delay between the request is sent - * and we re-try doing so - * - * \param value The value to be used - */ - void set_initial_backoff(TimeUnit value); - - /** - * \brief Sets the backoff step - * - * When using the linear backoff policy, this will be the delay between sending a request - * that fails and re-trying it - * - * \param value The value to be used - */ - void set_backoff_step(TimeUnit value); - - /** - * \brief Sets the maximum backoff - * - * The backoff used will never be larger than this number - * - * \param value The value to be used - */ - void set_maximum_backoff(TimeUnit value); - /** * \brief Sets the error callback * @@ -145,6 +97,7 @@ public: */ void set_error_callback(ErrorCallback callback); + /** * \brief Commits the given message synchronously * @@ -165,42 +118,25 @@ public: */ void commit(const TopicPartitionList& topic_partitions); private: - TimeUnit increase_backoff(TimeUnit backoff); - template - void do_commit(const T& object) { - TimeUnit backoff = initial_backoff_; - while (true) { - auto start = std::chrono::steady_clock::now(); - try { - consumer_.commit(object); - // If the commit succeeds, we're done - return; - } - catch (const HandleException& ex) { - // If there's a callback and it returns false for this message, abort - if (callback_ && !callback_(ex.get_error())) { - return; - } - } - - auto end = std::chrono::steady_clock::now(); - auto time_elapsed = end - start; - // If we still have time left, then sleep - if (time_elapsed < backoff) { - std::this_thread::sleep_for(backoff - time_elapsed); - } - // Increase out backoff depending on the policy being used - backoff = increase_backoff(backoff); + bool do_commit(const T& object) { + try { + consumer_.commit(object); + // If the commit succeeds, we're done + return true; } + catch (const HandleException& ex) { + // If there's a callback and it returns false for this message, abort + if (callback_ && !callback_(ex.get_error())) { + return true; + } + } + // In any other case, we failed. Keep committing + return false; } Consumer& consumer_; - TimeUnit initial_backoff_; - TimeUnit backoff_step_; - TimeUnit maximum_backoff_; ErrorCallback callback_; - BackoffPolicy policy_; }; } // cppkafka diff --git a/include/cppkafka/utils/backoff_performer.h b/include/cppkafka/utils/backoff_performer.h new file mode 100644 index 0000000..6b57d49 --- /dev/null +++ b/include/cppkafka/utils/backoff_performer.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_BACKOFF_PERFORMER_H +#define CPPKAFKA_BACKOFF_PERFORMER_H + +#include +#include +#include +#include "../consumer.h" + +namespace cppkafka { + +/** + * + */ +class BackoffPerformer { +public: + using TimeUnit = std::chrono::milliseconds; + static constexpr TimeUnit DEFAULT_INITIAL_BACKOFF{100}; + static constexpr TimeUnit DEFAULT_BACKOFF_STEP{50}; + static constexpr TimeUnit DEFAULT_MAXIMUM_BACKOFF{1000}; + + /** + * The backoff policy to use + */ + enum class BackoffPolicy { + LINEAR, + EXPONENTIAL + }; + + /** + * Constructs an instance of backoff perform + * + * By default, the linear backoff policy is used + */ + BackoffPerformer(); + + /** + * \brief Sets the backoff policy + * + * \param policy The backoff policy to be used + */ + void set_backoff_policy(BackoffPolicy policy); + + /** + * \brief Sets the initial backoff + * + * The first time a commit fails, this will be the delay between the request is sent + * and we re-try doing so + * + * \param value The value to be used + */ + void set_initial_backoff(TimeUnit value); + + /** + * \brief Sets the backoff step + * + * When using the linear backoff policy, this will be the delay between sending a request + * that fails and re-trying it + * + * \param value The value to be used + */ + void set_backoff_step(TimeUnit value); + + /** + * \brief Sets the maximum backoff + * + * The backoff used will never be larger than this number + * + * \param value The value to be used + */ + void set_maximum_backoff(TimeUnit value); + + /** + * \brief Executes an action and backs off if it fails + * + * This will call the functor and will retry in case it returns false + * + * \param callback The action to be executed + */ + template + void perform(const Functor& callback) { + TimeUnit backoff = initial_backoff_; + while (true) { + auto start = std::chrono::steady_clock::now(); + // If the callback returns true, we're done + if (callback()) { + return; + } + + auto end = std::chrono::steady_clock::now(); + auto time_elapsed = end - start; + // If we still have time left, then sleep + if (time_elapsed < backoff) { + std::this_thread::sleep_for(backoff - time_elapsed); + } + // Increase out backoff depending on the policy being used + backoff = increase_backoff(backoff); + } + } +private: + TimeUnit increase_backoff(TimeUnit backoff); + + TimeUnit initial_backoff_; + TimeUnit backoff_step_; + TimeUnit maximum_backoff_; + BackoffPolicy policy_; +}; + +} // cppkafka + +#endif // CPPKAFKA_BACKOFF_PERFORMER_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 674688e..5450230 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,7 @@ set(SOURCES producer.cpp consumer.cpp + utils/backoff_performer.cpp utils/backoff_committer.cpp utils/consumer_dispatcher.cpp ) diff --git a/src/utils/backoff_committer.cpp b/src/utils/backoff_committer.cpp index bed9ef7..de02bc7 100644 --- a/src/utils/backoff_committer.cpp +++ b/src/utils/backoff_committer.cpp @@ -35,48 +35,24 @@ using std::min; namespace cppkafka { BackoffCommitter::BackoffCommitter(Consumer& consumer) -: consumer_(consumer), initial_backoff_(DEFAULT_INITIAL_BACKOFF), - backoff_step_(DEFAULT_BACKOFF_STEP), maximum_backoff_(DEFAULT_MAXIMUM_BACKOFF), - policy_(BackoffPolicy::LINEAR) { +: consumer_(consumer) { } -void BackoffCommitter::set_backoff_policy(BackoffPolicy policy) { - policy_ = policy; -} - -void BackoffCommitter::set_initial_backoff(TimeUnit value) { - initial_backoff_ = value; -} - -void BackoffCommitter::set_backoff_step(TimeUnit value) { - backoff_step_ = value; -} - -void BackoffCommitter::set_maximum_backoff(TimeUnit value) { - maximum_backoff_ = value; -} - void BackoffCommitter::set_error_callback(ErrorCallback callback) { callback_ = move(callback); } void BackoffCommitter::commit(const Message& msg) { - do_commit(msg); + perform([&] { + return do_commit(msg); + }); } void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) { - do_commit(topic_partitions); -} - -BackoffCommitter::TimeUnit BackoffCommitter::increase_backoff(TimeUnit backoff) { - if (policy_ == BackoffPolicy::LINEAR) { - backoff = backoff + backoff_step_; - } - else { - backoff = backoff * 2; - } - return min(backoff, maximum_backoff_); + perform([&] { + return do_commit(topic_partitions); + }); } } // cppkafka diff --git a/src/utils/backoff_performer.cpp b/src/utils/backoff_performer.cpp new file mode 100644 index 0000000..19b9df3 --- /dev/null +++ b/src/utils/backoff_performer.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include "utils/backoff_performer.h" + +using std::min; + +namespace cppkafka { + +BackoffPerformer::BackoffPerformer() +: initial_backoff_(DEFAULT_INITIAL_BACKOFF), + backoff_step_(DEFAULT_BACKOFF_STEP), maximum_backoff_(DEFAULT_MAXIMUM_BACKOFF), + policy_(BackoffPolicy::LINEAR) { + +} + +void BackoffPerformer::set_backoff_policy(BackoffPolicy policy) { + policy_ = policy; +} + +void BackoffPerformer::set_initial_backoff(TimeUnit value) { + initial_backoff_ = value; +} + +void BackoffPerformer::set_backoff_step(TimeUnit value) { + backoff_step_ = value; +} + +void BackoffPerformer::set_maximum_backoff(TimeUnit value) { + maximum_backoff_ = value; +} + +BackoffPerformer::TimeUnit BackoffPerformer::increase_backoff(TimeUnit backoff) { + if (policy_ == BackoffPolicy::LINEAR) { + backoff = backoff + backoff_step_; + } + else { + backoff = backoff * 2; + } + return min(backoff, maximum_backoff_); +} + +} // cppkafka