From 577bbb02422df90e0af4a9712b51dd6cfc4f8bc0 Mon Sep 17 00:00:00 2001 From: Alex Damian Date: Tue, 26 Jun 2018 11:57:49 -0400 Subject: [PATCH] added error check for partition list (#90) --- include/cppkafka/kafka_handle_base.h | 2 ++ src/consumer.cpp | 32 ++++++++++++++++++---------- src/kafka_handle_base.cpp | 21 +++++++++++++++--- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 334c6be..2cc2376 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -253,6 +253,8 @@ protected: void set_handle(rd_kafka_t* handle); void check_error(rd_kafka_resp_err_t error) const; + void check_error(rd_kafka_resp_err_t error, + const rd_kafka_topic_partition_list_t* list_ptr) const; rd_kafka_conf_t* get_configuration_handle(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; diff --git a/src/consumer.cpp b/src/consumer.cpp index 9a308c6..d35f453 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -121,11 +121,16 @@ void Consumer::unsubscribe() { } void Consumer::assign(const TopicPartitionList& topic_partitions) { - TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); - // If the list is empty, then we need to use a null pointer - auto handle = topic_partitions.empty() ? nullptr : topic_list_handle.get(); - rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle); - check_error(error); + rd_kafka_resp_err_t error; + if (topic_partitions.empty()) { + error = rd_kafka_assign(get_handle(), nullptr); + check_error(error); + } + else { + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + error = rd_kafka_assign(get_handle(), topic_list_handle.get()); + check_error(error, topic_list_handle.get()); + } } void Consumer::unassign() { @@ -181,7 +186,7 @@ Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) cons TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), static_cast(get_timeout().count())); - check_error(error); + check_error(error, topic_list_handle.get()); return convert(topic_list_handle); } @@ -189,7 +194,7 @@ TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const { TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get()); - check_error(error); + check_error(error, topic_list_handle.get()); return convert(topic_list_handle); } @@ -287,10 +292,15 @@ void Consumer::commit(const Message& msg, bool async) { void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) { rd_kafka_resp_err_t error; - error = rd_kafka_commit(get_handle(), - !topic_partitions ? nullptr : convert(*topic_partitions).get(), - async ? 1 : 0); - check_error(error); + if (topic_partitions == nullptr) { + error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0); + check_error(error); + } + else { + TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions); + error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0); + check_error(error, topic_list_handle.get()); + } } void Consumer::handle_rebalance(rd_kafka_resp_err_t error, diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index c3daba1..284b10f 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -61,7 +61,7 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(), topic_list_handle.get()); - check_error(error); + check_error(error, topic_list_handle.get()); } void KafkaHandleBase::pause(const std::string& topic) { @@ -72,7 +72,7 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(), topic_list_handle.get()); - check_error(error); + check_error(error, topic_list_handle.get()); } void KafkaHandleBase::resume(const std::string& topic) { @@ -153,7 +153,7 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri const int timeout = static_cast(timeout_ms_.count()); rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(), timeout); - check_error(result); + check_error(result, topic_list_handle.get()); return convert(topic_list_handle); } @@ -228,6 +228,21 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const { } } +void KafkaHandleBase::check_error(rd_kafka_resp_err_t error, + const rd_kafka_topic_partition_list_t* list_ptr) const { + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw HandleException(error); + } + if (list_ptr) { + //check if any partition has errors + for (int i = 0; i < list_ptr->cnt; ++i) { + if (list_ptr->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw HandleException(error); + } + } + } +} + rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() { return config_.get_handle(); }