From e73c997a0c994b822fb0fdf92db0d21ba9badf3e Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 21 Oct 2018 11:32:52 -0700 Subject: [PATCH] Allow getting Events out of Queues --- include/cppkafka/queue.h | 21 +++++++++++++++++++-- src/queue.cpp | 13 ++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/include/cppkafka/queue.h b/include/cppkafka/queue.h index 72c238a..10f4b39 100644 --- a/include/cppkafka/queue.h +++ b/include/cppkafka/queue.h @@ -31,6 +31,7 @@ #include #include #include +#include "event.h" #include "macros.h" #include "message.h" @@ -130,7 +131,7 @@ public: * \return A message */ Message consume(std::chrono::milliseconds timeout) const; - + /** * \brief Consumes a batch of messages from this queue * @@ -188,7 +189,23 @@ public: */ std::vector consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const; - + + /** + * \brief Extracts the next message in this Queue + * + * /return The latest event, if any + */ + Event next_event() const; + + /** + * \brief Extracts the next message in this Queue + * + * \param timeout The amount of time to wait for this operation to complete + * + * /return The latest event, if any + */ + Event next_event(std::chrono::milliseconds timeout) const; + /** * Indicates whether this queue is valid (not null) */ diff --git a/src/queue.cpp b/src/queue.cpp index 909fd76..a00d8de 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -95,13 +95,20 @@ Message Queue::consume(milliseconds timeout) const { return Message(rd_kafka_consume_queue(handle_.get(), static_cast(timeout.count()))); } -std::vector Queue::consume_batch(size_t max_batch_size) const { +vector Queue::consume_batch(size_t max_batch_size) const { return consume_batch(max_batch_size, timeout_ms_, allocator()); } -std::vector Queue::consume_batch(size_t max_batch_size, - milliseconds timeout) const { +vector Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const { return consume_batch(max_batch_size, timeout, allocator()); } +Event Queue::next_event() const { + return next_event(timeout_ms_); +} + +Event Queue::next_event(milliseconds timeout) const { + return Event(rd_kafka_queue_poll(handle_.get(), timeout.count())); +} + } //cppkafka