diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index c49e8e3..8994652 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -50,7 +50,7 @@ namespace cppkafka { * so the usual loop is simplified away and you can process messages without * having to check for all those cases. * - * When calling ConsumerDispatcher::run, a list of callbacks has to be provided. + * When calling BasicConsumerDispatcher::run, a list of callbacks has to be provided. * These will handle each case (message, timeout, error, eof), allowing you to * only provide what you need. The only callback that is required is the message one. * For the rest, the following actions will be performed as defaults: @@ -65,11 +65,12 @@ namespace cppkafka { * - void(Message) * - Message(Message). In this case if the message is returned, it will be buffered * while calling the throttle callback until the message is actually processed. - * * Timeout: void(ConsumerDispatcher::Timeout) + * * Timeout: void(BasicConsumerDispatcher::Timeout) * * Error: void(Error) - * * EOF: void(ConsumerDispatcher::EndOfFile, TopicPartition) + * * EOF: void(BasicConsumerDispatcher::EndOfFile, TopicPartition) */ -class ConsumerDispatcher { +template +class BasicConsumerDispatcher { public: /** * Tag to indicate a timeout occurred @@ -91,12 +92,12 @@ public: * * \param consumer The consumer to be used */ - ConsumerDispatcher(Consumer& consumer); + BasicConsumerDispatcher(ConsumerType& consumer); /** * \brief Consumes messages dispatching events to the appropriate callack * - * This will loop until ConsumerDispatcher::stop is called + * This will loop until BasicConsumerDispatcher::stop is called * * \param args The list of callbacks to be executed */ @@ -277,7 +278,7 @@ private: auto process_message(const Functor& callback, Message msg, const Functors&... functors) -> typename std::enable_if::value, void>::type { - const auto throttle_ptr = &ConsumerDispatcher::handle_throttle; + const auto throttle_ptr = &BasicConsumerDispatcher::handle_throttle; const auto default_throttler = std::bind(throttle_ptr, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); @@ -296,13 +297,32 @@ private: } } - Consumer& consumer_; + ConsumerType& consumer_; bool running_; }; +using ConsumerDispatcher = BasicConsumerDispatcher; + +template +BasicConsumerDispatcher::BasicConsumerDispatcher(ConsumerType& consumer) +: consumer_(consumer) { + +} + +template +void BasicConsumerDispatcher::stop() { + running_ = false; +} + +template +void BasicConsumerDispatcher::handle_error(Error error) { + throw ConsumerException(error); +} + +template template -void ConsumerDispatcher::run(const Args&... args) { - using self = ConsumerDispatcher; +void BasicConsumerDispatcher::run(const Args&... args) { + using self = BasicConsumerDispatcher; // Make sure all callbacks match one of the signatures. Otherwise users could provide // bogus callbacks that would never be executed diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5450230..d485e1f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -18,7 +18,6 @@ set(SOURCES utils/backoff_performer.cpp utils/backoff_committer.cpp - utils/consumer_dispatcher.cpp ) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka) diff --git a/src/utils/consumer_dispatcher.cpp b/src/utils/consumer_dispatcher.cpp deleted file mode 100644 index d084127..0000000 --- a/src/utils/consumer_dispatcher.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2017, Matias Fontanini - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "utils/consumer_dispatcher.h" -#include "exceptions.h" - -namespace cppkafka { - -ConsumerDispatcher::ConsumerDispatcher(Consumer& consumer) -: consumer_(consumer) { -} - -void ConsumerDispatcher::stop() { - running_ = false; -} - -void ConsumerDispatcher::handle_error(Error error) { - throw ConsumerException(error); -} - -} // cppkafka