Header support implementation (#115)

* header support implementation

* Fixed issue when ptr is null and doesn't have a cloner function

* Code complete with test cases

updated travis file with v0.11.5

* Added compile time check for rdkafka header support version

* Changes per last code review

* Using brace list initializers
This commit is contained in:
Alex Damian
2018-10-16 13:58:05 -04:00
committed by Matias Fontanini
parent 9af4330c6d
commit fbe3759fed
20 changed files with 1316 additions and 51 deletions

View File

@@ -7,7 +7,7 @@ compiler:
- clang
env:
- RDKAFKA_VERSION=v0.11.0
- RDKAFKA_VERSION=v0.11.5
os:
- linux

View File

@@ -17,6 +17,8 @@ only supported via the high level consumer API. _cppkafka_ requires **rdkafka >=
order to use it. Other wrapped functionalities are also provided, like fetching metadata,
offsets, etc.
* _cppkafka_ provides message header support. This feature requires **rdkafka >= 0.11.4**.
* _cppkafka_ tries to add minimal overhead over _librdkafka_. A very thin wrapper for _librdkafka_
messages is used for consumption so there's virtually no overhead at all.

View File

@@ -172,6 +172,14 @@ CPPKAFKA_API bool operator==(const Buffer& lhs, const Buffer& rhs);
*/
CPPKAFKA_API bool operator!=(const Buffer& lhs, const Buffer& rhs);
/**
* Compares Buffer objects lexicographically
*/
CPPKAFKA_API bool operator<(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator<=(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator>(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator>=(const Buffer& lhs, const Buffer& rhs);
} // cppkafka
#endif // CPPKAFKA_BUFFER_H

View File

@@ -41,7 +41,7 @@ template <typename T, typename Deleter, typename Cloner>
class ClonablePtr {
public:
/**
* Creates an instance
* \brief Creates an instance
*
* \param ptr The pointer to be wrapped
* \param deleter The deleter functor
@@ -60,17 +60,23 @@ public:
* \param rhs The pointer to be copied
*/
ClonablePtr(const ClonablePtr& rhs)
: handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) {
: handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter())),
cloner_(rhs.cloner_) {
}
/**
* Copies and assigns the given pointer
* \brief Copies and assigns the given pointer
*
* \param rhs The pointer to be copied
*/
ClonablePtr& operator=(const ClonablePtr& rhs) {
handle_.reset(cloner_(rhs.handle_.get()));
if (this == &rhs) {
return *this;
}
handle_ = rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter());
return *this;
}
@@ -79,11 +85,25 @@ public:
~ClonablePtr() = default;
/**
* Getter for the internal pointer
* \brief Getter for the internal pointer
*/
T* get() const {
return handle_.get();
}
/**
* \brief Releases ownership of the internal pointer
*/
T* release() {
return handle_.release();
}
/**
* \brief Indicates whether this ClonablePtr instance is valid (not null)
*/
explicit operator bool() const {
return static_cast<bool>(handle_);
}
private:
std::unique_ptr<T, Deleter> handle_;
Cloner cloner_;

View File

@@ -39,6 +39,9 @@
#include <cppkafka/error.h>
#include <cppkafka/exceptions.h>
#include <cppkafka/group_information.h>
#include <cppkafka/header.h>
#include <cppkafka/header_list.h>
#include <cppkafka/header_list_iterator.h>
#include <cppkafka/kafka_handle_base.h>
#include <cppkafka/logging.h>
#include <cppkafka/macros.h>

195
include/cppkafka/header.h Normal file
View File

@@ -0,0 +1,195 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_HEADER_H
#define CPPKAFKA_HEADER_H
#include "macros.h"
#include "buffer.h"
#include <string>
#include <assert.h>
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
namespace cppkafka {
/**
* \brief Class representing a rdkafka header.
*
* The template parameter 'BufferType' can represent a cppkafka::Buffer, std::string, std::vector, etc.
* A valid header may contain an empty name as well as null data.
*/
template <typename BufferType>
class Header {
public:
using ValueType = BufferType;
/**
* \brief Build an empty header with no data
*/
Header() = default;
/**
* \brief Build a header instance
* \param name The header name
* \param value The non-modifiable header data
*/
Header(std::string name,
const BufferType& value);
/**
* \brief Build a header instance
* \param name The header name
* \param value The header data to be moved
*/
Header(std::string name,
BufferType&& value);
/**
* \brief Get the header name
* \return A reference to the name
*/
const std::string& get_name() const;
/**
* \brief Get the header value
* \return A const reference to the underlying buffer
*/
const BufferType& get_value() const;
/**
* \brief Get the header value
* \return A non-const reference to the underlying buffer
*/
BufferType& get_value();
/**
* \brief Check if this header is empty
* \return True if the header contains valid data, false otherwise.
*/
operator bool() const;
private:
template <typename T>
T make_value(const T& other);
Buffer make_value(const Buffer& other);
std::string name_;
BufferType value_;
};
// Comparison operators for Header type
template <typename BufferType>
bool operator==(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value());
}
template <typename BufferType>
bool operator!=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs == rhs);
}
template <typename BufferType>
bool operator<(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) < std::tie(rhs.get_name(), rhs.get_value());
}
template <typename BufferType>
bool operator>(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) > std::tie(rhs.get_name(), rhs.get_value());
}
template <typename BufferType>
bool operator<=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs > rhs);
}
template <typename BufferType>
bool operator>=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs < rhs);
}
// Implementation
template <typename BufferType>
Header<BufferType>::Header(std::string name,
const BufferType& value)
: name_(std::move(name)),
value_(make_value(value)) {
}
template <typename BufferType>
Header<BufferType>::Header(std::string name,
BufferType&& value)
: name_(std::move(name)),
value_(std::move(value)) {
}
template <typename BufferType>
const std::string& Header<BufferType>::get_name() const {
return name_;
}
template <typename BufferType>
const BufferType& Header<BufferType>::get_value() const {
return value_;
}
template <typename BufferType>
BufferType& Header<BufferType>::get_value() {
return value_;
}
template <typename BufferType>
Header<BufferType>::operator bool() const {
return !value_.empty();
}
template <>
inline
Header<Buffer>::operator bool() const {
return value_.get_size() > 0;
}
template <typename BufferType>
template <typename T>
T Header<BufferType>::make_value(const T& other) {
return other;
}
template <typename BufferType>
Buffer Header<BufferType>::make_value(const Buffer& other) {
return Buffer(other.get_data(), other.get_size());
}
} //namespace cppkafka
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
#endif //CPPKAFKA_HEADER_H

