Kotlin Ktor RabbitMQ: Producer and Consumer Example

  1. Client sends a message to the Ktor Producer via an HTTP POST request.
  2. Producer publishes the message to the RabbitMQ queue (ktor-queue).
  3. The Consumer listens to the queue and processes the message.
  4. After processing, the Consumer sends an acknowledgment back to RabbitMQ.

Here’s an example of how you can set up a producer and a consumer using Kotlin with Ktor and RabbitMQ. This demonstrates a simple message-passing workflow where the producer sends a message to a queue, and the consumer processes it.

1. Project Setup

Add Dependencies

Add the following dependencies to your build.gradle.kts:

dependencies {
    implementation("io.ktor:ktor-server-core:<ktor_version>")
    implementation("io.ktor:ktor-server-netty:<ktor_version>")
    implementation("com.rabbitmq:amqp-client:<rabbitmq_version>")
    implementation("io.ktor:ktor-server-call-logging:<ktor_version>")
    implementation("ch.qos.logback:logback-classic:<logback_version>")
}

Replace:

  • <ktor_version> with the latest Ktor version (e.g., 2.x.x).
  • <rabbitmq_version> with the latest RabbitMQ AMQP Client version.
  • <logback_version> with the latest Logback version for logging.

2. Define a RabbitMQ Connection Manager

Create a utility for RabbitMQ connections. This will be shared between the producer and consumer.

import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory

object RabbitMQConnection {
    private val factory = ConnectionFactory().apply {
        host = "localhost"  // RabbitMQ server
        port = 5672         // Default port
    }

    val connection: Connection by lazy {
        factory.newConnection()
    }
}

3. Implement the Producer API

Create a Ktor server to send messages to RabbitMQ.

import com.rabbitmq.client.Channel
import io.ktor.application.*
import io.ktor.features.CallLogging
import io.ktor.response.*
import io.ktor.request.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import org.slf4j.event.Level

fun main() {
    embeddedServer(Netty, port = 8080) {
        install(CallLogging) {
            level = Level.INFO
        }
        routing {
            post("/produce") {
                val message = call.receiveText()
                val channel: Channel = RabbitMQConnection.connection.createChannel()
                val queueName = "ktor-queue"

                // Declare the queue
                channel.queueDeclare(queueName, true, false, false, null)

                // Publish the message
                channel.basicPublish("", queueName, null, message.toByteArray())
                channel.close()

                call.respondText("Message sent: $message")
            }
        }
    }.start(wait = true)
}

Explanation:

  • The /produce endpoint accepts a POST request with a message body.
  • The message is published to a RabbitMQ queue named ktor-queue.

4. Implement the Consumer

Create a standalone consumer application to process messages from the queue.

import com.rabbitmq.client.*

fun main() {
    val queueName = "ktor-queue"
    val channel: Channel = RabbitMQConnection.connection.createChannel()

    // Declare the queue
    channel.queueDeclare(queueName, true, false, false, null)

    println("Waiting for messages...")

    val deliverCallback = DeliverCallback { _, delivery ->
        val message = String(delivery.body)
        println("Received: $message")
        // Process the message (add your business logic here)
    }

    // Start consuming messages
    channel.basicConsume(queueName, true, deliverCallback) { _ -> }
}

Explanation:

  • The consumer continuously listens to the ktor-queue and processes incoming messages.
  • basicConsume automatically acknowledges messages after processing.

5. Test the Workflow

  1. Start RabbitMQ: Ensure RabbitMQ is running on localhost:5672. Use Docker or install it locally:

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

    Access the management UI at http://localhost:15672 (default username/password: guest/guest).

  2. Run the Consumer: Start the consumer application by running the consumer code. It will listen for messages on the queue.

  3. Run the Producer API: Start the Ktor server for the producer.

  4. Send Messages: Use a tool like curl or Postman to send messages to the producer:

    curl -X POST -d "Hello, RabbitMQ!" http://localhost:8080/produce

    You should see the message "Hello, RabbitMQ!" logged by the consumer.


6. Sample Output

Consumer Console:

Waiting for messages...
Received: Hello, RabbitMQ!

Producer API Response:

Message sent: Hello, RabbitMQ!

7. Enhance the Workflow

  • Scalability: Use multiple consumers for the same queue to scale message processing.
  • Durability: Make queues and messages durable in RabbitMQ.
  • Error Handling: Add retry mechanisms and error logs for failed message processing.
  • Serialization: Use JSON or Protobuf for structured messages.

This setup ensures an efficient and extensible producer-consumer pattern with Ktor and RabbitMQ.

Comments

Popular posts from this blog

Spring Boot OpenAI Integration: Step-by-Step Guide

Orchestration-Based Saga Architecture and Spring Boot Microservices Implementation Guide

Spring Boot 3 + Angular 15 + Material - Full Stack CRUD Application Example