Document more classes

This commit is contained in:
Matias Fontanini
2016-06-12 20:14:40 -07:00
parent 9751acd8df
commit a055768f5a
13 changed files with 400 additions and 7 deletions

View File

@@ -85,7 +85,17 @@ public:
/**
* \brief Sets an attribute.
*
* This will call rd_kafka_conf_set under the hood
* This will call rd_kafka_conf_set under the hood.
*
* If the zookeeper extension is enabled (cppkafka is build with -DENABLE_ZOOKEEPER=1), then
* this accepts 2 extra attribute names:
*
* - "zookeeper" which indicates the zookeeper endpoint to connect to
* - "zookeeper.receive.timeout.ms" which indicates the zookeeper receive timeout
*
* When the "zookeeper" attribute is used, a Consumer or Producer constructed using this
* configuration will use zookeeper under the hood to get the broker list and watch for
* broker updates.
*
* \param name The name of the attribute
* \param value The value of the attribute

View File

@@ -81,7 +81,7 @@ class TopicConfiguration;
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* if (msg.get_error() == 0) {
* if (!msg.has_error()) {
* // It's an actual message. Get the payload and print it to stdout
* cout << msg.get_payload().as_string() << endl;
* }

View File

@@ -38,27 +38,93 @@
namespace cppkafka {
/**
* \brief Thin wrapper over a rdkafka message handle
*
* This is a non copyable, movable class that wraps a rd_kafka_message_t*.
*
* Messages can be empty (contain a null rd_kafka_message_t*). Therefore, users should check
* that the message isn't empty by using the operator bool() before using them. This is especially
* necessary when calling Consumer::poll() as any poll operation that returns a null pointer will
* return an empty message.
*/
class Message {
public:
/**
* Constructs a message that won't take ownership of the given pointer
*/
static Message make_non_owning(rd_kafka_message_t* handle);
/**
* Constructs an empty message
*/
Message();
/**
* \brief Constructs a message from a handle
*
* The constructed message instance *will own* the given pointer, calling
* rd_kafka_message_destroy upon destruction.
*
* \param handle The message handle to be wrapped
*/
Message(rd_kafka_message_t* handle);
Message(const Message&) = delete;
Message(Message&& rhs) = default;
Message& operator=(const Message&) = delete;
Message& operator=(Message&& rhs) = default;
/**
* Indicates whether this is a message carrying an error notification
*/
bool has_error() const;
/**
* Gets the error attribute
*/
rd_kafka_resp_err_t get_error() const;
/**
* Gets the topic that this message belongs to
*/
std::string get_topic() const;
/**
* Gets the partition that this message belongs to
*/
int get_partition() const;
/**
* Gets the message's payload
*/
const Buffer& get_payload() const;
/**
* Gets the message's key
*/
const Buffer& get_key() const;
/**
* Gets the message offset
*/
int64_t get_offset() const;
void* private_data();
/**
* \brief Gets the private data.
*
* This should only be used on messages produced by a Producer that were set a private data
* attribute
*/
void* private_data() const;
/**
* Indicates whether this message is valid (not null)
*/
explicit operator bool() const;
/**
* Gets the rdkafka message handle
*/
rd_kafka_message_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;

View File

@@ -39,14 +39,36 @@
namespace cppkafka {
/**
* Represents the metadata for a partition
*/
class PartitionMetadata {
public:
PartitionMetadata(const rd_kafka_metadata_partition& partition);
/**
* Gets the partition id
*/
uint32_t get_id() const;
/**
* Gets the partition error as reported by the broker
*/
rd_kafka_resp_err_t get_error() const;
/**
* Gets the leader broker id
*/
int32_t get_leader() const;
/**
* Gets the replica brokers
*/
const std::vector<int32_t>& get_replicas() const;
/**
* Gets the In Sync Replica Brokers
*/
const std::vector<int32_t>& get_in_sync_replica_brokers() const;
private:
int32_t id_;
@@ -56,12 +78,26 @@ private:
std::vector<int32_t> isrs_;
};
/**
* Represents the metadata for a topic
*/
class TopicMetadata {
public:
TopicMetadata(const rd_kafka_metadata_topic& topic);
/**
* Gets the topic name
*/
const std::string& get_topic() const;
/**
* Gets the topic error
*/
rd_kafka_resp_err_t get_error() const;
/**
* Gets the partitions' metadata
*/
const std::vector<PartitionMetadata>& get_partitions() const;
private:
std::string topic_;
@@ -69,12 +105,26 @@ private:
std::vector<PartitionMetadata> partitions_;
};
/**
* Represents a broker's metadata
*/
class BrokerMetadata {
public:
BrokerMetadata(const rd_kafka_metadata_broker_t& broker);
/**
* Gets the host this broker can be found at
*/
const std::string& get_host() const;
/**
* Gets the broker's id
*/
int32_t get_id() const;
/**
* Gets the broker's port
*/
uint16_t get_port() const;
private:
const std::string host_;
@@ -82,13 +132,35 @@ private:
uint16_t port_;
};
/**
* Represents metadata for brokers, topics and partitions
*/
class Metadata {
public:
Metadata(const rd_kafka_metadata_t* ptr);
/**
* Gets the brokers' metadata
*/
std::vector<BrokerMetadata> get_brokers() const;
/**
* Gets the topics' metadata
*/
std::vector<TopicMetadata> get_topics() const;
/**
* Gets metadata for the topics that can be found on the given set
*
* \param topics The topic names to be looked up
*/
std::vector<TopicMetadata> get_topics(const std::unordered_set<std::string>& topics) const;
/**
* Gets metadata for topics that start with the given prefix
*
* \param prefix The prefix to be looked up
*/
std::vector<TopicMetadata> get_topics(const std::string& prefix) const;
private:
using HandlePtr = std::unique_ptr<const rd_kafka_metadata_t, decltype(&rd_kafka_metadata_destroy)>;

View File

@@ -32,11 +32,29 @@
namespace cppkafka {
/**
* \brief Dumb wrapper over a partition
*
* This class is basically a wrapper over an int that when default constructed will default
* to using RD_KAFKA_PARTITION_UA so you don't need to use the macro name.
*/
class Partition {
public:
/**
* \brief Constructs an unassigned partition
*
* The partition's value will be RD_KAFKA_PARTITION_UA
*/
Partition();
/**
* Construct a partition using the given partition value
*/
Partition(int partition);
/**
* Gets the partition value
*/
int get_partition() const;
private:
int partition_;

View File

@@ -44,6 +44,40 @@ class Buffer;
class Partition;
class TopicConfiguration;
/**
* \brief Producer class
*
* This class allows producing messages on kafka.
*
* By default the payloads will be copied (using the RD_KAFKA_MSG_F_COPY flag) but the
* behavior can be changed, in which case rdkafka will be reponsible for freeing it.
*
* In order to produce messages you could do something like:
*
* \code
* // Use the zookeeper extension
* Configuration config;
* config.set("zookeeper", "127.0.0.1:2181");
*
* // Create a producer
* Producer producer(config);
*
* // Get the topic we'll write into
* Topic topic = producer.get_topic("foo");
*
* // Create some key and payload
* string key = "creative_key_name";
* string payload = "some payload";
*
* // Write a message into an unassigned partition
* producer.produce(topic, Partition(), Buffer(payload.data(), payload.size()));
*
* // Write using a key
* producer.produce(topic, Partition(), Buffer(payload.data(), payload.size()),
* Buffer(key.data(), key.size()));
*
* \endcode
*/
class Producer : public KafkaHandleBase {
public:
enum PayloadPolicy {
@@ -51,17 +85,62 @@ public:
FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE
};
/**
* Constructs a producer using the given configuration
*
* \param config The configuration to use
*/
Producer(Configuration config);
/**
* Sets the payload policy
*
* \param policy The payload policy to be used
*/
void set_payload_policy(PayloadPolicy policy);
/**
* Returns the current payload policy
*/
PayloadPolicy get_payload_policy() const;
/**
* Produces a message
*
* \param topic The topic to write the message to
* \param partition The partition to write the message to
* \param payload The message payload
*/
void produce(const Topic& topic, const Partition& partition, const Buffer& payload);
/**
* Produces a message
*
* \param topic The topic to write the message to
* \param partition The partition to write the message to
* \param payload The message payload
* \param key The message key
*/
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key);
/**
* Produces a message
*
* \param topic The topic to write the message to
* \param partition The partition to write the message to
* \param payload The message payload
* \param key The message key
* \param user_data The opaque data pointer to be used (accesible via Message::private_data)
*/
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key, void* user_data);
/**
* \brief Polls on this handle
*
* This translates into a call to rd_kafka_poll
*/
int poll();
private:
PayloadPolicy message_payload_policy_;

View File

@@ -37,13 +37,46 @@
namespace cppkafka {
/**
* \brief Represents a rdkafka topic
*
* This is a simple wrapper over a rd_kafka_topic_t*
*/
class Topic {
public:
/**
* \brief Creates a Topic object that doesn't take ownership of the handle
*
* \param handle The handle to be used
*/
static Topic make_non_owning(rd_kafka_topic_t* handle);
/**
* \brief Constructs a topic using a handle
*
* This will take ownership of the handle
*
* \param handle The handle to be used
*/
Topic(rd_kafka_topic_t* handle);
/**
* Returns the topic name
*/
std::string get_name() const;
/**
* \brief Check if the partition is available
*
* This translates into a call to rd_kafka_topic_partition_available
*
* \param partition The partition to check
*/
bool is_partition_available(int partition) const;
/**
* Returns the rdkakfa handle
*/
rd_kafka_topic_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_topic_t, decltype(&rd_kafka_topic_destroy)>;

View File

@@ -41,24 +41,78 @@ namespace cppkafka {
class Topic;
class Buffer;
/**
* \brief Represents the topic configuration
*
* ConfigurationBase provides some extra overloads for set
*/
class TopicConfiguration : public ConfigurationBase<TopicConfiguration> {
public:
/**
* \brief Partitioner callback
*
* This has the same requirements as rdkafka's partitioner calback:
* - *Must not* call any rd_kafka_*() functions except:
* rd_kafka_topic_partition_available(). This is done via Topic::is_partition_available
* - *Must not* block or execute for prolonged periods of time.
* - *Must* return a value between 0 and partition_count-1, or the
* special RD_KAFKA_PARTITION_UA value if partitioning
* could not be performed.
*/
using PartitionerCallback = std::function<int32_t(const Topic&, const Buffer& key,
int32_t partition_count)>;
using ConfigurationBase<TopicConfiguration>::set;
/**
* Default constructs a topic configuration object
*/
TopicConfiguration();
/**
* Sets an option
*
* \param name The name of the option
* \param value The value of the option
*/
void set(const std::string& name, const std::string& value);
/**
* \brief Sets the partitioner callback
*
* This translates into a call to rd_kafka_topic_conf_set_partitioner_cb
*/
void set_partitioner_callback(PartitionerCallback callback);
/**
* \brief Sets the "this" pointer as the opaque pointer for this handle
*
* This method will be called by consumers/producers when the topic configuration object
* has been put in a persistent memory location. Users of cppkafka do not need to use this.
*/
void set_as_opaque();
/**
* Gets the partitioner callback
*/
const PartitionerCallback& get_partitioner_callback() const;
rd_kafka_topic_conf_t* get_handle() const;
/**
* Returns true iff the given property name has been set
*/
bool has_property(const std::string& name) const;
/**
* Gets an option's value
*
* \param name The option's name
*/
std::string get(const std::string& name) const;
/**
* Gets the rdkafka handle
*/
rd_kafka_topic_conf_t* get_handle() const;
private:
using HandlePtr = ClonablePtr<rd_kafka_topic_conf_t,
decltype(&rd_kafka_topic_conf_destroy),

View File

@@ -35,16 +35,64 @@
namespace cppkafka {
/**
* Represents a topic/partition
*/
class TopicPartition {
public:
/**
* Default constructs a topic/partition
*/
TopicPartition();
/**
* \brief Constructs a topic/partition
*
* The partition value will be RD_KAFKA_OFFSET_INVALID
*
* \param topic The topic name
*/
TopicPartition(const char* topic);
/**
* \brief Constructs a topic/partition
*
* The partition value will be RD_KAFKA_OFFSET_INVALID
*
* \param topic The topic name
*/
TopicPartition(std::string topic);
/**
* Constructs a topic/partition
*
* \param topic The topic name
* \param partition The partition to be used
*/
TopicPartition(std::string topic, int partition);
/**
* Constructs a topic/partition
*
* \param topic The topic name
* \param partition The partition to be used
* \param offset The offset to be used
*/
TopicPartition(std::string topic, int partition, int64_t offset);
/**
* Gets the topic name
*/
const std::string& get_topic() const;
/**
* Gets the partition
*/
int get_partition() const;
/**
* Gets the offset
*/
int64_t get_offset() const;
private:
std::string topic_;

View File

@@ -40,8 +40,12 @@ class TopicPartition;
using TopicPartitionsListPtr = std::unique_ptr<rd_kafka_topic_partition_list_t,
decltype(&rd_kafka_topic_partition_list_destroy)>;
/**
* A topic partition list
*/
using TopicPartitionList = std::vector<TopicPartition>;
// Conversions between rdkafka handles and TopicPartitionList
TopicPartitionsListPtr convert(const std::vector<TopicPartition>& topic_partitions);
std::vector<TopicPartition> convert(const TopicPartitionsListPtr& topic_partitions);
std::vector<TopicPartition> convert(rd_kafka_topic_partition_list_t* topic_partitions);

View File

@@ -91,7 +91,7 @@ int64_t Message::get_offset() const {
return handle_->offset;
}
void* Message::private_data() {
void* Message::private_data() const {
return handle_->_private;
}

View File

@@ -56,6 +56,10 @@ string Topic::get_name() const {
return rd_kafka_topic_name(handle_.get());
}
bool Topic::is_partition_available(int partition) const {
return rd_kafka_topic_partition_available(handle_.get(), partition);
}
rd_kafka_topic_t* Topic::get_handle() const {
return handle_.get();
}

View File

@@ -90,8 +90,9 @@ TopicConfiguration::get_partitioner_callback() const {
return partitioner_callback_;
}
rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const {
return handle_.get();
bool TopicConfiguration::has_property(const string& name) const {
size_t size = 0;
return rd_kafka_topic_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK;
}
string TopicConfiguration::get(const string& name) const {
@@ -105,6 +106,10 @@ string TopicConfiguration::get(const string& name) const {
return string(buffer.data());
}
rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const {
return handle_.get();
}
TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) {
return HandlePtr(ptr, &rd_kafka_topic_conf_destroy, &rd_kafka_topic_conf_dup);
}