diff --git a/.gitignore b/.gitignore index 378eac2..ddd4bc4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ build +include/cppkafka/config.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e542618..b3dd353 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,18 @@ include_directories(${Boost_INCLUDE_DIRS}) find_package(RdKafka REQUIRED) include_directories(${RDKAFKA_INCLUDE_DIR}) +if (HAVE_OFFSETS_FOR_TIMES) + message(STATUS "Enabling support for KafkaHandleBase::get_offsets_for_times") + set(CPPKAFKA_HAVE_OFFSET_FOR_TIMES ON) +else() + message(STATUS "Disabling support for KafkaHandleBase::get_offsets_for_times") +endif() +# Configuration file +configure_file( + "${PROJECT_SOURCE_DIR}/include/cppkafka/config.h.in" + "${PROJECT_SOURCE_DIR}/include/cppkafka/config.h" +) + add_subdirectory(src) add_subdirectory(include) diff --git a/cmake/FindRdKafka.cmake b/cmake/FindRdKafka.cmake index 82da1ae..50182a4 100644 --- a/cmake/FindRdKafka.cmake +++ b/cmake/FindRdKafka.cmake @@ -24,6 +24,7 @@ include(CheckFunctionExists) set(CMAKE_REQUIRED_LIBRARIES ${RDKAFKA_LIBRARY}) check_function_exists(rd_kafka_committed HAVE_VALID_KAFKA_VERSION) +check_function_exists(rd_kafka_offsets_for_times HAVE_OFFSETS_FOR_TIMES) set(CMAKE_REQUIRED_LIBRARIES) if (HAVE_VALID_KAFKA_VERSION) diff --git a/include/cppkafka/config.h.in b/include/cppkafka/config.h.in new file mode 100644 index 0000000..c0d69c3 --- /dev/null +++ b/include/cppkafka/config.h.in @@ -0,0 +1,6 @@ +#ifndef CPPKAFKA_CONFIG_H +#define CPPKAFKA_CONFIG_H + +#cmakedefine CPPKAFKA_HAVE_OFFSET_FOR_TIMES + +#endif // CPPKAFKA_CONFIG_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index e1eea6e..2017b33 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -34,14 +34,17 @@ #include #include #include +#include #include #include +#include #include #include "topic_partition.h" #include "topic_partition_list.h" #include "topic_configuration.h" #include "configuration.h" #include "macros.h" +#include "config.h" namespace cppkafka { @@ -55,6 +58,7 @@ class TopicMetadata; class CPPKAFKA_API KafkaHandleBase { public: using OffsetTuple = std::tuple; + using TopicPartitionsTimestampsMap = std::map; virtual ~KafkaHandleBase() = default; KafkaHandleBase(const KafkaHandleBase&) = delete; @@ -152,6 +156,19 @@ public: */ TopicMetadata get_metadata(const Topic& topic) const; + #ifdef CPPKAFKA_HAVE_OFFSET_FOR_TIMES + + /** + * \brief Gets topic/partition offsets based on timestamps + * + * This translates into a call to rd_kafka_offsets_for_times + * + * \param queries A map from topic/partition to the timestamp to be used + */ + TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const; + + #endif // CPPKAFKA_HAVE_OFFSET_FOR_TIMES + /** * Returns the kafka handle name */ diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index f0f6730..341515f 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -119,6 +119,23 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const { return topics.front(); } +#ifdef CPPKAFKA_HAVE_OFFSET_FOR_TIMES +TopicPartitionList +KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const { + TopicPartitionList topic_partitions; + for (const auto& query : queries) { + const TopicPartition& topic_partition = query.first; + topic_partitions.emplace_back(topic_partition.get_topic(), topic_partition.get_partition(), + query.second.count()); + } + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(), + timeout_ms_.count()); + check_error(result); + return convert(topic_list_handle); +} +#endif // CPPKAFKA_HAVE_OFFSET_FOR_TIMES + string KafkaHandleBase::get_name() const { return rd_kafka_name(handle_.get()); }