diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index 76ad1f3..87864b2 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -32,6 +32,7 @@ #include #include "../consumer.h" +#include "backoff_performer.h" namespace cppkafka { @@ -60,7 +61,10 @@ namespace cppkafka { * * The signature for each callback should be as following (or compatible) * - * * Message callback: void(Message) + * * Message callback, either: + * - void(Message) + * - Message(Message). In this case if the message is returned, it will be buffered + * while calling the throttle callback until the message is actually processed. * * Timeout: void(ConsumerDispatcher::Timeout) * * Error: void(Error) * * EOF: void(ConsumerDispatcher::EndOfFile, TopicPartition) @@ -77,6 +81,11 @@ public: */ struct EndOfFile {}; + /* + * Tag to indicate end of file was reached on a partition being consumed + */ + struct Throttle {}; + /** * Constructs a consumer dispatcher over the given consumer * @@ -111,6 +120,16 @@ private: static void handle_error(Error error); static void handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { } static void handle_timeout(Timeout) { } + template + void handle_throttle(Throttle, const Functor& callback, Message msg) { + BackoffPerformer{}.perform([&]() { + if (!running_) { + return true; + } + msg = callback(std::move(msg)); + return !msg; + }); + } // Traits and template helpers @@ -223,6 +242,31 @@ private: check_callbacks_match(functors...); } + template + auto process_message(const Functor& callback, Message msg, const Functors&...) + -> typename std::enable_if::value, + void>::type { + callback(std::move(msg)); + } + + template + auto process_message(const Functor& callback, Message msg, const Functors&... functors) + -> typename std::enable_if::value, + void>::type { + const auto throttle_ptr = &ConsumerDispatcher::handle_throttle; + const auto default_throttler = std::bind(throttle_ptr, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3); + + using OnThrottleArgs = std::tuple; + const auto on_throttle = find_matching_functor(functors..., + default_throttler); + + msg = callback(std::move(msg)); + if (msg) { + on_throttle(Throttle{}, callback, std::move(msg)); + } + } + Consumer& consumer_; bool running_; }; @@ -236,12 +280,13 @@ void ConsumerDispatcher::run(const Args&... args) { check_callbacks_match(args...); // This one is required - const auto& on_message = find_matching_functor(args...); + const auto on_message = find_matching_functor(args...); // For the rest, append our own implementation at the end as a fallback - const auto& on_error = find_matching_functor(args..., &self::handle_error); - const auto& on_eof = find_matching_functor(args..., &self::handle_eof); - const auto& on_timeout = find_matching_functor(args..., &self::handle_timeout); + const auto on_error = find_matching_functor(args..., &self::handle_error); + const auto on_eof = find_matching_functor(args..., &self::handle_eof); + const auto on_timeout = find_matching_functor(args..., &self::handle_timeout); + running_ = true; while (running_) { Message msg = consumer_.poll(); @@ -258,7 +303,7 @@ void ConsumerDispatcher::run(const Args&... args) { } continue; } - on_message(std::move(msg)); + process_message(on_message, std::move(msg), args...); } } diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 6a9573f..312ba78 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -7,6 +7,8 @@ #include #include "cppkafka/consumer.h" #include "cppkafka/producer.h" +#include "cppkafka/utils/consumer_dispatcher.h" +#include "cppkafka/utils/buffered_producer.h" #include "test_utils.h" using std::vector; @@ -168,3 +170,43 @@ TEST_F(ConsumerTest, OffsetCommit) { } EXPECT_TRUE(offset_commit_called); } + +TEST_F(ConsumerTest, Throttle) { + int partition = 0; + + // Create a consumer and subscribe to the topic + Configuration config = make_consumer_config("offset_commit"); + Consumer consumer(config); + consumer.assign({ { KAFKA_TOPIC, 0 } }); + + { + ConsumerRunner runner(consumer, 0, 1); + runner.try_join(); + } + + // Produce a message just so we stop the consumer + BufferedProducer producer(make_producer_config()); + string payload = "Hello world!"; + producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.flush(); + + size_t callback_executed_count = 0; + + ConsumerDispatcher dispatcher(consumer); + dispatcher.run( + [&](Message msg) { + callback_executed_count++; + if (callback_executed_count == 3) { + return Message(); + } + return move(msg); + }, + [&](ConsumerDispatcher::Timeout) { + if (callback_executed_count == 3) { + dispatcher.stop(); + } + } + ); + + EXPECT_EQ(3, callback_executed_count); +}