diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 43ff66b..0415e33 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -291,7 +291,7 @@ public: * \return The topic partition list */ TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const; - +#if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION) /** * \brief Stores the offsets on the currently assigned topic/partitions (legacy). * @@ -312,7 +312,7 @@ public: * \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true. */ void store_offsets(const TopicPartitionList& topic_partitions) const; - +#endif /** * \brief Stores the offset for this message (legacy). * diff --git a/include/cppkafka/macros.h b/include/cppkafka/macros.h index 77f13f8..6baebf0 100644 --- a/include/cppkafka/macros.h +++ b/include/cppkafka/macros.h @@ -50,5 +50,6 @@ #define RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 #define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 #define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x01000002 //v1.0.0.02 +#define RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION 0x00090501 //v0.9.5.01 #endif // CPPKAFKA_MACROS_H diff --git a/src/consumer.cpp b/src/consumer.cpp index 30a6962..7c5da44 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -200,6 +200,7 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const return convert(topic_list_handle); } +#if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION) void Consumer::store_consumed_offsets() const { store_offsets(get_offsets_position(get_assignment())); } @@ -209,6 +210,7 @@ void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const { rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get()); check_error(error, topic_list_handle.get()); } +#endif void Consumer::store_offset(const Message& msg) const { rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset());