From 065f899954ac92ae4f814f2fd1d6d21894c3ab5b Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 3 Jul 2016 08:32:13 -0700 Subject: [PATCH] Add error handler to compacted topic processor --- include/cppkafka/compacted_topic_processor.h | 21 +++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/include/cppkafka/compacted_topic_processor.h b/include/cppkafka/compacted_topic_processor.h index ed17809..60a7cf7 100644 --- a/include/cppkafka/compacted_topic_processor.h +++ b/include/cppkafka/compacted_topic_processor.h @@ -133,6 +133,11 @@ public: */ using EventHandler = std::function; + /** + * Callback used for error handling + */ + using ErrorHandler = std::function; + /** * \brief Constructs an instance */ @@ -159,6 +164,11 @@ public: */ void set_event_handler(EventHandler callback); + /** + * \brief Sets the error handler callback + */ + void set_error_handler(ErrorHandler callback); + /** * \brief Processes the next event */ @@ -170,6 +180,7 @@ private: KeyDecoder key_decoder_; ValueDecoder value_decoder_; EventHandler event_handler_; + ErrorHandler error_handler_; std::map partition_offsets_; Consumer::AssignmentCallback original_assignment_callback_; }; @@ -255,11 +266,16 @@ void CompactedTopicProcessor::set_event_handler(EventHandler callback) { event_handler_ = std::move(callback); } +template +void CompactedTopicProcessor::set_error_handler(ErrorHandler callback) { + error_handler_ = std::move(callback); +} + template void CompactedTopicProcessor::process_event() { Message message = consumer_.poll(); if (message) { - if (!message.has_error()) { + if (!message.get_error()) { Key key = key_decoder_(message.get_key()); if (message.get_payload()) { // If there's a payload, generate a SET_ELEMENT event @@ -281,6 +297,9 @@ void CompactedTopicProcessor::process_event() { event_handler_({ Event::REACHED_EOF, message.get_topic(), message.get_partition() }); } + else if (error_handler_) { + error_handler_(std::move(message)); + } } } }