Use optionals for CompactedTopicProcessor decoders

This commit is contained in:
Matias Fontanini
2016-10-20 15:18:46 -07:00
parent 23f151175d
commit b834fbd07a

View File

@@ -121,12 +121,12 @@ public:
/**
* Callback used for decoding key objects
*/
using KeyDecoder = std::function<Key(const Buffer&)>;
using KeyDecoder = std::function<boost::optional<Key>(const Buffer&)>;
/**
* Callback used for decoding value objects
*/
using ValueDecoder = std::function<Value(const Key& key, const Buffer&)>;
using ValueDecoder = std::function<boost::optional<Value>(const Key& key, const Buffer&)>;
/**
* Callback used for event handling
@@ -276,17 +276,22 @@ void CompactedTopicProcessor<Key, Value>::process_event() {
Message message = consumer_.poll();
if (message) {
if (!message.get_error()) {
Key key = key_decoder_(message.get_key());
if (message.get_payload()) {
// If there's a payload, generate a SET_ELEMENT event
Value value = value_decoder_(key, message.get_payload());
event_handler_({ Event::SET_ELEMENT, message.get_topic(), message.get_partition(),
key, std::move(value) });
}
else {
// No payload, generate a DELETE_ELEMENT event
event_handler_({ Event::DELETE_ELEMENT, message.get_topic(),
message.get_partition(), key });
boost::optional<Key> key = key_decoder_(message.get_key());
if (key) {
if (message.get_payload()) {
boost::optional<Value> value = value_decoder_(*key, message.get_payload());
if (value) {
// If there's a payload and we managed to parse the value, generate a
// SET_ELEMENT event
event_handler_({ Event::SET_ELEMENT, message.get_topic(),
message.get_partition(), *key, std::move(*value) });
}
}
else {
// No payload, generate a DELETE_ELEMENT event
event_handler_({ Event::DELETE_ELEMENT, message.get_topic(),
message.get_partition(), *key });
}
}
// Store the offset for this topic/partition
TopicPartition topic_partition(message.get_topic(), message.get_partition());