mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-10-31 18:47:48 +00:00 
			
		
		
		
	Allow access to the user-supplied delivery callback. (#66)
* Allow access to the user-supplied delivery callback. * Remove valgrind warning * Added buffer size watermark * added ability to produce a message directly * Updated on_delivery_report function
This commit is contained in:
		 Alex Damian
					Alex Damian
				
			
				
					committed by
					
						 Matias Fontanini
						Matias Fontanini
					
				
			
			
				
	
			
			
			 Matias Fontanini
						Matias Fontanini
					
				
			
						parent
						
							46c396f729
						
					
				
				
					commit
					841e632fbd
				
			| @@ -43,6 +43,7 @@ namespace cppkafka { | ||||
| class Topic; | ||||
| class Buffer; | ||||
| class TopicConfiguration; | ||||
| class Message; | ||||
|  | ||||
| /** | ||||
|  * \brief Producer class | ||||
| @@ -86,39 +87,44 @@ public: | ||||
|     }; | ||||
|  | ||||
|     /** | ||||
|      * Constructs a producer using the given configuration | ||||
|      * \brief Constructs a producer using the given configuration | ||||
|      * | ||||
|      * \param config The configuration to use | ||||
|      */ | ||||
|     Producer(Configuration config); | ||||
|  | ||||
|     /** | ||||
|      * Sets the payload policy | ||||
|      * \brief Sets the payload policy | ||||
|      * | ||||
|      * \param policy The payload policy to be used | ||||
|      */ | ||||
|     void set_payload_policy(PayloadPolicy policy); | ||||
|  | ||||
|     /** | ||||
|      * Returns the current payload policy | ||||
|      * \brief Returns the current payload policy | ||||
|      */ | ||||
|     PayloadPolicy get_payload_policy() const; | ||||
|  | ||||
|     /** | ||||
|      * Produces a message | ||||
|      * \brief Produces a message | ||||
|      * | ||||
|      * \param topic The topic to write the message to | ||||
|      * \param partition The partition to write the message to | ||||
|      * \param payload The message payload | ||||
|      * \param builder The builder class used to compose a message | ||||
|      */ | ||||
|     void produce(const MessageBuilder& builder); | ||||
|      | ||||
|     /** | ||||
|      * \brief Produces a message | ||||
|      * | ||||
|      * \param message The message to be produced | ||||
|      */ | ||||
|     void produce(const Message& message); | ||||
|  | ||||
|     /** | ||||
|      * \brief Polls on this handle | ||||
|      * | ||||
|      * This translates into a call to rd_kafka_poll. | ||||
|      * | ||||
|      * The timeout used on this call is the one configured via Producer::set_timeout. | ||||
|      * \remark The timeout used on this call is the one configured via Producer::set_timeout. | ||||
|      */ | ||||
|     int poll(); | ||||
|  | ||||
| @@ -136,7 +142,7 @@ public: | ||||
|      * | ||||
|      * This translates into a call to rd_kafka_flush. | ||||
|      * | ||||
|      * The timeout used on this call is the one configured via Producer::set_timeout. | ||||
|      * \remark The timeout used on this call is the one configured via Producer::set_timeout. | ||||
|      */ | ||||
|     void flush(); | ||||
|  | ||||
|   | ||||
| @@ -108,6 +108,16 @@ public: | ||||
|      * \param builder The builder that contains the message to be produced | ||||
|      */ | ||||
|     void produce(const MessageBuilder& builder); | ||||
|      | ||||
|     /** | ||||
|      * \brief Produces a message without buffering it | ||||
|      * | ||||
|      * The message will still be tracked so that a call to flush or wait_for_acks will actually | ||||
|      * wait for it to be acknowledged. | ||||
|      * | ||||
|      * \param message The message to be produced | ||||
|      */ | ||||
|     void produce(const Message& message); | ||||
|  | ||||
|     /** | ||||
|      * \brief Flushes the buffered messages. | ||||
| @@ -126,6 +136,34 @@ public: | ||||
|      * Clears any buffered messages | ||||
|      */ | ||||
|     void clear(); | ||||
|      | ||||
|     /** | ||||
|      * \brief Sets the maximum amount of messages to be enqueued in the buffer. | ||||
|      * | ||||
|      * After 'max_buffer_size' is reached, flush() will be called automatically. | ||||
|      * | ||||
|      * \param size The max size of the internal buffer. Allowed values are: | ||||
|      *             -1 : Unlimited buffer size. Must be flushed manually (default value) | ||||
|      *              0 : Don't buffer anything. add_message() behaves like produce() | ||||
|      *            > 0 : Max number of messages before flush() is called. | ||||
|      * | ||||
|      * \remark add_message() will block when 'max_buffer_size' is reached due to flush() | ||||
|      */ | ||||
|     void set_max_buffer_size(ssize_t max_buffer_size); | ||||
|      | ||||
|     /** | ||||
|      * \brief Return the maximum allowed buffer size. | ||||
|      * | ||||
|      * \return The max buffer size. A value of -1 indicates an unbounded buffer. | ||||
|      */ | ||||
|     ssize_t get_max_buffer_size() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the number of messages in the buffer | ||||
|      * | ||||
|      * \return The number of messages | ||||
|      */ | ||||
|     size_t get_buffer_size() const; | ||||
|  | ||||
|     /** | ||||
|      * Gets the Producer object | ||||
| @@ -157,20 +195,25 @@ private: | ||||
|  | ||||
|     template <typename BuilderType> | ||||
|     void do_add_message(BuilderType&& builder); | ||||
|     void produce_message(const MessageBuilder& message); | ||||
|     template <typename MessageType> | ||||
|     void produce_message(const MessageType& message); | ||||
|     Configuration prepare_configuration(Configuration config); | ||||
|     void on_delivery_report(const Message& message); | ||||
|  | ||||
|      | ||||
|     Configuration::DeliveryReportCallback delivery_report_callback_; | ||||
|     Producer producer_; | ||||
|     QueueType messages_; | ||||
|     ProduceFailureCallback produce_failure_callback_; | ||||
|     size_t expected_acks_{0}; | ||||
|     size_t messages_acked_{0}; | ||||
|     ssize_t max_buffer_size_{-1}; | ||||
| }; | ||||
|  | ||||
| template <typename BufferType> | ||||
| BufferedProducer<BufferType>::BufferedProducer(Configuration config) | ||||
| : producer_(prepare_configuration(std::move(config))) { | ||||
| : delivery_report_callback_(config.get_delivery_report_callback()), | ||||
|   producer_(prepare_configuration(std::move(config))) { | ||||
|  | ||||
| } | ||||
|  | ||||
| @@ -190,13 +233,18 @@ void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) { | ||||
|     expected_acks_++; | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| void BufferedProducer<BufferType>::produce(const Message& message) { | ||||
|     produce_message(message); | ||||
|     expected_acks_++; | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| void BufferedProducer<BufferType>::flush() { | ||||
|     while (!messages_.empty()) { | ||||
|         produce_message(messages_.front()); | ||||
|         messages_.pop(); | ||||
|     } | ||||
|  | ||||
|     wait_for_acks(); | ||||
| } | ||||
|  | ||||
| @@ -228,11 +276,32 @@ void BufferedProducer<BufferType>::clear() { | ||||
|     messages_acked_ = 0; | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| void BufferedProducer<BufferType>::set_max_buffer_size(ssize_t max_buffer_size) { | ||||
|     if (max_buffer_size < -1) { | ||||
|         throw Exception("Invalid buffer size."); | ||||
|     } | ||||
|     max_buffer_size_ = max_buffer_size; | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const { | ||||
|     return max_buffer_size_; | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| size_t BufferedProducer<BufferType>::get_buffer_size() const { | ||||
|     return messages_.size(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| template <typename BuilderType> | ||||
| void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) { | ||||
|     expected_acks_++; | ||||
|     messages_.push(std::move(builder)); | ||||
|     messages_.push(std::forward<BuilderType>(builder)); | ||||
|     if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { | ||||
|         flush(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| @@ -257,11 +326,12 @@ void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCa | ||||
| } | ||||
|  | ||||
| template <typename BufferType> | ||||
| void BufferedProducer<BufferType>::produce_message(const MessageBuilder& builder) { | ||||
| template <typename MessageType> | ||||
| void BufferedProducer<BufferType>::produce_message(const MessageType& message) { | ||||
|     bool sent = false; | ||||
|     while (!sent) { | ||||
|         try { | ||||
|             producer_.produce(builder); | ||||
|             producer_.produce(message); | ||||
|             sent = true; | ||||
|         } | ||||
|         catch (const HandleException& ex) { | ||||
| @@ -287,22 +357,16 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration | ||||
|  | ||||
| template <typename BufferType> | ||||
| void BufferedProducer<BufferType>::on_delivery_report(const Message& message) { | ||||
|     // We should produce this message again if it has an error and we either don't have a  | ||||
|     // Call the user-supplied delivery report callback if any | ||||
|     if (delivery_report_callback_) { | ||||
|         delivery_report_callback_(producer_, message); | ||||
|     } | ||||
|     // We should produce this message again if it has an error and we either don't have a | ||||
|     // produce failure callback or we have one but it returns true | ||||
|     bool should_produce = message.get_error() && | ||||
|                           (!produce_failure_callback_ || produce_failure_callback_(message)); | ||||
|     if (should_produce) { | ||||
|         MessageBuilder builder(message.get_topic()); | ||||
|         const auto& key = message.get_key(); | ||||
|         const auto& payload = message.get_payload(); | ||||
|         builder.partition(message.get_partition()) | ||||
|                .key(Buffer(key.get_data(), key.get_size())) | ||||
|                .payload(Buffer(payload.get_data(), payload.get_size())) | ||||
|                .user_data(message.get_user_data()); | ||||
|         if (message.get_timestamp()) { | ||||
|             builder.timestamp(message.get_timestamp()->get_timestamp()); | ||||
|         } | ||||
|         produce_message(builder); | ||||
|         produce_message(message); | ||||
|         return; | ||||
|     } | ||||
|     // If production was successful or the produce failure callback returned false, then | ||||
|   | ||||
| @@ -30,10 +30,10 @@ | ||||
| #include <errno.h> | ||||
| #include "producer.h" | ||||
| #include "exceptions.h" | ||||
| #include "message.h" | ||||
|  | ||||
| using std::move; | ||||
| using std::string; | ||||
|  | ||||
| using std::chrono::milliseconds; | ||||
|  | ||||
| namespace cppkafka { | ||||
| @@ -77,6 +77,23 @@ void Producer::produce(const MessageBuilder& builder) { | ||||
|     check_error(result); | ||||
| } | ||||
|  | ||||
| void Producer::produce(const Message& message) { | ||||
|     const Buffer& payload = message.get_payload(); | ||||
|     const Buffer& key = message.get_key(); | ||||
|     const int policy = static_cast<int>(message_payload_policy_); | ||||
|     int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; | ||||
|     auto result = rd_kafka_producev(get_handle(), | ||||
|                                     RD_KAFKA_V_TOPIC(message.get_topic().data()), | ||||
|                                     RD_KAFKA_V_PARTITION(message.get_partition()), | ||||
|                                     RD_KAFKA_V_MSGFLAGS(policy), | ||||
|                                     RD_KAFKA_V_TIMESTAMP(duration), | ||||
|                                     RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), | ||||
|                                     RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), | ||||
|                                     RD_KAFKA_V_OPAQUE(message.get_user_data()), | ||||
|                                     RD_KAFKA_V_END); | ||||
|     check_error(result); | ||||
| } | ||||
|  | ||||
| int Producer::poll() { | ||||
|     return poll(get_timeout()); | ||||
| } | ||||
|   | ||||
| @@ -101,6 +101,38 @@ TEST_CASE("simple production", "[producer]") { | ||||
|         REQUIRE(!!message.get_timestamp() == true); | ||||
|         CHECK(message.get_timestamp()->get_timestamp() == timestamp); | ||||
|     } | ||||
|      | ||||
|     SECTION("message without message builder") { | ||||
|         const string payload = "Goodbye cruel world!"; | ||||
|         const string key = "replay key"; | ||||
|         const milliseconds timestamp{15}; | ||||
|         Producer producer(config); | ||||
|         producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition) | ||||
|                                                      .key(key) | ||||
|                                                      .payload(payload) | ||||
|                                                      .timestamp(timestamp)); | ||||
|         runner.try_join(); | ||||
|         ConsumerRunner runner2(consumer, 1, 1); | ||||
|          | ||||
|         const auto& replay_messages = runner.get_messages(); | ||||
|         REQUIRE(replay_messages.size() == 1); | ||||
|         const auto& replay_message = replay_messages[0]; | ||||
|          | ||||
|         //produce the same message again | ||||
|         producer.produce(replay_message); | ||||
|         runner2.try_join(); | ||||
|          | ||||
|         const auto& messages = runner2.get_messages(); | ||||
|         REQUIRE(messages.size() == 1); | ||||
|         const auto& message = messages[0]; | ||||
|         CHECK(message.get_payload() == payload); | ||||
|         CHECK(message.get_key() == key); | ||||
|         CHECK(message.get_topic() == KAFKA_TOPIC); | ||||
|         CHECK(message.get_partition() == partition); | ||||
|         CHECK(!!message.get_error() == false); | ||||
|         REQUIRE(!!message.get_timestamp() == true); | ||||
|         CHECK(message.get_timestamp()->get_timestamp() == timestamp); | ||||
|     } | ||||
|  | ||||
|     SECTION("callbacks") { | ||||
|         // Now create a producer and produce a message | ||||
| @@ -240,3 +272,34 @@ TEST_CASE("buffered producer", "[producer]") { | ||||
|         CHECK(message.get_payload() == payload); | ||||
|     } | ||||
| } | ||||
|  | ||||
| TEST_CASE("buffered producer with limited buffer", "[producer]") { | ||||
|     int partition = 0; | ||||
|     int num_messages = 4; | ||||
|      | ||||
|     // Create a consumer and assign this topic/partition | ||||
|     Consumer consumer(make_consumer_config()); | ||||
|     consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); | ||||
|     ConsumerRunner runner(consumer, 3, 1); | ||||
|  | ||||
|     // Now create a buffered producer and produce two messages | ||||
|     BufferedProducer<string> producer(make_producer_config()); | ||||
|     const string payload = "Hello world! 2"; | ||||
|     const string key = "such key"; | ||||
|     REQUIRE(producer.get_buffer_size() == 0); | ||||
|     REQUIRE(producer.get_max_buffer_size() == -1); | ||||
|      | ||||
|     // Limit the size of the internal buffer | ||||
|     producer.set_max_buffer_size(num_messages-1); | ||||
|     while (num_messages--) { | ||||
|         producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload)); | ||||
|     } | ||||
|     REQUIRE(producer.get_buffer_size() == 1); | ||||
|      | ||||
|     // Finish the runner | ||||
|     runner.try_join(); | ||||
|  | ||||
|     // Validate messages received | ||||
|     const auto& messages = runner.get_messages(); | ||||
|     REQUIRE(messages.size() == producer.get_max_buffer_size()); | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user