diff --git a/include/cppkafka/compacted_topic_processor.h b/include/cppkafka/compacted_topic_processor.h new file mode 100644 index 0000000..ed17809 --- /dev/null +++ b/include/cppkafka/compacted_topic_processor.h @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_COMPACTED_TOPIC_PROCESSOR_H +#define CPPKAFKA_COMPACTED_TOPIC_PROCESSOR_H + +#include +#include +#include +#include +#include +#include "buffer.h" +#include "consumer.h" + +namespace cppkafka { + +/** + * \brief Events generated by a CompactedTopicProcessor + */ +template +class CompactedTopicEvent { +public: + /** + * \brief Event type enum + */ + enum EventType { + SET_ELEMENT, + DELETE_ELEMENT, + CLEAR_ELEMENTS, + REACHED_EOF + }; + + /** + * Constructs an instance providing only a type + */ + CompactedTopicEvent(EventType type, std::string topic, int partition); + + /** + * Constructs an instance providing a type and a key + */ + CompactedTopicEvent(EventType type, std::string topic, int partition, Key key); + + /** + * Constructs an instance providing a type, a key and a value + */ + CompactedTopicEvent(EventType type, std::string topic, int partition, Key key, Value value); + + /** + * Gets the event type + */ + EventType get_type() const; + + /** + * Gets the topic that generated this event + */ + const std::string& get_topic() const; + + /** + * Gets the partition that generated this event + */ + int get_partition() const; + + /** + * \brief Gets the event key + * + * Note that it's only valid to call this method if the event type is either: + * + * * SET_ELEMENT + * * DELETE_ELEMENT + */ + const Key& get_key() const; + + /** + * \brief Gets the event value + * + * Note that it's only valid to call this method if the event type is SET_ELEMENT + */ + const Value& get_value() const; +private: + EventType type_; + std::string topic_; + int partition_; + boost::optional key_; + boost::optional value_; +}; + +template +class CompactedTopicProcessor { +public: + /** + * The type of events generated by this processor + */ + using Event = CompactedTopicEvent; + + /** + * Callback used for decoding key objects + */ + using KeyDecoder = std::function; + + /** + * Callback used for decoding value objects + */ + using ValueDecoder = std::function; + + /** + * Callback used for event handling + */ + using EventHandler = std::function; + + /** + * \brief Constructs an instance + */ + CompactedTopicProcessor(Consumer& consumer); + ~CompactedTopicProcessor(); + + CompactedTopicProcessor(const CompactedTopicProcessor&) = delete; + CompactedTopicProcessor(CompactedTopicProcessor&&) = delete; + CompactedTopicProcessor& operator=(const CompactedTopicProcessor&) = delete; + CompactedTopicProcessor& operator=(CompactedTopicProcessor&&) = delete; + + /** + * \brief Sets the key decoder callback + */ + void set_key_decoder(KeyDecoder callback); + + /** + * \brief Sets the value decoder callback + */ + void set_value_decoder(ValueDecoder callback); + + /** + * \brief Sets the event handler callback + */ + void set_event_handler(EventHandler callback); + + /** + * \brief Processes the next event + */ + void process_event(); +private: + void on_assignment(TopicPartitionList& topic_partitions); + + Consumer& consumer_; + KeyDecoder key_decoder_; + ValueDecoder value_decoder_; + EventHandler event_handler_; + std::map partition_offsets_; + Consumer::AssignmentCallback original_assignment_callback_; +}; + +// CompactedTopicEvent + +template +CompactedTopicEvent::CompactedTopicEvent(EventType type, std::string topic, int partition) +: type_(type), topic_(std::move(topic)), partition_(partition) { + +} + +template +CompactedTopicEvent::CompactedTopicEvent(EventType type, std::string topic, int partition, + K key) +: type_(type), topic_(std::move(topic)), partition_(partition), key_(std::move(key)) { + +} + +template +CompactedTopicEvent::CompactedTopicEvent(EventType type, std::string topic, int partition, + K key, V value) +: type_(type), topic_(std::move(topic)), partition_(partition), key_(std::move(key)), + value_(std::move(value)) { + +} + +template +typename CompactedTopicEvent::EventType CompactedTopicEvent::get_type() const { + return type_; +} + +template +const std::string& CompactedTopicEvent::get_topic() const { + return topic_; +} + +template +int CompactedTopicEvent::get_partition() const { + return partition_; +} + +template +const K& CompactedTopicEvent::get_key() const { + return *key_; +} + +template +const V& CompactedTopicEvent::get_value() const { + return *value_; +} + +// CompactedTopicProcessor + +template +CompactedTopicProcessor::CompactedTopicProcessor(Consumer& consumer) +: consumer_(consumer) { + // Save the current assignment callback and assign ours + original_assignment_callback_ = consumer_.get_assignment_callback(); + consumer_.set_assignment_callback([&](TopicPartitionList& topic_partitions) { + on_assignment(topic_partitions); + }); +} + +template +CompactedTopicProcessor::~CompactedTopicProcessor() { + // Restore previous assignment callback + consumer_.set_assignment_callback(original_assignment_callback_); +} + +template +void CompactedTopicProcessor::set_key_decoder(KeyDecoder callback) { + key_decoder_ = std::move(callback); +} + +template +void CompactedTopicProcessor::set_value_decoder(ValueDecoder callback) { + value_decoder_ = std::move(callback); +} + +template +void CompactedTopicProcessor::set_event_handler(EventHandler callback) { + event_handler_ = std::move(callback); +} + +template +void CompactedTopicProcessor::process_event() { + Message message = consumer_.poll(); + if (message) { + if (!message.has_error()) { + Key key = key_decoder_(message.get_key()); + if (message.get_payload()) { + // If there's a payload, generate a SET_ELEMENT event + Value value = value_decoder_(key, message.get_payload()); + event_handler_({ Event::SET_ELEMENT, message.get_topic(), message.get_partition(), + key, std::move(value) }); + } + else { + // No payload, generate a DELETE_ELEMENT event + event_handler_({ Event::DELETE_ELEMENT, message.get_topic(), + message.get_partition(), key }); + } + // Store the offset for this topic/partition + TopicPartition topic_partition(message.get_topic(), message.get_partition()); + partition_offsets_[topic_partition] = message.get_offset(); + } + else { + if (message.is_eof()) { + event_handler_({ Event::REACHED_EOF, message.get_topic(), + message.get_partition() }); + } + } + } +} + +template +void CompactedTopicProcessor::on_assignment(TopicPartitionList& topic_partitions) { + if (original_assignment_callback_) { + original_assignment_callback_(topic_partitions); + } + std::set partitions_found; + // See if we already had an assignment for any of these topic/partitions. If we do, + // then restore the offset following the last one we saw + for (TopicPartition& topic_partition : topic_partitions) { + auto iter = partition_offsets_.find(topic_partition); + if (iter != partition_offsets_.end()) { + topic_partition.set_offset(iter->second); + } + // Populate this set + partitions_found.insert(topic_partition); + } + // Clear our cache: remove any entries for topic/partitions that aren't assigned to us now. + // Emit a CLEAR_ELEMENTS event for each topic/partition that is gone + auto iter = partition_offsets_.begin(); + while (iter != partition_offsets_.end()) { + const TopicPartition& topic_partition = iter->first; + if (partitions_found.count(topic_partition) == 0) { + event_handler_({ Event::CLEAR_ELEMENTS, topic_partition.get_topic(), + topic_partition.get_partition() }); + iter = partition_offsets_.erase(iter); + } + else { + ++iter; + } + } +} + +} // cppkafka + +#endif // CPPKAFKA_COMPACTED_TOPIC_PROCESSOR_H