Merge pull request #241 from accelerated/timeout-overloads

Added timeout overloads for consumer and handle classes
This commit is contained in:
Matias Fontanini
2020-04-08 08:28:26 -07:00
committed by GitHub
4 changed files with 152 additions and 17 deletions

View File

@@ -280,6 +280,20 @@ public:
* \return The topic partition list * \return The topic partition list
*/ */
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const; TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;
/**
* \brief Gets the offsets committed for the given topic/partition list with a timeout
*
* This translates into a call to rd_kafka_committed
*
* \param topic_partitions The topic/partition list to be queried
*
* \param timeout The timeout for this operation. Supersedes the default consumer timeout.
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions,
std::chrono::milliseconds timeout) const;
/** /**
* \brief Gets the offset positions for the given topic/partition list * \brief Gets the offset positions for the given topic/partition list

View File

@@ -134,6 +134,20 @@ public:
* \return A pair of watermark offsets {low, high} * \return A pair of watermark offsets {low, high}
*/ */
OffsetTuple query_offsets(const TopicPartition& topic_partition) const; OffsetTuple query_offsets(const TopicPartition& topic_partition) const;
/**
* \brief Queries the offset for the given topic/partition with a given timeout
*
* This translates into a call to rd_kafka_query_watermark_offsets
*
* \param topic_partition The topic/partition to be queried
*
* \timeout The timeout for this operation. This supersedes the default handle timeout.
*
* \return A pair of watermark offsets {low, high}
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition,
std::chrono::milliseconds timeout) const;
/** /**
* \brief Gets the rdkafka handle * \brief Gets the rdkafka handle
@@ -177,6 +191,20 @@ public:
* \return The metadata * \return The metadata
*/ */
Metadata get_metadata(bool all_topics = true) const; Metadata get_metadata(bool all_topics = true) const;
/**
* \brief Gets metadata for brokers, topics, partitions, etc with a timeout
*
* This translates into a call to rd_kafka_metadata
*
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return The metadata
*/
Metadata get_metadata(bool all_topics,
std::chrono::milliseconds timeout) const;
/** /**
* \brief Gets general metadata but only fetches metadata for the given topic rather than * \brief Gets general metadata but only fetches metadata for the given topic rather than
@@ -189,6 +217,21 @@ public:
* \return The topic metadata * \return The topic metadata
*/ */
TopicMetadata get_metadata(const Topic& topic) const; TopicMetadata get_metadata(const Topic& topic) const;
/**
* \brief Gets general metadata but only fetches metadata for the given topic rather than
* all of them. Uses a timeout to limit the operation execution time.
*
* This translates into a call to rd_kafka_metadata
*
* \param topic The topic to fetch information for
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return The topic metadata
*/
TopicMetadata get_metadata(const Topic& topic,
std::chrono::milliseconds timeout) const;
/** /**
* \brief Gets the consumer group information * \brief Gets the consumer group information
@@ -198,6 +241,18 @@ public:
* \return The group information * \return The group information
*/ */
GroupInformation get_consumer_group(const std::string& name); GroupInformation get_consumer_group(const std::string& name);
/**
* \brief Gets the consumer group information with a timeout
*
* \param name The name of the consumer group to look up
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return The group information
*/
GroupInformation get_consumer_group(const std::string& name,
std::chrono::milliseconds timeout);
/** /**
* \brief Gets all consumer groups * \brief Gets all consumer groups
@@ -205,6 +260,15 @@ public:
* \return A list of consumer groups * \return A list of consumer groups
*/ */
GroupInformationList get_consumer_groups(); GroupInformationList get_consumer_groups();
/**
* \brief Gets all consumer groups with a timeout
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return A list of consumer groups
*/
GroupInformationList get_consumer_groups(std::chrono::milliseconds timeout);
/** /**
* \brief Gets topic/partition offsets based on timestamps * \brief Gets topic/partition offsets based on timestamps
@@ -216,6 +280,20 @@ public:
* \return A topic partition list * \return A topic partition list
*/ */
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const; TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
/**
* \brief Gets topic/partition offsets based on timestamps with a timeout
*
* This translates into a call to rd_kafka_offsets_for_times
*
* \param queries A map from topic/partition to the timestamp to be used
*
* \param timeout The timeout for this operation. This supersedes the default handle timeout.
*
* \return A topic partition list
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
std::chrono::milliseconds timeout) const;
/** /**
* \brief Get the kafka handle name * \brief Get the kafka handle name
@@ -283,8 +361,11 @@ private:
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>; using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const; Metadata get_metadata(bool all_topics,
GroupInformationList fetch_consumer_groups(const char* name); rd_kafka_topic_t* topic_ptr,
std::chrono::milliseconds timeout) const;
GroupInformationList fetch_consumer_groups(const char* name,
std::chrono::milliseconds timeout);
void save_topic_config(const std::string& topic_name, TopicConfiguration config); void save_topic_config(const std::string& topic_name, TopicConfiguration config);
std::chrono::milliseconds timeout_ms_; std::chrono::milliseconds timeout_ms_;

View File

@@ -185,9 +185,15 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p
TopicPartitionList TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const { Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const {
return get_offsets_committed(topic_partitions, get_timeout());
}
TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions,
milliseconds timeout) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
static_cast<int>(get_timeout().count())); static_cast<int>(timeout.count()));
check_error(error, topic_list_handle.get()); check_error(error, topic_list_handle.get());
return convert(topic_list_handle); return convert(topic_list_handle);
} }

View File

@@ -108,24 +108,40 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config)
KafkaHandleBase::OffsetTuple KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const { KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
return query_offsets(topic_partition, timeout_ms_);
}
KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition,
milliseconds timeout) const {
int64_t low; int64_t low;
int64_t high; int64_t high;
const string& topic = topic_partition.get_topic(); const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition(); const int partition = topic_partition.get_partition();
const int timeout = static_cast<int>(timeout_ms_.count()); const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(), rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
partition, &low, &high, partition, &low, &high,
timeout); timeout_ms);
check_error(result); check_error(result);
return make_tuple(low, high); return make_tuple(low, high);
} }
Metadata KafkaHandleBase::get_metadata(bool all_topics) const { Metadata KafkaHandleBase::get_metadata(bool all_topics) const {
return get_metadata(all_topics, nullptr); return get_metadata(all_topics, nullptr, timeout_ms_);
}
Metadata KafkaHandleBase::get_metadata(bool all_topics,
milliseconds timeout) const {
return get_metadata(all_topics, nullptr, timeout);
} }
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const { TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
Metadata md = get_metadata(false, topic.get_handle()); return get_metadata(topic, timeout_ms_);
}
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic,
milliseconds timeout) const {
Metadata md = get_metadata(false, topic.get_handle(), timeout);
auto topics = md.get_topics(); auto topics = md.get_topics();
if (topics.empty()) { if (topics.empty()) {
throw ElementNotFound("topic metadata", topic.get_name()); throw ElementNotFound("topic metadata", topic.get_name());
@@ -134,7 +150,12 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
} }
GroupInformation KafkaHandleBase::get_consumer_group(const string& name) { GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
auto result = fetch_consumer_groups(name.c_str()); return get_consumer_group(name, timeout_ms_);
}
GroupInformation KafkaHandleBase::get_consumer_group(const string& name,
milliseconds timeout) {
auto result = fetch_consumer_groups(name.c_str(), timeout);
if (result.empty()) { if (result.empty()) {
throw ElementNotFound("consumer group information", name); throw ElementNotFound("consumer group information", name);
} }
@@ -142,11 +163,21 @@ GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
} }
vector<GroupInformation> KafkaHandleBase::get_consumer_groups() { vector<GroupInformation> KafkaHandleBase::get_consumer_groups() {
return fetch_consumer_groups(nullptr); return get_consumer_groups(timeout_ms_);
}
vector<GroupInformation> KafkaHandleBase::get_consumer_groups(milliseconds timeout) {
return fetch_consumer_groups(nullptr, timeout);
} }
TopicPartitionList TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const { KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
return get_offsets_for_times(queries, timeout_ms_);
}
TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
milliseconds timeout) const {
TopicPartitionList topic_partitions; TopicPartitionList topic_partitions;
for (const auto& query : queries) { for (const auto& query : queries) {
const TopicPartition& topic_partition = query.first; const TopicPartition& topic_partition = query.first;
@@ -154,9 +185,9 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri
query.second.count()); query.second.count());
} }
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
const int timeout = static_cast<int>(timeout_ms_.count()); const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(), rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
timeout); timeout_ms);
check_error(result, topic_list_handle.get()); check_error(result, topic_list_handle.get());
return convert(topic_list_handle); return convert(topic_list_handle);
} }
@@ -193,19 +224,22 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf
return Topic(topic); return Topic(topic);
} }
Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const { Metadata KafkaHandleBase::get_metadata(bool all_topics,
rd_kafka_topic_t* topic_ptr,
milliseconds timeout) const {
const rd_kafka_metadata_t* metadata; const rd_kafka_metadata_t* metadata;
const int timeout = static_cast<int>(timeout_ms_.count()); const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics, rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics,
topic_ptr, &metadata, timeout); topic_ptr, &metadata, timeout_ms);
check_error(error); check_error(error);
return Metadata(metadata); return Metadata(metadata);
} }
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name) { vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name,
milliseconds timeout) {
const rd_kafka_group_list* list = nullptr; const rd_kafka_group_list* list = nullptr;
const int timeout = static_cast<int>(timeout_ms_.count()); const int timeout_ms = static_cast<int>(timeout.count());
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout); auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms);
check_error(result); check_error(result);
// Wrap this in a unique_ptr so it gets auto deleted // Wrap this in a unique_ptr so it gets auto deleted