Integrate Apache Pulsar with Spring Boot using Spring for Apache Pulsar


To integrate Apache Pulsar with Spring Boot using Spring for Apache Pulsar, you can follow these steps. Spring for Apache Pulsar provides a Spring-based abstraction for Pulsar producers, consumers, and other messaging components, which simplifies integration into Spring Boot applications.

Step 1: Add the spring-boot-starter-pulsar Dependency

Ensure that you include the spring-boot-starter-pulsar dependency in your pom.xml. As of now, the Spring community provides this dependency for easy integration with Apache Pulsar.

<dependencies>
    <!-- Spring Boot Starter Web for REST API -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Boot Starter Pulsar for Pulsar integration -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
    </dependency>
</dependencies>

Step 2: Configure Pulsar in application.properties

In your application.properties, configure the Pulsar client service URL, producer, and consumer topic.

# application.properties

# Pulsar service URL
spring.pulsar.client.service-url=pulsar://localhost:6650

# Producer settings for the topic
spring.pulsar.producer.topic=my-topic

# Consumer settings for the topic
spring.pulsar.consumer.topic=my-topic
spring.pulsar.consumer.subscription-name=my-subscription

Step 3: Define Pulsar Producer and Consumer Beans

Now, let's create a PulsarConfig class to set up the Producer and Consumer beans.

package com.example.pulsar;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.ProducerBuilder;
import org.springframework.pulsar.core.ConsumerBuilder;

@Configuration
public class PulsarConfig {

    // Producer Bean to send messages to Pulsar topic
    @Bean
    public Producer<String> pulsarProducer() throws Exception {
        return ProducerBuilder.create()
                .topic("my-topic")
                .schema(Schema.STRING)
                .build();
    }

    // Consumer Bean to receive messages from Pulsar topic
    @Bean
    public Consumer<String> pulsarConsumer() throws Exception {
        return ConsumerBuilder.create()
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .schema(Schema.STRING)
                .messageListener((consumer, message) -> {
                    System.out.println("Received message: " + message.getValue());
                    consumer.acknowledge(message);  // Acknowledge message receipt
                })
                .build();
    }

    // PulsarTemplate Bean to send messages via producer
    @Bean
    public PulsarTemplate<String> pulsarTemplate(Producer<String> producer) {
        return new PulsarTemplate<>(producer);
    }
}

Step 4: Create a Service to Handle Message Production

Now, let’s create a service to send messages using PulsarTemplate.

package com.example.pulsar;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.pulsar.core.PulsarTemplate;

@Service
public class PulsarService {

    private final PulsarTemplate<String> pulsarTemplate;

    @Autowired
    public PulsarService(PulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    // Method to send a message to Pulsar topic
    public void sendMessage(String message) {
        pulsarTemplate.send(message);
    }
}

Step 5: Create a REST Controller

Next, let's create a controller to expose a REST API to trigger the sending of messages.

package com.example.pulsar;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class PulsarController {

    private final PulsarService pulsarService;

    @Autowired
    public PulsarController(PulsarService pulsarService) {
        this.pulsarService = pulsarService;
    }

    // Endpoint to send a message
    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
        pulsarService.sendMessage(message);
        return "Message sent successfully!";
    }

    // Endpoint to simulate message consumption
    @GetMapping("/consume")
    public String consumeMessage() {
        // Message consumption is automatically handled by the Pulsar consumer in the configuration class
        return "Message consumption in progress (check console for consumed messages).";
    }
}

Step 6: Running the Application

  1. Start Apache Pulsar: Make sure your Pulsar broker is running on localhost:6650. You can start Pulsar using Docker for quick setup:

    docker run -d -p 6650:6650 -p 8080:8080 --name pulsar apachepulsar/pulsar:latest
    
  2. Start Spring Boot Application: Run the Spring Boot application through your IDE or via the command line using Maven:

    mvn spring-boot:run
  3. Test the Endpoints:

    • To send a message to Pulsar, open your browser or use curl:

      curl "http://localhost:8080/send?message=HelloPulsar"
    • To consume messages, access the following endpoint (message consumption will be shown in the console output):

      curl "http://localhost:8080/consume"

Conclusion

This example sets up a Spring Boot application with Apache Pulsar using the spring-boot-starter-pulsar dependency. It demonstrates how to produce messages using PulsarTemplate and consume messages automatically using the Spring abstraction for Pulsar.

Comments

Popular posts from this blog

Spring Boot OpenAI Integration: Step-by-Step Guide

Orchestration-Based Saga Architecture and Spring Boot Microservices Implementation Guide

Spring Boot 3 + Angular 15 + Material - Full Stack CRUD Application Example