From effdf7fb95c612c3844f998de40def6484eb053c Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Wed, 24 Apr 2019 16:42:56 -0400 Subject: [PATCH] Removed ReturnType. Throw on error from inside do_commit() as well as from perform() --- include/cppkafka/utils/backoff_committer.h | 32 ++++++++++------------ include/cppkafka/utils/backoff_performer.h | 17 ++---------- src/utils/backoff_committer.cpp | 12 ++++---- 3 files changed, 22 insertions(+), 39 deletions(-) diff --git a/include/cppkafka/utils/backoff_committer.h b/include/cppkafka/utils/backoff_committer.h index b4be907..27c5ef3 100644 --- a/include/cppkafka/utils/backoff_committer.h +++ b/include/cppkafka/utils/backoff_committer.h @@ -137,32 +137,28 @@ public: private: // If the ReturnType contains 'true', we abort committing. Otherwise we continue. // The second member of the ReturnType contains the RdKafka error if any. - template - ReturnType do_commit(const T* object) { - ReturnType rt; + template + bool do_commit(Args&&...args) { try { - if (!object) { - consumer_.commit(); - } - else { - consumer_.commit(*object); - } + consumer_.commit(std::forward(args)...); // If the commit succeeds, we're done. + return true; } catch (const HandleException& ex) { - rt.error_ = ex.get_error(); + Error error = ex.get_error(); // If there were actually no offsets to commit, return. Retrying won't solve // anything here - if (rt.error_ != RD_KAFKA_RESP_ERR__NO_OFFSET) { - // If there's no callback or if returns true for this message, keep committing. - // Otherwise abort. - CallbackInvoker callback("backoff committer", callback_, &consumer_); - if (!callback || callback(rt.error_)) { - rt.abort_ = false; //continue retrying - } + if (error == RD_KAFKA_RESP_ERR__NO_OFFSET) { + throw ex; //abort + } + // If there's a callback and it returns false for this message, abort. + // Otherwise keep committing. + CallbackInvoker callback("backoff committer", callback_, &consumer_); + if (callback && !callback(error)) { + throw ex; //abort } } - return rt; + return false; //continue } Consumer& consumer_; diff --git a/include/cppkafka/utils/backoff_performer.h b/include/cppkafka/utils/backoff_performer.h index 6ee5897..7ffdc89 100644 --- a/include/cppkafka/utils/backoff_performer.h +++ b/include/cppkafka/utils/backoff_performer.h @@ -48,14 +48,6 @@ public: static const TimeUnit DEFAULT_BACKOFF_STEP; static const TimeUnit DEFAULT_MAXIMUM_BACKOFF; static const size_t DEFAULT_MAXIMUM_RETRIES; - - /** - * @brief Type which any functor must return. - */ - struct ReturnType { - bool abort_{true}; - Error error_; - }; /** * The backoff policy to use @@ -128,15 +120,10 @@ public: void perform(const Functor& callback) { TimeUnit backoff = initial_backoff_; size_t retries = maximum_retries_; - ReturnType rt; while (retries--) { auto start = std::chrono::steady_clock::now(); // If the callback returns true, we're done - rt = callback(); - if (rt.abort_) { - if (rt.error_) { - break; //terminal error - } + if (callback()) { return; //success } auto end = std::chrono::steady_clock::now(); @@ -149,7 +136,7 @@ public: backoff = increase_backoff(backoff); } // No more retries left or we have a terminal error. - throw ConsumerException(rt.error_); + throw Exception("Commit failed: no more retries."); } private: TimeUnit increase_backoff(TimeUnit backoff); diff --git a/src/utils/backoff_committer.cpp b/src/utils/backoff_committer.cpp index 04f77a7..cba0cc2 100644 --- a/src/utils/backoff_committer.cpp +++ b/src/utils/backoff_committer.cpp @@ -44,20 +44,20 @@ void BackoffCommitter::set_error_callback(ErrorCallback callback) { } void BackoffCommitter::commit() { - perform([&]()->ReturnType { - return do_commit(nullptr); + perform([&] { + return do_commit(); }); } void BackoffCommitter::commit(const Message& msg) { - perform([&]()->ReturnType { - return do_commit(&msg); + perform([&] { + return do_commit(msg); }); } void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) { - perform([&]()->ReturnType { - return do_commit(&topic_partitions); + perform([&] { + return do_commit(topic_partitions); }); }