Add get/query offset wrappers

This commit is contained in:
Matias Fontanini
2016-06-05 18:14:46 -07:00
parent 65a60f1690
commit 7708e945b7
6 changed files with 72 additions and 18 deletions

View File

@@ -41,10 +41,13 @@ public:
void commit(const TopicPartitionList& topic_partitions); void commit(const TopicPartitionList& topic_partitions);
void async_commit(const TopicPartitionList& topic_partitions); void async_commit(const TopicPartitionList& topic_partitions);
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions); OffsetTuple get_offsets(const std::string& topic, int partition) const;
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions);
TopicPartitionList get_subscription(); TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;
TopicPartitionList get_assignment(); TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;
TopicPartitionList get_subscription() const;
TopicPartitionList get_assignment() const;
std::string get_member_id() const;
Message poll(); Message poll();
private: private:

View File

@@ -6,6 +6,7 @@
#include <chrono> #include <chrono>
#include <unordered_map> #include <unordered_map>
#include <mutex> #include <mutex>
#include <tuple>
#include <librdkafka/rdkafka.h> #include <librdkafka/rdkafka.h>
#include "metadata.h" #include "metadata.h"
#include "topic_partition.h" #include "topic_partition.h"
@@ -19,6 +20,8 @@ class Topic;
class KafkaHandleBase { class KafkaHandleBase {
public: public:
using OffsetTuple = std::tuple<int64_t, int64_t>;
virtual ~KafkaHandleBase() = default; virtual ~KafkaHandleBase() = default;
KafkaHandleBase(const KafkaHandleBase&) = delete; KafkaHandleBase(const KafkaHandleBase&) = delete;
KafkaHandleBase(KafkaHandleBase&&) = delete; KafkaHandleBase(KafkaHandleBase&&) = delete;
@@ -30,18 +33,21 @@ public:
void set_timeout(const std::chrono::milliseconds& timeout); void set_timeout(const std::chrono::milliseconds& timeout);
rd_kafka_t* get_handle(); OffsetTuple query_offsets(const std::string& topic, int partition) const;
rd_kafka_t* get_handle() const;
Topic get_topic(const std::string& name); Topic get_topic(const std::string& name);
Topic get_topic(const std::string& name, TopicConfiguration config); Topic get_topic(const std::string& name, TopicConfiguration config);
Metadata get_metadata(); Metadata get_metadata() const;
Metadata get_metadata(const Topic& topic); Metadata get_metadata(const Topic& topic) const;
std::string get_name() const;
std::chrono::milliseconds get_timeout() const; std::chrono::milliseconds get_timeout() const;
const Configuration& get_configuration() const; const Configuration& get_configuration() const;
protected: protected:
KafkaHandleBase(Configuration config); KafkaHandleBase(Configuration config);
void set_handle(rd_kafka_t* handle); void set_handle(rd_kafka_t* handle);
void check_error(rd_kafka_resp_err_t error); void check_error(rd_kafka_resp_err_t error) const;
rd_kafka_conf_t* get_configuration_handle(); rd_kafka_conf_t* get_configuration_handle();
private: private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT; static const std::chrono::milliseconds DEFAULT_TIMEOUT;
@@ -50,7 +56,7 @@ private:
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>; using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(rd_kafka_topic_t* topic_ptr); Metadata get_metadata(rd_kafka_topic_t* topic_ptr) const;
void save_topic_config(const std::string& topic_name, TopicConfiguration config); void save_topic_config(const std::string& topic_name, TopicConfiguration config);
HandlePtr handle_; HandlePtr handle_;

View File

@@ -6,6 +6,7 @@
using std::vector; using std::vector;
using std::string; using std::string;
using std::move; using std::move;
using std::make_tuple;
using std::chrono::milliseconds; using std::chrono::milliseconds;
@@ -96,7 +97,17 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) {
commit(topic_partitions, true); commit(topic_partitions, true);
} }
TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) { KafkaHandleBase::OffsetTuple Consumer::get_offsets(const string& topic, int partition) const {
int64_t low;
int64_t high;
rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(),
partition, &low, &high);
check_error(result);
return make_tuple(low, high);
}
TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
get_timeout().count()); get_timeout().count());
@@ -104,14 +115,15 @@ TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& top
return convert(topic_list_handle); return convert(topic_list_handle);
} }
TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) { TopicPartitionList
Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get()); rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get());
check_error(error); check_error(error);
return convert(topic_list_handle); return convert(topic_list_handle);
} }
TopicPartitionList Consumer::get_subscription() { TopicPartitionList Consumer::get_subscription() const {
rd_kafka_resp_err_t error; rd_kafka_resp_err_t error;
rd_kafka_topic_partition_list_t* list = nullptr; rd_kafka_topic_partition_list_t* list = nullptr;
error = rd_kafka_subscription(get_handle(), &list); error = rd_kafka_subscription(get_handle(), &list);
@@ -119,7 +131,7 @@ TopicPartitionList Consumer::get_subscription() {
return convert(make_handle(list)); return convert(make_handle(list));
} }
TopicPartitionList Consumer::get_assignment() { TopicPartitionList Consumer::get_assignment() const {
rd_kafka_resp_err_t error; rd_kafka_resp_err_t error;
rd_kafka_topic_partition_list_t* list = nullptr; rd_kafka_topic_partition_list_t* list = nullptr;
error = rd_kafka_assignment(get_handle(), &list); error = rd_kafka_assignment(get_handle(), &list);
@@ -127,6 +139,10 @@ TopicPartitionList Consumer::get_assignment() {
return convert(make_handle(list)); return convert(make_handle(list));
} }
string Consumer::get_member_id() const {
return rd_kafka_memberid(get_handle());
}
Message Consumer::poll() { Message Consumer::poll() {
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(),
get_timeout().count()); get_timeout().count());

View File

@@ -6,6 +6,7 @@
using std::string; using std::string;
using std::vector; using std::vector;
using std::move; using std::move;
using std::make_tuple;
using std::lock_guard; using std::lock_guard;
using std::mutex; using std::mutex;
using std::chrono::milliseconds; using std::chrono::milliseconds;
@@ -42,7 +43,7 @@ void KafkaHandleBase::set_timeout(const milliseconds& timeout) {
timeout_ms_ = timeout; timeout_ms_ = timeout;
} }
rd_kafka_t* KafkaHandleBase::get_handle() { rd_kafka_t* KafkaHandleBase::get_handle() const {
return handle_.get(); return handle_.get();
} }
@@ -57,14 +58,29 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config)
return get_topic(name, rd_kafka_topic_conf_dup(handle)); return get_topic(name, rd_kafka_topic_conf_dup(handle));
} }
Metadata KafkaHandleBase::get_metadata() { KafkaHandleBase::OffsetTuple KafkaHandleBase::query_offsets(const string& topic,
int partition) const {
int64_t low;
int64_t high;
rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
partition, &low, &high,
timeout_ms_.count());
check_error(result);
return make_tuple(low, high);
}
Metadata KafkaHandleBase::get_metadata() const {
return get_metadata(nullptr); return get_metadata(nullptr);
} }
Metadata KafkaHandleBase::get_metadata(const Topic& topic) { Metadata KafkaHandleBase::get_metadata(const Topic& topic) const {
return get_metadata(topic.get_handle()); return get_metadata(topic.get_handle());
} }
string KafkaHandleBase::get_name() const {
return rd_kafka_name(handle_.get());
}
milliseconds KafkaHandleBase::get_timeout() const { milliseconds KafkaHandleBase::get_timeout() const {
return timeout_ms_; return timeout_ms_;
} }
@@ -85,7 +101,7 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf
return Topic(topic); return Topic(topic);
} }
Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) { Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) const {
const rd_kafka_metadata_t* metadata; const rd_kafka_metadata_t* metadata;
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), topic_ptr != nullptr, rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), topic_ptr != nullptr,
topic_ptr, &metadata, timeout_ms_.count()); topic_ptr, &metadata, timeout_ms_.count());
@@ -99,7 +115,7 @@ void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfigura
iter->second.set_as_opaque(); iter->second.set_as_opaque();
} }
void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const {
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error); throw HandleException(error);
} }

