Add KafkaHandleBase::get_offsets_for_times

This commit is contained in:
Matias Fontanini
2017-04-15 19:21:51 -07:00
parent c9f3b0c5bc
commit 8143c5b06a
6 changed files with 54 additions and 0 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
build
include/cppkafka/config.h

View File

@@ -34,6 +34,18 @@ include_directories(${Boost_INCLUDE_DIRS})
find_package(RdKafka REQUIRED)
include_directories(${RDKAFKA_INCLUDE_DIR})
if (HAVE_OFFSETS_FOR_TIMES)
message(STATUS "Enabling support for KafkaHandleBase::get_offsets_for_times")
set(CPPKAFKA_HAVE_OFFSET_FOR_TIMES ON)
else()
message(STATUS "Disabling support for KafkaHandleBase::get_offsets_for_times")
endif()
# Configuration file
configure_file(
"${PROJECT_SOURCE_DIR}/include/cppkafka/config.h.in"
"${PROJECT_SOURCE_DIR}/include/cppkafka/config.h"
)
add_subdirectory(src)
add_subdirectory(include)

View File

@@ -24,6 +24,7 @@ include(CheckFunctionExists)
set(CMAKE_REQUIRED_LIBRARIES ${RDKAFKA_LIBRARY})
check_function_exists(rd_kafka_committed HAVE_VALID_KAFKA_VERSION)
check_function_exists(rd_kafka_offsets_for_times HAVE_OFFSETS_FOR_TIMES)
set(CMAKE_REQUIRED_LIBRARIES)
if (HAVE_VALID_KAFKA_VERSION)

View File

@@ -0,0 +1,6 @@
#ifndef CPPKAFKA_CONFIG_H
#define CPPKAFKA_CONFIG_H
#cmakedefine CPPKAFKA_HAVE_OFFSET_FOR_TIMES
#endif // CPPKAFKA_CONFIG_H

View File

@@ -34,14 +34,17 @@
#include <memory>
#include <chrono>
#include <unordered_map>
#include <map>
#include <mutex>
#include <tuple>
#include <chrono>
#include <librdkafka/rdkafka.h>
#include "topic_partition.h"
#include "topic_partition_list.h"
#include "topic_configuration.h"
#include "configuration.h"
#include "macros.h"
#include "config.h"
namespace cppkafka {
@@ -55,6 +58,7 @@ class TopicMetadata;
class CPPKAFKA_API KafkaHandleBase {
public:
using OffsetTuple = std::tuple<int64_t, int64_t>;
using TopicPartitionsTimestampsMap = std::map<TopicPartition, std::chrono::milliseconds>;
virtual ~KafkaHandleBase() = default;
KafkaHandleBase(const KafkaHandleBase&) = delete;
@@ -152,6 +156,19 @@ public:
*/
TopicMetadata get_metadata(const Topic& topic) const;
#ifdef CPPKAFKA_HAVE_OFFSET_FOR_TIMES
/**
* \brief Gets topic/partition offsets based on timestamps
*
* This translates into a call to rd_kafka_offsets_for_times
*
* \param queries A map from topic/partition to the timestamp to be used
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
#endif // CPPKAFKA_HAVE_OFFSET_FOR_TIMES
/**
* Returns the kafka handle name
*/

View File

@@ -119,6 +119,23 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
return topics.front();
}
#ifdef CPPKAFKA_HAVE_OFFSET_FOR_TIMES
TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
TopicPartitionList topic_partitions;
for (const auto& query : queries) {
const TopicPartition& topic_partition = query.first;
topic_partitions.emplace_back(topic_partition.get_topic(), topic_partition.get_partition(),
query.second.count());
}
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
timeout_ms_.count());
check_error(result);
return convert(topic_list_handle);
}
#endif // CPPKAFKA_HAVE_OFFSET_FOR_TIMES
string KafkaHandleBase::get_name() const {
return rd_kafka_name(handle_.get());
}