diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 4f4268e..854cbbd 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 2cc2376..949e612 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -45,6 +45,7 @@ #include "topic_configuration.h" #include "configuration.h" #include "macros.h" +#include "logging.h" namespace cppkafka { @@ -107,6 +108,11 @@ public: * \param timeout The timeout to be set */ void set_timeout(std::chrono::milliseconds timeout); + + /** + * \brief Sets the log level + */ + void set_log_level(LogLevel level); /** * \brief Adds one or more brokers to this handle's broker list diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 7a3459a..252cb97 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -40,10 +40,10 @@ #include "macros.h" #include "error.h" #include "header_list.h" +#include "message_timestamp.h" namespace cppkafka { -class MessageTimestamp; class Internal; /** @@ -175,7 +175,7 @@ public: * * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ - inline boost::optional get_timestamp() const; + boost::optional get_timestamp() const; /** * \brief Gets the message latency in microseconds as measured from the produce() call. @@ -226,49 +226,6 @@ private: using MessageList = std::vector; -/** - * Represents a message's timestamp - */ -class CPPKAFKA_API MessageTimestamp { -public: - /** - * The timestamp type - */ - enum TimestampType { - CREATE_TIME = RD_KAFKA_TIMESTAMP_CREATE_TIME, - LOG_APPEND_TIME = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME - }; - - /** - * Constructs a timestamp object using a 'duration'. - */ - MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type); - - /** - * Gets the timestamp value. If the timestamp was created with a 'time_point', - * the duration represents the number of milliseconds since epoch. - */ - std::chrono::milliseconds get_timestamp() const; - - /** - * Gets the timestamp type - */ - TimestampType get_type() const; -private: - std::chrono::milliseconds timestamp_; - TimestampType type_; -}; - -boost::optional Message::get_timestamp() const { - rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; - int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); - if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { - return {}; - } - return MessageTimestamp(std::chrono::milliseconds(timestamp), - static_cast(type)); -} - } // cppkafka #endif // CPPKAFKA_MESSAGE_H diff --git a/include/cppkafka/message_timestamp.h b/include/cppkafka/message_timestamp.h new file mode 100644 index 0000000..e19b6ce --- /dev/null +++ b/include/cppkafka/message_timestamp.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_MESSAGE_TIMESTAMP_H +#define CPPKAFKA_MESSAGE_TIMESTAMP_H + +#include +#include +#include +#include "macros.h" + +namespace cppkafka { + +/** + * Represents a message's timestamp + */ +class CPPKAFKA_API MessageTimestamp { + friend class Message; +public: + /** + * The timestamp type + */ + enum TimestampType { + CREATE_TIME = RD_KAFKA_TIMESTAMP_CREATE_TIME, + LOG_APPEND_TIME = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME + }; + + /** + * Gets the timestamp value. If the timestamp was created with a 'time_point', + * the duration represents the number of milliseconds since epoch. + */ + std::chrono::milliseconds get_timestamp() const; + + /** + * Gets the timestamp type + */ + TimestampType get_type() const; +private: + MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type); + + std::chrono::milliseconds timestamp_; + TimestampType type_; +}; + +} // cppkafka + +#endif //CPPKAFKA_MESSAGE_TIMESTAMP_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1525b1c..409821e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ set(SOURCES buffer.cpp queue.cpp message.cpp + message_timestamp.cpp message_internal.cpp topic_partition.cpp topic_partition_list.cpp diff --git a/src/consumer.cpp b/src/consumer.cpp index 52e50b2..9ef4189 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -79,6 +79,7 @@ Consumer::Consumer(Configuration config) } rd_kafka_poll_set_consumer(ptr); set_handle(ptr); + set_log_level(LogLevel::LogErr); } Consumer::~Consumer() { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 284b10f..bba6e14 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -83,6 +83,10 @@ void KafkaHandleBase::set_timeout(milliseconds timeout) { timeout_ms_ = timeout; } +void KafkaHandleBase::set_log_level(LogLevel level) { + rd_kafka_set_log_level(handle_.get(), static_cast(level)); +} + void KafkaHandleBase::add_brokers(const string& brokers) { rd_kafka_brokers_add(handle_.get(), brokers.data()); } diff --git a/src/message.cpp b/src/message.cpp index b17f20b..103bae8 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -84,20 +84,14 @@ Message& Message::load_internal() { return *this; } -// MessageTimestamp - -MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type) -: timestamp_(timestamp), - type_(type) { - -} - -milliseconds MessageTimestamp::get_timestamp() const { - return timestamp_; -} - -MessageTimestamp::TimestampType MessageTimestamp::get_type() const { - return type_; +boost::optional Message::get_timestamp() const { + rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; + int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); + if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { + return {}; + } + return MessageTimestamp(std::chrono::milliseconds(timestamp), + static_cast(type)); } } // cppkafka diff --git a/src/message_timestamp.cpp b/src/message_timestamp.cpp new file mode 100644 index 0000000..5c4c970 --- /dev/null +++ b/src/message_timestamp.cpp @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "message_timestamp.h" + +using std::chrono::milliseconds; + +namespace cppkafka { + +MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type) +: timestamp_(timestamp), + type_(type) { + +} + +milliseconds MessageTimestamp::get_timestamp() const { + return timestamp_; +} + +MessageTimestamp::TimestampType MessageTimestamp::get_type() const { + return type_; +} + +} // cppkafka + diff --git a/src/producer.cpp b/src/producer.cpp index dd1c1c2..706e607 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -52,8 +52,8 @@ Producer::Producer(Configuration config) if (!ptr) { throw Exception("Failed to create producer handle: " + string(error_buffer)); } - rd_kafka_set_log_level(ptr, 7); set_handle(ptr); + set_log_level(LogLevel::LogErr); } void Producer::set_payload_policy(PayloadPolicy policy) {