mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 11:07:56 +00:00
Remove valgrind warning
This commit is contained in:
@@ -206,13 +206,6 @@ public:
|
|||||||
*/
|
*/
|
||||||
ssize_t get_max_buffer_size() const;
|
ssize_t get_max_buffer_size() const;
|
||||||
|
|
||||||
/**
|
|
||||||
* \brief Get the number of unsent messages in the buffer
|
|
||||||
*
|
|
||||||
* \return The number of messages
|
|
||||||
*/
|
|
||||||
size_t get_buffer_size() const;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Get the number of messages not yet acked by the broker
|
* \brief Get the number of messages not yet acked by the broker
|
||||||
*
|
*
|
||||||
@@ -515,7 +508,6 @@ void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
|
|||||||
template <typename BufferType>
|
template <typename BufferType>
|
||||||
Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration config) {
|
Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration config) {
|
||||||
using std::placeholders::_2;
|
using std::placeholders::_2;
|
||||||
delivery_report_callback_ = config.get_delivery_report_callback();
|
|
||||||
auto callback = std::bind(&BufferedProducer<BufferType>::on_delivery_report, this, _2);
|
auto callback = std::bind(&BufferedProducer<BufferType>::on_delivery_report, this, _2);
|
||||||
config.set_delivery_report_callback(std::move(callback));
|
config.set_delivery_report_callback(std::move(callback));
|
||||||
return config;
|
return config;
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ void producer_run(BufferedProducer<string>& producer,
|
|||||||
int& exit_flag, condition_variable& clear,
|
int& exit_flag, condition_variable& clear,
|
||||||
int num_messages,
|
int num_messages,
|
||||||
int partition) {
|
int partition) {
|
||||||
MessageBuilder builder(KAFKA_TOPIC);
|
MessageBuilder builder(KAFKA_TOPICS[0]);
|
||||||
string key("wassup?");
|
string key("wassup?");
|
||||||
string payload("nothing much!");
|
string payload("nothing much!");
|
||||||
|
|
||||||
@@ -145,7 +145,7 @@ TEST_CASE("simple production", "[producer]") {
|
|||||||
const string key = "replay key";
|
const string key = "replay key";
|
||||||
const milliseconds timestamp{15};
|
const milliseconds timestamp{15};
|
||||||
Producer producer(config);
|
Producer producer(config);
|
||||||
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition)
|
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
|
||||||
.key(key)
|
.key(key)
|
||||||
.payload(payload)
|
.payload(payload)
|
||||||
.timestamp(timestamp));
|
.timestamp(timestamp));
|
||||||
@@ -165,7 +165,7 @@ TEST_CASE("simple production", "[producer]") {
|
|||||||
const auto& message = messages[0];
|
const auto& message = messages[0];
|
||||||
CHECK(message.get_payload() == payload);
|
CHECK(message.get_payload() == payload);
|
||||||
CHECK(message.get_key() == key);
|
CHECK(message.get_key() == key);
|
||||||
CHECK(message.get_topic() == KAFKA_TOPIC);
|
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
|
||||||
CHECK(message.get_partition() == partition);
|
CHECK(message.get_partition() == partition);
|
||||||
CHECK(!!message.get_error() == false);
|
CHECK(!!message.get_error() == false);
|
||||||
REQUIRE(!!message.get_timestamp() == true);
|
REQUIRE(!!message.get_timestamp() == true);
|
||||||
@@ -316,7 +316,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") {
|
|||||||
|
|
||||||
// Create a consumer and assign this topic/partition
|
// Create a consumer and assign this topic/partition
|
||||||
Consumer consumer(make_consumer_config());
|
Consumer consumer(make_consumer_config());
|
||||||
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
|
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
|
||||||
ConsumerRunner runner(consumer, 3, 1);
|
ConsumerRunner runner(consumer, 3, 1);
|
||||||
|
|
||||||
// Now create a buffered producer and produce two messages
|
// Now create a buffered producer and produce two messages
|
||||||
@@ -329,7 +329,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") {
|
|||||||
// Limit the size of the internal buffer
|
// Limit the size of the internal buffer
|
||||||
producer.set_max_buffer_size(num_messages-1);
|
producer.set_max_buffer_size(num_messages-1);
|
||||||
while (num_messages--) {
|
while (num_messages--) {
|
||||||
producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload));
|
producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).key(key).payload(payload));
|
||||||
}
|
}
|
||||||
REQUIRE(producer.get_buffer_size() == 1);
|
REQUIRE(producer.get_buffer_size() == 1);
|
||||||
|
|
||||||
@@ -351,7 +351,7 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") {
|
|||||||
|
|
||||||
// Create a consumer and assign this topic/partition
|
// Create a consumer and assign this topic/partition
|
||||||
Consumer consumer(make_consumer_config());
|
Consumer consumer(make_consumer_config());
|
||||||
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
|
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
|
||||||
ConsumerRunner runner(consumer, num_messages, 1);
|
ConsumerRunner runner(consumer, num_messages, 1);
|
||||||
|
|
||||||
BufferedProducer<string> producer(make_producer_config());
|
BufferedProducer<string> producer(make_producer_config());
|
||||||
|
|||||||
Reference in New Issue
Block a user