Python Flask Kafka Batch Processing with Scheduler & Database Batch Insert Example

This diagram shows the components and their interactions: Kafka Producer, Kafka Broker, Batch, Scheduler, Python Flask, and Database.

Explanation:

  • Actors:

    • Kafka Producer: Produces messages to the Kafka Broker.
    • Scheduler: Triggers the periodic batch processing.
    • User: The user interacting with the system.
  • Entities:

    • Kafka Broker: The Kafka message broker that handles communication between the producer and consumer.
    • Python Flask App: The Flask app that interacts with Kafka and handles user requests.
    • Batch Processing: The logic responsible for consuming Kafka messages and processing them in batches.
    • PostgreSQL Database: The database where batch data is inserted.

Interactions:

  1. User interacts with the FlaskApp.
  2. FlaskApp sends/receives messages from KafkaBroker.
  3. KafkaProducer publishes messages to the KafkaBroker.
  4. The KafkaBroker delivers messages to FlaskApp.
  5. FlaskApp initiates Batch Processing.
  6. Batch Processing inserts data into the PostgreSQL Database.
  7. The Scheduler periodically triggers Batch Processing to insert data.


Here is a complete example of processing bulk data from a Kafka queue, batching the data, and inserting it into a database using Python, Flask, a scheduler (like APScheduler), and Kafka's Python client confluent-kafka.


Steps

  1. Install Dependencies Install required libraries:

    pip install flask apscheduler confluent-kafka sqlalchemy psycopg2
  2. Kafka Consumer and Database Setup

    • Use Kafka for the message queue.
    • Use SQLAlchemy to interact with the database (e.g., PostgreSQL).
  3. Application Design

    • Flask App: To serve and monitor the system.
    • Scheduler: To trigger batch inserts at intervals.
    • Batch Logic: To consume Kafka messages and insert data in bulk into the database.

Code

from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from confluent_kafka import Consumer, KafkaError
from sqlalchemy import create_engine, Column, Integer, String, Sequence
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import time

# Flask App
app = Flask(__name__)

# Database Configuration
DATABASE_URL = 'postgresql://user:password@localhost:5432/mydatabase'
engine = create_engine(DATABASE_URL)
Base = declarative_base()
Session = sessionmaker(bind=engine)
db_session = Session()

# Kafka Configuration
KAFKA_BROKER = 'localhost:9092'
KAFKA_TOPIC = 'example_topic'

consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER,
    'group.id': 'example_group',
    'auto.offset.reset': 'earliest'
})

# Message Model for SQLAlchemy
class Message(Base):
    __tablename__ = 'messages'
    id = Column(Integer, Sequence('message_id_seq'), primary_key=True)
    key = Column(String(50))
    value = Column(String(255))

Base.metadata.create_all(engine)

# In-Memory Batch
batch = []
BATCH_SIZE = 100
BATCH_INTERVAL = 10  # Seconds

def consume_messages():
    """
    Consume messages from Kafka and store them in an in-memory batch.
    """
    global batch
    consumer.subscribe([KAFKA_TOPIC])
    
    while True:
        msg = consumer.poll(1.0)  # Poll timeout of 1 second
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                print(f"Kafka Error: {msg.error()}")
            continue

        # Add message to batch
        batch.append({"key": msg.key().decode('utf-8'), "value": msg.value().decode('utf-8')})
        if len(batch) >= BATCH_SIZE:
            insert_batch()

def insert_batch():
    """
    Insert the in-memory batch into the database and clear the batch.
    """
    global batch
    if not batch:
        return

    try:
        db_session.bulk_insert_mappings(Message, batch)
        db_session.commit()
        print(f"Inserted {len(batch)} messages into the database.")
        batch.clear()
    except Exception as e:
        db_session.rollback()
        print(f"Error during batch insert: {e}")

# APScheduler Job
def schedule_batch_insert():
    """
    Periodically insert batch data into the database.
    """
    if batch:
        insert_batch()

scheduler = BackgroundScheduler()
scheduler.add_job(schedule_batch_insert, 'interval', seconds=BATCH_INTERVAL)
scheduler.start()

# Flask Routes
@app.route('/health', methods=['GET'])
def health_check():
    return jsonify({"status": "running", "batch_size": len(batch)})

@app.route('/flush', methods=['POST'])
def flush_batch():
    """
    Manually trigger batch insert.
    """
    insert_batch()
    return jsonify({"status": "flushed", "batch_size": len(batch)})

if __name__ == "__main__":
    # Start Kafka Consumer in a background thread
    from threading import Thread
    consumer_thread = Thread(target=consume_messages, daemon=True)
    consumer_thread.start()

    # Run Flask App
    app.run(host='0.0.0.0', port=5000)

How It Works

  1. Kafka Consumer: Continuously polls Kafka for new messages and adds them to an in-memory batch.
  2. Batch Insert: Inserts data into the database when the batch size reaches the threshold (BATCH_SIZE) or when the scheduler triggers a flush.
  3. Scheduler: Flushes the batch to the database every BATCH_INTERVAL seconds if there are messages.
  4. Flask Endpoints:
    • /health: Monitor the system status and batch size.
    • /flush: Manually trigger a batch insert.

Enhancements

  • Add retries and error handling for Kafka and database operations.
  • Use environment variables for configuration (dotenv).
  • Consider moving to an external task queue like Celery for scalability.

This setup provides an efficient, scalable, and modular way to process and store bulk data from Kafka into a database.


🎉 Master Python Web Development with Flask! 🚀

Want to build sleek, powerful web applications with Python? Look no further! 📚 "Building Web Apps with Python and Flask" by Malhar Lathkar is your ultimate guide to becoming a web development pro!

🔥 What’s Inside?

  • Core Flask Features: URL routing, templates, static files, cookies, and sessions.
  • Advanced Topics: Flask extensions, database integrations, and RESTful API deployment.
  • Flask Tools: Leverage Jinja2, Werkzeug, and more!
  • Build Scalable Apps: Learn blueprints, design patterns, and modular structures.

💡 Whether you're a Python enthusiast, a beginner, or a tech startup looking to scale, this guide is packed with practical insights to turbocharge your development skills.

📈 Why This Book?

  • Expert-led coverage from industry pro Malhar Lathkar.
  • Real-world projects to practice your skills.
  • Learn to deploy fully functional apps and APIs!

🚀 Ready to Build the Future of Web Apps? 💥 Grab your copy of "Building Web Apps with Python and Flask" today and take your Python web development journey to the next level! 🌐

Don’t miss out – unlock your web development potential NOW!

Comments

Popular posts from this blog

Spring Boot OpenAI Integration: Step-by-Step Guide

Orchestration-Based Saga Architecture and Spring Boot Microservices Implementation Guide

Spring Boot 3 + Angular 15 + Material - Full Stack CRUD Application Example