September 26, 2023


Unlimited Technology

How to Set Up Kafka Integration Test – Grape Up

How to Set Up Kafka Integration Test – Grape Up

Do you consider device screening as not plenty of resolution for preserving the application’s trustworthiness and steadiness? Are you frightened that by some means or someplace there is a prospective bug hiding in the assumption that device tests need to cover all instances? And also is mocking Kafka not plenty of for project prerequisites? If even just one reply is  ‘yes’, then welcome to a wonderful and easy manual on how to established up Integration Exams for Kafka making use of TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-source Java library specialised in furnishing all wanted answers for the integration and tests of exterior resources. It means that we are capable to mimic an true databases, web server, or even an event bus environment and deal with that as a responsible location to test app functionality. All these fancy attributes are hooked into docker visuals, defined as containers. Do we require to examination the database layer with true MongoDB? No problems, we have a test container for that. We can not also forget about about UI checks – Selenium Container will do just about anything that we truly will need.
In our scenario, we will emphasis on Kafka Testcontainer.

What is Embedded Kafka?

As the identify indicates, we are going to offer with an in-memory Kafka occasion, prepared to be applied as a standard broker with entire functionality. It makes it possible for us to operate with producers and consumers, as normal, creating our integration checks light-weight. 

Before we get started

The thought for our examination is basic – I would like to check Kafka customer and producer utilizing two different techniques and check out how we can utilize them in precise situations. 

Kafka Messages are serialized utilizing Avro schemas.

Embedded Kafka – Producer Take a look at

The notion is straightforward – let’s make a easy venture with the controller, which invokes a support approach to force a Kafka Avro serialized concept.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also worthy of mentioning great plugin for Avro. Listed here plugins part:

id 'org.springframework.boot' edition '2.6.8'
id 'io.spring.dependency-management' model '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" variation "1.3."

Avro Plugin supports schema automobile-producing. This is a must-have.

Link to plugin:

Now let us outline the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "sort": "report",
  "name": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be concentrated only on sending messages to Kafka employing a template, nothing fascinating about that element. Most important features can be completed just working with this line:

ListenableFuture> long term = this.kafkaTemplate.deliver("sign-up-request", kafkaMessage)

We can’t neglect about take a look at qualities:

    allow-bean-definition-overriding: real
      group-id: group_id
      auto-offset-reset: earliest
      essential-deserializer: org.apache.kafka.frequent.serialization.StringDeserializer
      benefit-deserializer: com.grapeup.myawesome.myawesomeconsumer.prevalent.CustomKafkaAvroDeserializer
      auto.register.schemas: legitimate
      vital-serializer: org.apache.kafka.prevalent.serialization.StringSerializer
      price-serializer: com.grapeup.myawesome.myawesomeconsumer.popular.CustomKafkaAvroSerializer
      specific.avro.reader: real

As we see in the outlined exam properties, we declare a custom deserializer/serializer for KafkaMessages. It is hugely encouraged to use Kafka with Avro – never let JSONs retain item composition, let’s use civilized mapper and object definition like Avro.


community course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    community CustomKafkaAvroSerializer(SchemaRegistryClient customer) 
        tremendous(new MockSchemaRegistryClient())

    general public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        super(new MockSchemaRegistryClient(), props)


general public course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    community CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        tremendous(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props) 
        super(new MockSchemaRegistryClient(), props)

And we have almost everything to begin composing our examination.

@EmbeddedKafka(partitions = 1, subjects = "register-request")
course ProducerControllerTest {

All we need to have to do is increase @EmbeddedKafka annotation with outlined topics and partitions. Software Context will boot Kafka Broker with provided configuration just like that. Continue to keep in mind that @TestInstance ought to be applied with unique thought. Lifecycle.For every_Course will avoid creating the identical objects/context for just about every check strategy. Truly worth examining if exams are much too time-consuming.

Consumer consumerServiceTest
void Setup() 
DefaultKafkaConsumerFactory customer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = customer.createConsumer()

Below we can declare the test consumer, primarily based on the Avro schema return style. All Kafka qualities are by now supplied in the .yml file. That client will be utilized as a check out if the producer really pushed a concept.

Listed here is the real check technique:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto ask for = RegisterRequestDto.builder()

                put up("/register-ask for")
                      .content(objectMapper.writeValueAsBytes(ask for)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Matter_Title)

        RegisterRequest valueReceived = consumedRegisterRequest.benefit()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Very first of all, we use MockMvc to conduct an action on our endpoint. That endpoint works by using ProducerService to drive messages to Kafka. KafkaConsumer is employed to confirm if the producer labored as anticipated. And that’s it – we have a completely working check with embedded Kafka.

Check Containers – Purchaser Test

TestContainers are nothing else like impartial docker visuals ready for currently being dockerized. The pursuing exam circumstance will be increased by a MongoDB graphic. Why not keep our information in the databases right right after just about anything happened in Kafka circulation?

Dependencies are not considerably distinctive than in the prior instance. The following measures are desired for exam containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

established('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s concentrate now on the Customer part. The check case will be uncomplicated – one consumer support will be responsible for receiving the Kafka information and storing the parsed payload in the MongoDB selection. All that we have to have to know about KafkaListeners, for now, is that annotation:

@KafkaListener(topics = "sign up-ask for")

By the functionality of the annotation processor, KafkaListenerContainerFactory will be dependable to develop a listener on our technique. From this second our system will respond to any upcoming Kafka information with the described subject.

Avro serializer and deserializer configs are the similar as in the prior take a look at.

Concerning TestContainer, we really should begin with the subsequent annotations:

@ActiveProfiles("take a look at")
community course AbstractIntegrationTest {

Through startup, all configured TestContainers modules will be activated. It signifies that we will get accessibility to the full functioning setting of the picked source. As instance:

private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a final result of booting the take a look at, we can assume two docker containers to start off with the delivered configuration.

What is really significant for the mongo container – it gives us complete entry to the database utilizing just a easy link uri. With this kind of a element, we are able to take a glimpse what is the existing point out in our collections, even for the duration of debug manner and well prepared breakpoints.
Just take a glimpse also at the Ryuk container – it will work like overwatch and checks if our containers have begun correctly.

And in this article is the very last element of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.increase("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.kafka.purchaser.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("", mongoDBContainer::getReplicaSetUrl)

   kafkaContainer.start out()
   mongoDBContainer.start off()


public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource provides us the solution to established all wanted atmosphere variables throughout the check lifecycle. Strongly desired for any config functions for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each listener to get predicted partitions during container startup.

And the very last component of the Kafka exam containers journey – the key entire body of the exam:

@Take a look at
community void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("sign up-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct())

   //Wait for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().dimension())

private KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

private void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   try (KafkaProducer producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord document = new ProducerRecord<>(topicName, registerRequest)

The custom made producer is responsible for creating our message to KafkaBroker. Also, it is advised to give some time for customers to deal with messages thoroughly. As we see, the message was not just eaten by the listener, but also saved in the MongoDB assortment.


As we can see, existing answers for integration assessments are fairly uncomplicated to carry out and sustain in assignments. There is no stage in holding just unit tests and counting on all lines lined as a indication of code/logic excellent. Now the problem is, should we use an Embedded solution or TestContainers? I suggest initial of all focusing on the term “Embedded”. As a great integration examination, we want to get an almost best copy of the production surroundings with all properties/options bundled. In-memory answers are superior, but generally, not enough for big business enterprise tasks. Certainly, the benefit of Embedded expert services is the simple way to implement this sort of exams and preserve configuration, just when anything happens in memory.
TestContainers at the to start with sight might look like overkill, but they give us the most vital attribute, which is a individual natural environment. We really don’t have to even rely on present docker illustrations or photos – if we want we can use tailor made types. This is a big enhancement for likely exam eventualities.
What about Jenkins? There is no purpose to be worried also to use TestContainers in Jenkins. I firmly propose checking TestContainers documentation on how conveniently we can set up the configuration for Jenkins agents.
To sum up – if there is no blocker or any unwelcome ailment for working with TestContainers, then do not wait. It is often superior to preserve all expert services managed and secured with integration take a look at contracts.