mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-11-04 04:27:48 +00:00 
			
		
		
		
	Merge pull request #164 from accelerated/offset_store
Added consumer legacy offset store API
This commit is contained in:
		@@ -291,6 +291,38 @@ public:
 | 
				
			|||||||
     * \return The topic partition list
 | 
					     * \return The topic partition list
 | 
				
			||||||
     */
 | 
					     */
 | 
				
			||||||
    TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;
 | 
					    TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;
 | 
				
			||||||
 | 
					#if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION)
 | 
				
			||||||
 | 
					    /**
 | 
				
			||||||
 | 
					     * \brief Stores the offsets on the currently assigned topic/partitions (legacy).
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * This translates into a call to rd_kafka_offsets_store with the offsets prior to the current assignment positions.
 | 
				
			||||||
 | 
					     * It is equivalent to calling rd_kafka_offsets_store(get_offsets_position(get_assignment())).
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true.
 | 
				
			||||||
 | 
					     */
 | 
				
			||||||
 | 
					    void store_consumed_offsets() const;
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    /**
 | 
				
			||||||
 | 
					     * \brief Stores the offsets on the given topic/partitions (legacy).
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * This translates into a call to rd_kafka_offsets_store.
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * \param topic_partitions The topic/partition list to be stored.
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true.
 | 
				
			||||||
 | 
					     */
 | 
				
			||||||
 | 
					    void store_offsets(const TopicPartitionList& topic_partitions) const;
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					    /**
 | 
				
			||||||
 | 
					     * \brief Stores the offset for this message (legacy).
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * This translates into a call to rd_kafka_offset_store.
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * \param msg The message whose offset will be stored.
 | 
				
			||||||
 | 
					     *
 | 
				
			||||||
 | 
					     * \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true.
 | 
				
			||||||
 | 
					     */
 | 
				
			||||||
 | 
					    void store_offset(const Message& msg) const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
     * \brief Gets the current topic subscription
 | 
					     * \brief Gets the current topic subscription
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -50,5 +50,6 @@
 | 
				
			|||||||
#define RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
 | 
					#define RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
 | 
				
			||||||
#define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
 | 
					#define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
 | 
				
			||||||
