diff --git a/.travis.yml b/.travis.yml index 89f6732..6fc9794 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ compiler: - clang env: + - RDKAFKA_VERSION=v0.9.4 - RDKAFKA_VERSION=v0.11.5 os: diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index bddc0d2..910cebb 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -42,6 +42,7 @@ #include "clonable_ptr.h" #include "configuration_base.h" #include "macros.h" +#include "event.h" namespace cppkafka { @@ -78,6 +79,7 @@ public: const std::string& message)>; using StatsCallback = std::function; using SocketCallback = std::function; + using BackgroundEventCallback = std::function; using ConfigurationBase::set; using ConfigurationBase::get; @@ -142,6 +144,13 @@ public: */ Configuration& set_socket_callback(SocketCallback callback); +#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION + /** + * Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb) + */ + Configuration& set_background_event_callback(BackgroundEventCallback callback); +#endif + /** * Sets the default topic configuration */ @@ -204,6 +213,11 @@ public: */ const SocketCallback& get_socket_callback() const; + /** + * Gets the background event callback + */ + const BackgroundEventCallback& get_background_event_callback() const; + /** * Gets the default topic configuration */ @@ -229,6 +243,7 @@ private: LogCallback log_callback_; StatsCallback stats_callback_; SocketCallback socket_callback_; + BackgroundEventCallback background_event_callback_; }; } // cppkafka diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 65bbbaa..1be75c8 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -461,7 +461,6 @@ public: private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); - static Queue get_queue(rd_kafka_queue_t* handle); void close(); void commit(const Message& msg, bool async); void commit(const TopicPartitionList* topic_partitions, bool async); @@ -485,7 +484,7 @@ std::vector Consumer::poll_batch(size_t max_batch_size, const Allocator& alloc) { std::vector raw_messages(max_batch_size); // Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment) - Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle()))); + Queue queue = Queue::make_queue(rd_kafka_queue_get_consumer(get_handle())); ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle(), timeout.count(), raw_messages.data(), diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 854cbbd..86ac366 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include diff --git a/include/cppkafka/event.h b/include/cppkafka/event.h new file mode 100644 index 0000000..5fea261 --- /dev/null +++ b/include/cppkafka/event.h @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2018, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_EVENT_H +#define CPPKAFKA_EVENT_H + +#include +#include +#include +#include "error.h" +#include "message.h" +#include "topic_partition.h" +#include "topic_partition_list.h" + +namespace cppkafka { + +class Event { +public: + /** + * Construct an Event from a rdkafka event handle and take ownership of it + * + * /param handle The handle to construct this event from + */ + Event(rd_kafka_event_t* handle); + + /** + * Returns the name of this event + */ + std::string get_name() const; + + /** + * Returns the type of this event + */ + rd_kafka_event_type_t get_type() const; + + /** + * \brief Gets the next message contained in this event. + * + * This call is only valid if the event type is one of: + * * RD_KAFKA_EVENT_FETCH + * * RD_KAFKA_EVENT_DR + * + * \note The returned message's lifetime *is tied to this Event*. That is, if the event + * is free'd so will the contents of the message. + */ + Message get_next_message() const; + + /** + * \brief Gets all messages in this event (if any) + * + * This call is only valid if the event type is one of: + * * RD_KAFKA_EVENT_FETCH + * * RD_KAFKA_EVENT_DR + * + * \note The returned messages' lifetime *is tied to this Event*. That is, if the event + * is free'd so will the contents of the messages. + * + * \return A vector containing 0 or more messages + */ + std::vector get_messages(); + + /** + * \brief Gets all messages in this event (if any) + * + * This call is only valid if the event type is one of: + * * RD_KAFKA_EVENT_FETCH + * * RD_KAFKA_EVENT_DR + * + * \param allocator The allocator to use on the output vector + * + * \note The returned messages' lifetime *is tied to this Event*. That is, if the event + * is free'd so will the contents of the messages. + * + * \return A vector containing 0 or more messages + */ + template + std::vector get_messages(const Allocator allocator); + + /** + * \brief Gets the number of messages contained in this event + * + * This call is only valid if the event type is one of: + * * RD_KAFKA_EVENT_FETCH + * * RD_KAFKA_EVENT_DR + */ + size_t get_message_count() const; + + /** + * \brief Returns the error in this event + */ + Error get_error() const; + + /** + * Gets the opaque pointer in this event + */ + void* get_opaque() const; + +#if RD_KAFKA_VERSION >= RD_KAFKA_EVENT_STATS_SUPPORT_VERSION + /** + * \brief Gets the stats in this event + * + * This call is only valid if the event type is RD_KAFKA_EVENT_STATS + */ + std::string get_stats() const { + return rd_kafka_event_stats(handle_.get()); + } +#endif + + /** + * \brief Gets the topic/partition for this event + * + * This call is only valid if the event type is RD_KAFKA_EVENT_ERROR + */ + TopicPartition get_topic_partition() const; + + /** + * \brief Gets the list of topic/partitions in this event + * + * This call is only valid if the event type is one of: + * * RD_KAFKA_EVENT_REBALANCE + * * RD_KAFKA_EVENT_OFFSET_COMMIT + */ + TopicPartitionList get_topic_partition_list() const; + + /** + * Check whether this event is valid + * + * /return true iff this event has a valid (non-null) handle inside + */ + operator bool() const; +private: + using HandlePtr = std::unique_ptr; + + HandlePtr handle_; +}; + +template +std::vector Event::get_messages(const Allocator allocator) { + const size_t total_messages = get_message_count(); + std::vector raw_messages(total_messages); + const auto messages_read = rd_kafka_event_message_array(handle_.get(), + raw_messages.data(), + total_messages); + std::vector output(allocator); + output.reserve(messages_read); + for (auto message : raw_messages) { + output.emplace_back(Message::make_non_owning(const_cast(message))); + } + return output; +} + +} // cppkafka + +#endif // CPPKAFKA_EVENT_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 949e612..c4caad3 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -46,6 +46,7 @@ #include "configuration.h" #include "macros.h" #include "logging.h" +#include "queue.h" namespace cppkafka { @@ -239,6 +240,19 @@ public: */ const Configuration& get_configuration() const; +#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION + /** + * \brief Gets the background queue + * + * This translates into a call to rd_kafka_queue_get_background + * + * \return The background queue + */ + Queue get_background_queue() const { + return Queue::make_queue(rd_kafka_queue_get_background(handle_.get())); + } +#endif + /** * \brief Gets the length of the out queue * diff --git a/include/cppkafka/macros.h b/include/cppkafka/macros.h index a3fc392..460b414 100644 --- a/include/cppkafka/macros.h +++ b/include/cppkafka/macros.h @@ -46,5 +46,8 @@ // See: https://github.com/edenhill/librdkafka/issues/1792 #define RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION 0x000b0500 //v0.11.5.00 #define RD_KAFKA_HEADERS_SUPPORT_VERSION 0x000b0402 //v0.11.4.02 +#define RD_KAFKA_ADMIN_API_SUPPORT_VERSION 0x000b0500 //v0.11.5.00 +#define RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 +#define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 #endif // CPPKAFKA_MACROS_H diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 252cb97..2d82d4b 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -177,6 +177,7 @@ public: */ boost::optional get_timestamp() const; +#if RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION /** * \brief Gets the message latency in microseconds as measured from the produce() call. */ @@ -184,6 +185,7 @@ public: assert(handle_); return std::chrono::microseconds(rd_kafka_message_latency(handle_.get())); } +#endif /** * \brief Indicates whether this message is valid (not null) diff --git a/include/cppkafka/queue.h b/include/cppkafka/queue.h index 72c238a..b509efc 100644 --- a/include/cppkafka/queue.h +++ b/include/cppkafka/queue.h @@ -31,6 +31,7 @@ #include #include #include +#include "event.h" #include "macros.h" #include "message.h" @@ -51,7 +52,18 @@ public: * \param handle The handle to be used */ static Queue make_non_owning(rd_kafka_queue_t* handle); - + + /** + * \brieef Creates a Queue object out of a handle. + * + * This will check what the rdkafka version is and will return either an owned + * queue handle or a non owned one, depending on whether the current version + * is >= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION (see macros.h) + * + * \param handle The handle to be used + */ + static Queue make_queue(rd_kafka_queue_t* handle); + /** * \brief Constructs an empty queue * @@ -130,7 +142,7 @@ public: * \return A message */ Message consume(std::chrono::milliseconds timeout) const; - + /** * \brief Consumes a batch of messages from this queue * @@ -188,7 +200,23 @@ public: */ std::vector consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const; - + + /** + * \brief Extracts the next message in this Queue + * + * /return The latest event, if any + */ + Event next_event() const; + + /** + * \brief Extracts the next message in this Queue + * + * \param timeout The amount of time to wait for this operation to complete + * + * /return The latest event, if any + */ + Event next_event(std::chrono::milliseconds timeout) const; + /** * Indicates whether this queue is valid (not null) */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 409821e..5d31078 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,6 +14,7 @@ set(SOURCES metadata.cpp group_information.cpp error.cpp + event.cpp kafka_handle_base.cpp producer.cpp diff --git a/src/configuration.cpp b/src/configuration.cpp index 061adc7..fbb3d3a 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -74,7 +74,7 @@ void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque void throttle_callback_proxy(rd_kafka_t*, const char* broker_name, int32_t broker_id, int throttle_time_ms, void *opaque) { KafkaHandleBase* handle = static_cast(opaque); - CallbackInvoker + CallbackInvoker ("throttle", handle->get_configuration().get_throttle_callback(), handle) (*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); } @@ -102,6 +102,13 @@ int socket_callback_proxy(int domain, int type, int protocol, void* opaque) { (domain, type, protocol); } +void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, void *opaque) { + KafkaHandleBase* handle = static_cast(opaque); + CallbackInvoker + ("background_event", handle->get_configuration().get_background_event_callback(), handle) + (*handle, Event{event_ptr}); +} + // Configuration Configuration::Configuration() @@ -177,6 +184,14 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) { return *this; } +#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION +Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) { + background_event_callback_ = move(callback); + rd_kafka_conf_set_background_event_cb(handle_.get(), &background_event_callback_proxy); + return *this; +} +#endif + Configuration& Configuration::set_default_topic_configuration(TopicConfiguration config) { default_topic_config_ = std::move(config); @@ -239,6 +254,11 @@ const Configuration::SocketCallback& Configuration::get_socket_callback() const return socket_callback_; } +const Configuration::BackgroundEventCallback& +Configuration::get_background_event_callback() const { + return background_event_callback_; +} + const optional& Configuration::get_default_topic_configuration() const { return default_topic_config_; } diff --git a/src/consumer.cpp b/src/consumer.cpp index 9ef4189..81fc04e 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -49,15 +49,6 @@ using std::allocator; namespace cppkafka { -Queue Consumer::get_queue(rd_kafka_queue_t* handle) { - if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) { - return Queue::make_non_owning(handle); - } - else { - return Queue(handle); - } -} - void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque) { TopicPartitionList list = convert(partitions); @@ -265,19 +256,19 @@ std::vector Consumer::poll_batch(size_t max_batch_size, milliseconds ti } Queue Consumer::get_main_queue() const { - Queue queue(get_queue(rd_kafka_queue_get_main(get_handle()))); + Queue queue = Queue::make_queue(rd_kafka_queue_get_main(get_handle())); queue.disable_queue_forwarding(); return queue; } Queue Consumer::get_consumer_queue() const { - return get_queue(rd_kafka_queue_get_consumer(get_handle())); + return Queue::make_queue(rd_kafka_queue_get_consumer(get_handle())); } Queue Consumer::get_partition_queue(const TopicPartition& partition) const { - Queue queue(get_queue(rd_kafka_queue_get_partition(get_handle(), - partition.get_topic().c_str(), - partition.get_partition()))); + Queue queue = Queue::make_queue(rd_kafka_queue_get_partition(get_handle(), + partition.get_topic().c_str(), + partition.get_partition())); queue.disable_queue_forwarding(); return queue; } diff --git a/src/event.cpp b/src/event.cpp new file mode 100644 index 0000000..3fbc52f --- /dev/null +++ b/src/event.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2018, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "event.h" + +using std::allocator; +using std::string; +using std::unique_ptr; +using std::vector; + +namespace cppkafka { + +Event::Event(rd_kafka_event_t* handle) +: handle_(handle, &rd_kafka_event_destroy) { + +} + +string Event::get_name() const { + return rd_kafka_event_name(handle_.get()); +} + +rd_kafka_event_type_t Event::get_type() const { + return rd_kafka_event_type(handle_.get()); +} + +Message Event::get_next_message() const { + // Note: the constness in rd_kafka_event_message_next's return value is not needed and it + // breaks Message's interface. This is dirty but it looks like it should have no side effects. + const auto message = + const_cast(rd_kafka_event_message_next(handle_.get())); + return Message::make_non_owning(message); +} + +vector Event::get_messages() { + return get_messages(allocator()); +} + +size_t Event::get_message_count() const { + return rd_kafka_event_message_count(handle_.get()); +} + +Error Event::get_error() const { + return rd_kafka_event_error(handle_.get()); +} + +void* Event::get_opaque() const { + return rd_kafka_event_opaque(handle_.get()); +} + +TopicPartition Event::get_topic_partition() const { + using TopparHandle = unique_ptr; + TopparHandle toppar_handle{rd_kafka_event_topic_partition(handle_.get()), + &rd_kafka_topic_partition_destroy}; + return TopicPartition(toppar_handle->topic, toppar_handle->partition, toppar_handle->offset); +} + +TopicPartitionList Event::get_topic_partition_list() const { + auto toppars_handle = rd_kafka_event_topic_partition_list(handle_.get()); + return convert(toppars_handle); +} + +Event::operator bool() const { + return !!handle_; +} + +} // cppkafka diff --git a/src/queue.cpp b/src/queue.cpp index 909fd76..875bd1b 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -46,6 +46,15 @@ Queue Queue::make_non_owning(rd_kafka_queue_t* handle) { return Queue(handle, NonOwningTag{}); } +Queue Queue::make_queue(rd_kafka_queue_t* handle) { + if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) { + return Queue::make_non_owning(handle); + } + else { + return Queue(handle); + } +} + Queue::Queue() : handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT) { @@ -95,13 +104,20 @@ Message Queue::consume(milliseconds timeout) const { return Message(rd_kafka_consume_queue(handle_.get(), static_cast(timeout.count()))); } -std::vector Queue::consume_batch(size_t max_batch_size) const { +vector Queue::consume_batch(size_t max_batch_size) const { return consume_batch(max_batch_size, timeout_ms_, allocator()); } -std::vector Queue::consume_batch(size_t max_batch_size, - milliseconds timeout) const { +vector Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const { return consume_batch(max_batch_size, timeout, allocator()); } +Event Queue::next_event() const { + return next_event(timeout_ms_); +} + +Event Queue::next_event(milliseconds timeout) const { + return Event(rd_kafka_queue_poll(handle_.get(), timeout.count())); +} + } //cppkafka diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 4baa896..077b351 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -241,3 +241,20 @@ TEST_CASE("consume batch", "[consumer]") { CHECK(all_messages[0].get_payload() == payload); CHECK(all_messages[1].get_payload() == payload); } + +// This test may fail due to what seems to be an rdkafka bug. Skip it for now until we're +// certain of what to do +TEST_CASE("Event consumption", "[!hide][consumer]") { + // Create a consumer and subscribe to the topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + + vector types = { + RD_KAFKA_EVENT_NONE + }; + Queue queue = consumer.get_main_queue(); + for (const auto type : types) { + const Event event = queue.next_event(); + CHECK(event.get_type() == type); + } +} diff --git a/tests/roundrobin_poll_test.cpp b/tests/roundrobin_poll_test.cpp index 22bde53..a1719f8 100644 --- a/tests/roundrobin_poll_test.cpp +++ b/tests/roundrobin_poll_test.cpp @@ -77,7 +77,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); // Produce messages so we stop the consumer - Producer producer(make_producer_config()); + BufferedProducer producer(make_producer_config()); string payload = "RoundRobin"; // push 3 messages in each partition @@ -86,17 +86,8 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { .partition(i % KAFKA_NUM_PARTITIONS) .payload(payload)); } - for (int i = 0; i < 3; ++i) { - try { - producer.flush(); - break; - } - catch (const exception& ex) { - if (i == 2) { - throw; - } - } - } + producer.flush(); + runner.try_join(); // Check that we have all messages