mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-21 20:24:49 +00:00
Move get_queue behavior into Queue class
This commit is contained in:
@@ -461,7 +461,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
|
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
|
||||||
rd_kafka_topic_partition_list_t *partitions, void *opaque);
|
rd_kafka_topic_partition_list_t *partitions, void *opaque);
|
||||||
static Queue get_queue(rd_kafka_queue_t* handle);
|
|
||||||
void close();
|
void close();
|
||||||
void commit(const Message& msg, bool async);
|
void commit(const Message& msg, bool async);
|
||||||
void commit(const TopicPartitionList* topic_partitions, bool async);
|
void commit(const TopicPartitionList* topic_partitions, bool async);
|
||||||
@@ -485,7 +484,7 @@ std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
|
|||||||
const Allocator& alloc) {
|
const Allocator& alloc) {
|
||||||
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
|
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
|
||||||
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
|
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
|
||||||
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
|
Queue queue = Queue::make_queue(rd_kafka_queue_get_consumer(get_handle()));
|
||||||
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle(),
|
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle(),
|
||||||
timeout.count(),
|
timeout.count(),
|
||||||
raw_messages.data(),
|
raw_messages.data(),
|
||||||
|
|||||||
@@ -53,6 +53,17 @@ public:
|
|||||||
*/
|
*/
|
||||||
static Queue make_non_owning(rd_kafka_queue_t* handle);
|
static Queue make_non_owning(rd_kafka_queue_t* handle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brieef Creates a Queue object out of a handle.
|
||||||
|
*
|
||||||
|
* This will check what the rdkafka version is and will return either an owned
|
||||||
|
* queue handle or a non owned one, depending on whether the current version
|
||||||
|
* is >= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION (see macros.h)
|
||||||
|
*
|
||||||
|
* \param handle The handle to be used
|
||||||
|
*/
|
||||||
|
static Queue make_queue(rd_kafka_queue_t* handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Constructs an empty queue
|
* \brief Constructs an empty queue
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -49,15 +49,6 @@ using std::allocator;
|
|||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
|
|
||||||
Queue Consumer::get_queue(rd_kafka_queue_t* handle) {
|
|
||||||
if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) {
|
|
||||||
return Queue::make_non_owning(handle);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
return Queue(handle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
|
void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
|
||||||
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
|
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
|
||||||
TopicPartitionList list = convert(partitions);
|
TopicPartitionList list = convert(partitions);
|
||||||
@@ -265,19 +256,19 @@ std::vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds ti
|
|||||||
}
|
}
|
||||||
|
|
||||||
Queue Consumer::get_main_queue() const {
|
Queue Consumer::get_main_queue() const {
|
||||||
Queue queue(get_queue(rd_kafka_queue_get_main(get_handle())));
|
Queue queue = Queue::make_queue(rd_kafka_queue_get_main(get_handle()));
|
||||||
queue.disable_queue_forwarding();
|
queue.disable_queue_forwarding();
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Queue Consumer::get_consumer_queue() const {
|
Queue Consumer::get_consumer_queue() const {
|
||||||
return get_queue(rd_kafka_queue_get_consumer(get_handle()));
|
return Queue::make_queue(rd_kafka_queue_get_consumer(get_handle()));
|
||||||
}
|
}
|
||||||
|
|
||||||
Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
|
Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
|
||||||
Queue queue(get_queue(rd_kafka_queue_get_partition(get_handle(),
|
Queue queue = Queue::make_queue(rd_kafka_queue_get_partition(get_handle(),
|
||||||
partition.get_topic().c_str(),
|
partition.get_topic().c_str(),
|
||||||
partition.get_partition())));
|
partition.get_partition()));
|
||||||
queue.disable_queue_forwarding();
|
queue.disable_queue_forwarding();
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,15 @@ Queue Queue::make_non_owning(rd_kafka_queue_t* handle) {
|
|||||||
return Queue(handle, NonOwningTag{});
|
return Queue(handle, NonOwningTag{});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Queue Queue::make_queue(rd_kafka_queue_t* handle) {
|
||||||
|
if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) {
|
||||||
|
return Queue::make_non_owning(handle);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Queue(handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Queue::Queue()
|
Queue::Queue()
|
||||||
: handle_(nullptr, nullptr),
|
: handle_(nullptr, nullptr),
|
||||||
timeout_ms_(DEFAULT_TIMEOUT) {
|
timeout_ms_(DEFAULT_TIMEOUT) {
|
||||||
|
|||||||
Reference in New Issue
Block a user