May 23, 2024


Simply Consistent

How to Set Up Kafka Integration Test – Grape Up

Do you take into consideration unit testing as not plenty of alternative for retaining the application’s reliability and balance? Are you scared that by some means or somewhere there is a potential bug hiding in the assumption that device tests must go over all situations? And also is mocking Kafka not more than enough for project requirements? If even one particular remedy is  ‘yes’, then welcome to a nice and simple guideline on how to established up Integration Assessments for Kafka using TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open up-resource Java library specialized in furnishing all essential solutions for the integration and tests of exterior sources. It suggests that we are in a position to mimic an actual database, world wide web server, or even an party bus atmosphere and address that as a responsible position to check app performance. All these extravagant characteristics are hooked into docker photos, defined as containers. Do we require to exam the databases layer with real MongoDB? No worries, we have a exam container for that. We can not also neglect about UI checks – Selenium Container will do anything that we in fact require.
In our scenario, we will focus on Kafka Testcontainer.

What is Embedded Kafka?

As the name implies, we are likely to deal with an in-memory Kafka instance, all set to be made use of as a regular broker with whole performance. It makes it possible for us to function with producers and individuals, as normal, generating our integration exams lightweight. 

Ahead of we begin

The thought for our exam is uncomplicated – I would like to take a look at Kafka customer and producer working with two unique ways and examine how we can make use of them in real circumstances. 

Kafka Messages are serialized employing Avro schemas.

Embedded Kafka – Producer Check

The notion is straightforward – let us generate a uncomplicated undertaking with the controller, which invokes a services process to force a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also value mentioning excellent plugin for Avro. In this article plugins section:

id 'org.springframework.boot' version '2.6.8'
id 'io.spring.dependency-management' model '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3."

Avro Plugin supports schema automobile-creating. This is a ought to-have.

Website link to plugin:

Now let’s outline the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "kind": "report",
  "identify": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be focused only on sending messages to Kafka making use of a template, almost nothing fascinating about that aspect. Major performance can be done just applying this line:

ListenableFuture> long run = this.kafkaTemplate.deliver("sign up-request", kafkaMessage)

We can not forget about about examination houses:

    allow for-bean-definition-overriding: true
      group-id: team_id
      automobile-offset-reset: earliest
      important-deserializer: org.apache.kafka.popular.serialization.StringDeserializer
      value-deserializer: com.grapeup.myawesome.myawesomeconsumer.popular.CustomKafkaAvroDeserializer
      automobile.sign up.schemas: accurate
      critical-serializer: org.apache.kafka.prevalent.serialization.StringSerializer
      benefit-serializer: com.grapeup.myawesome.myawesomeconsumer.common.CustomKafkaAvroSerializer
      precise.avro.reader: true

As we see in the stated take a look at houses, we declare a custom made deserializer/serializer for KafkaMessages. It is extremely advisable to use Kafka with Avro – really don’t permit JSONs keep object composition, let’s use civilized mapper and item definition like Avro.


general public course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        super(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        tremendous(new MockSchemaRegistryClient(), props)


community course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    community CustomKafkaAvroSerializer(SchemaRegistryClient shopper) 
        tremendous(new MockSchemaRegistryClient())

    community CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props) 
        tremendous(new MockSchemaRegistryClient(), props)

And we have every thing to start off writing our exam.

@TestInstance(TestInstance.Lifecycle.For each_Course)
@EmbeddedKafka(partitions = 1, matters = "sign up-request")
course ProducerControllerTest {

All we need to have to do is increase @EmbeddedKafka annotation with stated matters and partitions. Software Context will boot Kafka Broker with delivered configuration just like that. Keep in thoughts that @TestInstance need to be applied with special thought. Lifecycle.Per_Class will prevent generating the very same objects/context for just about every take a look at technique. Really worth checking if assessments are much too time-consuming.

Buyer consumerServiceTest
void Set up() 
DefaultKafkaConsumerFactory shopper = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = purchaser.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject matter_Name))

Right here we can declare the examination purchaser, dependent on the Avro schema return sort. All Kafka properties are previously supplied in the .yml file. That client will be employed as a examine if the producer really pushed a concept.

