Make ConsumerDispatcher a template class

This commit is contained in:
Matias Fontanini
2017-07-04 18:07:22 -07:00
parent 191956b4ca
commit 9e6315fcc2
3 changed files with 30 additions and 58 deletions

View File

@@ -50,7 +50,7 @@ namespace cppkafka {
* so the usual loop is simplified away and you can process messages without
* having to check for all those cases.
*
* When calling ConsumerDispatcher::run, a list of callbacks has to be provided.
* When calling BasicConsumerDispatcher::run, a list of callbacks has to be provided.
* These will handle each case (message, timeout, error, eof), allowing you to
* only provide what you need. The only callback that is required is the message one.
* For the rest, the following actions will be performed as defaults:
@@ -65,11 +65,12 @@ namespace cppkafka {
* - void(Message)
* - Message(Message). In this case if the message is returned, it will be buffered
* while calling the throttle callback until the message is actually processed.
* * Timeout: void(ConsumerDispatcher::Timeout)
* * Timeout: void(BasicConsumerDispatcher::Timeout)
* * Error: void(Error)
* * EOF: void(ConsumerDispatcher::EndOfFile, TopicPartition)
* * EOF: void(BasicConsumerDispatcher::EndOfFile, TopicPartition)
*/
class ConsumerDispatcher {
template <typename ConsumerType>
class BasicConsumerDispatcher {
public:
/**
* Tag to indicate a timeout occurred
@@ -91,12 +92,12 @@ public:
*
* \param consumer The consumer to be used
*/
ConsumerDispatcher(Consumer& consumer);
BasicConsumerDispatcher(ConsumerType& consumer);
/**
* \brief Consumes messages dispatching events to the appropriate callack
*
* This will loop until ConsumerDispatcher::stop is called
* This will loop until BasicConsumerDispatcher::stop is called
*
* \param args The list of callbacks to be executed
*/
@@ -277,7 +278,7 @@ private:
auto process_message(const Functor& callback, Message msg, const Functors&... functors)
-> typename std::enable_if<std::is_same<Message, decltype(callback(std::move(msg)))>::value,
void>::type {
const auto throttle_ptr = &ConsumerDispatcher::handle_throttle<Functor>;
const auto throttle_ptr = &BasicConsumerDispatcher::handle_throttle<Functor>;
const auto default_throttler = std::bind(throttle_ptr, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
@@ -296,13 +297,32 @@ private:
}
}
Consumer& consumer_;
ConsumerType& consumer_;
bool running_;
};
using ConsumerDispatcher = BasicConsumerDispatcher<Consumer>;
template <typename ConsumerType>
BasicConsumerDispatcher<ConsumerType>::BasicConsumerDispatcher(ConsumerType& consumer)
: consumer_(consumer) {
}
template <typename ConsumerType>
void BasicConsumerDispatcher<ConsumerType>::stop() {
running_ = false;
}
template <typename ConsumerType>
void BasicConsumerDispatcher<ConsumerType>::handle_error(Error error) {
throw ConsumerException(error);
}
template <typename ConsumerType>
template <typename... Args>
void ConsumerDispatcher::run(const Args&... args) {
using self = ConsumerDispatcher;
void BasicConsumerDispatcher<ConsumerType>::run(const Args&... args) {
using self = BasicConsumerDispatcher<ConsumerType>;
// Make sure all callbacks match one of the signatures. Otherwise users could provide
// bogus callbacks that would never be executed

View File

@@ -18,7 +18,6 @@ set(SOURCES
utils/backoff_performer.cpp
utils/backoff_committer.cpp
utils/consumer_dispatcher.cpp
)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka)

View File

@@ -1,47 +0,0 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "utils/consumer_dispatcher.h"
#include "exceptions.h"
namespace cppkafka {
ConsumerDispatcher::ConsumerDispatcher(Consumer& consumer)
: consumer_(consumer) {
}
void ConsumerDispatcher::stop() {
running_ = false;
}
void ConsumerDispatcher::handle_error(Error error) {
throw ConsumerException(error);
}
} // cppkafka