View File

@@ -0,0 +1,317 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_HEADER_LIST_H
#define CPPKAFKA_HEADER_LIST_H
#include <librdkafka/rdkafka.h>
#include "clonable_ptr.h"
#include "header.h"
#include "header_list_iterator.h"
#include "exceptions.h"
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
namespace cppkafka {
/**
* \brief Thin wrapper over a rd_kafka_headers_t handle which optionally controls its lifetime.
* \tparam HeaderType The header type
*
* This is a copyable and movable class that wraps a rd_kafka_header_t*. When copying this class,
* all associated headers are also copied via rd_kafka_headers_copy(). If this list owns the underlying handle,
* its destructor will call rd_kafka_headers_destroy().
*/
template <typename HeaderType>
class HeaderList {
public:
using BufferType = typename HeaderType::ValueType;
using Iterator = HeaderIterator<HeaderType>;
/**
* Constructs a message that won't take ownership of the given pointer.
*/
static HeaderList<HeaderType> make_non_owning(rd_kafka_headers_t* handle);
/**
* \brief Create an empty header list with no handle.
*/
HeaderList();
/**
* \brief Create an empty header list. This call translates to rd_kafka_headers_new().
* \param reserve The number of headers to reserve space for.
*/
explicit HeaderList(size_t reserve);
/**
* \brief Create a header list and assume ownership of the handle.
* \param handle The header list handle.
*/
explicit HeaderList(rd_kafka_headers_t* handle);
/**
* \brief Add a header to the list. This translates to rd_kafka_header_add().
* \param header The header.
* \return An Error indicating if the operation was successful or not.
* \warning This operation shall invalidate all iterators.
*/
Error add(const HeaderType& header);
/**
* \brief Remove all headers with 'name'. This translates to rd_kafka_header_remove().
* \param name The name of the header(s) to remove.
* \return An Error indicating if the operation was successful or not.
* \warning This operation shall invalidate all iterators.
*/
Error remove(const std::string& name);
/**
* \brief Return the header present at position 'index'. Throws on error.
* This translates to rd_kafka_header_get(index)
* \param index The header index in the list (0-based).
* \return The header at that position.
*/
HeaderType at(size_t index) const; //throws
/**
* \brief Return the first header in the list. Throws if the list is empty.
* This translates to rd_kafka_header_get(0).
* \return The first header.
*/
HeaderType front() const; //throws
/**
* \brief Return the first header in the list. Throws if the list is empty.
* This translates to rd_kafka_header_get(size-1).
* \return The last header.
*/
HeaderType back() const; //throws
/**
* \brief Returns the number of headers in the list. This translates to rd_kafka_header_cnt().
* \return The number of headers.
*/
size_t size() const;
/**
* \brief Indicates if this list is empty.
* \return True if empty, false otherwise.
*/
bool empty() const;
/**
* \brief Returns a HeaderIterator pointing to the first position if the list is not empty
* or pointing to end() otherwise.
* \return An iterator.
* \warning This iterator will be invalid if add() or remove() is called.
*/
Iterator begin() const;
/**
* \brief Returns a HeaderIterator pointing to one element past the end of the list.
* \return An iterator.
* \remark This iterator cannot be de-referenced.
*/
Iterator end() const;
/**
* \brief Get the underlying header list handle.
* \return The handle.
*/
rd_kafka_headers_t* get_handle() const;
/**
* \brief Get the underlying header list handle and release its ownership.
* \return The handle.
* \warning After this call, the HeaderList becomes invalid.
*/
rd_kafka_headers_t* release_handle();
/**
* \brief Indicates if this list is valid (contains a non-null handle) or not.
* \return True if valid, false otherwise.
*/
explicit operator bool() const;
private:
struct NonOwningTag { };
static void dummy_deleter(rd_kafka_headers_t*) {}
static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast<rd_kafka_headers_t*>(handle); }
using HandlePtr = ClonablePtr<rd_kafka_headers_t, decltype(&rd_kafka_headers_destroy),
decltype(&rd_kafka_headers_copy)>;
HeaderList(rd_kafka_headers_t* handle, NonOwningTag);
HandlePtr handle_;
};
template <typename HeaderType>
bool operator==(const HeaderList<HeaderType>& lhs, const HeaderList<HeaderType> rhs) {
if (!lhs && !rhs) {
return true;
}
if (!lhs || !rhs) {
return false;
}
if (lhs.size() != rhs.size()) {
return false;
}
return std::equal(lhs.begin(), lhs.end(), rhs.begin());
}
template <typename HeaderType>
bool operator!=(const HeaderList<HeaderType>& lhs, const HeaderList<HeaderType> rhs) {
return !(lhs == rhs);
}
template <typename HeaderType>
HeaderList<HeaderType> HeaderList<HeaderType>::make_non_owning(rd_kafka_headers_t* handle) {
return HeaderList(handle, NonOwningTag());
}
template <typename HeaderType>
HeaderList<HeaderType>::HeaderList()
: handle_(nullptr, nullptr, nullptr) {
}
template <typename HeaderType>
HeaderList<HeaderType>::HeaderList(size_t reserve)
: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) {
}
template <typename HeaderType>
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle)
: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy
}
template <typename HeaderType>
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle, NonOwningTag)
: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy.
}
// Methods
template <typename HeaderType>
Error HeaderList<HeaderType>::add(const HeaderType& header) {
assert(handle_);
return rd_kafka_header_add(handle_.get(),
header.get_name().data(), header.get_name().size(),
header.get_value().data(), header.get_value().size());
}
template <>
inline
Error HeaderList<Header<Buffer>>::add(const Header<Buffer>& header) {
assert(handle_);
return rd_kafka_header_add(handle_.get(),
header.get_name().data(), header.get_name().size(),
header.get_value().get_data(), header.get_value().get_size());
}
template <typename HeaderType>
Error HeaderList<HeaderType>::remove(const std::string& name) {
assert(handle_);
return rd_kafka_header_remove(handle_.get(), name.data());
}
template <typename HeaderType>
HeaderType HeaderList<HeaderType>::at(size_t index) const {
assert(handle_);
const char *name, *value;
size_t size;
Error error = rd_kafka_header_get_all(handle_.get(), index, &name, reinterpret_cast<const void**>(&value), &size);
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw Exception(error.to_string());
}
return HeaderType(name, BufferType(value, size));
}
template <typename HeaderType>
HeaderType HeaderList<HeaderType>::front() const {
return at(0);
}
template <typename HeaderType>
HeaderType HeaderList<HeaderType>::back() const {
return at(size()-1);
}
template <typename HeaderType>
size_t HeaderList<HeaderType>::size() const {
assert(handle_);
return rd_kafka_header_cnt(handle_.get());
}
template <typename HeaderType>
bool HeaderList<HeaderType>::empty() const {
return size() == 0;
}
template <typename HeaderType>
typename HeaderList<HeaderType>::Iterator
HeaderList<HeaderType>::begin() const {
assert(handle_);
if (empty()) {
return end();
}
return Iterator(make_non_owning(handle_.get()), 0);
}
template <typename HeaderType>
typename HeaderList<HeaderType>::Iterator
HeaderList<HeaderType>::end() const {
assert(handle_);
return Iterator(make_non_owning(handle_.get()), size());
}
template <typename HeaderType>
rd_kafka_headers_t* HeaderList<HeaderType>::get_handle() const {
return handle_.get();
}
template <typename HeaderType>
rd_kafka_headers_t* HeaderList<HeaderType>::release_handle() {
return handle_.release();
}
template <typename HeaderType>
HeaderList<HeaderType>::operator bool() const {
return static_cast<bool>(handle_);
}
} //namespace cppkafka
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
#endif //CPPKAFKA_HEADER_LIST_H

