From b2b0d16fee8f968d03862992bb69848c74a7ce6c Mon Sep 17 00:00:00 2001 From: Docker RHEL Date: Wed, 8 Apr 2020 15:23:05 +0000 Subject: [PATCH] Added timeout overloads for consumer and handle classes --- include/cppkafka/consumer.h | 14 +++++ include/cppkafka/kafka_handle_base.h | 85 +++++++++++++++++++++++++++- src/consumer.cpp | 8 ++- src/kafka_handle_base.cpp | 62 +++++++++++++++----- 4 files changed, 152 insertions(+), 17 deletions(-) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 0415e33..a90a8b1 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -280,6 +280,20 @@ public: * \return The topic partition list */ TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const; + + /** + * \brief Gets the offsets committed for the given topic/partition list with a timeout + * + * This translates into a call to rd_kafka_committed + * + * \param topic_partitions The topic/partition list to be queried + * + * \param timeout The timeout for this operation. Supersedes the default consumer timeout. + * + * \return The topic partition list + */ + TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions, + std::chrono::milliseconds timeout) const; /** * \brief Gets the offset positions for the given topic/partition list diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index c4caad3..2188c91 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -134,6 +134,20 @@ public: * \return A pair of watermark offsets {low, high} */ OffsetTuple query_offsets(const TopicPartition& topic_partition) const; + + /** + * \brief Queries the offset for the given topic/partition with a given timeout + * + * This translates into a call to rd_kafka_query_watermark_offsets + * + * \param topic_partition The topic/partition to be queried + * + * \timeout The timeout for this operation. This supersedes the default handle timeout. + * + * \return A pair of watermark offsets {low, high} + */ + OffsetTuple query_offsets(const TopicPartition& topic_partition, + std::chrono::milliseconds timeout) const; /** * \brief Gets the rdkafka handle @@ -177,6 +191,20 @@ public: * \return The metadata */ Metadata get_metadata(bool all_topics = true) const; + + /** + * \brief Gets metadata for brokers, topics, partitions, etc with a timeout + * + * This translates into a call to rd_kafka_metadata + * + * \param all_topics Whether to fetch metadata about all topics or only locally known ones + * + * \param timeout The timeout for this operation. Supersedes the default handle timeout. + * + * \return The metadata + */ + Metadata get_metadata(bool all_topics, + std::chrono::milliseconds timeout) const; /** * \brief Gets general metadata but only fetches metadata for the given topic rather than @@ -189,6 +217,21 @@ public: * \return The topic metadata */ TopicMetadata get_metadata(const Topic& topic) const; + + /** + * \brief Gets general metadata but only fetches metadata for the given topic rather than + * all of them. Uses a timeout to limit the operation execution time. + * + * This translates into a call to rd_kafka_metadata + * + * \param topic The topic to fetch information for + * + * \param timeout The timeout for this operation. Supersedes the default handle timeout. + * + * \return The topic metadata + */ + TopicMetadata get_metadata(const Topic& topic, + std::chrono::milliseconds timeout) const; /** * \brief Gets the consumer group information @@ -198,6 +241,18 @@ public: * \return The group information */ GroupInformation get_consumer_group(const std::string& name); + + /** + * \brief Gets the consumer group information with a timeout + * + * \param name The name of the consumer group to look up + * + * \param timeout The timeout for this operation. Supersedes the default handle timeout. + * + * \return The group information + */ + GroupInformation get_consumer_group(const std::string& name, + std::chrono::milliseconds timeout); /** * \brief Gets all consumer groups @@ -205,6 +260,15 @@ public: * \return A list of consumer groups */ GroupInformationList get_consumer_groups(); + + /** + * \brief Gets all consumer groups with a timeout + * + * \param timeout The timeout for this operation. Supersedes the default handle timeout. + * + * \return A list of consumer groups + */ + GroupInformationList get_consumer_groups(std::chrono::milliseconds timeout); /** * \brief Gets topic/partition offsets based on timestamps @@ -216,6 +280,20 @@ public: * \return A topic partition list */ TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const; + + /** + * \brief Gets topic/partition offsets based on timestamps with a timeout + * + * This translates into a call to rd_kafka_offsets_for_times + * + * \param queries A map from topic/partition to the timestamp to be used + * + * \param timeout The timeout for this operation. This supersedes the default handle timeout. + * + * \return A topic partition list + */ + TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries, + std::chrono::milliseconds timeout) const; /** * \brief Get the kafka handle name @@ -283,8 +361,11 @@ private: using TopicConfigurationMap = std::unordered_map; Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); - Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const; - GroupInformationList fetch_consumer_groups(const char* name); + Metadata get_metadata(bool all_topics, + rd_kafka_topic_t* topic_ptr, + std::chrono::milliseconds timeout) const; + GroupInformationList fetch_consumer_groups(const char* name, + std::chrono::milliseconds timeout); void save_topic_config(const std::string& topic_name, TopicConfiguration config); std::chrono::milliseconds timeout_ms_; diff --git a/src/consumer.cpp b/src/consumer.cpp index 7c5da44..5522ca9 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -185,9 +185,15 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const { + return get_offsets_committed(topic_partitions, get_timeout()); +} + +TopicPartitionList +Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions, + milliseconds timeout) const { TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), - static_cast(get_timeout().count())); + static_cast(timeout.count())); check_error(error, topic_list_handle.get()); return convert(topic_list_handle); } diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index c93139c..a4196e7 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -108,24 +108,40 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config) KafkaHandleBase::OffsetTuple KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const { + return query_offsets(topic_partition, timeout_ms_); +} + +KafkaHandleBase::OffsetTuple +KafkaHandleBase::query_offsets(const TopicPartition& topic_partition, + milliseconds timeout) const { int64_t low; int64_t high; const string& topic = topic_partition.get_topic(); const int partition = topic_partition.get_partition(); - const int timeout = static_cast(timeout_ms_.count()); + const int timeout_ms = static_cast(timeout.count()); rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(), partition, &low, &high, - timeout); + timeout_ms); check_error(result); return make_tuple(low, high); } Metadata KafkaHandleBase::get_metadata(bool all_topics) const { - return get_metadata(all_topics, nullptr); + return get_metadata(all_topics, nullptr, timeout_ms_); +} + +Metadata KafkaHandleBase::get_metadata(bool all_topics, + milliseconds timeout) const { + return get_metadata(all_topics, nullptr, timeout); } TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const { - Metadata md = get_metadata(false, topic.get_handle()); + return get_metadata(topic, timeout_ms_); +} + +TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic, + milliseconds timeout) const { + Metadata md = get_metadata(false, topic.get_handle(), timeout); auto topics = md.get_topics(); if (topics.empty()) { throw ElementNotFound("topic metadata", topic.get_name()); @@ -134,7 +150,12 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const { } GroupInformation KafkaHandleBase::get_consumer_group(const string& name) { - auto result = fetch_consumer_groups(name.c_str()); + return get_consumer_group(name, timeout_ms_); +} + +GroupInformation KafkaHandleBase::get_consumer_group(const string& name, + milliseconds timeout) { + auto result = fetch_consumer_groups(name.c_str(), timeout); if (result.empty()) { throw ElementNotFound("consumer group information", name); } @@ -142,11 +163,21 @@ GroupInformation KafkaHandleBase::get_consumer_group(const string& name) { } vector KafkaHandleBase::get_consumer_groups() { - return fetch_consumer_groups(nullptr); + return get_consumer_groups(timeout_ms_); +} + +vector KafkaHandleBase::get_consumer_groups(milliseconds timeout) { + return fetch_consumer_groups(nullptr, timeout); } TopicPartitionList KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const { + return get_offsets_for_times(queries, timeout_ms_); +} + +TopicPartitionList +KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries, + milliseconds timeout) const { TopicPartitionList topic_partitions; for (const auto& query : queries) { const TopicPartition& topic_partition = query.first; @@ -154,9 +185,9 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri query.second.count()); } TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); - const int timeout = static_cast(timeout_ms_.count()); + const int timeout_ms = static_cast(timeout.count()); rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(), - timeout); + timeout_ms); check_error(result, topic_list_handle.get()); return convert(topic_list_handle); } @@ -193,19 +224,22 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf return Topic(topic); } -Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const { +Metadata KafkaHandleBase::get_metadata(bool all_topics, + rd_kafka_topic_t* topic_ptr, + milliseconds timeout) const { const rd_kafka_metadata_t* metadata; - const int timeout = static_cast(timeout_ms_.count()); + const int timeout_ms = static_cast(timeout.count()); rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics, - topic_ptr, &metadata, timeout); + topic_ptr, &metadata, timeout_ms); check_error(error); return Metadata(metadata); } -vector KafkaHandleBase::fetch_consumer_groups(const char* name) { +vector KafkaHandleBase::fetch_consumer_groups(const char* name, + milliseconds timeout) { const rd_kafka_group_list* list = nullptr; - const int timeout = static_cast(timeout_ms_.count()); - auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout); + const int timeout_ms = static_cast(timeout.count()); + auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms); check_error(result); // Wrap this in a unique_ptr so it gets auto deleted