diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 80a9f67..8454a4c 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -206,13 +206,6 @@ public: */ ssize_t get_max_buffer_size() const; - /** - * \brief Get the number of unsent messages in the buffer - * - * \return The number of messages - */ - size_t get_buffer_size() const; - /** * \brief Get the number of messages not yet acked by the broker * @@ -515,7 +508,6 @@ void BufferedProducer::produce_message(const MessageType& message) { template Configuration BufferedProducer::prepare_configuration(Configuration config) { using std::placeholders::_2; - delivery_report_callback_ = config.get_delivery_report_callback(); auto callback = std::bind(&BufferedProducer::on_delivery_report, this, _2); config.set_delivery_report_callback(std::move(callback)); return config; diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 85ae866..007f2e6 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -52,7 +52,7 @@ void producer_run(BufferedProducer& producer, int& exit_flag, condition_variable& clear, int num_messages, int partition) { - MessageBuilder builder(KAFKA_TOPIC); + MessageBuilder builder(KAFKA_TOPICS[0]); string key("wassup?"); string payload("nothing much!"); @@ -145,7 +145,7 @@ TEST_CASE("simple production", "[producer]") { const string key = "replay key"; const milliseconds timestamp{15}; Producer producer(config); - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition) + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) .key(key) .payload(payload) .timestamp(timestamp)); @@ -165,7 +165,7 @@ TEST_CASE("simple production", "[producer]") { const auto& message = messages[0]; CHECK(message.get_payload() == payload); CHECK(message.get_key() == key); - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); REQUIRE(!!message.get_timestamp() == true); @@ -316,7 +316,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) }); ConsumerRunner runner(consumer, 3, 1); // Now create a buffered producer and produce two messages @@ -329,7 +329,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") { // Limit the size of the internal buffer producer.set_max_buffer_size(num_messages-1); while (num_messages--) { - producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload)); + producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).key(key).payload(payload)); } REQUIRE(producer.get_buffer_size() == 1); @@ -351,7 +351,7 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) }); ConsumerRunner runner(consumer, num_messages, 1); BufferedProducer producer(make_producer_config());