Producer-Consumer Pattern with Azure Queue Storage in .NET

To implement a Producer-Consumer pattern using Azure Queue Storage in .NET, you'll need to have one component producing messages (the producer) and another consuming them (the consumer). In this pattern, the producer sends messages to the queue, and the consumer retrieves and processes them.

Prerequisites:

  1. Azure Storage Account: Make sure you have an Azure Storage account and queue created.
  2. Azure.Storage.Queues NuGet Package: Install this package in your .NET project to interact with Azure Queue Storage.
    dotnet add package Azure.Storage.Queues

Steps:

  • Producer will send messages to the Azure Queue.
  • Consumer will retrieve messages from the queue, process them, and delete them after processing.

Example Code

1. Producer (Sender) Code

The producer sends messages to the Azure Queue.

using Azure.Storage.Queues;
using System;
using System.Threading.Tasks;

class Producer
{
    static async Task Main(string[] args)
    {
        string connectionString = "<YourAzureStorageConnectionString>";
        string queueName = "taskqueue"; // The name of the queue
        
        // Create QueueClient instance
        QueueClient queueClient = new QueueClient(connectionString, queueName);
        
        // Ensure the queue exists
        await queueClient.CreateIfNotExistsAsync();
        
        // Send messages to the queue
        for (int i = 0; i < 5; i++)
        {
            string message = $"Task {i + 1}";
            await queueClient.SendMessageAsync(message);
            Console.WriteLine($"Message sent: {message}");
        }
    }
}

Explanation:

  • QueueClient: A client object to interact with Azure Queue Storage.
  • CreateIfNotExistsAsync: Ensures that the queue is created if it doesn't exist.
  • SendMessageAsync: Sends a message to the queue.

2. Consumer Code

The consumer retrieves and processes messages from the Azure Queue. Once a message is processed, it is deleted from the queue.

using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using System;
using System.Threading.Tasks;

class Consumer
{
    static async Task Main(string[] args)
    {
        string connectionString = "<YourAzureStorageConnectionString>";
        string queueName = "taskqueue"; // The name of the queue

        // Create QueueClient instance
        QueueClient queueClient = new QueueClient(connectionString, queueName);
        
        // Ensure the queue exists
        await queueClient.CreateIfNotExistsAsync();

        Console.WriteLine("Consumer is ready to process messages...");

        // Continuously process messages from the queue
        while (true)
        {
            // Receive messages from the queue (up to 1 message)
            QueueMessage[] messages = await queueClient.ReceiveMessagesAsync(1);

            if (messages.Length > 0)
            {
                var message = messages[0];
                Console.WriteLine($"Processing message: {message.MessageText}");

                // Simulate message processing
                await Task.Delay(2000); // Simulate work (e.g., processing data)

                // Delete the message after processing
                await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
                Console.WriteLine($"Message deleted: {message.MessageText}");
            }
            else
            {
                // If no messages are available, wait for a while before checking again
                Console.WriteLine("No messages available. Waiting...");
                await Task.Delay(1000); // Sleep for a while before retrying
            }
        }
    }
}

Explanation:

  • ReceiveMessagesAsync: Retrieves messages from the queue (in this case, 1 message at a time).
  • DeleteMessageAsync: Deletes the message from the queue after it has been successfully processed.
  • Simulating Processing: The Task.Delay(2000) line simulates a processing delay to mimic some work being done on the message.

Workflow:

  1. Producer:
    • Sends several messages to the Azure Queue (e.g., "Task 1", "Task 2", etc.).
  2. Consumer:
    • Continuously checks for new messages from the queue.
    • If a message is available, the consumer processes it and deletes it after processing.
    • If no messages are available, the consumer waits for a short time before checking again.

This simple example demonstrates how to implement a producer-consumer pattern using Azure Queue Storage in .NET. The producer pushes tasks (or messages) into the queue, and the consumer pulls those tasks, processes them, and removes them from the queue after completion. This pattern is commonly used for background task processing and decoupling systems.

Comments

Popular posts from this blog

Spring Boot OpenAI Integration: Step-by-Step Guide

Orchestration-Based Saga Architecture and Spring Boot Microservices Implementation Guide

Spring Boot 3 + Angular 15 + Material - Full Stack CRUD Application Example