Added pause/resume for producers (#87)

* Added pause/resume for producers

* Moved pause/resume functions to KafkaHandleBase
This commit is contained in:
Alex Damian
2018-06-25 12:16:57 -04:00
committed by Matias Fontanini
parent 069ea3df8e
commit 5c72f3fe28
6 changed files with 39 additions and 1 deletions

View File

@@ -101,6 +101,7 @@ public:
using AssignmentCallback = std::function<void(TopicPartitionList&)>; using AssignmentCallback = std::function<void(TopicPartitionList&)>;
using RevocationCallback = std::function<void(const TopicPartitionList&)>; using RevocationCallback = std::function<void(const TopicPartitionList&)>;
using RebalanceErrorCallback = std::function<void(Error)>; using RebalanceErrorCallback = std::function<void(Error)>;
using KafkaHandleBase::pause;
/** /**
* \brief Creates an instance of a consumer. * \brief Creates an instance of a consumer.

View File

@@ -76,6 +76,13 @@ public:
*/ */
void pause_partitions(const TopicPartitionList& topic_partitions); void pause_partitions(const TopicPartitionList& topic_partitions);
/**
* \brief Pauses consumption/production for this topic
*
* \param topic The topic name
*/
void pause(const std::string& topic);
/** /**
* \brief Resumes consumption/production from the given topic/partition list * \brief Resumes consumption/production from the given topic/partition list
* *
@@ -85,6 +92,13 @@ public:
*/ */
void resume_partitions(const TopicPartitionList& topic_partitions); void resume_partitions(const TopicPartitionList& topic_partitions);
/**
* \brief Resumes consumption/production for this topic
*
* \param topic The topic name
*/
void resume(const std::string& topic);
/** /**
* \brief Sets the timeout for operations that require a timeout * \brief Sets the timeout for operations that require a timeout
* *

View File

@@ -78,6 +78,7 @@ class Message;
*/ */
class CPPKAFKA_API Producer : public KafkaHandleBase { class CPPKAFKA_API Producer : public KafkaHandleBase {
public: public:
using KafkaHandleBase::pause;
/** /**
* The policy to use for the payload. The default policy is COPY_PAYLOAD * The policy to use for the payload. The default policy is COPY_PAYLOAD
*/ */

View File

@@ -41,6 +41,7 @@
namespace cppkafka { namespace cppkafka {
class TopicPartition; class TopicPartition;
class PartitionMetadata;
using TopicPartitionsListPtr = std::unique_ptr<rd_kafka_topic_partition_list_t, using TopicPartitionsListPtr = std::unique_ptr<rd_kafka_topic_partition_list_t,
decltype(&rd_kafka_topic_partition_list_destroy)>; decltype(&rd_kafka_topic_partition_list_destroy)>;
@@ -53,6 +54,8 @@ using TopicPartitionList = std::vector<TopicPartition>;
CPPKAFKA_API TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions); CPPKAFKA_API TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions);
CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions); CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions);
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions); CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
CPPKAFKA_API TopicPartitionList convert(const std::string& topic,
const std::vector<PartitionMetadata>& partition_metadata);
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle); CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);
// Extracts a partition list subset belonging to the provided topics (case-insensitive) // Extracts a partition list subset belonging to the provided topics (case-insensitive)

View File

@@ -64,6 +64,10 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition
check_error(error); check_error(error);
} }
void KafkaHandleBase::pause(const std::string& topic) {
pause_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
}
void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) { void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(), rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
@@ -71,6 +75,10 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio
check_error(error); check_error(error);
} }
void KafkaHandleBase::resume(const std::string& topic) {
resume_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
}
void KafkaHandleBase::set_timeout(milliseconds timeout) { void KafkaHandleBase::set_timeout(milliseconds timeout) {
timeout_ms_ = timeout; timeout_ms_ = timeout;
} }

View File

@@ -32,6 +32,7 @@
#include "topic_partition_list.h" #include "topic_partition_list.h"
#include "topic_partition.h" #include "topic_partition.h"
#include "exceptions.h" #include "exceptions.h"
#include "metadata.h"
using std::vector; using std::vector;
using std::set; using std::set;
@@ -66,6 +67,16 @@ TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions) {
return output; return output;
} }
TopicPartitionList convert(const std::string& topic,
const std::vector<PartitionMetadata>& partition_metadata)
{
TopicPartitionList output;
for (const auto& meta : partition_metadata) {
output.emplace_back(topic, meta.get_id());
}
return output;
}
TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) { TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy); return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
} }