From 4369b75695daeea7f75bd6122e40e2ba2bda9bf5 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 10 Jun 2017 17:39:23 -0700 Subject: [PATCH] Use tags on EOF and timeout callbacks on ConsumerDispatcher --- include/cppkafka/utils/consumer_dispatcher.h | 39 ++++++++++++++------ src/utils/consumer_dispatcher.cpp | 4 +- tests/test_utils.cpp | 4 +- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index 85773d5..05e5c49 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -61,12 +61,22 @@ namespace cppkafka { * The signature for each callback should be as following (or compatible) * * * Message callback: void(Message) - * * Timeout: void() + * * Timeout: void(ConsumerDispatcher::Timeout) * * Error: void(Error) - * * EOF: void(TopicPartition) + * * EOF: void(ConsumerDispatcher::EndOfFile, TopicPartition) */ class ConsumerDispatcher { public: + /** + * Tag to indicate a timeout occurred + */ + struct Timeout {}; + + /** + * Tag to indicate end of file was reached on a partition being consumed + */ + struct EndOfFile {}; + /** * Constructs a consumer dispatcher over the given consumer * @@ -93,8 +103,8 @@ public: void stop(); private: static void handle_error(Error error); - static void handle_eof(const TopicPartition& topic_partition); - static void ignore(); + static void handle_eof(EndOfFile, const TopicPartition& topic_partition); + static void handle_timeout(Timeout); // Traits and template helpers @@ -185,26 +195,31 @@ private: template void ConsumerDispatcher::run(const Args&... args) { + // Define the types we need for each type of callback + using OnMessageArgs = std::tuple; + using OnErrorArgs = std::tuple; + using OnEofArgs = std::tuple; + using OnTimeoutArgs = std::tuple; + using self = ConsumerDispatcher; + // This one is required - const auto& on_message = find_callable_functor>(args...); + const auto& on_message = find_callable_functor(args...); // For the rest, append our own implementation at the end as a fallback - const auto& on_error = find_callable_functor>(args..., - self::handle_error); - const auto& on_eof = find_callable_functor>(args..., - self::handle_eof); - const auto& on_timeout = find_callable_functor>(args..., self::ignore); + const auto& on_error = find_callable_functor(args..., self::handle_error); + const auto& on_eof = find_callable_functor(args..., self::handle_eof); + const auto& on_timeout = find_callable_functor(args..., self::handle_timeout); running_ = true; while (running_) { Message msg = consumer_.poll(); if (!msg) { - on_timeout(); + on_timeout(Timeout{}); continue; } if (msg.get_error()) { if (msg.is_eof()) { - on_eof({ msg.get_topic(), msg.get_partition(), msg.get_offset() }); + on_eof(EndOfFile{}, { msg.get_topic(), msg.get_partition(), msg.get_offset() }); } else { on_error(msg.get_error()); diff --git a/src/utils/consumer_dispatcher.cpp b/src/utils/consumer_dispatcher.cpp index e602779..2e2ef9e 100644 --- a/src/utils/consumer_dispatcher.cpp +++ b/src/utils/consumer_dispatcher.cpp @@ -44,11 +44,11 @@ void ConsumerDispatcher::handle_error(Error error) { throw ConsumerException(error); } -void ConsumerDispatcher::handle_eof(const TopicPartition& /*topic_partition*/) { +void ConsumerDispatcher::handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { } -void ConsumerDispatcher::ignore() { +void ConsumerDispatcher::handle_timeout(Timeout) { } diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp index fe83fd1..d141a9e 100644 --- a/tests/test_utils.cpp +++ b/tests/test_utils.cpp @@ -39,7 +39,7 @@ ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t parti } }, // EOF callback - [&](const TopicPartition& topic_partition) { + [&](ConsumerDispatcher::EndOfFile, const TopicPartition& topic_partition) { if (number_eofs != partitions) { number_eofs++; if (number_eofs == partitions) { @@ -50,7 +50,7 @@ ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t parti } }, // Timeout callback - [&]() { + [&](ConsumerDispatcher::Timeout) { if (expected > 0 && messages_.size() == expected) { dispatcher.stop(); }