Spring Boot Kafka Batch Processing with Quartz Scheduler: Step-by-Step Guide

This diagram shows the components and their interactions: Kafka Producer, Kafka Broker, Spring Batch, Quartz Scheduler, and Database.
To process bulk data from a queue and perform batch inserts into a database using Spring Boot, Batch, and Quartz, follow these steps:

1. Prerequisites

  1. Kafka broker running (e.g., local installation or Docker).
  2. Spring Boot project with the following dependencies:
<dependencies>
    <!-- Spring Boot and Batch -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>

    <!-- Spring for Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Quartz Scheduler -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-quartz</artifactId>
    </dependency>

    <!-- Database -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

2. Application Architecture

  1. Kafka Producer: Sends messages to a Kafka topic.
  2. Kafka Consumer: Consumes messages and buffers them for batch processing.
  3. Spring Batch: Processes buffered data in chunks and writes it to the database.
  4. Quartz Scheduler: Triggers the Spring Batch job periodically.

3. Implementation

Step 1: Kafka Configuration

Add Kafka configuration in application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: batch-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Step 2: Kafka Consumer

Create a Kafka consumer that listens to messages and stores them in a buffer for batch processing.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class KafkaMessageListener {

    private final BlockingQueue<String> messageBuffer = new LinkedBlockingQueue<>();

    @KafkaListener(topics = "batch-topic", groupId = "batch-group")
    public void listen(ConsumerRecord<String, String> record) {
        messageBuffer.add(record.value());
    }

    public BlockingQueue<String> getMessageBuffer() {
        return messageBuffer;
    }
}

Step 3: Spring Batch Configuration

Batch Configuration:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, 
                   StepBuilderFactory stepBuilderFactory,
                   ItemReader<String> itemReader,
                   ItemProcessor<String, YourEntity> itemProcessor,
                   ItemWriter<YourEntity> itemWriter) {

        Step step = stepBuilderFactory.get("batch-step")
                .<String, YourEntity>chunk(50) // Process in chunks of 50
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .build();

        return jobBuilderFactory.get("batch-job")
                .start(step)
                .build();
    }

    @Bean
    public ItemReader<String> itemReader(KafkaMessageListener kafkaMessageListener) {
        List<String> bufferedMessages = List.copyOf(kafkaMessageListener.getMessageBuffer());
        kafkaMessageListener.getMessageBuffer().clear();
        return new ListItemReader<>(bufferedMessages);
    }

    @Bean
    public ItemProcessor<String, YourEntity> itemProcessor() {
        return data -> new YourEntity(data); // Transform raw data to entity
    }

    @Bean
    public ItemWriter<YourEntity> itemWriter(YourRepository repository) {
        return repository::saveAll; // Save batch to database
    }
}

Step 4: Quartz Configuration

Quartz Scheduler:

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.CronScheduleBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QuartzConfig {

    @Bean
    public JobDetail jobDetail() {
        return JobBuilder.newJob(BatchJobLauncher.class)
                .withIdentity("batchJob")
                .storeDurably()
                .build();
    }

    @Bean
    public Trigger trigger(JobDetail jobDetail) {
        return TriggerBuilder.newTrigger()
                .forJob(jobDetail)
                .withIdentity("batchTrigger")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 * * * ?")) // Every minute
                .build();
    }
}

Quartz Job Launcher:

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;

public class BatchJobLauncher implements Job {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job batchJob;

    @Override
    public void execute(JobExecutionContext context) {
        try {
            jobLauncher.run(batchJob, new JobParameters());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Step 5: Entity and Repository

Entity:

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;

@Entity
public class YourEntity {

    @Id
    @GeneratedValue
    private Long id;
    private String data;

    public YourEntity(String data) {
        this.data = data;
    }

    // Getters and Setters
}

Repository:

import org.springframework.data.jpa.repository.JpaRepository;

public interface YourRepository extends JpaRepository<YourEntity, Long> {
}

4. Run the Application

  1. Start Kafka.
  2. Create a Kafka topic named batch-topic.
  3. Start the Spring Boot application.
  4. Send messages to the Kafka topic using a producer.
  5. Quartz triggers the batch job every minute, consuming buffered Kafka messages, processing them in chunks, and saving them to the database.

5. Testing

  • Use Kafka tools (e.g., kafka-console-producer.sh) to send test messages to the topic.
  • Verify that messages are consumed and stored in the database in batches.

Unlock Your Microservices Mastery for Only $9!

Get your copy now for just $9! and start building resilient and scalable microservices with the help of Microservices with Spring Boot 3 and Spring Cloud.

Comments

Popular posts from this blog

Spring Boot OpenAI Integration: Step-by-Step Guide

Orchestration-Based Saga Architecture and Spring Boot Microservices Implementation Guide

Spring Boot 3 + Angular 15 + Material - Full Stack CRUD Application Example