From 532d83b225d96a1c5d841f52d8938d8eeecac294 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 1 May 2018 15:12:03 -0400 Subject: [PATCH] Removed reserve() and synced consumer::poll_batch and queue::poll_batch functions --- src/consumer.cpp | 14 ++++---------- src/queue.cpp | 11 ++++------- src/utils/roundrobin_poll_adapter.cpp | 1 - tests/consumer_test.cpp | 4 ++-- tests/test_utils.cpp | 2 +- 5 files changed, 11 insertions(+), 21 deletions(-) diff --git a/src/consumer.cpp b/src/consumer.cpp index fe79b2a..dfa55a5 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -235,11 +235,11 @@ Message Consumer::poll(milliseconds timeout) { return rd_kafka_consumer_poll(get_handle(), static_cast(timeout.count())); } -vector Consumer::poll_batch(size_t max_batch_size) { +MessageList Consumer::poll_batch(size_t max_batch_size) { return poll_batch(max_batch_size, get_timeout()); } -vector Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { +MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { vector raw_messages(max_batch_size); rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(get_handle()); ssize_t result = rd_kafka_consume_batch_queue(queue, timeout.count(), raw_messages.data(), @@ -247,15 +247,9 @@ vector Consumer::poll_batch(size_t max_batch_size, milliseconds timeout if (result == -1) { check_error(rd_kafka_last_error()); // on the off-chance that check_error() does not throw an error - result = 0; + return MessageList(); } - vector output; - raw_messages.resize(result); - output.reserve(result); - for (const auto ptr : raw_messages) { - output.emplace_back(ptr); - } - return output; + return MessageList(raw_messages.begin(), raw_messages.end()); } Queue Consumer::get_main_queue() const { diff --git a/src/queue.cpp b/src/queue.cpp index 410a84b..a15f266 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -99,11 +99,11 @@ MessageList Queue::consume_batch(size_t max_batch_size) const { } MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const { - vector raw_message_list(max_batch_size); + vector raw_messages(max_batch_size); ssize_t num_messages = rd_kafka_consume_batch_queue(handle_.get(), static_cast(timeout.count()), - raw_message_list.data(), - max_batch_size); + raw_messages.data(), + raw_messages.size()); if (num_messages == -1) { rd_kafka_resp_err_t error = rd_kafka_last_error(); if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { @@ -112,10 +112,7 @@ MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) co return MessageList(); } // Build message list - MessageList messages; - messages.reserve(raw_message_list.size()); - messages.assign(raw_message_list.begin(), raw_message_list.end()); - return messages; + return MessageList(raw_messages.begin(), raw_messages.end()); } } //cppkafka diff --git a/src/utils/roundrobin_poll_adapter.cpp b/src/utils/roundrobin_poll_adapter.cpp index 2df7e7d..10b2d7b 100644 --- a/src/utils/roundrobin_poll_adapter.cpp +++ b/src/utils/roundrobin_poll_adapter.cpp @@ -124,7 +124,6 @@ void RoundRobinPollAdapter::consume_batch(MessageList& messages, ssize_t& count, return; } // concatenate both lists - messages.reserve(messages.size() + partition_messages.size()); messages.insert(messages.end(), make_move_iterator(partition_messages.begin()), make_move_iterator(partition_messages.end())); diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 87592ae..550c61f 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -228,10 +228,10 @@ TEST_CASE("consume batch", "[consumer]") { producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); producer.flush(); - vector all_messages; + MessageList all_messages; int i = 0; while (i < 5 && all_messages.size() != 2) { - vector messages = consumer.poll_batch(2); + MessageList messages = consumer.poll_batch(2); all_messages.insert(all_messages.end(), make_move_iterator(messages.begin()), make_move_iterator(messages.end())); ++i; diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp index 8eff92a..5370929 100644 --- a/tests/test_utils.cpp +++ b/tests/test_utils.cpp @@ -79,7 +79,7 @@ ConsumerRunner::~ConsumerRunner() { try_join(); } -const vector& ConsumerRunner::get_messages() const { +const MessageList& ConsumerRunner::get_messages() const { return messages_; }