Allow throttling on ConsumerDispatcher

This commit is contained in:
Matias Fontanini
2017-06-17 08:52:48 -07:00
parent 556dac7015
commit 4af48ff0e7
2 changed files with 93 additions and 6 deletions

View File

@@ -32,6 +32,7 @@
#include <tuple>
#include "../consumer.h"
#include "backoff_performer.h"
namespace cppkafka {
@@ -60,7 +61,10 @@ namespace cppkafka {
*
* The signature for each callback should be as following (or compatible)
*
* * Message callback: void(Message)
* * Message callback, either:
* - void(Message)
* - Message(Message). In this case if the message is returned, it will be buffered
* while calling the throttle callback until the message is actually processed.
* * Timeout: void(ConsumerDispatcher::Timeout)
* * Error: void(Error)
* * EOF: void(ConsumerDispatcher::EndOfFile, TopicPartition)
@@ -77,6 +81,11 @@ public:
*/
struct EndOfFile {};
/*
* Tag to indicate end of file was reached on a partition being consumed
*/
struct Throttle {};
/**
* Constructs a consumer dispatcher over the given consumer
*
@@ -111,6 +120,16 @@ private:
static void handle_error(Error error);
static void handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { }
static void handle_timeout(Timeout) { }
template <typename Functor>
void handle_throttle(Throttle, const Functor& callback, Message msg) {
BackoffPerformer{}.perform([&]() {
if (!running_) {
return true;
}
msg = callback(std::move(msg));
return !msg;
});
}
// Traits and template helpers
@@ -223,6 +242,31 @@ private:
check_callbacks_match(functors...);
}
template <typename Functor, typename... Functors>
auto process_message(const Functor& callback, Message msg, const Functors&...)
-> typename std::enable_if<std::is_same<void, decltype(callback(std::move(msg)))>::value,
void>::type {
callback(std::move(msg));
}
template <typename Functor, typename... Functors>
auto process_message(const Functor& callback, Message msg, const Functors&... functors)
-> typename std::enable_if<std::is_same<Message, decltype(callback(std::move(msg)))>::value,
void>::type {
const auto throttle_ptr = &ConsumerDispatcher::handle_throttle<Functor>;
const auto default_throttler = std::bind(throttle_ptr, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
using OnThrottleArgs = std::tuple<Throttle, const Functor&, Message>;
const auto on_throttle = find_matching_functor<OnThrottleArgs>(functors...,
default_throttler);
msg = callback(std::move(msg));
if (msg) {
on_throttle(Throttle{}, callback, std::move(msg));
}
}
Consumer& consumer_;
bool running_;
};
@@ -236,12 +280,13 @@ void ConsumerDispatcher::run(const Args&... args) {
check_callbacks_match(args...);
// This one is required
const auto& on_message = find_matching_functor<OnMessageArgs>(args...);
const auto on_message = find_matching_functor<OnMessageArgs>(args...);
// For the rest, append our own implementation at the end as a fallback
const auto& on_error = find_matching_functor<OnErrorArgs>(args..., &self::handle_error);
const auto& on_eof = find_matching_functor<OnEofArgs>(args..., &self::handle_eof);
const auto& on_timeout = find_matching_functor<OnTimeoutArgs>(args..., &self::handle_timeout);
const auto on_error = find_matching_functor<OnErrorArgs>(args..., &self::handle_error);
const auto on_eof = find_matching_functor<OnEofArgs>(args..., &self::handle_eof);
const auto on_timeout = find_matching_functor<OnTimeoutArgs>(args..., &self::handle_timeout);
running_ = true;
while (running_) {
Message msg = consumer_.poll();
@@ -258,7 +303,7 @@ void ConsumerDispatcher::run(const Args&... args) {
}
continue;
}
on_message(std::move(msg));
process_message(on_message, std::move(msg), args...);
}
}