diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index d347005..d59cb64 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -85,7 +85,17 @@ public: /** * \brief Sets an attribute. * - * This will call rd_kafka_conf_set under the hood + * This will call rd_kafka_conf_set under the hood. + * + * If the zookeeper extension is enabled (cppkafka is build with -DENABLE_ZOOKEEPER=1), then + * this accepts 2 extra attribute names: + * + * - "zookeeper" which indicates the zookeeper endpoint to connect to + * - "zookeeper.receive.timeout.ms" which indicates the zookeeper receive timeout + * + * When the "zookeeper" attribute is used, a Consumer or Producer constructed using this + * configuration will use zookeeper under the hood to get the broker list and watch for + * broker updates. * * \param name The name of the attribute * \param value The value of the attribute diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index c001c95..5211d2d 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -81,7 +81,7 @@ class TopicConfiguration; * Message msg = consumer.poll(); * if (msg) { * // It's a valid message! - * if (msg.get_error() == 0) { + * if (!msg.has_error()) { * // It's an actual message. Get the payload and print it to stdout * cout << msg.get_payload().as_string() << endl; * } diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 464fca0..d0375fe 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -38,27 +38,93 @@ namespace cppkafka { +/** + * \brief Thin wrapper over a rdkafka message handle + * + * This is a non copyable, movable class that wraps a rd_kafka_message_t*. + * + * Messages can be empty (contain a null rd_kafka_message_t*). Therefore, users should check + * that the message isn't empty by using the operator bool() before using them. This is especially + * necessary when calling Consumer::poll() as any poll operation that returns a null pointer will + * return an empty message. + */ class Message { public: + /** + * Constructs a message that won't take ownership of the given pointer + */ static Message make_non_owning(rd_kafka_message_t* handle); + /** + * Constructs an empty message + */ Message(); + + /** + * \brief Constructs a message from a handle + * + * The constructed message instance *will own* the given pointer, calling + * rd_kafka_message_destroy upon destruction. + * + * \param handle The message handle to be wrapped + */ Message(rd_kafka_message_t* handle); Message(const Message&) = delete; Message(Message&& rhs) = default; Message& operator=(const Message&) = delete; Message& operator=(Message&& rhs) = default; + /** + * Indicates whether this is a message carrying an error notification + */ bool has_error() const; + + /** + * Gets the error attribute + */ rd_kafka_resp_err_t get_error() const; + + /** + * Gets the topic that this message belongs to + */ std::string get_topic() const; + + /** + * Gets the partition that this message belongs to + */ int get_partition() const; + + /** + * Gets the message's payload + */ const Buffer& get_payload() const; + + /** + * Gets the message's key + */ const Buffer& get_key() const; + + /** + * Gets the message offset + */ int64_t get_offset() const; - void* private_data(); + + /** + * \brief Gets the private data. + * + * This should only be used on messages produced by a Producer that were set a private data + * attribute + */ + void* private_data() const; + + /** + * Indicates whether this message is valid (not null) + */ explicit operator bool() const; + /** + * Gets the rdkafka message handle + */ rd_kafka_message_t* get_handle() const; private: using HandlePtr = std::unique_ptr; diff --git a/include/cppkafka/metadata.h b/include/cppkafka/metadata.h index 8a64f21..05fecab 100644 --- a/include/cppkafka/metadata.h +++ b/include/cppkafka/metadata.h @@ -39,14 +39,36 @@ namespace cppkafka { +/** + * Represents the metadata for a partition + */ class PartitionMetadata { public: PartitionMetadata(const rd_kafka_metadata_partition& partition); + /** + * Gets the partition id + */ uint32_t get_id() const; + + /** + * Gets the partition error as reported by the broker + */ rd_kafka_resp_err_t get_error() const; + + /** + * Gets the leader broker id + */ int32_t get_leader() const; + + /** + * Gets the replica brokers + */ const std::vector& get_replicas() const; + + /** + * Gets the In Sync Replica Brokers + */ const std::vector& get_in_sync_replica_brokers() const; private: int32_t id_; @@ -56,12 +78,26 @@ private: std::vector isrs_; }; +/** + * Represents the metadata for a topic + */ class TopicMetadata { public: TopicMetadata(const rd_kafka_metadata_topic& topic); + /** + * Gets the topic name + */ const std::string& get_topic() const; + + /** + * Gets the topic error + */ rd_kafka_resp_err_t get_error() const; + + /** + * Gets the partitions' metadata + */ const std::vector& get_partitions() const; private: std::string topic_; @@ -69,12 +105,26 @@ private: std::vector partitions_; }; +/** + * Represents a broker's metadata + */ class BrokerMetadata { public: BrokerMetadata(const rd_kafka_metadata_broker_t& broker); + /** + * Gets the host this broker can be found at + */ const std::string& get_host() const; + + /** + * Gets the broker's id + */ int32_t get_id() const; + + /** + * Gets the broker's port + */ uint16_t get_port() const; private: const std::string host_; @@ -82,13 +132,35 @@ private: uint16_t port_; }; +/** + * Represents metadata for brokers, topics and partitions + */ class Metadata { public: Metadata(const rd_kafka_metadata_t* ptr); + /** + * Gets the brokers' metadata + */ std::vector get_brokers() const; + + /** + * Gets the topics' metadata + */ std::vector get_topics() const; + + /** + * Gets metadata for the topics that can be found on the given set + * + * \param topics The topic names to be looked up + */ std::vector get_topics(const std::unordered_set& topics) const; + + /** + * Gets metadata for topics that start with the given prefix + * + * \param prefix The prefix to be looked up + */ std::vector get_topics(const std::string& prefix) const; private: using HandlePtr = std::unique_ptr; diff --git a/include/cppkafka/partition.h b/include/cppkafka/partition.h index 5a1b0ae..e6f3c49 100644 --- a/include/cppkafka/partition.h +++ b/include/cppkafka/partition.h @@ -32,11 +32,29 @@ namespace cppkafka { +/** + * \brief Dumb wrapper over a partition + * + * This class is basically a wrapper over an int that when default constructed will default + * to using RD_KAFKA_PARTITION_UA so you don't need to use the macro name. + */ class Partition { public: + /** + * \brief Constructs an unassigned partition + * + * The partition's value will be RD_KAFKA_PARTITION_UA + */ Partition(); + + /** + * Construct a partition using the given partition value + */ Partition(int partition); + /** + * Gets the partition value + */ int get_partition() const; private: int partition_; diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 3e01f66..d7ad4dc 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -44,6 +44,40 @@ class Buffer; class Partition; class TopicConfiguration; +/** + * \brief Producer class + * + * This class allows producing messages on kafka. + * + * By default the payloads will be copied (using the RD_KAFKA_MSG_F_COPY flag) but the + * behavior can be changed, in which case rdkafka will be reponsible for freeing it. + * + * In order to produce messages you could do something like: + * + * \code + * // Use the zookeeper extension + * Configuration config; + * config.set("zookeeper", "127.0.0.1:2181"); + * + * // Create a producer + * Producer producer(config); + * + * // Get the topic we'll write into + * Topic topic = producer.get_topic("foo"); + * + * // Create some key and payload + * string key = "creative_key_name"; + * string payload = "some payload"; + * + * // Write a message into an unassigned partition + * producer.produce(topic, Partition(), Buffer(payload.data(), payload.size())); + * + * // Write using a key + * producer.produce(topic, Partition(), Buffer(payload.data(), payload.size()), + * Buffer(key.data(), key.size())); + * + * \endcode + */ class Producer : public KafkaHandleBase { public: enum PayloadPolicy { @@ -51,17 +85,62 @@ public: FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE }; + /** + * Constructs a producer using the given configuration + * + * \param config The configuration to use + */ Producer(Configuration config); + /** + * Sets the payload policy + * + * \param policy The payload policy to be used + */ void set_payload_policy(PayloadPolicy policy); + + /** + * Returns the current payload policy + */ PayloadPolicy get_payload_policy() const; + /** + * Produces a message + * + * \param topic The topic to write the message to + * \param partition The partition to write the message to + * \param payload The message payload + */ void produce(const Topic& topic, const Partition& partition, const Buffer& payload); + + /** + * Produces a message + * + * \param topic The topic to write the message to + * \param partition The partition to write the message to + * \param payload The message payload + * \param key The message key + */ void produce(const Topic& topic, const Partition& partition, const Buffer& payload, const Buffer& key); + + /** + * Produces a message + * + * \param topic The topic to write the message to + * \param partition The partition to write the message to + * \param payload The message payload + * \param key The message key + * \param user_data The opaque data pointer to be used (accesible via Message::private_data) + */ void produce(const Topic& topic, const Partition& partition, const Buffer& payload, const Buffer& key, void* user_data); + /** + * \brief Polls on this handle + * + * This translates into a call to rd_kafka_poll + */ int poll(); private: PayloadPolicy message_payload_policy_; diff --git a/include/cppkafka/topic.h b/include/cppkafka/topic.h index fe0a354..f710424 100644 --- a/include/cppkafka/topic.h +++ b/include/cppkafka/topic.h @@ -37,13 +37,46 @@ namespace cppkafka { +/** + * \brief Represents a rdkafka topic + * + * This is a simple wrapper over a rd_kafka_topic_t* + */ class Topic { public: + /** + * \brief Creates a Topic object that doesn't take ownership of the handle + * + * \param handle The handle to be used + */ static Topic make_non_owning(rd_kafka_topic_t* handle); + /** + * \brief Constructs a topic using a handle + * + * This will take ownership of the handle + * + * \param handle The handle to be used + */ Topic(rd_kafka_topic_t* handle); + /** + * Returns the topic name + */ std::string get_name() const; + + /** + * \brief Check if the partition is available + * + * This translates into a call to rd_kafka_topic_partition_available + * + * \param partition The partition to check + */ + bool is_partition_available(int partition) const; + + /** + * Returns the rdkakfa handle + */ rd_kafka_topic_t* get_handle() const; private: using HandlePtr = std::unique_ptr; diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index 5bf6715..11029df 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -41,24 +41,78 @@ namespace cppkafka { class Topic; class Buffer; +/** + * \brief Represents the topic configuration + * + * ConfigurationBase provides some extra overloads for set + */ class TopicConfiguration : public ConfigurationBase { public: + /** + * \brief Partitioner callback + * + * This has the same requirements as rdkafka's partitioner calback: + * - *Must not* call any rd_kafka_*() functions except: + * rd_kafka_topic_partition_available(). This is done via Topic::is_partition_available + * - *Must not* block or execute for prolonged periods of time. + * - *Must* return a value between 0 and partition_count-1, or the + * special RD_KAFKA_PARTITION_UA value if partitioning + * could not be performed. + */ using PartitionerCallback = std::function; using ConfigurationBase::set; + /** + * Default constructs a topic configuration object + */ TopicConfiguration(); + /** + * Sets an option + * + * \param name The name of the option + * \param value The value of the option + */ void set(const std::string& name, const std::string& value); + /** + * \brief Sets the partitioner callback + * + * This translates into a call to rd_kafka_topic_conf_set_partitioner_cb + */ void set_partitioner_callback(PartitionerCallback callback); + /** + * \brief Sets the "this" pointer as the opaque pointer for this handle + * + * This method will be called by consumers/producers when the topic configuration object + * has been put in a persistent memory location. Users of cppkafka do not need to use this. + */ void set_as_opaque(); + /** + * Gets the partitioner callback + */ const PartitionerCallback& get_partitioner_callback() const; - rd_kafka_topic_conf_t* get_handle() const; + + /** + * Returns true iff the given property name has been set + */ + bool has_property(const std::string& name) const; + + /** + * Gets an option's value + * + * \param name The option's name + */ std::string get(const std::string& name) const; + + /** + * Gets the rdkafka handle + */ + rd_kafka_topic_conf_t* get_handle() const; private: using HandlePtr = ClonablePtr; +/** + * A topic partition list + */ using TopicPartitionList = std::vector; +// Conversions between rdkafka handles and TopicPartitionList TopicPartitionsListPtr convert(const std::vector& topic_partitions); std::vector convert(const TopicPartitionsListPtr& topic_partitions); std::vector convert(rd_kafka_topic_partition_list_t* topic_partitions); diff --git a/src/message.cpp b/src/message.cpp index 65d4fcc..813e750 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -91,7 +91,7 @@ int64_t Message::get_offset() const { return handle_->offset; } -void* Message::private_data() { +void* Message::private_data() const { return handle_->_private; } diff --git a/src/topic.cpp b/src/topic.cpp index b010af9..34a1521 100644 --- a/src/topic.cpp +++ b/src/topic.cpp @@ -56,6 +56,10 @@ string Topic::get_name() const { return rd_kafka_topic_name(handle_.get()); } +bool Topic::is_partition_available(int partition) const { + return rd_kafka_topic_partition_available(handle_.get(), partition); +} + rd_kafka_topic_t* Topic::get_handle() const { return handle_.get(); } diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index a34dbad..a407a67 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -90,8 +90,9 @@ TopicConfiguration::get_partitioner_callback() const { return partitioner_callback_; } -rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const { - return handle_.get(); +bool TopicConfiguration::has_property(const string& name) const { + size_t size = 0; + return rd_kafka_topic_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK; } string TopicConfiguration::get(const string& name) const { @@ -105,6 +106,10 @@ string TopicConfiguration::get(const string& name) const { return string(buffer.data()); } +rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const { + return handle_.get(); +} + TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) { return HandlePtr(ptr, &rd_kafka_topic_conf_destroy, &rd_kafka_topic_conf_dup); }