Allow getting Events out of Queues

This commit is contained in:
Matias Fontanini
2018-10-21 11:32:52 -07:00
parent b46991db7e
commit e73c997a0c
2 changed files with 29 additions and 5 deletions

View File

@@ -31,6 +31,7 @@
#include <memory>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
#include "event.h"
#include "macros.h"
#include "message.h"
@@ -130,7 +131,7 @@ public:
* \return A message
*/
Message consume(std::chrono::milliseconds timeout) const;
/**
* \brief Consumes a batch of messages from this queue
*
@@ -188,7 +189,23 @@ public:
*/
std::vector<Message> consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout) const;
/**
* \brief Extracts the next message in this Queue
*
* /return The latest event, if any
*/
Event next_event() const;
/**
* \brief Extracts the next message in this Queue
*
* \param timeout The amount of time to wait for this operation to complete
*
* /return The latest event, if any
*/
Event next_event(std::chrono::milliseconds timeout) const;
/**
* Indicates whether this queue is valid (not null)
*/

View File

@@ -95,13 +95,20 @@ Message Queue::consume(milliseconds timeout) const {
return Message(rd_kafka_consume_queue(handle_.get(), static_cast<int>(timeout.count())));
}
std::vector<Message> Queue::consume_batch(size_t max_batch_size) const {
vector<Message> Queue::consume_batch(size_t max_batch_size) const {
return consume_batch(max_batch_size, timeout_ms_, allocator<Message>());
}
std::vector<Message> Queue::consume_batch(size_t max_batch_size,
milliseconds timeout) const {
vector<Message> Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const {
return consume_batch(max_batch_size, timeout, allocator<Message>());
}
Event Queue::next_event() const {
return next_event(timeout_ms_);
}
Event Queue::next_event(milliseconds timeout) const {
return Event(rd_kafka_queue_poll(handle_.get(), timeout.count()));
}
} //cppkafka