Allow consuming message batches

Fixes #3
This commit is contained in:
Matias Fontanini
2017-07-17 19:17:49 -07:00
parent 2340046544
commit 1582f6156d
3 changed files with 65 additions and 0 deletions

View File

@@ -324,6 +324,25 @@ public:
* \param timeout The timeout to be used on this call * \param timeout The timeout to be used on this call
*/ */
Message poll(std::chrono::milliseconds timeout); Message poll(std::chrono::milliseconds timeout);
/**
* \brief Polls for a batch of messages
*
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
*/
std::vector<Message> poll_batch(size_t max_batch_size);
/**
* \brief Polls for a batch of messages
*
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
*/
std::vector<Message> poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
private: private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque); rd_kafka_topic_partition_list_t *partitions, void *opaque);

View File

@@ -197,6 +197,25 @@ Message Consumer::poll(milliseconds timeout) {
return message ? Message(message) : Message(); return message ? Message(message) : Message();
} }
vector<Message> Consumer::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_timeout());
}
vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(get_handle());
ssize_t result = rd_kafka_consume_batch_queue(queue, timeout.count(), raw_messages.data(),
raw_messages.size());
if (result == -1) {
check_error(rd_kafka_last_error());
}
vector<Message> output;
for (const auto& ptr : raw_messages) {
output.emplace_back(ptr);
}
return output;
}
void Consumer::close() { void Consumer::close() {
rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle());
check_error(error); check_error(error);

View File

@@ -210,3 +210,30 @@ TEST_F(ConsumerTest, Throttle) {
EXPECT_EQ(3, callback_executed_count); EXPECT_EQ(3, callback_executed_count);
} }
TEST_F(ConsumerTest, ConsumeBatch) {
int partition = 0;
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("test");
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
{
ConsumerRunner runner(consumer, 0, 1);
runner.try_join();
}
// Produce a message just so we stop the consumer
BufferedProducer<string> producer(make_producer_config());
string payload = "Hello world!";
// Produce it twice
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.flush();
vector<Message> messages = consumer.poll_batch(2);
ASSERT_EQ(2, messages.size());
EXPECT_EQ(payload, messages[0].get_payload());
EXPECT_EQ(payload, messages[1].get_payload());
}