From 0c1119727b227000793e16c58d90802bad8bceec Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Wed, 24 Apr 2019 10:30:32 -0400 Subject: [PATCH] Replaced termination callback with throwing exception --- CMakeLists.txt | 23 +++++------- include/cppkafka/error.h | 6 ++- include/cppkafka/utils/backoff_committer.h | 43 +++++++++++++++------- include/cppkafka/utils/backoff_performer.h | 20 +++++++++- src/topic_partition_list.cpp | 1 - src/utils/backoff_committer.cpp | 14 +++++-- 6 files changed, 73 insertions(+), 34 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1256923..cee5f7a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,20 +7,17 @@ set(CPPKAFKA_VERSION_MINOR 2) set(CPPKAFKA_VERSION "${CPPKAFKA_VERSION_MAJOR}.${CPPKAFKA_VERSION_MINOR}") set(RDKAFKA_MIN_VERSION 0x00090400) -if (NOT CMAKE_CXX_FLAGS) - # Set default compile flags for the project - if(MSVC) - # Don't always use Wall, since VC's /Wall is ridiculously verbose. - set(CMAKE_CXX_FLAGS "/W3") +if(MSVC) + # Don't always use Wall, since VC's /Wall is ridiculously verbose. + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3") - # Disable VC secure checks, since these are not really issues - add_definitions("-D_CRT_SECURE_NO_WARNINGS=1") - add_definitions("-D_SCL_SECURE_NO_WARNINGS=1") - add_definitions("-DNOGDI=1") - add_definitions("-DNOMINMAX=1") - else() - set(CMAKE_CXX_FLAGS "-std=c++11 -Wall") - endif() + # Disable VC secure checks, since these are not really issues + add_definitions("-D_CRT_SECURE_NO_WARNINGS=1") + add_definitions("-D_SCL_SECURE_NO_WARNINGS=1") + add_definitions("-DNOGDI=1") + add_definitions("-DNOMINMAX=1") +else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall") endif() set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/") diff --git a/include/cppkafka/error.h b/include/cppkafka/error.h index 91ceb19..1c72611 100644 --- a/include/cppkafka/error.h +++ b/include/cppkafka/error.h @@ -42,6 +42,10 @@ namespace cppkafka { */ class CPPKAFKA_API Error { public: + /** + * @brief Constructs an error object with RD_KAFKA_RESP_ERR_NO_ERROR + */ + Error() = default; /** * Constructs an error object */ @@ -77,7 +81,7 @@ public: */ CPPKAFKA_API friend std::ostream& operator<<(std::ostream& output, const Error& rhs); private: - rd_kafka_resp_err_t error_; + rd_kafka_resp_err_t error_{RD_KAFKA_RESP_ERR_NO_ERROR}; }; } // cppkafka diff --git a/include/cppkafka/utils/backoff_committer.h b/include/cppkafka/utils/backoff_committer.h index 5224980..b4be907 100644 --- a/include/cppkafka/utils/backoff_committer.h +++ b/include/cppkafka/utils/backoff_committer.h @@ -100,10 +100,18 @@ public: */ void set_error_callback(ErrorCallback callback); + /** + * \brief Commits the current partition assignment synchronously + * + * This will call Consumer::commit() until either the message is successfully + * committed or the error callback returns false (if any is set). + */ + void commit(); + /** * \brief Commits the given message synchronously * - * This will call Consumer::commit until either the message is successfully + * This will call Consumer::commit(msg) until either the message is successfully * committed or the error callback returns false (if any is set). * * \param msg The message to be committed @@ -113,7 +121,7 @@ public: /** * \brief Commits the offsets on the given topic/partitions synchronously * - * This will call Consumer::commit until either the offsets are successfully + * This will call Consumer::commit(topic_partitions) until either the offsets are successfully * committed or the error callback returns false (if any is set). * * \param topic_partitions The topic/partition list to be committed @@ -127,25 +135,34 @@ public: */ Consumer& get_consumer(); private: - // Return true to abort and false to continue committing + // If the ReturnType contains 'true', we abort committing. Otherwise we continue. + // The second member of the ReturnType contains the RdKafka error if any. template - bool do_commit(const T& object) { + ReturnType do_commit(const T* object) { + ReturnType rt; try { - consumer_.commit(object); - // If the commit succeeds, we're done - return true; + if (!object) { + consumer_.commit(); + } + else { + consumer_.commit(*object); + } + // If the commit succeeds, we're done. } catch (const HandleException& ex) { + rt.error_ = ex.get_error(); // If there were actually no offsets to commit, return. Retrying won't solve // anything here - if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) { - return true; + if (rt.error_ != RD_KAFKA_RESP_ERR__NO_OFFSET) { + // If there's no callback or if returns true for this message, keep committing. + // Otherwise abort. + CallbackInvoker callback("backoff committer", callback_, &consumer_); + if (!callback || callback(rt.error_)) { + rt.abort_ = false; //continue retrying + } } - // If there's a callback and it returns false for this message, abort. - // Otherwise keep committing. - CallbackInvoker callback("backoff committer", callback_, &consumer_); - return callback && !callback(ex.get_error()); } + return rt; } Consumer& consumer_; diff --git a/include/cppkafka/utils/backoff_performer.h b/include/cppkafka/utils/backoff_performer.h index 7641c9c..6ee5897 100644 --- a/include/cppkafka/utils/backoff_performer.h +++ b/include/cppkafka/utils/backoff_performer.h @@ -34,6 +34,7 @@ #include #include #include "../consumer.h" +#include "../exceptions.h" namespace cppkafka { @@ -47,6 +48,14 @@ public: static const TimeUnit DEFAULT_BACKOFF_STEP; static const TimeUnit DEFAULT_MAXIMUM_BACKOFF; static const size_t DEFAULT_MAXIMUM_RETRIES; + + /** + * @brief Type which any functor must return. + */ + struct ReturnType { + bool abort_{true}; + Error error_; + }; /** * The backoff policy to use @@ -119,11 +128,16 @@ public: void perform(const Functor& callback) { TimeUnit backoff = initial_backoff_; size_t retries = maximum_retries_; + ReturnType rt; while (retries--) { auto start = std::chrono::steady_clock::now(); // If the callback returns true, we're done - if (callback()) { - return; + rt = callback(); + if (rt.abort_) { + if (rt.error_) { + break; //terminal error + } + return; //success } auto end = std::chrono::steady_clock::now(); auto time_elapsed = end - start; @@ -134,6 +148,8 @@ public: // Increase out backoff depending on the policy being used backoff = increase_backoff(backoff); } + // No more retries left or we have a terminal error. + throw ConsumerException(rt.error_); } private: TimeUnit increase_backoff(TimeUnit backoff); diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 67e0b8e..72f5cd6 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -38,7 +38,6 @@ using std::vector; using std::set; using std::ostream; using std::string; -using std::equal; namespace cppkafka { diff --git a/src/utils/backoff_committer.cpp b/src/utils/backoff_committer.cpp index 58fedc6..04f77a7 100644 --- a/src/utils/backoff_committer.cpp +++ b/src/utils/backoff_committer.cpp @@ -43,15 +43,21 @@ void BackoffCommitter::set_error_callback(ErrorCallback callback) { callback_ = move(callback); } +void BackoffCommitter::commit() { + perform([&]()->ReturnType { + return do_commit(nullptr); + }); +} + void BackoffCommitter::commit(const Message& msg) { - perform([&] { - return do_commit(msg); + perform([&]()->ReturnType { + return do_commit(&msg); }); } void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) { - perform([&] { - return do_commit(topic_partitions); + perform([&]()->ReturnType { + return do_commit(&topic_partitions); }); }