From b7a0dce710139ab590362630c9a4b7ebfa732166 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Tue, 4 Jul 2017 18:23:42 -0700 Subject: [PATCH] Add a generic "event" event on ConsumerDispatcher --- include/cppkafka/utils/consumer_dispatcher.h | 21 +++++++++++++++----- tests/test_utils.cpp | 4 ++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index b3a47a2..a1071e8 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -87,6 +87,11 @@ public: */ struct Throttle {}; + /** + * Tag to indicate there was some event processed (message, timeout, error, etc) + */ + struct Event {}; + /** * Constructs a consumer dispatcher over the given consumer * @@ -117,10 +122,12 @@ private: using OnErrorArgs = std::tuple; using OnEofArgs = std::tuple; using OnTimeoutArgs = std::tuple; + using OnEventArgs = std::tuple; static void handle_error(Error error); static void handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { } static void handle_timeout(Timeout) { } + static void handle_event(Event) { } template void handle_throttle(Throttle, const Functor& callback, Message msg) { @@ -250,7 +257,9 @@ private: !std::is_same::type>::value || !std::is_same::type>::value, + typename find_type::type>::value || + !std::is_same::type>::value, "Callback doesn't match any of the expected signatures" ); } @@ -335,24 +344,26 @@ void BasicConsumerDispatcher::run(const Args&... args) { const auto on_error = find_matching_functor(args..., &self::handle_error); const auto on_eof = find_matching_functor(args..., &self::handle_eof); const auto on_timeout = find_matching_functor(args..., &self::handle_timeout); + const auto on_event = find_matching_functor(args..., &self::handle_event); running_ = true; while (running_) { Message msg = consumer_.poll(); if (!msg) { on_timeout(Timeout{}); - continue; } - if (msg.get_error()) { + else if (msg.get_error()) { if (msg.is_eof()) { on_eof(EndOfFile{}, { msg.get_topic(), msg.get_partition(), msg.get_offset() }); } else { on_error(msg.get_error()); } - continue; } - process_message(on_message, std::move(msg), args...); + else { + process_message(on_message, std::move(msg), args...); + } + on_event(Event{}); } } diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp index d141a9e..8eff92a 100644 --- a/tests/test_utils.cpp +++ b/tests/test_utils.cpp @@ -49,8 +49,8 @@ ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t parti } } }, - // Timeout callback - [&](ConsumerDispatcher::Timeout) { + // Every time there's any event callback + [&](ConsumerDispatcher::Event) { if (expected > 0 && messages_.size() == expected) { dispatcher.stop(); }