Below is the true examination technique:

@Take a look at
void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto ask for = RegisterRequestDto.builder()

                post("/register-ask for")
                      .written content(objectMapper.writeValueAsBytes(request)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Subject matter_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.benefit()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Initially of all, we use MockMvc to complete an motion on our endpoint. That endpoint employs ProducerService to push messages to Kafka. KafkaConsumer is utilized to validate if the producer labored as expected. And that’s it – we have a fully doing work check with embedded Kafka.

Test Containers – Client Check

TestContainers are nothing else like impartial docker illustrations or photos completely ready for staying dockerized. The subsequent take a look at scenario will be improved by a MongoDB picture. Why not maintain our knowledge in the database correct just after nearly anything transpired in Kafka flow?

Dependencies are not substantially different than in the former case in point. The adhering to actions are desired for check containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

set('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s aim now on the Customer component. The check scenario will be basic – one particular client support will be liable for having the Kafka information and storing the parsed payload in the MongoDB selection. All that we will need to know about KafkaListeners, for now, is that annotation:

@KafkaListener(subjects = "sign up-ask for")

By the operation of the annotation processor, KafkaListenerContainerFactory will be accountable to create a listener on our strategy. From this second our process will respond to any impending Kafka concept with the mentioned subject.

Avro serializer and deserializer configs are the exact as in the former exam.

Regarding TestContainer, we should start out with the subsequent annotations:

community class AbstractIntegrationTest {

Through startup, all configured TestContainers modules will be activated. It indicates that we will get access to the comprehensive running environment of the picked supply. As case in point:

personal KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

general public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a end result of booting the take a look at, we can assume two docker containers to start with the provided configuration.

What is definitely crucial for the mongo container – it provides us entire entry to the databases employing just a easy connection uri. With these a element, we are equipped to consider a appear what is the latest condition in our collections, even during debug method and ready breakpoints.
Acquire a search also at the Ryuk container – it operates like overwatch and checks if our containers have started off effectively.

And right here is the very last aspect of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.increase("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("spring.kafka.shopper.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.include("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.include("", mongoDBContainer::getReplicaSetUrl)

   kafkaContainer.get started()
   mongoDBContainer.get started()


public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource offers us the solution to set all essential natural environment variables all through the test lifecycle. Strongly essential for any config functions for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each listener to get expected partitions for the duration of container startup.

And the previous part of the Kafka test containers journey – the major overall body of the exam:

public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("sign up-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").make())

   //Hold out for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().dimension())

non-public KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

personal void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   test (KafkaProducer producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord record = new ProducerRecord<>(topicName, registerRequest)

The custom producer is accountable for creating our information to KafkaBroker. Also, it is advised to give some time for individuals to tackle messages correctly. As we see, the message was not just eaten by the listener, but also stored in the MongoDB collection.


As we can see, present methods for integration assessments are quite simple to put into practice and manage in assignments. There is no level in keeping just device assessments and counting on all strains lined as a signal of code/logic high quality. Now the question is, need to we use an Embedded solution or TestContainers? I advise initial of all focusing on the term “Embedded”. As a ideal integration examination, we want to get an nearly best duplicate of the production ecosystem with all qualities/options involved. In-memory remedies are very good, but typically, not sufficient for substantial company assignments. Surely, the edge of Embedded expert services is the uncomplicated way to apply these tests and keep configuration, just when anything at all transpires in memory.
TestContainers at the 1st sight might glance like overkill, but they give us the most critical aspect, which is a individual setting. We really don’t have to even count on existing docker pictures – if we want we can use personalized kinds. This is a enormous improvement for probable examination eventualities.
What about Jenkins? There is no rationale to be worried also to use TestContainers in Jenkins. I firmly endorse examining TestContainers documentation on how very easily we can set up the configuration for Jenkins agents.
To sum up – if there is no blocker or any undesirable affliction for making use of TestContainers, then don’t hesitate. It is constantly superior to preserve all expert services managed and secured with integration test contracts.