Allow getting consumer group information

This commit is contained in:
Matias Fontanini
2017-04-16 19:45:03 -07:00
parent ebde747e18
commit dbb547889b
15 changed files with 544 additions and 116 deletions

View File

@@ -74,6 +74,30 @@ public:
InvalidConfigOptionType(const std::string& config_name, const std::string& type);
};
/**
* Indicates something that was being looked up failed to be found
*/
class CPPKAFKA_API ElementNotFound : public Exception {
public:
ElementNotFound(const std::string& element_type, const std::string& name);
};
/**
* Indicates something that was incorrectly parsed
*/
class CPPKAFKA_API ParseException : public Exception {
public:
ParseException(const std::string& message);
};
/**
* Indicates something had an unexpected versiom
*/
class CPPKAFKA_API UnexpectedVersion : public Exception {
public:
UnexpectedVersion(uint32_t version);
};
/**
* A generic rdkafka handle error
*/

View File

@@ -0,0 +1,141 @@
#ifndef CPPKAFKA_GROUP_INFORMATION_H
#define CPPKAFKA_GROUP_INFORMATION_H
#include <vector>
#include <cstdint>
#include "macros.h"
#include "metadata.h"
#include "error.h"
#include "topic_partition_list.h"
namespace cppkafka {
/**
* \brief Parses the member assignment information
*
* This class parses the data in GroupMemberInformation::get_member_assignment.
*/
class CPPKAFKA_API MemberAssignmentInformation {
public:
/**
* Constructs an instance
*/
MemberAssignmentInformation(const std::vector<uint8_t>& data);
/**
* Gets the version
*/
uint16_t get_version() const;
/**
* Gets the topic/partition assignment
*/
const TopicPartitionList& get_topic_partitions() const;
private:
uint16_t version_;
TopicPartitionList topic_partitions_;
};
/**
* \brief Represents the information about a specific consumer group member
*/
class CPPKAFKA_API GroupMemberInformation {
public:
/**
* Constructs an instance using the provided information
*
* \param info The information pointer
*/
GroupMemberInformation(const rd_kafka_group_member_info& info);
/**
* Gets the member id
*/
const std::string& get_member_id() const;
/**
* Gets the client id
*/
const std::string& get_client_id() const;
/**
* Gets the client host
*/
const std::string& get_client_host() const;
/**
* Gets the member metadata
*/
const std::vector<uint8_t>& get_member_metadata() const;
/**
* Gets the member assignment
*/
const std::vector<uint8_t>& get_member_assignment() const;
private:
std::string member_id_;
std::string client_id_;
std::string client_host_;
std::vector<uint8_t> member_metadata_;
std::vector<uint8_t> member_assignment_;
};
/**
* \brief Represents the information about a specific consumer group
*/
class CPPKAFKA_API GroupInformation {
public:
/**
* Constructs an instance using the provided information.
*
* \param info The information pointer
*/
GroupInformation(const rd_kafka_group_info& info);
/**
* Gets the broker metadata
*/
const BrokerMetadata& get_broker() const;
/**
* Gets the group name
*/
const std::string& get_name() const;
/**
* Gets the broker-originated error
*/
Error get_error() const;
/**
* Gets the group state
*/
const std::string& get_state() const;
/**
* Gets the group protocol type
*/
const std::string& get_protocol_type() const;
/**
* Gets the group protocol
*/
const std::string& get_protocol() const;
/**
* Gets the group members
*/
const std::vector<GroupMemberInformation>& get_members() const;
private:
BrokerMetadata broker_;
std::string name_;
Error error_;
std::string state_;
std::string protocol_type_;
std::string protocol_;
std::vector<GroupMemberInformation> members_;
};
} // cppkafka
#endif // CPPKAFKA_GROUP_INFORMATION_H

View File

@@ -50,6 +50,7 @@ namespace cppkafka {
class Topic;
class Metadata;
class TopicMetadata;
class GroupInformation;
/**
* Base class for kafka consumer/producer
@@ -155,6 +156,18 @@ public:
*/
TopicMetadata get_metadata(const Topic& topic) const;
/**
* Gets the consumer group information
*
* \param name The name of the consumer group to look up
*/
GroupInformation get_consumer_group(const std::string& name);
/**
* Gets all consumer groups
*/
std::vector<GroupInformation> get_consumer_groups();
/**
* \brief Gets topic/partition offsets based on timestamps
*
@@ -201,6 +214,7 @@ private:
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
std::vector<GroupInformation> fetch_consumer_groups(const char* name);
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
HandlePtr handle_;

View File

@@ -117,6 +117,16 @@ public:
*/
bool operator<(const TopicPartition& rhs) const;
/**
* Compare the (topic, partition) for equality
*/
bool operator==(const TopicPartition& rhs) const;
/**
* Compare the (topic, partition) for in-equality
*/
bool operator!=(const TopicPartition& rhs) const;
/**
* Print to a stream
*/

View File

@@ -9,6 +9,7 @@ set(SOURCES
topic_partition.cpp
topic_partition_list.cpp
metadata.cpp
group_information.cpp
error.cpp
kafka_handle_base.cpp

View File

@@ -30,6 +30,7 @@
#include "exceptions.h"
using std::string;
using std::to_string;
namespace cppkafka {
@@ -65,6 +66,26 @@ InvalidConfigOptionType::InvalidConfigOptionType(const string& config_name, cons
}
// ElementNotFound
ElementNotFound::ElementNotFound(const string& element_type, const string& name)
: Exception("Could not find " + element_type + " for " + name) {
}
// ParseException
ParseException::ParseException(const string& message)
: Exception(message) {
}
// UnexpectedVersion
UnexpectedVersion::UnexpectedVersion(uint32_t version)
: Exception("Unexpected version " + to_string(version)) {
}
// HandleException
HandleException::HandleException(Error error)

144
src/group_information.cpp Normal file
View File

@@ -0,0 +1,144 @@
#include "group_information.h"
#include <cstring>
#include <algorithm>
#include "topic_partition.h"
#include "exceptions.h"
using std::string;
using std::vector;
using std::memcpy;
using std::distance;
namespace cppkafka {
// MemberAssignmentInformation
MemberAssignmentInformation::MemberAssignmentInformation(const vector<uint8_t>& data) {
const char* error_msg = "Message is malformed";
// Version + topic list size
if (data.size() < sizeof(uint16_t) + sizeof(uint32_t)) {
throw ParseException(error_msg);
}
const uint8_t* ptr = data.data();
const uint8_t* end = ptr + data.size();
memcpy(&version_, ptr, sizeof(version_));
version_ = be16toh(version_);
ptr += sizeof(version_);
uint32_t total_topics;
memcpy(&total_topics, ptr, sizeof(total_topics));
total_topics = be32toh(total_topics);
ptr += sizeof(total_topics);
for (uint32_t i = 0; i != total_topics; ++i) {
if (ptr + sizeof(uint16_t) > end) {
throw ParseException(error_msg);
}
uint16_t topic_length;
memcpy(&topic_length, ptr, sizeof(topic_length));
topic_length = be16toh(topic_length);
ptr += sizeof(topic_length);
// Check for string length + size of partitions list
if (topic_length > distance(ptr, end) + sizeof(uint32_t)) {
throw ParseException(error_msg);
}
string topic_name(ptr, ptr + topic_length);
ptr += topic_length;
uint32_t total_partitions;
memcpy(&total_partitions, ptr, sizeof(total_partitions));
total_partitions = be32toh(total_partitions);
ptr += sizeof(total_partitions);
if (ptr + total_partitions * sizeof(uint32_t) > end) {
throw ParseException(error_msg);
}
for (uint32_t j = 0; j < total_partitions; ++j) {
uint32_t partition;
memcpy(&partition, ptr, sizeof(partition));
partition = be32toh(partition);
ptr += sizeof(partition);
topic_partitions_.emplace_back(topic_name, partition);
}
}
}
uint16_t MemberAssignmentInformation::get_version() const {
return version_;
}
const TopicPartitionList& MemberAssignmentInformation::get_topic_partitions() const {
return topic_partitions_;
}
// GroupMemberInformation
GroupMemberInformation::GroupMemberInformation(const rd_kafka_group_member_info& info)
: member_id_(info.member_id), client_id_(info.client_id), client_host_(info.client_host),
member_metadata_((uint8_t*)info.member_metadata,
(uint8_t*)info.member_metadata + info.member_metadata_size),
member_assignment_((uint8_t*)info.member_assignment,
(uint8_t*)info.member_assignment + info.member_assignment_size) {
}
const string& GroupMemberInformation::get_member_id() const {
return member_id_;
}
const string& GroupMemberInformation::get_client_id() const {
return client_id_;
}
const string& GroupMemberInformation::get_client_host() const {
return client_host_;
}
const vector<uint8_t>& GroupMemberInformation::get_member_metadata() const {
return member_metadata_;
}
const vector<uint8_t>& GroupMemberInformation::get_member_assignment() const {
return member_assignment_;
}
// GroupInformation
GroupInformation::GroupInformation(const rd_kafka_group_info& info)
: broker_(info.broker), name_(info.group), error_(info.err), state_(info.state),
protocol_type_(info.protocol_type), protocol_(info.protocol) {
for (int i = 0; i < info.member_cnt; ++i) {
members_.emplace_back(info.members[i]);
}
}
const BrokerMetadata& GroupInformation::get_broker() const {
return broker_;
}
const string& GroupInformation::get_name() const {
return name_;
}
Error GroupInformation::get_error() const {
return error_;
}
const string& GroupInformation::get_state() const {
return state_;
}
const string& GroupInformation::get_protocol_type() const {
return protocol_type_;
}
const string& GroupInformation::get_protocol() const {
return protocol_;
}
const vector<GroupMemberInformation>& GroupInformation::get_members() const {
return members_;
}
} // cppkafka

View File

@@ -29,6 +29,7 @@
#include "kafka_handle_base.h"
#include "metadata.h"
#include "group_information.h"
#include "exceptions.h"
#include "topic.h"
#include "topic_partition_list.h"
@@ -114,11 +115,23 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
Metadata md = get_metadata(false, topic.get_handle());
auto topics = md.get_topics();
if (topics.empty()) {
throw Exception("Failed to find metadata for topic");
throw ElementNotFound("topic metadata", topic.get_name());
}
return topics.front();
}
GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
auto result = fetch_consumer_groups(name.c_str());
if (result.empty()) {
throw ElementNotFound("consumer group information", name);
}
return move(result[0]);
}
vector<GroupInformation> KafkaHandleBase::get_consumer_groups() {
return fetch_consumer_groups(nullptr);
}
TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
TopicPartitionList topic_partitions;
@@ -170,6 +183,23 @@ Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_
return Metadata(metadata);
}
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name) {
const rd_kafka_group_list* list = nullptr;
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms_.count());
check_error(result);
// Wrap this in a unique_ptr so it gets auto deleted
using GroupHandle = std::unique_ptr<const rd_kafka_group_list,
decltype(&rd_kafka_group_list_destroy)>;
GroupHandle group_handle(list, &rd_kafka_group_list_destroy);
vector<GroupInformation> groups;
for (int i = 0; i < list->group_cnt; ++i) {
groups.emplace_back(list->groups[i]);
}
return groups;
}
void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfiguration config) {
lock_guard<mutex> _(topic_configurations_mutex_);
auto iter = topic_configurations_.emplace(topic_name, move(config)).first;

View File

@@ -83,6 +83,14 @@ bool TopicPartition::operator<(const TopicPartition& rhs) const {
return tie(topic_, partition_) < tie(rhs.topic_, rhs.partition_);
}
bool TopicPartition::operator==(const TopicPartition& rhs) const {
return tie(topic_, partition_) == tie(rhs.topic_, rhs.partition_);
}
bool TopicPartition::operator!=(const TopicPartition& rhs) const {
return !(*this == rhs);
}
ostream& operator<<(ostream& output, const TopicPartition& rhs) {
return output << rhs.get_topic() << "[" << rhs.get_partition() << "]";
}

View File

@@ -14,8 +14,13 @@ macro(create_test test_name)
add_test(${test_name} ${test_name}_test)
add_dependencies(tests ${test_name}_test)
add_dependencies(${test_name}_test cppkafka)
target_link_libraries(${test_name}_test cppkafka-test)
endmacro()
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp)
add_dependencies(cppkafka-test cppkafka)
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
create_test(consumer)
create_test(producer)

