Apache Kafka Load Testing with JMeter
Kafka works well as a replacement for a more traditional message broker. In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.
In Distributed Microservice Architecture Kafka is used for non-blocking communication between the services, handling enormous volumes of real-time data streams generated by systems like IoT, Recommender system etc. there are plethora of other use cases where Kafka can be utilized.
Testing of Kafka validates if Kafka cluster is able to handle current load without any failures. Most of time testing Kafka, is testing the whole application as Kafka is really fast in terms accepting messages and consumer applications which consume messages from Kafka topics lead to overall high processing times.
Now coming on testing Kaka we are supposed to send\consume huge number of messages. If you look from Kafka Producer and Consumers Application perspective, there are connected to Kafka once and send or consume messages based on the flow. Same session is reused, I am trying to replicate similar to the actual.
I am using JMeter for sending the messages to Kafka. There is a third party Sampler(pepper-box) that provides support of Producing and Consuming messages from Kafka which doesn't match with my requirement. So I have implement logic used by developers to connect and send messages to Kakfa using JMeter JSR223 sampler.
To send\consume messages using Kafka I am using same kafka-clients Libray (kafka-clients-2.5.0.jar) used in the application. Download Jar file from Maven Repository and place it under lib\ext.
Now lets use the methods present from Kafka library to create load. I am using sample code from official docs, you can implement code according to your requirement.
Snippet from Docs:
To Run Java code in JMeter I am using JSR223 Sampler with Language set to goovy.
Imports:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Connection Properties: Add attributes as per your requirement to establishing connection.
Properties props = new Properties();
props.put("bootstrap.servers", "HOSTNAME:PORT");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("bootstrap.servers", "HOSTNAME:PORT");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Establish connection:
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
Initialize Data:
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC Name, KEY, VALUE);
Example:
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test", Integer.toString(i), "value"+${__Random(1,100000000,)});
Send Data to Kafka:
producer.send(producerRecord);
Close Connection:
producer.close();
But if use the code without iterations it would lead to more connections rather than reusing the same connection, so we need a loop to do so. We also need to check if there is any failure during bulk message push. Below is the final code that is needed to do so.
Final code:
<Import Dependencies>
//Initialize Kafka Producer
KafkaProducer<String, String> producer = null;
try {
<Connection Properties>
<Establish Connection>
//Declare number of messages
int sample=1000; //Number of messages to be sent (Sub Iterations)
int fail=0; //Variable to track failures during message sending
//Sub loop to send messages
for(int i=0;i<sample;i++) {
try {
<Set Data>
<Send Message to Kafka>
} catch (Exception e) {
log.error("" + e);
log.error("Error occurred when sending message to Kafka");
fail=fail+1;
}
//Set results as per executions
if(fail>0) {
SampleResult.setErrorCount(fail); //Set error number of error messages to sampler result
SampleResult.setSuccessful(false);
} else {
SampleResult.setSuccessful(true);
}
}
//Remove variables references
props=null;
} catch (Exception e) {
log.error("" + e);
log.error("Failed to connect to Kafka");
SampleResult.setSuccessful(false);
}finally {
if (producer != null) {
try {
<Close Connection>
//Remove variables references
producer=null;
} catch (Exception ex) {
log.error("" + e);
log.error("Failed to close connection");
}
}
}
I have updated Sampler results failure count and success status using JMeter API (SampleResult) from Sampler code. Thus results that are reported from JMeter needs to be updated in regards to number of iterations.
Before we start the test, we have to disable GUI logging to JMeter. I have observed that if GUI logging is enabled, Kafka client JAR is logging connection details to the GUI which is leading to Memory leak. The issue will not appear if test is run in Non-GUI mode, bug has been raised for the same. If you want to run in GUI mode disable the GUI logging by commenting the below.
Disabled GUI logging config from log4j2.xml which is present in JMeter bin folder
<Loggers>
<Root level="info">
<AppenderRef ref="jmeter-log" />
<!--<AppenderRef ref="gui-log-event" />-->
</Root>
</Loggers>
<Root level="info">
<AppenderRef ref="jmeter-log" />
<!--<AppenderRef ref="gui-log-event" />-->
</Root>
</Loggers>
Add the below properties to user.properties
jmeter.loggerpanel.maxlength=1000
jmeter.loggerpanel.enable_when_closed=false
Sampler JMeter Script for testing Kafka (GitHub)
Below is the sample HTML results to be updated with reference to Iterations inside the sampler.
Sample results from 25 Users test running for 5 minutes with 1000 messages per Sample
Modified Calculation Formula (Sub Iteration = Iterations in below formula)
Final Results
Stats from Kafkadrop
Messages: