mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-11-03 20:18:06 +00:00 
			
		
		
		
	Removed reserve() and synced consumer::poll_batch and queue::poll_batch functions
This commit is contained in:
		@@ -235,11 +235,11 @@ Message Consumer::poll(milliseconds timeout) {
 | 
			
		||||
    return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count()));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
vector<Message> Consumer::poll_batch(size_t max_batch_size) {
 | 
			
		||||
MessageList Consumer::poll_batch(size_t max_batch_size) {
 | 
			
		||||
    return poll_batch(max_batch_size, get_timeout());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
 | 
			
		||||
MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
 | 
			
		||||
    vector<rd_kafka_message_t*> raw_messages(max_batch_size);
 | 
			
		||||
    rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(get_handle());
 | 
			
		||||
    ssize_t result = rd_kafka_consume_batch_queue(queue, timeout.count(), raw_messages.data(),
 | 
			
		||||
@@ -247,15 +247,9 @@ vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout
 | 
			
		||||
    if (result == -1) {
 | 
			
		||||
        check_error(rd_kafka_last_error());
 | 
			
		||||
        // on the off-chance that check_error() does not throw an error
 | 
			
		||||
        result = 0;
 | 
			
		||||
        return MessageList();
 | 
			
		||||
    }
 | 
			
		||||
    vector<Message> output;
 | 
			
		||||
    raw_messages.resize(result);
 | 
			
		||||
    output.reserve(result);
 | 
			
		||||
    for (const auto ptr : raw_messages) {
 | 
			
		||||
        output.emplace_back(ptr);
 | 
			
		||||
    }
 | 
			
		||||
    return output;
 | 
			
		||||
    return MessageList(raw_messages.begin(), raw_messages.end());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
Queue Consumer::get_main_queue() const {
 | 
			
		||||
 
 | 
			
		||||
@@ -99,11 +99,11 @@ MessageList Queue::consume_batch(size_t max_batch_size) const {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const {
 | 
			
		||||
    vector<rd_kafka_message_t*> raw_message_list(max_batch_size);
 | 
			
		||||
    vector<rd_kafka_message_t*> raw_messages(max_batch_size);
 | 
			
		||||
    ssize_t num_messages = rd_kafka_consume_batch_queue(handle_.get(),
 | 
			
		||||
                                                        static_cast<int>(timeout.count()),
 | 
			
		||||
                                                        raw_message_list.data(),
 | 
			
		||||
                                                        max_batch_size);
 | 
			
		||||
                                                        raw_messages.data(),
 | 
			
		||||
                                                        raw_messages.size());
 | 
			
		||||
    if (num_messages == -1) {
 | 
			
		||||
        rd_kafka_resp_err_t error = rd_kafka_last_error();
 | 
			
		||||
        if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
 | 
			
		||||
@@ -112,10 +112,7 @@ MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) co
 | 
			
		||||
        return MessageList();
 | 
			
		||||
    }
 | 
			
		||||
    // Build message list
 | 
			
		||||
    MessageList messages;
 | 
			
		||||
    messages.reserve(raw_message_list.size());
 | 
			
		||||
    messages.assign(raw_message_list.begin(), raw_message_list.end());
 | 
			
		||||
    return messages;
 | 
			
		||||
    return MessageList(raw_messages.begin(), raw_messages.end());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
} //cppkafka
 | 
			
		||||
 
 | 
			
		||||
@@ -124,7 +124,6 @@ void RoundRobinPollAdapter::consume_batch(MessageList& messages, ssize_t& count,
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
    // concatenate both lists
 | 
			
		||||
    messages.reserve(messages.size() + partition_messages.size());
 | 
			
		||||
    messages.insert(messages.end(),
 | 
			
		||||
                    make_move_iterator(partition_messages.begin()),
 | 
			
		||||
                    make_move_iterator(partition_messages.end()));
 | 
			
		||||
 
 | 
			
		||||
@@ -228,10 +228,10 @@ TEST_CASE("consume batch", "[consumer]") {
 | 
			
		||||
    producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
 | 
			
		||||
    producer.flush();
 | 
			
		||||
 | 
			
		||||
    vector<Message> all_messages;
 | 
			
		||||
    MessageList all_messages;
 | 
			
		||||
    int i = 0;
 | 
			
		||||
    while (i < 5 && all_messages.size() != 2) {
 | 
			
		||||
        vector<Message> messages = consumer.poll_batch(2);
 | 
			
		||||
        MessageList messages = consumer.poll_batch(2);
 | 
			
		||||
        all_messages.insert(all_messages.end(), make_move_iterator(messages.begin()),
 | 
			
		||||
                            make_move_iterator(messages.end()));
 | 
			
		||||
        ++i;
 | 
			
		||||
 
 | 
			
		||||
@@ -79,7 +79,7 @@ ConsumerRunner::~ConsumerRunner() {
 | 
			
		||||
    try_join();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const vector<Message>& ConsumerRunner::get_messages() const {
 | 
			
		||||
const MessageList& ConsumerRunner::get_messages() const {
 | 
			
		||||
    return messages_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user