diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index e159996..323c3e5 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -110,6 +110,18 @@ private: Error error_; }; +/** + * Consumer exception + */ +class CPPKAFKA_API ConsumerException : public Exception { +public: + ConsumerException(Error error); + + Error get_error() const; +private: + Error error_; +}; + } // cppkafka #endif // CPPKAFKA_EXCEPTIONS_H diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h new file mode 100644 index 0000000..85773d5 --- /dev/null +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_CONSUMER_DISPATCHER_H +#define CPPKAFKA_CONSUMER_DISPATCHER_H + +#include +#include "../consumer.h" + +namespace cppkafka { + +/** + * \brief Helper to perform pattern matching when consuming messages + * + * As the way to consume messages requires you to: + * + * * Poll for a message + * * Check if it's not null + * * Check if it's an error (optionally handling EOF as a non error) + * * Process the message + * + * This class introduces a pattern matching based approach to consuming messages + * so the usual loop is simplified away and you can process messages without + * having to check for all those cases. + * + * When calling ConsumerDispatcher::run, a list of callbacks has to be provided. + * These will handle each case (message, timeout, error, eof), allowing you to + * only provide what you need. The only callback that is required is the message one. + * For the rest, the following actions will be performed as defaults: + * + * * Timeout: ignore + * * EOF: ignore + * * Error (not an EOF error): throw a ConsumerException exception + * + * The signature for each callback should be as following (or compatible) + * + * * Message callback: void(Message) + * * Timeout: void() + * * Error: void(Error) + * * EOF: void(TopicPartition) + */ +class ConsumerDispatcher { +public: + /** + * Constructs a consumer dispatcher over the given consumer + * + * \param consumer The consumer to be used + */ + ConsumerDispatcher(Consumer& consumer); + + /** + * \brief Consumes messages dispatching events to the appropriate callack + * + * This will loop until ConsumerDispatcher::stop is called + * + * \param args The list of callbacks to be executed + */ + template + void run(const Args&... args); + + /** + * \brief Stops consumption + * + * Note that as this is synchronous, if there's any poll operations currently in + * progress, then this will stop after the current call returns + */ + void stop(); +private: + static void handle_error(Error error); + static void handle_eof(const TopicPartition& topic_partition); + static void ignore(); + + // Traits and template helpers + + // Finds whether type T accepts arguments of types Args... + template + struct takes_arguments { + using yes = double; + using no = bool; + + template + static yes test(decltype(std::declval()(std::declval()...))*); + template + static no test(...); + + static constexpr bool value = sizeof(test(nullptr)) == sizeof(yes); + }; + + // Specialization for tuple + template + struct takes_arguments> : takes_arguments { + + }; + + template + struct identity { + using type = T; + }; + + // Placeholder to indicate a type wasn't found + struct type_not_found { + + }; + + // find_type: given a tuple of types and a list of functors, finds the functor + // type that accepts the given tuple types as parameters + template + struct find_type_helper { + using type = typename std::conditional::value, + identity, + find_type_helper + >::type::type; + }; + + template + struct find_type_helper { + using type = type_not_found; + }; + + template + struct find_type { + using type = typename find_type_helper::type; + }; + + // find_functor: given a Functor and a template parameter pack of functors, finds + // the one that matches the given type + template + struct find_functor_helper { + template + static const Functor& find(const Functor& arg, Functors&&...) { + return arg; + } + + template + static typename std::enable_if::value, const Functor&>::type + find(const Head&, Functors&&... functors) { + return find(std::forward(functors)...); + } + }; + + template + const Functor& find_functor(Args&&... args) { + return find_functor_helper::find(std::forward(args)...); + } + + // Finds the first functor that accepts the parameters in a tuple and returns it. If no + // such functor is found, a static asertion will occur + template + const typename find_type::type& + find_callable_functor(const Functors&... functors) { + using type = typename find_type::type; + static_assert(!std::is_same::value, "Valid functor not found"); + return find_functor(functors...); + } + + Consumer& consumer_; + bool running_; +}; + +template +void ConsumerDispatcher::run(const Args&... args) { + using self = ConsumerDispatcher; + // This one is required + const auto& on_message = find_callable_functor>(args...); + + // For the rest, append our own implementation at the end as a fallback + const auto& on_error = find_callable_functor>(args..., + self::handle_error); + const auto& on_eof = find_callable_functor>(args..., + self::handle_eof); + const auto& on_timeout = find_callable_functor>(args..., self::ignore); + running_ = true; + while (running_) { + Message msg = consumer_.poll(); + if (!msg) { + on_timeout(); + continue; + } + if (msg.get_error()) { + if (msg.is_eof()) { + on_eof({ msg.get_topic(), msg.get_partition(), msg.get_offset() }); + } + else { + on_error(msg.get_error()); + } + continue; + } + on_message(std::move(msg)); + } +} + +} // cppkafka + +#endif // CPPKAFKA_CONSUMER_DISPATCHER_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4876865..674688e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,6 +17,7 @@ set(SOURCES consumer.cpp utils/backoff_committer.cpp + utils/consumer_dispatcher.cpp ) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka) diff --git a/src/exceptions.cpp b/src/exceptions.cpp index ec0d778..fbf98e1 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -97,4 +97,15 @@ Error HandleException::get_error() const { return error_; } +// ConsumerException + +ConsumerException::ConsumerException(Error error) +: Exception(error.to_string()), error_(error) { + +} + +Error ConsumerException::get_error() const { + return error_; +} + } // cppkafka diff --git a/src/utils/consumer_dispatcher.cpp b/src/utils/consumer_dispatcher.cpp new file mode 100644 index 0000000..e602779 --- /dev/null +++ b/src/utils/consumer_dispatcher.cpp @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "utils/consumer_dispatcher.h" +#include "exceptions.h" + +namespace cppkafka { + +ConsumerDispatcher::ConsumerDispatcher(Consumer& consumer) +: consumer_(consumer) { +} + +void ConsumerDispatcher::stop() { + running_ = false; +} + +void ConsumerDispatcher::handle_error(Error error) { + throw ConsumerException(error); +} + +void ConsumerDispatcher::handle_eof(const TopicPartition& /*topic_partition*/) { + +} + +void ConsumerDispatcher::ignore() { + +} + +} // cppkafka diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp index e3cc080..fe83fd1 100644 --- a/tests/test_utils.cpp +++ b/tests/test_utils.cpp @@ -2,6 +2,7 @@ #include #include #include "test_utils.h" +#include "cppkafka/utils/consumer_dispatcher.h" using std::vector; using std::move; @@ -16,7 +17,9 @@ using std::chrono::milliseconds; using std::chrono::seconds; using cppkafka::Consumer; +using cppkafka::ConsumerDispatcher; using cppkafka::Message; +using cppkafka::TopicPartition; ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) : consumer_(consumer) { @@ -27,27 +30,38 @@ ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t parti consumer_.set_timeout(milliseconds(500)); size_t number_eofs = 0; auto start = system_clock::now(); - while (system_clock::now() - start < seconds(20)) { - if (expected > 0 && messages_.size() == expected) { - break; - } - if (expected == 0 && number_eofs >= partitions) { - break; - } - Message msg = consumer_.poll(); - if (msg && number_eofs != partitions && - msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - number_eofs++; + ConsumerDispatcher dispatcher(consumer_); + dispatcher.run( + // Message callback + [&](Message msg) { if (number_eofs == partitions) { - lock_guard _(mtx); - booted = true; - cond.notify_one(); + messages_.push_back(move(msg)); + } + }, + // EOF callback + [&](const TopicPartition& topic_partition) { + if (number_eofs != partitions) { + number_eofs++; + if (number_eofs == partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + } + }, + // Timeout callback + [&]() { + if (expected > 0 && messages_.size() == expected) { + dispatcher.stop(); + } + if (expected == 0 && number_eofs >= partitions) { + dispatcher.stop(); + } + if (system_clock::now() - start >= seconds(20)) { + dispatcher.stop(); } } - else if (msg && !msg.get_error() && number_eofs == partitions) { - messages_.push_back(move(msg)); - } - } + ); if (number_eofs < partitions) { lock_guard _(mtx); booted = true;