diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 8c3c899..5b7f634 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -301,6 +301,8 @@ public: * will think this consumer is down and will trigger a rebalance (if using dynamic * subscription). * + * The timeout used on this call will be the one configured via Consumer::set_timeout. + * * The returned message *might* be empty. If's necessary to check that it's a valid one before * using it: * @@ -312,6 +314,16 @@ public: * \endcode */ Message poll(); + + /** + * \brief Polls for new messages + * + * Same as the other overload of Consumer::poll but the provided timeout will be used + * instead of the one configured on this Consumer. + * + * \param timeout The timeout to be used on this call + */ + Message poll(std::chrono::milliseconds timeout); private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 2e9088f..71462c2 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -87,7 +87,7 @@ public: * * \param timeout The timeout to be set */ - void set_timeout(const std::chrono::milliseconds& timeout); + void set_timeout(std::chrono::milliseconds timeout); /** * \brief Adds one or more brokers to this handle's broker list diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 87f83c6..f16f2f1 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -140,9 +140,20 @@ public: /** * \brief Polls on this handle * - * This translates into a call to rd_kafka_poll + * This translates into a call to rd_kafka_poll. + * + * The timeout used on this call is the one configured via Producer::set_timeout. */ int poll(); + + /** + * \brief Polls on this handle + * + * This translates into a call to rd_kafka_poll. + * + * \param timeout The timeout used on this call + */ + int poll(std::chrono::milliseconds timeout); private: PayloadPolicy message_payload_policy_; }; diff --git a/src/consumer.cpp b/src/consumer.cpp index 96955b0..2152685 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -188,8 +188,11 @@ const Consumer::RebalanceErrorCallback& Consumer::get_rebalance_error_callback() } Message Consumer::poll() { - rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), - get_timeout().count()); + return poll(get_timeout()); +} + +Message Consumer::poll(milliseconds timeout) { + rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout.count()); return message ? Message(message) : Message(); } diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 9148e74..627b2dd 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -70,7 +70,7 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio check_error(error); } -void KafkaHandleBase::set_timeout(const milliseconds& timeout) { +void KafkaHandleBase::set_timeout(milliseconds timeout) { timeout_ms_ = timeout; } diff --git a/src/producer.cpp b/src/producer.cpp index 63dcc8b..5cd00cf 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -34,6 +34,8 @@ using std::move; using std::string; +using std::chrono::milliseconds; + namespace cppkafka { Producer::Producer(Configuration config) @@ -83,7 +85,11 @@ void Producer::produce(const Topic& topic, const Partition& partition, const Buf } int Producer::poll() { - return rd_kafka_poll(get_handle(), get_timeout().count()); + return poll(get_timeout()); +} + +int Producer::poll(milliseconds timeout) { + return rd_kafka_poll(get_handle(), timeout.count()); } } // cppkafka