diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index bd52865..a480727 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -74,6 +74,30 @@ public: InvalidConfigOptionType(const std::string& config_name, const std::string& type); }; +/** + * Indicates something that was being looked up failed to be found + */ +class CPPKAFKA_API ElementNotFound : public Exception { +public: + ElementNotFound(const std::string& element_type, const std::string& name); +}; + +/** + * Indicates something that was incorrectly parsed + */ +class CPPKAFKA_API ParseException : public Exception { +public: + ParseException(const std::string& message); +}; + +/** + * Indicates something had an unexpected versiom + */ +class CPPKAFKA_API UnexpectedVersion : public Exception { +public: + UnexpectedVersion(uint32_t version); +}; + /** * A generic rdkafka handle error */ diff --git a/include/cppkafka/group_information.h b/include/cppkafka/group_information.h new file mode 100644 index 0000000..483568f --- /dev/null +++ b/include/cppkafka/group_information.h @@ -0,0 +1,141 @@ +#ifndef CPPKAFKA_GROUP_INFORMATION_H +#define CPPKAFKA_GROUP_INFORMATION_H + +#include +#include +#include "macros.h" +#include "metadata.h" +#include "error.h" +#include "topic_partition_list.h" + +namespace cppkafka { + +/** + * \brief Parses the member assignment information + * + * This class parses the data in GroupMemberInformation::get_member_assignment. + */ +class CPPKAFKA_API MemberAssignmentInformation { +public: + /** + * Constructs an instance + */ + MemberAssignmentInformation(const std::vector& data); + + /** + * Gets the version + */ + uint16_t get_version() const; + + /** + * Gets the topic/partition assignment + */ + const TopicPartitionList& get_topic_partitions() const; +private: + uint16_t version_; + TopicPartitionList topic_partitions_; +}; + +/** + * \brief Represents the information about a specific consumer group member + */ +class CPPKAFKA_API GroupMemberInformation { +public: + /** + * Constructs an instance using the provided information + * + * \param info The information pointer + */ + GroupMemberInformation(const rd_kafka_group_member_info& info); + + /** + * Gets the member id + */ + const std::string& get_member_id() const; + + /** + * Gets the client id + */ + const std::string& get_client_id() const; + + /** + * Gets the client host + */ + const std::string& get_client_host() const; + + /** + * Gets the member metadata + */ + const std::vector& get_member_metadata() const; + + /** + * Gets the member assignment + */ + const std::vector& get_member_assignment() const; +private: + std::string member_id_; + std::string client_id_; + std::string client_host_; + std::vector member_metadata_; + std::vector member_assignment_; +}; + +/** + * \brief Represents the information about a specific consumer group + */ +class CPPKAFKA_API GroupInformation { +public: + /** + * Constructs an instance using the provided information. + * + * \param info The information pointer + */ + GroupInformation(const rd_kafka_group_info& info); + + /** + * Gets the broker metadata + */ + const BrokerMetadata& get_broker() const; + + /** + * Gets the group name + */ + const std::string& get_name() const; + + /** + * Gets the broker-originated error + */ + Error get_error() const; + + /** + * Gets the group state + */ + const std::string& get_state() const; + + /** + * Gets the group protocol type + */ + const std::string& get_protocol_type() const; + + /** + * Gets the group protocol + */ + const std::string& get_protocol() const; + + /** + * Gets the group members + */ + const std::vector& get_members() const; +private: + BrokerMetadata broker_; + std::string name_; + Error error_; + std::string state_; + std::string protocol_type_; + std::string protocol_; + std::vector members_; +}; + +} // cppkafka + +#endif // CPPKAFKA_GROUP_INFORMATION_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index b76f87d..c61e67b 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -50,6 +50,7 @@ namespace cppkafka { class Topic; class Metadata; class TopicMetadata; +class GroupInformation; /** * Base class for kafka consumer/producer @@ -155,6 +156,18 @@ public: */ TopicMetadata get_metadata(const Topic& topic) const; + /** + * Gets the consumer group information + * + * \param name The name of the consumer group to look up + */ + GroupInformation get_consumer_group(const std::string& name); + + /** + * Gets all consumer groups + */ + std::vector get_consumer_groups(); + /** * \brief Gets topic/partition offsets based on timestamps * @@ -201,6 +214,7 @@ private: Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const; + std::vector fetch_consumer_groups(const char* name); void save_topic_config(const std::string& topic_name, TopicConfiguration config); HandlePtr handle_; diff --git a/include/cppkafka/topic_partition.h b/include/cppkafka/topic_partition.h index a77b07a..bfddd61 100644 --- a/include/cppkafka/topic_partition.h +++ b/include/cppkafka/topic_partition.h @@ -117,6 +117,16 @@ public: */ bool operator<(const TopicPartition& rhs) const; + /** + * Compare the (topic, partition) for equality + */ + bool operator==(const TopicPartition& rhs) const; + + /** + * Compare the (topic, partition) for in-equality + */ + bool operator!=(const TopicPartition& rhs) const; + /** * Print to a stream */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f62bcfc..348f549 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,6 +9,7 @@ set(SOURCES topic_partition.cpp topic_partition_list.cpp metadata.cpp + group_information.cpp error.cpp kafka_handle_base.cpp diff --git a/src/exceptions.cpp b/src/exceptions.cpp index 616675e..bdc178e 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -30,6 +30,7 @@ #include "exceptions.h" using std::string; +using std::to_string; namespace cppkafka { @@ -65,6 +66,26 @@ InvalidConfigOptionType::InvalidConfigOptionType(const string& config_name, cons } +// ElementNotFound + +ElementNotFound::ElementNotFound(const string& element_type, const string& name) +: Exception("Could not find " + element_type + " for " + name) { + +} + +// ParseException + +ParseException::ParseException(const string& message) +: Exception(message) { + +} + +// UnexpectedVersion + +UnexpectedVersion::UnexpectedVersion(uint32_t version) +: Exception("Unexpected version " + to_string(version)) { +} + // HandleException HandleException::HandleException(Error error) diff --git a/src/group_information.cpp b/src/group_information.cpp new file mode 100644 index 0000000..069ae0f --- /dev/null +++ b/src/group_information.cpp @@ -0,0 +1,144 @@ +#include "group_information.h" +#include +#include +#include "topic_partition.h" +#include "exceptions.h" + +using std::string; +using std::vector; +using std::memcpy; +using std::distance; + +namespace cppkafka { + +// MemberAssignmentInformation +MemberAssignmentInformation::MemberAssignmentInformation(const vector& data) { + const char* error_msg = "Message is malformed"; + // Version + topic list size + if (data.size() < sizeof(uint16_t) + sizeof(uint32_t)) { + throw ParseException(error_msg); + } + const uint8_t* ptr = data.data(); + const uint8_t* end = ptr + data.size(); + memcpy(&version_, ptr, sizeof(version_)); + version_ = be16toh(version_); + ptr += sizeof(version_); + + uint32_t total_topics; + memcpy(&total_topics, ptr, sizeof(total_topics)); + total_topics = be32toh(total_topics); + ptr += sizeof(total_topics); + + for (uint32_t i = 0; i != total_topics; ++i) { + if (ptr + sizeof(uint16_t) > end) { + throw ParseException(error_msg); + } + uint16_t topic_length; + memcpy(&topic_length, ptr, sizeof(topic_length)); + topic_length = be16toh(topic_length); + ptr += sizeof(topic_length); + + // Check for string length + size of partitions list + if (topic_length > distance(ptr, end) + sizeof(uint32_t)) { + throw ParseException(error_msg); + } + string topic_name(ptr, ptr + topic_length); + ptr += topic_length; + + uint32_t total_partitions; + memcpy(&total_partitions, ptr, sizeof(total_partitions)); + total_partitions = be32toh(total_partitions); + ptr += sizeof(total_partitions); + + if (ptr + total_partitions * sizeof(uint32_t) > end) { + throw ParseException(error_msg); + } + for (uint32_t j = 0; j < total_partitions; ++j) { + uint32_t partition; + memcpy(&partition, ptr, sizeof(partition)); + partition = be32toh(partition); + ptr += sizeof(partition); + + topic_partitions_.emplace_back(topic_name, partition); + } + } +} + +uint16_t MemberAssignmentInformation::get_version() const { + return version_; +} + +const TopicPartitionList& MemberAssignmentInformation::get_topic_partitions() const { + return topic_partitions_; +} + +// GroupMemberInformation + +GroupMemberInformation::GroupMemberInformation(const rd_kafka_group_member_info& info) +: member_id_(info.member_id), client_id_(info.client_id), client_host_(info.client_host), + member_metadata_((uint8_t*)info.member_metadata, + (uint8_t*)info.member_metadata + info.member_metadata_size), + member_assignment_((uint8_t*)info.member_assignment, + (uint8_t*)info.member_assignment + info.member_assignment_size) { + +} + +const string& GroupMemberInformation::get_member_id() const { + return member_id_; +} + +const string& GroupMemberInformation::get_client_id() const { + return client_id_; +} + +const string& GroupMemberInformation::get_client_host() const { + return client_host_; +} + +const vector& GroupMemberInformation::get_member_metadata() const { + return member_metadata_; +} + +const vector& GroupMemberInformation::get_member_assignment() const { + return member_assignment_; +} + +// GroupInformation + +GroupInformation::GroupInformation(const rd_kafka_group_info& info) +: broker_(info.broker), name_(info.group), error_(info.err), state_(info.state), + protocol_type_(info.protocol_type), protocol_(info.protocol) { + for (int i = 0; i < info.member_cnt; ++i) { + members_.emplace_back(info.members[i]); + } +} + +const BrokerMetadata& GroupInformation::get_broker() const { + return broker_; +} + +const string& GroupInformation::get_name() const { + return name_; +} + +Error GroupInformation::get_error() const { + return error_; +} + +const string& GroupInformation::get_state() const { + return state_; +} + +const string& GroupInformation::get_protocol_type() const { + return protocol_type_; +} + +const string& GroupInformation::get_protocol() const { + return protocol_; +} + +const vector& GroupInformation::get_members() const { + return members_; +} + +} // cppkafka diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 2d269fc..eea5e38 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -29,6 +29,7 @@ #include "kafka_handle_base.h" #include "metadata.h" +#include "group_information.h" #include "exceptions.h" #include "topic.h" #include "topic_partition_list.h" @@ -114,11 +115,23 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const { Metadata md = get_metadata(false, topic.get_handle()); auto topics = md.get_topics(); if (topics.empty()) { - throw Exception("Failed to find metadata for topic"); + throw ElementNotFound("topic metadata", topic.get_name()); } return topics.front(); } +GroupInformation KafkaHandleBase::get_consumer_group(const string& name) { + auto result = fetch_consumer_groups(name.c_str()); + if (result.empty()) { + throw ElementNotFound("consumer group information", name); + } + return move(result[0]); +} + +vector KafkaHandleBase::get_consumer_groups() { + return fetch_consumer_groups(nullptr); +} + TopicPartitionList KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const { TopicPartitionList topic_partitions; @@ -170,6 +183,23 @@ Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ return Metadata(metadata); } +vector KafkaHandleBase::fetch_consumer_groups(const char* name) { + const rd_kafka_group_list* list = nullptr; + auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms_.count()); + check_error(result); + + // Wrap this in a unique_ptr so it gets auto deleted + using GroupHandle = std::unique_ptr; + GroupHandle group_handle(list, &rd_kafka_group_list_destroy); + + vector groups; + for (int i = 0; i < list->group_cnt; ++i) { + groups.emplace_back(list->groups[i]); + } + return groups; +} + void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfiguration config) { lock_guard _(topic_configurations_mutex_); auto iter = topic_configurations_.emplace(topic_name, move(config)).first; diff --git a/src/topic_partition.cpp b/src/topic_partition.cpp index 64231ee..82f8865 100644 --- a/src/topic_partition.cpp +++ b/src/topic_partition.cpp @@ -83,6 +83,14 @@ bool TopicPartition::operator<(const TopicPartition& rhs) const { return tie(topic_, partition_) < tie(rhs.topic_, rhs.partition_); } +bool TopicPartition::operator==(const TopicPartition& rhs) const { + return tie(topic_, partition_) == tie(rhs.topic_, rhs.partition_); +} + +bool TopicPartition::operator!=(const TopicPartition& rhs) const { + return !(*this == rhs); +} + ostream& operator<<(ostream& output, const TopicPartition& rhs) { return output << rhs.get_topic() << "[" << rhs.get_partition() << "]"; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0b7d4df..24e9143 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -14,8 +14,13 @@ macro(create_test test_name) add_test(${test_name} ${test_name}_test) add_dependencies(tests ${test_name}_test) add_dependencies(${test_name}_test cppkafka) + target_link_libraries(${test_name}_test cppkafka-test) endmacro() +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp) +add_dependencies(cppkafka-test cppkafka) + add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") create_test(consumer) create_test(producer) diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index db51afc..6a9573f 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -7,6 +7,7 @@ #include #include "cppkafka/consumer.h" #include "cppkafka/producer.h" +#include "test_utils.h" using std::vector; using std::move; @@ -24,63 +25,6 @@ using std::chrono::system_clock; using namespace cppkafka; -class ConsumerRunner { -public: - ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) - : consumer_(consumer) { - bool booted = false; - mutex mtx; - condition_variable cond; - thread_ = thread([&, expected, partitions]() { - consumer_.set_timeout(milliseconds(500)); - size_t number_eofs = 0; - auto start = system_clock::now(); - while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { - Message msg = consumer_.poll(); - if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - number_eofs++; - if (number_eofs == partitions) { - lock_guard _(mtx); - booted = true; - cond.notify_one(); - } - } - else if (msg && !msg.get_error() && number_eofs == partitions) { - messages_.push_back(move(msg)); - } - } - }); - - unique_lock lock(mtx); - while (!booted) { - cond.wait(lock); - } - } - - - - ConsumerRunner(const ConsumerRunner&) = delete; - ConsumerRunner& operator=(const ConsumerRunner&) = delete; - - ~ConsumerRunner() { - try_join(); - } - - const std::vector& get_messages() const { - return messages_; - } - - void try_join() { - if (thread_.joinable()) { - thread_.join(); - } - } -private: - Consumer& consumer_; - thread thread_; - std::vector messages_; -}; - class ConsumerTest : public testing::Test { public: static const string KAFKA_TOPIC; diff --git a/tests/kafka_handle_base_test.cpp b/tests/kafka_handle_base_test.cpp index 7f82092..bd81bb7 100644 --- a/tests/kafka_handle_base_test.cpp +++ b/tests/kafka_handle_base_test.cpp @@ -1,8 +1,11 @@ #include #include #include +#include "cppkafka/consumer.h" #include "cppkafka/producer.h" #include "cppkafka/metadata.h" +#include "cppkafka/group_information.h" +#include "test_utils.h" using std::vector; using std::set; @@ -97,3 +100,42 @@ TEST_F(KafkaHandleBaseTest, TopicsMetadata) { Topic topic = producer.get_topic(KAFKA_TOPIC); EXPECT_EQ(KAFKA_TOPIC, producer.get_metadata(topic).get_name()); } + +TEST_F(KafkaHandleBaseTest, ConsumerGroups) { + string consumer_group = "kafka_handle_test"; + string client_id = "my_client_id"; + + Configuration config = make_config(); + config.set("group.id", consumer_group); + config.set("client.id", client_id); + config.set("enable.auto.commit", false); + + // Build consumer + Consumer consumer(config); + consumer.subscribe({ KAFKA_TOPIC }); + ConsumerRunner runner(consumer, 0, 3); + runner.try_join(); + + GroupInformation information = consumer.get_consumer_group(consumer_group); + EXPECT_EQ(consumer_group, information.get_name()); + EXPECT_EQ("consumer", information.get_protocol_type()); + ASSERT_EQ(1, information.get_members().size()); + + auto member = information.get_members()[0]; + EXPECT_EQ(client_id, member.get_client_id()); + + MemberAssignmentInformation assignment = member.get_member_assignment(); + EXPECT_EQ(0, assignment.get_version()); + vector expected_topic_partitions = { + { KAFKA_TOPIC, 0 }, + { KAFKA_TOPIC, 1 }, + { KAFKA_TOPIC, 2 } + }; + vector topic_partitions = assignment.get_topic_partitions(); + sort(topic_partitions.begin(), topic_partitions.end()); + EXPECT_EQ(expected_topic_partitions, topic_partitions); + /*for (const auto c : ) { + printf("%0d,", (int)c & 0xff); + } + std::cout << std::endl;*/ +} diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index d29cc94..d3595f7 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -7,6 +7,7 @@ #include "cppkafka/producer.h" #include "cppkafka/consumer.h" #include "cppkafka/utils/buffered_producer.h" +#include "test_utils.h" using std::string; using std::to_string; @@ -25,64 +26,6 @@ using std::chrono::milliseconds; using namespace cppkafka; -class ConsumerRunner { -public: - ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) - : consumer_(consumer) { - bool booted = false; - mutex mtx; - condition_variable cond; - thread_ = thread([&, expected, partitions]() { - consumer_.set_timeout(milliseconds(500)); - size_t number_eofs = 0; - auto start = system_clock::now(); - while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { - Message msg = consumer_.poll(); - if (msg && number_eofs != partitions && - msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - number_eofs++; - if (number_eofs == partitions) { - lock_guard _(mtx); - booted = true; - cond.notify_one(); - } - } - else if (msg && !msg.get_error()) { - messages_.push_back(move(msg)); - } - } - }); - - unique_lock lock(mtx); - while (!booted) { - cond.wait(lock); - } - } - - - - ConsumerRunner(const ConsumerRunner&) = delete; - ConsumerRunner& operator=(const ConsumerRunner&) = delete; - - ~ConsumerRunner() { - try_join(); - } - - const std::vector& get_messages() const { - return messages_; - } - - void try_join() { - if (thread_.joinable()) { - thread_.join(); - } - } -private: - Consumer& consumer_; - thread thread_; - std::vector messages_; -}; - class ProducerTest : public testing::Test { public: static const string KAFKA_TOPIC; diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp new file mode 100644 index 0000000..e3cc080 --- /dev/null +++ b/tests/test_utils.cpp @@ -0,0 +1,77 @@ +#include +#include +#include +#include "test_utils.h" + +using std::vector; +using std::move; +using std::thread; +using std::mutex; +using std::lock_guard; +using std::unique_lock; +using std::condition_variable; + +using std::chrono::system_clock; +using std::chrono::milliseconds; +using std::chrono::seconds; + +using cppkafka::Consumer; +using cppkafka::Message; + +ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) +: consumer_(consumer) { + bool booted = false; + mutex mtx; + condition_variable cond; + thread_ = thread([&, expected, partitions]() { + consumer_.set_timeout(milliseconds(500)); + size_t number_eofs = 0; + auto start = system_clock::now(); + while (system_clock::now() - start < seconds(20)) { + if (expected > 0 && messages_.size() == expected) { + break; + } + if (expected == 0 && number_eofs >= partitions) { + break; + } + Message msg = consumer_.poll(); + if (msg && number_eofs != partitions && + msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + number_eofs++; + if (number_eofs == partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + } + else if (msg && !msg.get_error() && number_eofs == partitions) { + messages_.push_back(move(msg)); + } + } + if (number_eofs < partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + }); + + unique_lock lock(mtx); + while (!booted) { + cond.wait(lock); + } +} + +ConsumerRunner::~ConsumerRunner() { + try_join(); +} + +const vector& ConsumerRunner::get_messages() const { + return messages_; +} + +void ConsumerRunner::try_join() { + if (thread_.joinable()) { + thread_.join(); + } +} + diff --git a/tests/test_utils.h b/tests/test_utils.h new file mode 100644 index 0000000..310989e --- /dev/null +++ b/tests/test_utils.h @@ -0,0 +1,24 @@ +#ifndef CPPKAFKA_TEST_UTILS_H +#define CPPKAFKA_TEST_UTILS_H + +#include +#include +#include "cppkafka/consumer.h" + +class ConsumerRunner { +public: + ConsumerRunner(cppkafka::Consumer& consumer, size_t expected, size_t partitions); + ConsumerRunner(const ConsumerRunner&) = delete; + ConsumerRunner& operator=(const ConsumerRunner&) = delete; + ~ConsumerRunner(); + + const std::vector& get_messages() const; + + void try_join(); +private: + cppkafka::Consumer& consumer_; + std::thread thread_; + std::vector messages_; +}; + +#endif // CPPKAFKA_TEST_UTILS_H