View File

@@ -0,0 +1,193 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_HEADER_LIST_ITERATOR_H
#define CPPKAFKA_HEADER_LIST_ITERATOR_H
#include <cstddef>
#include <utility>
#include <iterator>
#include "header.h"
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
namespace cppkafka {
template <typename HeaderType>
class HeaderList;
template <typename HeaderType>
class HeaderIterator;
template <typename HeaderType>
bool operator==(const HeaderIterator<HeaderType>& lhs, const HeaderIterator<HeaderType>& rhs);
/**
* \brief Iterator over a HeaderList object.
* \tparam HeaderType The type of header this iterator points to.
*/
template <typename HeaderType>
class HeaderIterator {
public:
friend HeaderList<HeaderType>;
using HeaderListType = HeaderList<HeaderType>;
using BufferType = typename HeaderType::ValueType;
//std::iterator_traits
using difference_type = std::ptrdiff_t;
using value_type = HeaderType;
using pointer = value_type*;
using reference = value_type&;
using iterator_category = std::bidirectional_iterator_tag;
friend bool operator==<HeaderType>(const HeaderIterator<HeaderType>& lhs,
const HeaderIterator<HeaderType>& rhs);
HeaderIterator(const HeaderIterator& other)
: header_list_(other.header_list_),
header_(make_header(other.header_)),
index_(other.index_) {
}
HeaderIterator& operator=(const HeaderIterator& other) {
if (this == &other) return *this;
header_list_ = other.header_list_;
header_ = make_header(other.header_);
index_ = other.index_;
}
HeaderIterator(HeaderIterator&&) = default;
HeaderIterator& operator=(HeaderIterator&&) = default;
/**
* \brief Prefix increment of the iterator.
* \return Itself after being incremented.
*/
HeaderIterator& operator++() {
assert(index_ < header_list_.size());
++index_;
return *this;
}
/**
* \brief Postfix increment of the iterator.
* \return Itself before being incremented.
*/
HeaderIterator operator++(int) {
HeaderIterator tmp(*this);
operator++();
return tmp;
}
/**
* \brief Prefix decrement of the iterator.
* \return Itself after being decremented.
*/
HeaderIterator& operator--() {
assert(index_ > 0);
--index_;
return *this;
}
/**
* \brief Postfix decrement of the iterator.
* \return Itself before being decremented.
*/
HeaderIterator operator--(int) {
HeaderIterator tmp(*this);
operator--();
return tmp;
}
/**
* \brief Dereferences this iterator.
* \return A reference to the header the iterator points to.
* \warning Throws if invalid or if *this == end().
*/
const HeaderType& operator*() const {
header_ = header_list_.at(index_);
return header_;
}
HeaderType& operator*() {
header_ = header_list_.at(index_);
return header_;
}
/**
* \brief Dereferences this iterator.
* \return The address to the header the iterator points to.
* \warning Throws if invalid or if *this == end().
*/
const HeaderType* operator->() const {
header_ = header_list_.at(index_);
return &header_;
}
HeaderType* operator->() {
header_ = header_list_.at(index_);
return &header_;
}
private:
HeaderIterator(HeaderListType headers,
size_t index)
: header_list_(std::move(headers)),
header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)),
index_(index) {
}
template <typename T>
T make_header(const T& other) {
return other;
}
Header<Buffer> make_header(const Header<Buffer>& other) {
return Header<Buffer>(other.get_name(),
Buffer(other.get_value().get_data(),
other.get_value().get_size()));
}
HeaderListType header_list_;
HeaderType header_;
size_t index_;
};
// Equality comparison operators
template <typename HeaderType>
bool operator==(const HeaderIterator<HeaderType>& lhs, const HeaderIterator<HeaderType>& rhs) {
return (lhs.header_list_.get_handle() == rhs.header_list_.get_handle()) && (lhs.index_ == rhs.index_);
}
template <typename HeaderType>
bool operator!=(const HeaderIterator<HeaderType>& lhs, const HeaderIterator<HeaderType>& rhs) {
return !(lhs == rhs);
}
} //namespace cppkafka
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
#endif //CPPKAFKA_HEADER_LIST_ITERATOR_H

