From 8dc94869fd9cdc10edae4b9ea6b1e41a146ca3a1 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 27 Oct 2018 09:25:07 -0700 Subject: [PATCH] Move get_queue behavior into Queue class --- include/cppkafka/consumer.h | 3 +-- include/cppkafka/queue.h | 13 ++++++++++++- src/consumer.cpp | 19 +++++-------------- src/queue.cpp | 9 +++++++++ 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 65bbbaa..1be75c8 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -461,7 +461,6 @@ public: private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); - static Queue get_queue(rd_kafka_queue_t* handle); void close(); void commit(const Message& msg, bool async); void commit(const TopicPartitionList* topic_partitions, bool async); @@ -485,7 +484,7 @@ std::vector Consumer::poll_batch(size_t max_batch_size, const Allocator& alloc) { std::vector raw_messages(max_batch_size); // Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment) - Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle()))); + Queue queue = Queue::make_queue(rd_kafka_queue_get_consumer(get_handle())); ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle(), timeout.count(), raw_messages.data(), diff --git a/include/cppkafka/queue.h b/include/cppkafka/queue.h index 10f4b39..b509efc 100644 --- a/include/cppkafka/queue.h +++ b/include/cppkafka/queue.h @@ -52,7 +52,18 @@ public: * \param handle The handle to be used */ static Queue make_non_owning(rd_kafka_queue_t* handle); - + + /** + * \brieef Creates a Queue object out of a handle. + * + * This will check what the rdkafka version is and will return either an owned + * queue handle or a non owned one, depending on whether the current version + * is >= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION (see macros.h) + * + * \param handle The handle to be used + */ + static Queue make_queue(rd_kafka_queue_t* handle); + /** * \brief Constructs an empty queue * diff --git a/src/consumer.cpp b/src/consumer.cpp index 9ef4189..81fc04e 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -49,15 +49,6 @@ using std::allocator; namespace cppkafka { -Queue Consumer::get_queue(rd_kafka_queue_t* handle) { - if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) { - return Queue::make_non_owning(handle); - } - else { - return Queue(handle); - } -} - void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque) { TopicPartitionList list = convert(partitions); @@ -265,19 +256,19 @@ std::vector Consumer::poll_batch(size_t max_batch_size, milliseconds ti } Queue Consumer::get_main_queue() const { - Queue queue(get_queue(rd_kafka_queue_get_main(get_handle()))); + Queue queue = Queue::make_queue(rd_kafka_queue_get_main(get_handle())); queue.disable_queue_forwarding(); return queue; } Queue Consumer::get_consumer_queue() const { - return get_queue(rd_kafka_queue_get_consumer(get_handle())); + return Queue::make_queue(rd_kafka_queue_get_consumer(get_handle())); } Queue Consumer::get_partition_queue(const TopicPartition& partition) const { - Queue queue(get_queue(rd_kafka_queue_get_partition(get_handle(), - partition.get_topic().c_str(), - partition.get_partition()))); + Queue queue = Queue::make_queue(rd_kafka_queue_get_partition(get_handle(), + partition.get_topic().c_str(), + partition.get_partition())); queue.disable_queue_forwarding(); return queue; } diff --git a/src/queue.cpp b/src/queue.cpp index a00d8de..875bd1b 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -46,6 +46,15 @@ Queue Queue::make_non_owning(rd_kafka_queue_t* handle) { return Queue(handle, NonOwningTag{}); } +Queue Queue::make_queue(rd_kafka_queue_t* handle) { + if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) { + return Queue::make_non_owning(handle); + } + else { + return Queue(handle); + } +} + Queue::Queue() : handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT) {