Add consumer rebalance callbacks

This commit is contained in:
Matias Fontanini
2016-05-22 15:11:32 -07:00
parent 58258750df
commit 29989ea346
4 changed files with 86 additions and 2 deletions

View File

@@ -12,9 +12,17 @@ namespace cppkafka {
const milliseconds Consumer::DEFAULT_TIMEOUT{1000};
Consumer::Consumer(const Configuration& config)
void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
TopicPartitionList list = TopicPartitionList::make_non_owning(partitions);
static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
}
Consumer::Consumer(Configuration config)
: timeout_ms_(DEFAULT_TIMEOUT) {
char error_buffer[512];
// Set ourselves as the opaque pointer
rd_kafka_conf_set_opaque(config.get_handle(), this);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, config.get_handle(),
error_buffer, sizeof(error_buffer));
if (!ptr) {
@@ -27,6 +35,18 @@ void Consumer::set_timeout(const milliseconds timeout) {
timeout_ms_ = timeout;
}
void Consumer::set_assignment_callback(AssignmentCallback callback) {
assignment_callback_ = move(callback);
}
void Consumer::set_revocation_callback(RevocationCallback callback) {
revocation_callback_ = move(callback);
}
void Consumer::set_rebalance_error_callback(RebalanceErrorCallback callback) {
rebalance_error_callback_ = move(callback);
}
void Consumer::subscribe(const vector<string>& topics) {
TopicPartitionList list(topics.begin(), topics.end());
rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), list.get_handle());
@@ -45,6 +65,11 @@ void Consumer::assign(const TopicPartitionList& topic_partitions) {
check_error(error);
}
void Consumer::unassign() {
rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), nullptr);
check_error(error);
}
void Consumer::close() {
rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle());
check_error(error);
@@ -117,4 +142,26 @@ void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) {
check_error(error);
}
void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
const TopicPartitionList& topic_partitions) {
if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
if (assignment_callback_) {
assignment_callback_(topic_partitions);
}
assign(topic_partitions);
}
else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
if (revocation_callback_) {
revocation_callback_(topic_partitions);
}
unassign();
}
else {
if (rebalance_error_callback_) {
rebalance_error_callback_(error);
}
unassign();
}
}
} // cppkafka