View File

@@ -43,4 +43,8 @@
#define CPPKAFKA_API
#endif // _WIN32 && !CPPKAFKA_STATIC
// See: https://github.com/edenhill/librdkafka/issues/1792
#define RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION 0x000b0500 //v0.11.5.00
#define RD_KAFKA_HEADERS_SUPPORT_VERSION 0x000b0402 //v0.11.4.02
#endif // CPPKAFKA_MACROS_H

View File

@@ -39,6 +39,7 @@
#include "buffer.h"
#include "macros.h"
#include "error.h"
#include "header_list.h"
namespace cppkafka {
@@ -59,6 +60,10 @@ class CPPKAFKA_API Message {
public:
friend class MessageInternal;
using InternalPtr = std::shared_ptr<Internal>;
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
using HeaderType = Header<Buffer>;
using HeaderListType = HeaderList<HeaderType>;
#endif
/**
* Constructs a message that won't take ownership of the given pointer
*/
@@ -84,7 +89,7 @@ public:
Message& operator=(Message&& rhs) = default;
/**
* Gets the error attribute
* \brief Gets the error attribute
*/
Error get_error() const {
assert(handle_);
@@ -92,14 +97,14 @@ public:
}
/**
* Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF
* \brief Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF
*/
bool is_eof() const {
return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF;
}
/**
* Gets the topic that this message belongs to
* \brief Gets the topic that this message belongs to
*/
std::string get_topic() const {
assert(handle_);
@@ -107,7 +112,7 @@ public:
}
/**
* Gets the partition that this message belongs to
* \brief Gets the partition that this message belongs to
*/
int get_partition() const {
assert(handle_);
@@ -115,21 +120,40 @@ public:
}
/**
* Gets the message's payload
* \brief Gets the message's payload
*/
const Buffer& get_payload() const {
return payload_;
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
/**
* \brief Gets the message's header list
*/
const HeaderListType& get_header_list() const {
return header_list_;
}
/**
* \brief Detaches the message's header list
*/
template <typename HeaderType>
HeaderList<HeaderType> detach_header_list() {
rd_kafka_headers_t* headers_handle;
Error error = rd_kafka_message_detach_headers(handle_.get(), &headers_handle);
return error ? HeaderList<HeaderType>() : HeaderList<HeaderType>(headers_handle);
}
#endif
/**
* Gets the message's key
* \brief Gets the message's key
*/
const Buffer& get_key() const {
return key_;
}
/**
* Gets the message offset
* \brief Gets the message offset
*/
int64_t get_offset() const {
assert(handle_);
@@ -152,23 +176,31 @@ public:
* If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned.
*/
inline boost::optional<MessageTimestamp> get_timestamp() const;
/**
* \brief Gets the message latency in microseconds as measured from the produce() call.
*/
std::chrono::microseconds get_latency() const {
assert(handle_);
return std::chrono::microseconds(rd_kafka_message_latency(handle_.get()));
}
/**
* Indicates whether this message is valid (not null)
* \brief Indicates whether this message is valid (not null)
*/
explicit operator bool() const {
return handle_ != nullptr;
}
/**
* Gets the rdkafka message handle
* \brief Gets the rdkafka message handle
*/
rd_kafka_message_t* get_handle() const {
return handle_.get();
}
/**
* Internal private const data accessor (internal use only)
* \brief Internal private const data accessor (internal use only)
*/
InternalPtr internal() const {
return internal_;
@@ -185,6 +217,9 @@ private:
HandlePtr handle_;
Buffer payload_;
Buffer key_;
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
HeaderListType header_list_;
#endif
void* user_data_;
InternalPtr internal_;
};

View File

@@ -35,6 +35,7 @@
#include "topic.h"
#include "macros.h"
#include "message.h"
#include "header_list.h"
namespace cppkafka {
@@ -44,6 +45,10 @@ namespace cppkafka {
template <typename BufferType, typename Concrete>
class BasicMessageBuilder {
public:
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
using HeaderType = Header<BufferType>;
using HeaderListType = HeaderList<HeaderType>;
#endif
/**
* Construct a BasicMessageBuilder
*
@@ -99,6 +104,15 @@ public:
*/
Concrete& key(BufferType&& value);
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
/**
* Add a header to the message
*
* \param header The header to be used
*/
Concrete& header(const HeaderType& header);
#endif
/**
* Sets the message's payload
*
@@ -146,7 +160,19 @@ public:
* Gets the message's key
*/
BufferType& key();
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
/**
* Gets the list of headers
*/
const HeaderListType& header_list() const;
/**
* Gets the list of headers
*/
HeaderListType& header_list();
#endif
/**
* Gets the message's payload
*/
@@ -180,6 +206,9 @@ private:
std::string topic_;
int partition_{-1};
BufferType key_;
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
HeaderListType header_list_;
#endif
BufferType payload_;
std::chrono::milliseconds timestamp_{0};
void* user_data_;
@@ -237,6 +266,17 @@ C& BasicMessageBuilder<T, C>::key(T&& value) {
return get_concrete();
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::header(const HeaderType& header) {
if (!header_list_) {
header_list_ = HeaderListType(5);
}
header_list_.add(header);
return get_concrete();
}
#endif
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::payload(const T& value) {
get_concrete().construct_buffer(payload_, value);
@@ -281,6 +321,20 @@ T& BasicMessageBuilder<T, C>::key() {
return key_;
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
template <typename T, typename C>
const typename BasicMessageBuilder<T, C>::HeaderListType&
BasicMessageBuilder<T, C>::header_list() const {
return header_list_;
}
template <typename T, typename C>
typename BasicMessageBuilder<T, C>::HeaderListType&
BasicMessageBuilder<T, C>::header_list() {
return header_list_;
}
#endif
template <typename T, typename C>
const T& BasicMessageBuilder<T, C>::payload() const {
return payload_;
@@ -338,7 +392,12 @@ C& BasicMessageBuilder<T, C>::get_concrete() {
*/
class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {
public:
using Base = BasicMessageBuilder<Buffer, MessageBuilder>;
using BasicMessageBuilder::BasicMessageBuilder;
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
using HeaderType = Base::HeaderType;
using HeaderListType = Base::HeaderListType;
#endif
void construct_buffer(Buffer& lhs, const Buffer& rhs) {
lhs = Buffer(rhs.get_data(), rhs.get_size());

View File

@@ -113,6 +113,7 @@ public:
* \param builder The builder class used to compose a message
*/
void produce(const MessageBuilder& builder);
void produce(MessageBuilder&& builder);
/**
* \brief Produces a message
@@ -120,6 +121,7 @@ public:
* \param message The message to be produced
*/
void produce(const Message& message);
void produce(Message&& message);
/**
* \brief Polls on this handle
@@ -157,6 +159,15 @@ public:
*/
void flush(std::chrono::milliseconds timeout);
private:
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
void do_produce(const MessageBuilder& builder, MessageBuilder::HeaderListType&& headers);
void do_produce(const Message& message, MessageBuilder::HeaderListType&& headers);
#else
void do_produce(const MessageBuilder& builder);
void do_produce(const Message& message);
#endif
// Members
PayloadPolicy message_payload_policy_;
};

View File

@@ -34,6 +34,7 @@
using std::string;
using std::equal;
using std::lexicographical_compare;
using std::ostream;
using std::hex;
using std::dec;
@@ -101,4 +102,22 @@ bool operator!=(const Buffer& lhs, const Buffer& rhs) {
return !(lhs == rhs);
}
bool operator<(const Buffer& lhs, const Buffer& rhs) {
return lexicographical_compare(lhs.get_data(), lhs.get_data() + lhs.get_size(),
rhs.get_data(), rhs.get_data() + rhs.get_size());
}
bool operator>(const Buffer& lhs, const Buffer& rhs) {
return lexicographical_compare(rhs.get_data(), rhs.get_data() + rhs.get_size(),
lhs.get_data(), lhs.get_data() + lhs.get_size());
}
bool operator<=(const Buffer& lhs, const Buffer& rhs) {
return !(lhs > rhs);
}
bool operator>=(const Buffer& lhs, const Buffer& rhs) {
return !(lhs < rhs);
}
} // cppkafka

View File

@@ -29,6 +29,7 @@
#include <sstream>
#include <algorithm>
#include <cctype>
#include "macros.h"
#include "consumer.h"
#include "exceptions.h"
#include "logging.h"
@@ -48,10 +49,8 @@ using std::allocator;
namespace cppkafka {
// See: https://github.com/edenhill/librdkafka/issues/1792
const int rd_kafka_queue_refcount_bug_version = 0x000b0500;
Queue Consumer::get_queue(rd_kafka_queue_t* handle) {
if (rd_kafka_version() <= rd_kafka_queue_refcount_bug_version) {
if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) {
return Queue::make_non_owning(handle);
}
else {

View File

@@ -26,7 +26,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "message.h"
#include "message_internal.h"
@@ -63,6 +63,16 @@ Message::Message(HandlePtr handle)
payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()),
key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()),
user_data_(handle_ ? handle_->_private : nullptr) {
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
// get the header list if any
if (handle_) {
rd_kafka_headers_t* headers_handle;
Error error = rd_kafka_message_headers(handle_.get(), &headers_handle);
if (!error) {
header_list_ = HeaderListType::make_non_owning(headers_handle);
}
}
#endif
}
Message& Message::load_internal() {

View File

@@ -64,39 +64,44 @@ Producer::PayloadPolicy Producer::get_payload_policy() const {
return message_payload_policy_;
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
void Producer::produce(const MessageBuilder& builder) {
const Buffer& payload = builder.payload();
const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_);
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(builder.user_data()),
RD_KAFKA_V_END);
check_error(result);
do_produce(builder, MessageBuilder::HeaderListType(builder.header_list())); //copy headers
}
void Producer::produce(MessageBuilder&& builder) {
do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers
}
void Producer::produce(const Message& message) {
const Buffer& payload = message.get_payload();
const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_);
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_TIMESTAMP(duration),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(message.get_user_data()),
RD_KAFKA_V_END);
check_error(result);
do_produce(message, HeaderList<Message::HeaderType>(message.get_header_list())); //copy headers
}
void Producer::produce(Message&& message) {
do_produce(message, message.detach_header_list<Message::HeaderType>()); //move headers
}
#else
void Producer::produce(const MessageBuilder& builder) {
do_produce(builder);
}
void Producer::produce(MessageBuilder&& builder) {
do_produce(builder);
}
void Producer::produce(const Message& message) {
do_produce(message);
}
void Producer::produce(Message&& message) {
do_produce(message);
}
#endif
int Producer::poll() {
return poll(get_timeout());
}
@@ -114,4 +119,80 @@ void Producer::flush(milliseconds timeout) {
check_error(result);
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
void Producer::do_produce(const MessageBuilder& builder,
MessageBuilder::HeaderListType&& headers) {
const Buffer& payload = builder.payload();
const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_);
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(builder.user_data()),
RD_KAFKA_V_END);
check_error(result);
}
void Producer::do_produce(const Message& message,
MessageBuilder::HeaderListType&& headers) {
const Buffer& payload = message.get_payload();
const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_);
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_TIMESTAMP(duration),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(message.get_user_data()),
RD_KAFKA_V_END);
check_error(result);
}
#else
void Producer::do_produce(const MessageBuilder& builder) {
const Buffer& payload = builder.payload();
const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_);
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(builder.user_data()),
RD_KAFKA_V_END);
check_error(result);
}
void Producer::do_produce(const Message& message) {
const Buffer& payload = message.get_payload();
const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_);
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_TIMESTAMP(duration),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(message.get_user_data()),
RD_KAFKA_V_END);
check_error(result);
}
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
} // cppkafka

View File

@@ -9,8 +9,7 @@ add_custom_target(tests)
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
add_executable(
cppkafka_tests
add_executable(cppkafka_tests
buffer_test.cpp
compacted_topic_processor_test.cpp
configuration_test.cpp
@@ -19,6 +18,7 @@ add_executable(
producer_test.cpp
consumer_test.cpp
roundrobin_poll_test.cpp
headers_test.cpp
# Main file
test_main.cpp

View File

@@ -195,7 +195,7 @@ TEST_CASE("consumer throttle", "[consumer]") {
if (callback_executed_count == 3) {
return Message();
}
return move(msg);
return msg;
},
[&](ConsumerDispatcher::Timeout) {
if (callback_executed_count == 3) {

226
tests/headers_test.cpp Normal file
View File

@@ -0,0 +1,226 @@
#include <vector>
#include <thread>
#include <set>
#include <mutex>
#include <chrono>
#include <iterator>
#include <condition_variable>
#include <catch.hpp>
#include "cppkafka/consumer.h"
#include "cppkafka/producer.h"
#include "cppkafka/header_list.h"
#include "test_utils.h"
using std::vector;
using std::move;
using std::string;
using std::thread;
using std::set;
using std::mutex;
using std::tie;
using std::condition_variable;
using std::lock_guard;
using std::unique_lock;
using std::make_move_iterator;
using std::chrono::seconds;
using std::chrono::milliseconds;
using std::chrono::system_clock;
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
using namespace cppkafka;
using StringHeader = Header<std::string>;
using BufferHeader = Header<Buffer>;
TEST_CASE("creation", "[headers]") {
SECTION("empty") {
HeaderList<StringHeader> list;
REQUIRE(!!list == false);
}
SECTION("default") {
HeaderList<StringHeader> list(2);
REQUIRE(!!list == true);
REQUIRE(list.size() == 0);
REQUIRE(list.empty() == true);
REQUIRE(list.get_handle() != nullptr);
}
SECTION("from handle") {
HeaderList<StringHeader> list(rd_kafka_headers_new(1));
REQUIRE(!!list == true);
REQUIRE(list.size() == 0);
REQUIRE(list.empty() == true);
REQUIRE(list.get_handle() != nullptr);
}
}
TEST_CASE("release", "[headers]") {
HeaderList<StringHeader> list(2);
auto handle = list.release_handle();
REQUIRE(handle != nullptr);
REQUIRE(list.release_handle() == nullptr); //release again
REQUIRE(!!list == false);
rd_kafka_headers_destroy(handle);
}
TEST_CASE("modify", "[headers]") {
SECTION("add") {
HeaderList<StringHeader> list(10);
//empty header name
list.add({{}, "payload1"});
//empty payload
list.add({"header2", {}});
list.add({"header3", "payload3"});
//both null
list.add({{}, {}});
//both empty (0-length strings)
list.add({"", ""});
//validate
REQUIRE(list.size() == 5);
REQUIRE_FALSE(list.empty());
//access a header
REQUIRE(list.at(1).get_name() == "header2");
REQUIRE(list.at(1).get_value().empty());
REQUIRE(list.at(2).get_value() == "payload3");
}
SECTION("remove") {
HeaderList<StringHeader> list(10);
//empty header name
list.add({{}, "payload1"});
//empty payload
list.add({"header2", {}});
list.add({"header3", "payload3"});
//both null
list.add({{}, {}});
//both empty (0 length strings)
list.add({"", ""});
//Remove a bogus name
Error err = list.remove("bogus");
REQUIRE(err.get_error() == RD_KAFKA_RESP_ERR__NOENT);
//Remove header with name
list.remove("header2");
REQUIRE(list.size() == 4);
list.remove("header3");
REQUIRE(list.size() == 3);
//Remove headers without name
list.remove({});
REQUIRE(list.size() == 0);
}
}
TEST_CASE("copy and move", "[headers]") {
SECTION("copy owning") {
//Create an owning header list and copy it
HeaderList<StringHeader> list(3), list2(3);
list.add({"header1", "payload1"});
list.add({"header2", "payload2"});
list.add({"header3", "payload3"});
REQUIRE(list2.size() == 0);
list2 = list;
REQUIRE(list2.size() == 3);
REQUIRE(list2.size() == list.size());
//make sure the handles are different
CHECK(list.get_handle() != list2.get_handle());
CHECK(list.at(0) == list2.at(0));
CHECK(list.at(1) == list2.at(1));
CHECK(list.at(2) == list2.at(2));
CHECK(list == list2);
}
SECTION("copy owning with buffers") {
//Create an owning header list and copy it
HeaderList<BufferHeader> list(3), list2(3);
string payload1 = "payload1", payload2 = "payload2", payload3 = "payload3";
list.add({"header1", payload1});
list.add({"header2", payload2});
list.add({"header3", payload3});
REQUIRE(list2.size() == 0);
list2 = list;
REQUIRE(list2.size() == 3);
REQUIRE(list2.size() == list.size());
//make sure the handles are different
CHECK(list.get_handle() != list2.get_handle());
CHECK(list.at(0) == list2.at(0));
CHECK(list.at(1) == list2.at(1));
CHECK(list.at(2) == list2.at(2));
CHECK(list == list2);
}
SECTION("copy non-owning") {
//Create an owning header list and copy it
HeaderList<StringHeader> list(3), list2(3), list3(HeaderList<StringHeader>::make_non_owning(list.get_handle()));
list.add({"header1", "payload1"});
list.add({"header2", "payload2"});
list.add({"header3", "payload3"});
list2 = list3; //copy non-owning list
REQUIRE(list.size() == 3);
REQUIRE(list3.size() == list.size());
REQUIRE(list2.size() == list.size());
//make sure the handles are the same
CHECK(list2.get_handle() == list3.get_handle());
CHECK(list2.at(0) == list3.at(0));
CHECK(list2.at(1) == list3.at(1));
CHECK(list2.at(2) == list3.at(2));
CHECK(list2 == list3);
}
SECTION("move") {
HeaderList<StringHeader> list(3), list2;
list.add({"header1", "payload1"});
list.add({"header2", "payload2"});
list.add({"header3", "payload3"});
auto handle = list.get_handle();
list2 = std::move(list);
CHECK_FALSE(!!list);
CHECK(!!list2);
CHECK(list2.size() == 3);
CHECK(handle == list2.get_handle());
}
}
TEST_CASE("access", "[headers]") {
HeaderList<StringHeader> list(3);
list.add({"header1", "payload1"});
list.add({"header2", "payload2"});
list.add({"header3", "payload3"});
CHECK(list.at(0).get_value() == "payload1");
CHECK(list.at(1).get_value() == "payload2");
CHECK(list.at(2).get_value() == "payload3");
CHECK_THROWS_AS(list.at(3), Exception);
CHECK(list.front() == list.at(0));
CHECK(list.back() == list.at(2));
}
TEST_CASE("iterate", "[headers]") {
HeaderList<StringHeader> list(3);
REQUIRE(list.begin() == list.end());
list.add({"header1", "payload1"});
REQUIRE(list.begin() != list.end());
CHECK(++list.begin() == list.end());
list.add({"header2", "payload2"});
list.add({"header3", "payload3"});
int i = 0;
for (auto it = list.begin(); it != list.end(); ++it, ++i) {
CHECK(it->get_name().length() == 7);
if (i == 0) {
CHECK(it->get_name() == "header1");
}
else if (i == 1) {
CHECK(it->get_name() == "header2");
}
else if (i == 2) {
CHECK(it->get_name() == "header3");
}
}
//rewind end() iterator
CHECK((--list.end())->get_name() == "header3");
}
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION

View File

@@ -184,6 +184,43 @@ TEST_CASE("simple production", "[producer]") {
CHECK(message.get_timestamp()->get_timestamp() == timestamp);
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
SECTION("message with key and move-able headers") {
using Hdr = MessageBuilder::HeaderType;
const string payload = "Hello world! 2";
const string key = "such key";
const string header1, header2 = "", header3 = "header3";
const milliseconds timestamp{15};
Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
.key(key)
.payload(payload)
.timestamp(timestamp)
.header(Hdr{})
.header(Hdr{"", header2})
.header(Hdr{"header3", header3}));
runner.try_join();
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == 1);
const auto& message = messages[0];
CHECK(message.get_payload() == payload);
CHECK(message.get_key() == key);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false);
REQUIRE(!!message.get_timestamp() == true);
CHECK(message.get_timestamp()->get_timestamp() == timestamp);
//validate headers
REQUIRE(!!message.get_header_list());
REQUIRE(message.get_header_list().size() == 3);
CHECK(message.get_header_list().front() == Hdr{});
CHECK(message.get_header_list().at(1) == Hdr{"", header2});
CHECK(message.get_header_list().back() == Hdr{"header3", header3});
}
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
SECTION("message without message builder") {
const string payload = "Goodbye cruel world!";
const string key = "replay key";
@@ -315,6 +352,52 @@ TEST_CASE("multiple messages", "[producer]") {
}
}
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") {
using Hdr = MessageBuilder::HeaderType;
size_t message_count = 2;
string payload = "Hello world with headers";
const string header1, header2 = "", header3 = "header3";
// Create a consumer and subscribe to this topic
Consumer consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS);
// Now create a producer and produce a message
Producer producer(make_producer_config());
MessageBuilder builder(KAFKA_TOPICS[0]);
builder.payload(payload)
.header(Hdr{})
.header(Hdr{"", header2})
.header(Hdr{"header3", header3});
producer.produce(builder);
producer.produce(builder);
//Check we still have the messages after production
CHECK(!!builder.header_list());
CHECK(builder.header_list().size() == 3);
runner.try_join();
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == message_count);
const auto& message = messages[0];
CHECK(message.get_payload() == payload);
CHECK(!!message.get_error() == false);
//validate headers
REQUIRE(!!message.get_header_list());
REQUIRE(message.get_header_list().size() == 3);
CHECK(message.get_header_list().front() == Hdr{});
CHECK(message.get_header_list().at(1) == Hdr{"", header2});
CHECK(message.get_header_list().back() == Hdr{"header3", header3});
//validate second message
CHECK(messages[0].get_header_list() == messages[1].get_header_list());
CHECK(messages[0].get_header_list().get_handle() != messages[1].get_header_list().get_handle());
}
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") {
size_t message_count = 10;
set<string> payloads;