View File

@@ -7,6 +7,7 @@
#include <gtest/gtest.h>
#include "cppkafka/consumer.h"
#include "cppkafka/producer.h"
#include "test_utils.h"
using std::vector;
using std::move;
@@ -24,63 +25,6 @@ using std::chrono::system_clock;
using namespace cppkafka;
class ConsumerRunner {
public:
ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions)
: consumer_(consumer) {
bool booted = false;
mutex mtx;
condition_variable cond;
thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
while (system_clock::now() - start < seconds(10) && messages_.size() < expected) {
Message msg = consumer_.poll();
if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
else if (msg && !msg.get_error() && number_eofs == partitions) {
messages_.push_back(move(msg));
}
}
});
unique_lock<mutex> lock(mtx);
while (!booted) {
cond.wait(lock);
}
}
ConsumerRunner(const ConsumerRunner&) = delete;
ConsumerRunner& operator=(const ConsumerRunner&) = delete;
~ConsumerRunner() {
try_join();
}
const std::vector<Message>& get_messages() const {
return messages_;
}
void try_join() {
if (thread_.joinable()) {
thread_.join();
}
}
private:
Consumer& consumer_;
thread thread_;
std::vector<Message> messages_;
};
class ConsumerTest : public testing::Test {
public:
static const string KAFKA_TOPIC;

View File

@@ -1,8 +1,11 @@
#include <set>
#include <unordered_set>
#include <gtest/gtest.h>
#include "cppkafka/consumer.h"
#include "cppkafka/producer.h"
#include "cppkafka/metadata.h"
#include "cppkafka/group_information.h"
#include "test_utils.h"
using std::vector;
using std::set;
@@ -97,3 +100,42 @@ TEST_F(KafkaHandleBaseTest, TopicsMetadata) {
Topic topic = producer.get_topic(KAFKA_TOPIC);
EXPECT_EQ(KAFKA_TOPIC, producer.get_metadata(topic).get_name());
}
TEST_F(KafkaHandleBaseTest, ConsumerGroups) {
string consumer_group = "kafka_handle_test";
string client_id = "my_client_id";
Configuration config = make_config();
config.set("group.id", consumer_group);
config.set("client.id", client_id);
config.set("enable.auto.commit", false);
// Build consumer
Consumer consumer(config);
consumer.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner(consumer, 0, 3);
runner.try_join();
GroupInformation information = consumer.get_consumer_group(consumer_group);
EXPECT_EQ(consumer_group, information.get_name());
EXPECT_EQ("consumer", information.get_protocol_type());
ASSERT_EQ(1, information.get_members().size());
auto member = information.get_members()[0];
EXPECT_EQ(client_id, member.get_client_id());
MemberAssignmentInformation assignment = member.get_member_assignment();
EXPECT_EQ(0, assignment.get_version());
vector<TopicPartition> expected_topic_partitions = {
{ KAFKA_TOPIC, 0 },
{ KAFKA_TOPIC, 1 },
{ KAFKA_TOPIC, 2 }
};
vector<TopicPartition> topic_partitions = assignment.get_topic_partitions();
sort(topic_partitions.begin(), topic_partitions.end());
EXPECT_EQ(expected_topic_partitions, topic_partitions);
/*for (const auto c : ) {
printf("%0d,", (int)c & 0xff);
}
std::cout << std::endl;*/
}

View File

@@ -7,6 +7,7 @@
#include "cppkafka/producer.h"
#include "cppkafka/consumer.h"
#include "cppkafka/utils/buffered_producer.h"
#include "test_utils.h"
using std::string;
using std::to_string;
@@ -25,64 +26,6 @@ using std::chrono::milliseconds;
using namespace cppkafka;
class ConsumerRunner {
public:
ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions)
: consumer_(consumer) {
bool booted = false;
mutex mtx;
condition_variable cond;
thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
while (system_clock::now() - start < seconds(10) && messages_.size() < expected) {
Message msg = consumer_.poll();
if (msg && number_eofs != partitions &&
msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
else if (msg && !msg.get_error()) {
messages_.push_back(move(msg));
}
}
});
unique_lock<mutex> lock(mtx);
while (!booted) {
cond.wait(lock);
}
}
ConsumerRunner(const ConsumerRunner&) = delete;
ConsumerRunner& operator=(const ConsumerRunner&) = delete;
~ConsumerRunner() {
try_join();
}
const std::vector<Message>& get_messages() const {
return messages_;
}
void try_join() {
if (thread_.joinable()) {
thread_.join();
}
}
private:
Consumer& consumer_;
thread thread_;
std::vector<Message> messages_;
};
class ProducerTest : public testing::Test {
public:
static const string KAFKA_TOPIC;

77
tests/test_utils.cpp Normal file
View File

@@ -0,0 +1,77 @@
#include <mutex>
#include <chrono>
#include <condition_variable>
#include "test_utils.h"
using std::vector;
using std::move;
using std::thread;
using std::mutex;
using std::lock_guard;
using std::unique_lock;
using std::condition_variable;
using std::chrono::system_clock;
using std::chrono::milliseconds;
using std::chrono::seconds;
using cppkafka::Consumer;
using cppkafka::Message;
ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions)
: consumer_(consumer) {
bool booted = false;
mutex mtx;
condition_variable cond;
thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
while (system_clock::now() - start < seconds(20)) {
if (expected > 0 && messages_.size() == expected) {
break;
}
if (expected == 0 && number_eofs >= partitions) {
break;
}
Message msg = consumer_.poll();
if (msg && number_eofs != partitions &&
msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
else if (msg && !msg.get_error() && number_eofs == partitions) {
messages_.push_back(move(msg));
}
}
if (number_eofs < partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
});
unique_lock<mutex> lock(mtx);
while (!booted) {
cond.wait(lock);
}
}
ConsumerRunner::~ConsumerRunner() {
try_join();
}
const vector<Message>& ConsumerRunner::get_messages() const {
return messages_;
}
void ConsumerRunner::try_join() {
if (thread_.joinable()) {
thread_.join();
}
}

24
tests/test_utils.h Normal file
View File

@@ -0,0 +1,24 @@
#ifndef CPPKAFKA_TEST_UTILS_H
#define CPPKAFKA_TEST_UTILS_H
#include <thread>
#include <vector>
#include "cppkafka/consumer.h"
class ConsumerRunner {
public:
ConsumerRunner(cppkafka::Consumer& consumer, size_t expected, size_t partitions);
ConsumerRunner(const ConsumerRunner&) = delete;
ConsumerRunner& operator=(const ConsumerRunner&) = delete;
~ConsumerRunner();
const std::vector<cppkafka::Message>& get_messages() const;
void try_join();
private:
cppkafka::Consumer& consumer_;
std::thread thread_;
std::vector<cppkafka::Message> messages_;
};
#endif // CPPKAFKA_TEST_UTILS_H