Pause/resume a consumer by topic (#67)

* Pause a consumer by topic

* Changes per review comments

* convert rvalue to value

* Refactored code to provide a more generic way of getting partition subsets

* Changes per code review and added test cases

* Modified loop to use binary search instead of linear

* Simplify find_matches test cases
This commit is contained in:
Alex Damian
2018-05-23 16:03:47 -04:00
committed by Matias Fontanini
parent ee71b3979a
commit 46c396f729
5 changed files with 110 additions and 1 deletions

View File

@@ -193,6 +193,16 @@ public:
*/
void unassign();
/**
* \brief Pauses all consumption
*/
void pause();
/**
* \brief Resumes all consumption
*/
void resume();
/**
* \brief Commits the current partition assignment
*
@@ -364,7 +374,6 @@ public:
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);
void close();
void commit(const Message& msg, bool async);
void commit(const TopicPartitionList* topic_partitions, bool async);

View File

@@ -34,6 +34,7 @@
#include <iosfwd>
#include <algorithm>
#include <vector>
#include <set>
#include <librdkafka/rdkafka.h>
#include "macros.h"
@@ -54,6 +55,16 @@ CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_part
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);
// Extracts a partition list subset belonging to the provided topics (case-insensitive)
CPPKAFKA_API TopicPartitionList find_matches(const TopicPartitionList& partitions,
const std::set<std::string>& topics);
// Extracts a partition list subset belonging to the provided partition ids
// Note: this assumes that all topic partitions in the original list belong to the same topic
// otherwise the partition ids may not be unique
CPPKAFKA_API TopicPartitionList find_matches(const TopicPartitionList& partitions,
const std::set<int>& ids);
CPPKAFKA_API std::ostream& operator<<(std::ostream& output, const TopicPartitionList& rhs);
} // cppkafka

View File

@@ -27,6 +27,8 @@
*
*/
#include <sstream>
#include <algorithm>
#include <cctype>
#include "consumer.h"
#include "exceptions.h"
#include "logging.h"
@@ -39,6 +41,8 @@ using std::move;
using std::make_tuple;
using std::ostringstream;
using std::chrono::milliseconds;
using std::toupper;
using std::equal;
namespace cppkafka {
@@ -125,6 +129,14 @@ void Consumer::unassign() {
check_error(error);
}
void Consumer::pause() {
pause_partitions(get_assignment());
}
void Consumer::resume() {
resume_partitions(get_assignment());
}
void Consumer::commit() {
commit(nullptr, false);
}

View File

@@ -28,12 +28,15 @@
*/
#include <iostream>
#include <string>
#include "topic_partition_list.h"
#include "topic_partition.h"
#include "exceptions.h"
using std::vector;
using std::set;
using std::ostream;
using std::string;
namespace cppkafka {
@@ -67,6 +70,37 @@ TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
}
TopicPartitionList find_matches(const TopicPartitionList& partitions,
const set<string>& topics) {
TopicPartitionList subset;
for (const auto& partition : partitions) {
for (const auto& topic : topics) {
if (topic.size() == partition.get_topic().size()) {
// compare both strings
bool match = equal(topic.begin(), topic.end(), partition.get_topic().begin(),
[](char c1, char c2)->bool {
return toupper(c1) == toupper(c2);
});
if (match) {
subset.emplace_back(partition);
}
}
}
}
return subset;
}
TopicPartitionList find_matches(const TopicPartitionList& partitions,
const set<int>& ids) {
TopicPartitionList subset;
for (const auto& partition : partitions) {
if (ids.count(partition.get_partition()) > 0) {
subset.emplace_back(partition);
}
}
return subset;
}
ostream& operator<<(ostream& output, const TopicPartitionList& rhs) {
output << "[ ";
for (auto iter = rhs.begin(); iter != rhs.end(); ++iter) {

View File

@@ -4,6 +4,8 @@
#include "cppkafka/topic_partition.h"
using std::ostringstream;
using std::set;
using std::string;
using namespace cppkafka;
@@ -42,3 +44,44 @@ TEST_CASE("topic partition list to string", "[topic_partition]") {
output << list;
CHECK(output.str() == "[ foo[-1:#], bar[2:#], foobar[3:4] ]");
}
TEST_CASE("find matches by topic", "[topic_partition]") {
const TopicPartitionList list = {
{ "foo", 0 },
{ "bar", 3 },
{ "fb", 1 },
{ "foo", 1 },
{ "fb", 2 },
{ "other", 1 },
{ "a", 1 }
};
const TopicPartitionList expected = {
{ "foo", 0 },
{ "fb", 1 },
{ "foo", 1 },
{ "fb", 2 },
};
const TopicPartitionList subset = find_matches(list, set<string>{"foo", "fb"});
CHECK(subset == expected);
}
TEST_CASE("find matches by id", "[topic_partition]") {
const TopicPartitionList list = {
{ "foo", 2 },
{ "foo", 3 },
{ "foo", 4 },
{ "foo", 5 },
{ "foo", 6 },
{ "foo", 7 },
{ "foo", 8 }
};
const TopicPartitionList expected = {
{ "foo", 2 },
{ "foo", 5 },
{ "foo", 8 },
};
const TopicPartitionList subset = find_matches(list, set<int>{2,5,8});
CHECK(subset == expected);
}