#define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x01000002 //v1.0.0.02
 | 
					#define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x01000002 //v1.0.0.02
 | 
				
			||||||
 | 
					#define RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION 0x00090501 //v0.9.5.01
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#endif // CPPKAFKA_MACROS_H
 | 
					#endif // CPPKAFKA_MACROS_H
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -200,6 +200,23 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const
 | 
				
			|||||||
    return convert(topic_list_handle);
 | 
					    return convert(topic_list_handle);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION)
 | 
				
			||||||
 | 
					void Consumer::store_consumed_offsets() const {
 | 
				
			||||||
 | 
					    store_offsets(get_offsets_position(get_assignment()));
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const {
 | 
				
			||||||
 | 
					    TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
 | 
				
			||||||
 | 
					    rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get());
 | 
				
			||||||
 | 
					    check_error(error, topic_list_handle.get());
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void Consumer::store_offset(const Message& msg) const {
 | 
				
			||||||
 | 
					    rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset());
 | 
				
			||||||
 | 
					    check_error(error);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
vector<string> Consumer::get_subscription() const {
 | 
					vector<string> Consumer::get_subscription() const {
 | 
				
			||||||
    rd_kafka_resp_err_t error;
 | 
					    rd_kafka_resp_err_t error;
 | 
				
			||||||
    rd_kafka_topic_partition_list_t* list = nullptr;
 | 
					    rd_kafka_topic_partition_list_t* list = nullptr;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -43,7 +43,7 @@ TEST_CASE("construction", "[buffer]") {
 | 
				
			|||||||
    // From vector
 | 
					    // From vector
 | 
				
			||||||
    const vector<uint8_t> vector_data(str_data.begin(), str_data.end());
 | 
					    const vector<uint8_t> vector_data(str_data.begin(), str_data.end());
 | 
				
			||||||
    // From array
 | 
					    // From array
 | 
				
			||||||
    const array<char,12> array_data{'H','e','l','l','o',' ','w','o','r','l','d','!'};
 | 
					    const array<char,12> array_data{{'H','e','l','l','o',' ','w','o','r','l','d','!'}};
 | 
				
			||||||
    // From raw array
 | 
					    // From raw array
 | 
				
			||||||
    const char raw_array[12]{'H','e','l','l','o',' ','w','o','r','l','d','!'};
 | 
					    const char raw_array[12]{'H','e','l','l','o',' ','w','o','r','l','d','!'};
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,12 +30,15 @@ using std::chrono::system_clock;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
using namespace cppkafka;
 | 
					using namespace cppkafka;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define ENABLE_STRICT_RR_ORDER 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//==================================================================================
 | 
					//==================================================================================
 | 
				
			||||||
//                           Helper functions
 | 
					//                           Helper functions
 | 
				
			||||||
//==================================================================================
 | 
					//==================================================================================
 | 
				
			||||||
static Configuration make_producer_config() {
 | 
					static Configuration make_producer_config() {
 | 
				
			||||||
    Configuration config = {    
 | 
					    Configuration config = {    
 | 
				
			||||||
        { "metadata.broker.list", KAFKA_TEST_INSTANCE },
 | 
					        { "metadata.broker.list", KAFKA_TEST_INSTANCE },
 | 
				
			||||||
 | 
					        { "max.in.flight", 1 }
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
    return config;
 | 
					    return config;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -49,6 +52,7 @@ static Configuration make_consumer_config(const string& group_id = make_consumer
 | 
				
			|||||||
    return config;
 | 
					    return config;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if ENABLE_STRICT_RR_ORDER
 | 
				
			||||||
static vector<int> make_roundrobin_partition_vector(int total_messages) {
 | 
					static vector<int> make_roundrobin_partition_vector(int total_messages) {
 | 
				
			||||||
    vector<int> partition_order;
 | 
					    vector<int> partition_order;
 | 
				
			||||||
    for (int i = 0, partition = 0; i < total_messages+1; ++i) {
 | 
					    for (int i = 0, partition = 0; i < total_messages+1; ++i) {
 | 
				
			||||||
@@ -59,6 +63,7 @@ static vector<int> make_roundrobin_partition_vector(int total_messages) {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
    return partition_order;
 | 
					    return partition_order;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//========================================================================
 | 
					//========================================================================
 | 
				
			||||||
//                              TESTS
 | 
					//                              TESTS
 | 
				
			||||||
@@ -82,7 +87,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
 | 
				
			|||||||
    
 | 
					    
 | 
				
			||||||
    // push 3 messages in each partition
 | 
					    // push 3 messages in each partition
 | 
				
			||||||
    for (int i = 0; i < total_messages; ++i) {
 | 
					    for (int i = 0; i < total_messages; ++i) {
 | 
				
			||||||
        producer.produce(MessageBuilder(KAFKA_TOPICS[0])
 | 
					        producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0])
 | 
				
			||||||
                            .partition(i % KAFKA_NUM_PARTITIONS)
 | 
					                            .partition(i % KAFKA_NUM_PARTITIONS)
 | 
				
			||||||
                            .payload(payload));
 | 
					                            .payload(payload));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -93,6 +98,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
 | 
				
			|||||||
    // Check that we have all messages
 | 
					    // Check that we have all messages
 | 
				
			||||||
    REQUIRE(runner.get_messages().size() == total_messages);
 | 
					    REQUIRE(runner.get_messages().size() == total_messages);
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
 | 
					#if ENABLE_STRICT_RR_ORDER
 | 
				
			||||||
    // Check that we have one message from each partition in desired order
 | 
					    // Check that we have one message from each partition in desired order
 | 
				
			||||||
    vector<int> partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS);
 | 
					    vector<int> partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS);
 | 
				
			||||||
    int partition_idx;
 | 
					    int partition_idx;
 | 
				
			||||||
@@ -101,12 +107,11 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
 | 
				
			|||||||
            // find first polled partition index
 | 
					            // find first polled partition index
 | 
				
			||||||
            partition_idx = runner.get_messages()[i].get_partition();
 | 
					            partition_idx = runner.get_messages()[i].get_partition();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]);
 | 
					        CHECK(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]);
 | 
				
			||||||
        REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
 | 
					        REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    //============ resume original poll strategy =============//
 | 
					    //============ resume original poll strategy =============//
 | 
				
			||||||
    
 | 
					 | 
				
			||||||
    //validate that once the round robin strategy is deleted, normal poll works as before
 | 
					    //validate that once the round robin strategy is deleted, normal poll works as before
 | 
				
			||||||
    consumer.delete_polling_strategy();
 | 
					    consumer.delete_polling_strategy();
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
@@ -115,7 +120,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
 | 
				
			|||||||
    payload = "SerialPolling";
 | 
					    payload = "SerialPolling";
 | 
				
			||||||
    // push 3 messages in each partition
 | 
					    // push 3 messages in each partition
 | 
				
			||||||
    for (int i = 0; i < total_messages; ++i) {
 | 
					    for (int i = 0; i < total_messages; ++i) {
 | 
				
			||||||
        producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
 | 
					        producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    producer.flush();
 | 
					    producer.flush();
 | 
				
			||||||
    serial_runner.try_join();
 | 
					    serial_runner.try_join();
 | 
				
			||||||
@@ -126,5 +131,11 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
 | 
				
			|||||||
    for (int i = 0; i < total_messages; ++i) {
 | 
					    for (int i = 0; i < total_messages; ++i) {
 | 
				
			||||||
        REQUIRE((string)serial_runner.get_messages()[i].get_payload() == payload);
 | 
					        REQUIRE((string)serial_runner.get_messages()[i].get_payload() == payload);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
					    // Simple payload check
 | 
				
			||||||
 | 
					    for (int i = 0; i < total_messages; ++i) {
 | 
				
			||||||
 | 
					        REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user