Add committed/assigned wrappers

This commit is contained in:
Matias Fontanini
2016-05-21 20:39:37 -07:00
parent acd5c4a603
commit c491136e0e
2 changed files with 20 additions and 0 deletions

View File

@@ -29,6 +29,9 @@ public:
void commit(const TopicPartitionList& topic_partitions);
void async_commit(const TopicPartitionList& topic_partitions);
TopicPartitionList get_committed(const TopicPartitionList& topic_partitions);
TopicPartitionList get_position(const TopicPartitionList& topic_partitions);
Message poll();
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;

View File

@@ -61,6 +61,23 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) {
commit(topic_partitions, true);
}
TopicPartitionList Consumer::get_committed(const TopicPartitionList& topic_partitions) {
// Copy the list, let rd_kafka change it and return it
TopicPartitionList output = topic_partitions;
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(),
timeout_ms_.count());
check_error(error);
return output;
}
TopicPartitionList Consumer::get_position(const TopicPartitionList& topic_partitions) {
// Copy the list, let rd_kafka change it and return it
TopicPartitionList output = topic_partitions;
rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), output.get_handle());
check_error(error);
return output;
}
Message Consumer::poll() {
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count());
return Message(message);