Added timeout overloads for consumer and handle classes

This commit is contained in:
Docker RHEL
2020-04-08 15:23:05 +00:00
parent 2ce0ae4a62
commit b2b0d16fee
4 changed files with 152 additions and 17 deletions

View File

@@ -185,9 +185,15 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p
TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const {
return get_offsets_committed(topic_partitions, get_timeout());
}
TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions,
milliseconds timeout) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
static_cast<int>(get_timeout().count()));
static_cast<int>(timeout.count()));
check_error(error, topic_list_handle.get());
return convert(topic_list_handle);
}

View File

@@ -108,24 +108,40 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config)
KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
return query_offsets(topic_partition, timeout_ms_);
}
KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition,
milliseconds timeout) const {
int64_t low;
int64_t high;
const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition();
const int timeout = static_cast<int>(timeout_ms_.count());
const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
partition, &low, &high,
timeout);
timeout_ms);
check_error(result);
return make_tuple(low, high);
}
Metadata KafkaHandleBase::get_metadata(bool all_topics) const {
return get_metadata(all_topics, nullptr);
return get_metadata(all_topics, nullptr, timeout_ms_);
}
Metadata KafkaHandleBase::get_metadata(bool all_topics,
milliseconds timeout) const {
return get_metadata(all_topics, nullptr, timeout);
}
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
Metadata md = get_metadata(false, topic.get_handle());
return get_metadata(topic, timeout_ms_);
}
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic,
milliseconds timeout) const {
Metadata md = get_metadata(false, topic.get_handle(), timeout);
auto topics = md.get_topics();
if (topics.empty()) {
throw ElementNotFound("topic metadata", topic.get_name());
@@ -134,7 +150,12 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
}
GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
auto result = fetch_consumer_groups(name.c_str());
return get_consumer_group(name, timeout_ms_);
}
GroupInformation KafkaHandleBase::get_consumer_group(const string& name,
milliseconds timeout) {
auto result = fetch_consumer_groups(name.c_str(), timeout);
if (result.empty()) {
throw ElementNotFound("consumer group information", name);
}
@@ -142,11 +163,21 @@ GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
}
vector<GroupInformation> KafkaHandleBase::get_consumer_groups() {
return fetch_consumer_groups(nullptr);
return get_consumer_groups(timeout_ms_);
}
vector<GroupInformation> KafkaHandleBase::get_consumer_groups(milliseconds timeout) {
return fetch_consumer_groups(nullptr, timeout);
}
TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
return get_offsets_for_times(queries, timeout_ms_);
}
TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
milliseconds timeout) const {
TopicPartitionList topic_partitions;
for (const auto& query : queries) {
const TopicPartition& topic_partition = query.first;
@@ -154,9 +185,9 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri
query.second.count());
}
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
const int timeout = static_cast<int>(timeout_ms_.count());
const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
timeout);
timeout_ms);
check_error(result, topic_list_handle.get());
return convert(topic_list_handle);
}
@@ -193,19 +224,22 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf
return Topic(topic);
}
Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const {
Metadata KafkaHandleBase::get_metadata(bool all_topics,
rd_kafka_topic_t* topic_ptr,
milliseconds timeout) const {
const rd_kafka_metadata_t* metadata;
const int timeout = static_cast<int>(timeout_ms_.count());
const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics,
topic_ptr, &metadata, timeout);
topic_ptr, &metadata, timeout_ms);
check_error(error);
return Metadata(metadata);
}
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name) {
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name,
milliseconds timeout) {
const rd_kafka_group_list* list = nullptr;
const int timeout = static_cast<int>(timeout_ms_.count());
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout);
const int timeout_ms = static_cast<int>(timeout.count());
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms);
check_error(result);
// Wrap this in a unique_ptr so it gets auto deleted