View File

@@ -14,6 +14,7 @@ using std::string;
using std::thread; using std::thread;
using std::set; using std::set;
using std::mutex; using std::mutex;
using std::tie;
using std::condition_variable; using std::condition_variable;
using std::lock_guard; using std::lock_guard;
using std::unique_lock; using std::unique_lock;
@@ -131,6 +132,12 @@ TEST_F(ConsumerTest, AssignmentCallback) {
assignment = consumer.get_assignment(); assignment = consumer.get_assignment();
EXPECT_EQ(3, assignment.size()); EXPECT_EQ(3, assignment.size());
int64_t low;
int64_t high;
tie(low, high) = consumer.get_offsets(KAFKA_TOPIC, partition);
EXPECT_GT(high, low);
EXPECT_EQ(high, runner.get_messages().back().get_offset() + 1);
} }
TEST_F(ConsumerTest, Rebalance) { TEST_F(ConsumerTest, Rebalance) {

View File

@@ -10,6 +10,7 @@
using std::string; using std::string;
using std::to_string; using std::to_string;
using std::set; using std::set;
using std::tie;
using std::move; using std::move;
using std::thread; using std::thread;
using std::mutex; using std::mutex;
@@ -124,6 +125,11 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error()); EXPECT_EQ(0, message.get_error());
int64_t low;
int64_t high;
tie(low, high) = producer.query_offsets(KAFKA_TOPIC, partition);
EXPECT_GT(high, low);
} }
TEST_F(ProducerTest, OneMessageUsingKey) { TEST_F(ProducerTest, OneMessageUsingKey) {