From ea9601ba1becd6ae61201b1835cc8246c51e835b Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 17 May 2018 11:06:23 -0400 Subject: [PATCH] Changes per code review --- include/cppkafka/utils/poll_interface.h | 6 +++--- include/cppkafka/utils/poll_strategy_base.h | 19 +++---------------- .../cppkafka/utils/roundrobin_poll_strategy.h | 10 +++------- src/CMakeLists.txt | 2 +- src/utils/roundrobin_poll_strategy.cpp | 19 +++++++++---------- tests/CMakeLists.txt | 2 +- tests/roundrobin_poll_test.cpp | 14 ++++++++------ tests/test_utils.h | 3 +-- 8 files changed, 29 insertions(+), 46 deletions(-) diff --git a/include/cppkafka/utils/poll_interface.h b/include/cppkafka/utils/poll_interface.h index 24abbcd..af93e3f 100644 --- a/include/cppkafka/utils/poll_interface.h +++ b/include/cppkafka/utils/poll_interface.h @@ -72,7 +72,7 @@ struct PollInterface { * * Each call to poll() will first consume from the global event queue and if there are * no pending events, will attempt to consume from all partitions until a valid message is found. - * The timeout used on this call will be the one configured via RoundRobinPollStrategy::set_timeout. + * The timeout used on this call will be the one configured via PollInterface::set_timeout. * * \return A message. The returned message *might* be empty. It's necessary to check * that it's a valid one before using it (see example above). @@ -86,7 +86,7 @@ struct PollInterface { /** * \brief Polls for new messages * - * Same as the other overload of RoundRobinPollStrategy::poll but the provided + * Same as the other overload of PollInterface::poll but the provided * timeout will be used instead of the one configured on this Consumer. * * \param timeout The timeout to be used on this call @@ -113,7 +113,7 @@ struct PollInterface { /** * \brief Polls all assigned partitions for a batch of new messages in round-robin fashion * - * Same as the other overload of RoundRobinPollStrategy::poll_batch but the provided + * Same as the other overload of PollInterface::poll_batch but the provided * timeout will be used instead of the one configured on this Consumer. * * \param max_batch_size The maximum amount of messages expected diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index 3e17c32..0cf6d88 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -43,8 +43,8 @@ namespace cppkafka { * related (user-specific) information. */ struct QueueData { - Queue queue_; - boost::any metadata_; + Queue queue; + boost::any metadata; }; /** @@ -52,8 +52,7 @@ struct QueueData { * * \brief Base implementation of the PollInterface */ -class PollStrategyBase : public PollInterface -{ +class PollStrategyBase : public PollInterface { public: using QueueMap = std::map; @@ -99,18 +98,6 @@ protected: */ QueueData& get_consumer_queue(); - /** - * \brief Return the next queue to be processed - * - * Depending on the polling strategy, each implementation must define it's own algorithm for - * determining the next queue to poll. - * - * \param opaque Application specific data which can help determine the next queue. - * - * \return A partition queue - */ - virtual QueueData& get_next_queue(void* opaque = nullptr) = 0; - /** * \brief Reset the internal state of the queues. * diff --git a/include/cppkafka/utils/roundrobin_poll_strategy.h b/include/cppkafka/utils/roundrobin_poll_strategy.h index ff6b29c..bb91e05 100644 --- a/include/cppkafka/utils/roundrobin_poll_strategy.h +++ b/include/cppkafka/utils/roundrobin_poll_strategy.h @@ -83,8 +83,7 @@ namespace cppkafka { * the Consumer instance it owns. */ -class RoundRobinPollStrategy : public PollStrategyBase -{ +class RoundRobinPollStrategy : public PollStrategyBase { public: RoundRobinPollStrategy(Consumer& consumer); @@ -112,16 +111,13 @@ public: std::chrono::milliseconds timeout) override; protected: - /** - * \sa PollStrategyBase::get_next_queue - */ - QueueData& get_next_queue(void* opaque = nullptr) final; - /** * \sa PollStrategyBase::reset_state */ void reset_state() final; + QueueData& get_next_queue(); + private: void consume_batch(Queue& queue, MessageList& messages, diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4ee784b..2e893a8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,7 +19,7 @@ set(SOURCES utils/backoff_performer.cpp utils/backoff_committer.cpp - utils/poll_strategy_base.cpp + utils/poll_strategy_base.cpp utils/roundrobin_poll_strategy.cpp ) diff --git a/src/utils/roundrobin_poll_strategy.cpp b/src/utils/roundrobin_poll_strategy.cpp index 92f880e..5d5fc7a 100644 --- a/src/utils/roundrobin_poll_strategy.cpp +++ b/src/utils/roundrobin_poll_strategy.cpp @@ -51,20 +51,20 @@ Message RoundRobinPollStrategy::poll() { Message RoundRobinPollStrategy::poll(milliseconds timeout) { // Always give priority to group and global events - Message message = get_consumer_queue().queue_.consume(milliseconds(0)); + Message message = get_consumer_queue().queue.consume(milliseconds(0)); if (message) { return message; } size_t num_queues = get_partition_queues().size(); while (num_queues--) { //consume the next partition (non-blocking) - message = get_next_queue().queue_.consume(milliseconds(0)); + message = get_next_queue().queue.consume(milliseconds(0)); if (message) { return message; } } // We still don't have a valid message so we block on the event queue - return get_consumer_queue().queue_.consume(timeout); + return get_consumer_queue().queue.consume(timeout); } MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) { @@ -76,16 +76,16 @@ MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, millisecon ssize_t count = max_batch_size; // batch from the group event queue first (non-blocking) - consume_batch(get_consumer_queue().queue_, messages, count, milliseconds(0)); + consume_batch(get_consumer_queue().queue, messages, count, milliseconds(0)); size_t num_queues = get_partition_queues().size(); while ((count > 0) && (num_queues--)) { // batch from the next partition (non-blocking) - consume_batch(get_next_queue().queue_, messages, count, milliseconds(0)); + consume_batch(get_next_queue().queue, messages, count, milliseconds(0)); } // we still have space left in the buffer if (count > 0) { // wait on the event queue until timeout - consume_batch(get_consumer_queue().queue_, messages, count, timeout); + consume_batch(get_consumer_queue().queue, messages, count, timeout); } return messages; } @@ -93,8 +93,7 @@ MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, millisecon void RoundRobinPollStrategy::consume_batch(Queue& queue, MessageList& messages, ssize_t& count, - milliseconds timeout) -{ + milliseconds timeout) { MessageList queue_messages = queue.consume_batch(count, timeout); if (queue_messages.empty()) { return; @@ -111,11 +110,11 @@ void RoundRobinPollStrategy::consume_batch(Queue& queue, void RoundRobinPollStrategy::restore_forwarding() { // forward all partition queues for (const auto& toppar : get_partition_queues()) { - toppar.second.queue_.forward_to_queue(get_consumer_queue().queue_); + toppar.second.queue.forward_to_queue(get_consumer_queue().queue); } } -QueueData& RoundRobinPollStrategy::get_next_queue(void* opaque) { +QueueData& RoundRobinPollStrategy::get_next_queue() { if (get_partition_queues().empty()) { throw QueueException(RD_KAFKA_RESP_ERR__STATE); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c43e315..01e06ed 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -18,7 +18,7 @@ add_executable( kafka_handle_base_test.cpp producer_test.cpp consumer_test.cpp - roundrobin_poll_test.cpp + roundrobin_poll_test.cpp # Main file test_main.cpp diff --git a/tests/roundrobin_poll_test.cpp b/tests/roundrobin_poll_test.cpp index abc998e..99414dd 100644 --- a/tests/roundrobin_poll_test.cpp +++ b/tests/roundrobin_poll_test.cpp @@ -108,9 +108,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { // Create a consumer and subscribe to the topic PollStrategyAdapter consumer(make_consumer_config()); - TopicPartitionList partitions; - for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++)); - consumer.assign(partitions); + consumer.subscribe({ KAFKA_TOPICS[0] }); consumer.add_polling_strategy(unique_ptr(new RoundRobinPollStrategy(consumer))); PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); @@ -130,10 +128,14 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { REQUIRE(runner.get_messages().size() == total_messages); // Check that we have one message from each partition in desired order - vector partition_order = make_roundrobin_partition_vector(total_messages); - + vector partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS); + int partition_idx; for (int i = 0; i < total_messages; ++i) { - REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+1]); + if (i == 0) { + // find first polled partition index + partition_idx = runner.get_messages()[i].get_partition(); + } + REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]); REQUIRE((string)runner.get_messages()[i].get_payload() == payload); } diff --git a/tests/test_utils.h b/tests/test_utils.h index 0e8d6f8..b6943e6 100644 --- a/tests/test_utils.h +++ b/tests/test_utils.h @@ -41,8 +41,7 @@ private: * \brief Specific implementation which can be used with other * util classes such as BasicConsumerDispatcher. */ -class PollStrategyAdapter : public Consumer -{ +class PollStrategyAdapter : public Consumer { public: PollStrategyAdapter(Configuration config); void add_polling_strategy(std::unique_ptr poll_strategy);