diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 117d035..5e6ab72 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -64,14 +64,14 @@ class TopicConfiguration; * Consumer consumer(config); * * // Set the assignment callback - * consumer.set_assignment_callback([&](vector& topic_partitions) { + * consumer.set_assignment_callback([&](TopicPartitionList& topic_partitions) { * // Here you could fetch offsets and do something, altering the offsets on the * // topic_partitions vector if needed * cout << "Got assigned " << topic_partitions.size() << " partitions!" << endl; * }); * * // Set the revocation callback - * consumer.set_revocation_callback([&](const vector& topic_partitions) { + * consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) { * cout << topic_partitions.size() << " partitions revoked!" << endl; * }); * @@ -126,7 +126,7 @@ public: * \brief Sets the topic/partition assignment callback * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the - * rebalance, converting from rdkafka topic partition list handles into vector + * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. * * \note You *do not need* to call Consumer::assign with the provided topic parttitions. This @@ -140,7 +140,7 @@ public: * \brief Sets the topic/partition revocation callback * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the - * rebalance, converting from rdkafka topic partition list handles into vector + * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. * * \note You *do not need* to call Consumer::assign with an empty topic partition list or @@ -155,7 +155,7 @@ public: * \brief Sets the rebalance error callback * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the - * rebalance, converting from rdkafka topic partition list handles into vector + * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. * * \param callback The rebalance error callback @@ -192,6 +192,24 @@ public: * parameter */ void unassign(); + + /** + * \brief Commits the current partition assignment + * + * This translates into a call to rd_kafka_commit with a null partition list. + * + * \remark This function is equivalent to calling commit(get_assignment()) + */ + void commit(); + + /** + * \brief Commits the current partition assignment asynchronously + * + * This translates into a call to rd_kafka_commit with a null partition list. + * + * \remark This function is equivalent to calling async_commit(get_assignment()) + */ + void async_commit(); /** * \brief Commits the given message synchronously @@ -349,7 +367,7 @@ private: void close(); void commit(const Message& msg, bool async); - void commit(const TopicPartitionList& topic_partitions, bool async); + void commit(const TopicPartitionList* topic_partitions, bool async); void handle_rebalance(rd_kafka_resp_err_t err, TopicPartitionList& topic_partitions); AssignmentCallback assignment_callback_; diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 9f84284..5866124 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include "buffer.h" @@ -83,6 +84,7 @@ public: * Gets the error attribute */ Error get_error() const { + assert(handle_); return handle_->err; } @@ -97,6 +99,7 @@ public: * Gets the topic that this message belongs to */ std::string get_topic() const { + assert(handle_); return rd_kafka_topic_name(handle_->rkt); } @@ -104,6 +107,7 @@ public: * Gets the partition that this message belongs to */ int get_partition() const { + assert(handle_); return handle_->partition; } @@ -125,6 +129,7 @@ public: * Gets the message offset */ int64_t get_offset() const { + assert(handle_); return handle_->offset; } @@ -135,6 +140,7 @@ public: * attribute */ void* get_user_data() const { + assert(handle_); return handle_->_private; } diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index 4e704d8..06f703c 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -49,9 +49,9 @@ using TopicPartitionsListPtr = std::unique_ptr; // Conversions between rdkafka handles and TopicPartitionList -CPPKAFKA_API TopicPartitionsListPtr convert(const std::vector& topic_partitions); -CPPKAFKA_API std::vector convert(const TopicPartitionsListPtr& topic_partitions); -CPPKAFKA_API std::vector convert(rd_kafka_topic_partition_list_t* topic_partitions); +CPPKAFKA_API TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions); +CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions); +CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions); CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle); CPPKAFKA_API std::ostream& operator<<(std::ostream& output, const TopicPartitionList& rhs); diff --git a/src/consumer.cpp b/src/consumer.cpp index b9bc67c..9333e7b 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -116,6 +116,14 @@ void Consumer::unassign() { check_error(error); } +void Consumer::commit() { + commit(nullptr, false); +} + +void Consumer::async_commit() { + commit(nullptr, true); +} + void Consumer::commit(const Message& msg) { commit(msg, false); } @@ -125,11 +133,11 @@ void Consumer::async_commit(const Message& msg) { } void Consumer::commit(const TopicPartitionList& topic_partitions) { - commit(topic_partitions, false); + commit(&topic_partitions, false); } void Consumer::async_commit(const TopicPartitionList& topic_partitions) { - commit(topic_partitions, true); + commit(&topic_partitions, true); } KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_partition) const { @@ -238,15 +246,15 @@ void Consumer::close() { void Consumer::commit(const Message& msg, bool async) { rd_kafka_resp_err_t error; - error = rd_kafka_commit_message(get_handle(), msg.get_handle(), - async ? 1 : 0); + error = rd_kafka_commit_message(get_handle(), msg.get_handle(), async ? 1 : 0); check_error(error); } -void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) { - TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); +void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) { rd_kafka_resp_err_t error; - error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0); + error = rd_kafka_commit(get_handle(), + !topic_partitions ? nullptr : convert(*topic_partitions).get(), + async ? 1 : 0); check_error(error); } diff --git a/src/message.cpp b/src/message.cpp index a9236bc..a52a0a7 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -41,7 +41,7 @@ Message Message::make_non_owning(rd_kafka_message_t* handle) { return Message(handle, NonOwningTag()); } -Message::Message() +Message::Message() : handle_(nullptr, nullptr) { } diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 4971b8a..26a0c99 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -37,7 +37,7 @@ using std::ostream; namespace cppkafka { -TopicPartitionsListPtr convert(const vector& topic_partitions) { +TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions) { TopicPartitionsListPtr handle(rd_kafka_topic_partition_list_new(topic_partitions.size()), &rd_kafka_topic_partition_list_destroy); for (const auto& item : topic_partitions) { @@ -50,12 +50,12 @@ TopicPartitionsListPtr convert(const vector& topic_partitions) { return handle; } -vector convert(const TopicPartitionsListPtr& topic_partitions) { +TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions) { return convert(topic_partitions.get()); } -vector convert(rd_kafka_topic_partition_list_t* topic_partitions) { - vector output; +TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions) { + TopicPartitionList output; for (int i = 0; i < topic_partitions->cnt; ++i) { const auto& elem = topic_partitions->elems[i]; output.emplace_back(elem.topic, elem.partition, elem.offset); diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index a64de5e..a41dd8c 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -51,12 +51,12 @@ public: const string ConsumerTest::KAFKA_TOPIC = "cppkafka_test1"; TEST_F(ConsumerTest, AssignmentCallback) { - vector assignment; + TopicPartitionList assignment; int partition = 0; // Create a consumer and subscribe to the topic Consumer consumer(make_consumer_config()); - consumer.set_assignment_callback([&](const vector& topic_partitions) { + consumer.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment = topic_partitions; }); consumer.subscribe({ KAFKA_TOPIC }); @@ -90,17 +90,17 @@ TEST_F(ConsumerTest, AssignmentCallback) { } TEST_F(ConsumerTest, Rebalance) { - vector assignment1; - vector assignment2; + TopicPartitionList assignment1; + TopicPartitionList assignment2; bool revocation_called = false; int partition = 0; // Create a consumer and subscribe to the topic Consumer consumer1(make_consumer_config()); - consumer1.set_assignment_callback([&](const vector& topic_partitions) { + consumer1.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment1 = topic_partitions; }); - consumer1.set_revocation_callback([&](const vector&) { + consumer1.set_revocation_callback([&](const TopicPartitionList&) { revocation_called = true; }); consumer1.subscribe({ KAFKA_TOPIC }); @@ -108,7 +108,7 @@ TEST_F(ConsumerTest, Rebalance) { // Create a second consumer and subscribe to the topic Consumer consumer2(make_consumer_config()); - consumer2.set_assignment_callback([&](const vector& topic_partitions) { + consumer2.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment2 = topic_partitions; }); consumer2.subscribe({ KAFKA_TOPIC }); diff --git a/tests/kafka_handle_base_test.cpp b/tests/kafka_handle_base_test.cpp index bd81bb7..acba3da 100644 --- a/tests/kafka_handle_base_test.cpp +++ b/tests/kafka_handle_base_test.cpp @@ -126,12 +126,12 @@ TEST_F(KafkaHandleBaseTest, ConsumerGroups) { MemberAssignmentInformation assignment = member.get_member_assignment(); EXPECT_EQ(0, assignment.get_version()); - vector expected_topic_partitions = { + TopicPartitionList expected_topic_partitions = { { KAFKA_TOPIC, 0 }, { KAFKA_TOPIC, 1 }, { KAFKA_TOPIC, 2 } }; - vector topic_partitions = assignment.get_topic_partitions(); + TopicPartitionList topic_partitions = assignment.get_topic_partitions(); sort(topic_partitions.begin(), topic_partitions.end()); EXPECT_EQ(expected_topic_partitions, topic_partitions); /*for (const auto c : ) {