Introduction

Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications. It provides high-throughput, fault-tolerant, publish-subscribe messaging system that can handle hundreds of thousands of messages per second.

Core Concepts

Topics and Partitions

Topics are the fundamental organization unit in Kafka, similar to tables in a database or folders in a file system. Topics are named channels to which messages are published.

Partitions are the way Kafka achieves scalability. Each topic is split into one or more partitions, which are ordered, immutable sequences of messages.

Key characteristics:

  • Each partition is an ordered, immutable sequence of records
  • Messages in a partition are assigned a sequential ID number called an offset
  • Partitions allow Kafka to scale horizontally
  • Multiple consumers can read from a topic in parallel (one consumer per partition)

Topics and Partitions

Brokers

Brokers are the Kafka servers that store the data and handle client requests. A Kafka cluster consists of one or more brokers.

Key characteristics:

  • Each broker hosts some of the topic partitions
  • Brokers are identified by an integer ID
  • When a client connects to any broker, it connects to the entire cluster
  • A typical cluster has 3-5 brokers for high availability

ZooKeeper

ZooKeeper is a distributed coordination service that Kafka uses to manage the cluster. It helps in maintaining configuration information, naming, providing distributed synchronization, and group services.

Key responsibilities:

  • Maintains the list of brokers in the cluster
  • Tracks topic creation and deletion
  • Elects a controller (one of the brokers)
  • Manages consumer groups (in older versions)
  • Stores ACLs (Access Control Lists)

Note: Kafka is moving away from ZooKeeper dependency in newer versions (KIP-500).

Producers

Producers publish messages to topics. Producers can choose to send messages to specific partitions or let Kafka handle the distribution.

Key characteristics:

  • Can send messages synchronously or asynchronously
  • Can specify a key with each message for consistent routing
  • Support idempotent and transactional semantics
  • Include configurable retry and batching mechanisms

Consumers

Consumers subscribe to topics and process the published messages. Each consumer tracks its position in the log with an offset.

Key characteristics:

  • Pull-based consumption model
  • Maintain their position in the log through offsets
  • Can read from multiple partitions
  • Can be part of a consumer group for parallel processing

Consumer Groups

Consumer Groups allow a group of consumers to cooperatively consume messages from topics. Each consumer in a group reads from exclusive partitions.

Key characteristics:

  • Enables horizontal scaling of consumption
  • Provides fault tolerance
  • Automatic rebalancing when consumers join or leave
  • At most one consumer per partition within a group

Consumer Groups

Kafka Commands

kafka-topics.sh

This command manages Kafka topics (create, delete, describe, alter).

Common Options:

  • --bootstrap-server: Specifies the Kafka broker(s) to connect to
  • --topic: Specifies the topic name
  • --create: Creates a new topic
  • --delete: Deletes a topic
  • --describe: Shows details of a topic
  • --list: Lists all available topics
  • --partitions: Sets the number of partitions
  • --replication-factor: Sets the replication factor

Examples:

Create a topic:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1

List all topics:

kafka-topics.sh --bootstrap-server localhost:9092 --list

Describe a topic:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

Delete a topic:

kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic

Alter topic configuration:

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 5

kafka-console-producer.sh

This command creates a producer for sending messages to a Kafka topic from the command line.

Common Options:

  • --bootstrap-server: Specifies the Kafka broker(s) to connect to
  • --topic: Specifies the topic to produce messages to
  • --property: Sets producer properties like compression, batch size, etc.

Examples:

Basic producer:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

Producer with custom properties:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic \
  --property "parse.key=true" \
  --property "key.separator=:"

After running the command, you can enter messages line by line:

> This is message 1
> This is message 2
> key:This is a message with a key

kafka-console-consumer.sh

This command creates a consumer for reading messages from a Kafka topic via the command line.

Common Options:

  • --bootstrap-server: Specifies the Kafka broker(s) to connect to
  • --topic: Specifies the topic to consume messages from
  • --from-beginning: Reads messages from the beginning of the topic
  • --group: Specifies a consumer group name
  • --property: Sets consumer properties

Examples:

Basic consumer (only new messages):

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic

Consumer reading from the beginning:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

Consumer with a specific group:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group

Consumer showing keys:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning \
  --property "print.key=true" \
  --property "key.separator=: "

Python Kafka Programming

Setup and Installation

Prerequisites:

  • Python 3.6+
  • Access to a Kafka cluster

Installing the Kafka Python Client:

pip install kafka-python

For more advanced use cases (better performance, more features):

pip install confluent-kafka

Basic Project Structure:

kafka-python-project/
β”œβ”€β”€ requirements.txt
β”œβ”€β”€ producer.py
β”œβ”€β”€ consumer.py
└── config.py

Example requirements.txt:

kafka-python==2.0.2
# or
# confluent-kafka==2.1.1

Example config.py:

KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC = 'my-topic'
CONSUMER_GROUP = 'my-consumer-group'

Producer Examples

Basic Producer Example:

from kafka import KafkaProducer
import json
import time
 
# Create a producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
 
# Send some messages
for i in range(10):
    data = {'number': i, 'timestamp': time.time()}
    
    # Send asynchronously
    future = producer.send('my-topic', value=data)
    
    # Block until message is sent (optional)
    result = future.get(timeout=60)
    
    print(f"Message sent to {result.topic}, partition {result.partition}, offset {result.offset}")
    
    time.sleep(1)
 
# Flush and close
producer.flush()
producer.close()

Producer with Key:

from kafka import KafkaProducer
import json
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda x: x.encode('utf-8'),
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
 
# Messages with the same key go to the same partition
user_ids = ['user1', 'user2', 'user3', 'user1', 'user2']
 
for user_id in user_ids:
    data = {'user_id': user_id, 'action': 'login'}
    
    producer.send('user-events', key=user_id, value=data)
 
producer.flush()
producer.close()

Producer with Error Handling:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8'),
    retries=5,  # Number of retries if sending fails
    acks='all'  # Wait for all replicas to acknowledge
)
 
def on_send_success(record_metadata):
    print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
 
def on_send_error(exc):
    print(f"Failed to send message: {exc}")
 
# Send with callbacks
for i in range(10):
    data = {'number': i}
    
    # Send with callbacks for success and failure
    producer.send('my-topic', value=data).add_callback(on_send_success).add_errback(on_send_error)
 
producer.flush()
producer.close()

Using Confluent Kafka Producer (Better Performance):

from confluent_kafka import Producer
import json
import socket
 
# Configure producer
conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': socket.gethostname()
}
 
producer = Producer(conf)
 
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
 
# Produce messages
for i in range(10):
    data = {'number': i}
    producer.produce(
        'my-topic',
        key=str(i),
        value=json.dumps(data),
        callback=delivery_report
    )
    
    # Trigger any callbacks for messages that have been delivered
    producer.poll(0)
 
# Wait for any outstanding messages to be delivered
producer.flush()

Consumer Examples

Basic Consumer Example:

from kafka import KafkaConsumer
import json
 
# Create a consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # 'latest' for only new messages
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
 
# Consume messages
for message in consumer:
    print(f"Received: {message.value} from partition {message.partition} at offset {message.offset}")
    
    # Access message metadata
    print(f"Key: {message.key}, Topic: {message.topic}, Timestamp: {message.timestamp}")

Manual Offset Commit:

from kafka import KafkaConsumer
import json
 
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # Disable auto commit
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
 
try:
    for message in consumer:
        print(f"Processing message: {message.value}")
        
        # Process the message (your business logic here)
        # ...
        
        # Commit offsets manually after processing
        consumer.commit()
except Exception as e:
    print(f"Error: {e}")
finally:
    consumer.close()

Subscribing to Multiple Topics:

from kafka import KafkaConsumer
import json
 
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
 
# Subscribe to multiple topics
consumer.subscribe(['topic1', 'topic2', 'topic3'])
 
for message in consumer:
    print(f"Topic: {message.topic}, Value: {message.value}")

Using Confluent Kafka Consumer:

from confluent_kafka import Consumer, KafkaError
import json
 
# Consumer configuration
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}
 
consumer = Consumer(conf)
 
# Subscribe to topic
consumer.subscribe(['my-topic'])
 
try:
    while True:
        # Poll for messages
        msg = consumer.poll(timeout=1.0)
        
        if msg is None:
            continue
            
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event - not an error
                print(f"Reached end of partition {msg.partition()}")
            else:
                print(f"Error: {msg.error()}")
        else:
            # Process message
            value = json.loads(msg.value().decode('utf-8'))
            print(f"Received message: {value}")
            
            # Manual commit
            consumer.commit(msg)
            
except KeyboardInterrupt:
    print("Interrupted")
finally:
    # Close consumer
    consumer.close()

Advanced Configuration

Producer Configuration:

from kafka import KafkaProducer
 
producer = KafkaProducer(
    bootstrap_servers=['broker1:9092', 'broker2:9092'],  # Multiple brokers for fault tolerance
    acks='all',                # Wait for all replicas
    retries=5,                 # Retry up to 5 times
    batch_size=16384,          # Batch size in bytes
    linger_ms=50,              # Wait up to 50ms to batch messages
    buffer_memory=33554432,    # 32MB producer buffer
    compression_type='snappy', # Compress messages (snappy, gzip, lz4)
    max_in_flight_requests_per_connection=5,
    key_serializer=lambda x: x.encode('utf-8'),
    value_serializer=lambda x: x.encode('utf-8')
)

Consumer Configuration:

from kafka import KafkaConsumer
 
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['broker1:9092', 'broker2:9092'],
    group_id='my-group',
    enable_auto_commit=True,
    auto_commit_interval_ms=5000,   # Commit every 5 seconds
    auto_offset_reset='earliest',   # Where to start if no offset
    max_poll_records=500,           # Max records per poll
    max_poll_interval_ms=300000,    # Max time between polls (5 min)
    session_timeout_ms=10000,       # Session timeout (10 sec)
    heartbeat_interval_ms=3000,     # Heartbeat interval (3 sec)
    key_deserializer=lambda x: x.decode('utf-8') if x else None,
    value_deserializer=lambda x: x.decode('utf-8') if x else None
)

SSL Configuration for Secure Connections:

from kafka import KafkaProducer, KafkaConsumer
 
# Security configuration for both producer and consumer
security_config = {
    'security_protocol': 'SSL',
    'ssl_cafile': '/path/to/ca.pem',  # CA certificate
    'ssl_certfile': '/path/to/client.pem',  # Client certificate
    'ssl_keyfile': '/path/to/client.key',  # Client private key
    'ssl_password': 'password',  # Private key password if any
}
 
# Create secure producer
producer = KafkaProducer(
    bootstrap_servers=['secure-broker:9093'],
    **security_config
)
 
# Create secure consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['secure-broker:9093'],
    group_id='my-group',
    **security_config
)

SASL Authentication:

from kafka import KafkaProducer
 
# SASL/PLAIN authentication
producer = KafkaProducer(
    bootstrap_servers=['broker:9093'],
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username='username',
    sasl_plain_password='password',
    ssl_cafile='/path/to/ca.pem'
)
 
# For SASL/SCRAM authentication
producer = KafkaProducer(
    bootstrap_servers=['broker:9093'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='username',
    sasl_plain_password='password',
    ssl_cafile='/path/to/ca.pem'
)

Common Use Cases

Real-time Data Pipeline

# producer.py
from kafka import KafkaProducer
import json
import time
import random
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
 
# Simulate IoT sensor data
while True:
    sensor_id = f"sensor-{random.randint(1, 10)}"
    temperature = round(random.uniform(20, 35), 1)
    humidity = round(random.uniform(30, 80), 1)
    
    data = {
        'sensor_id': sensor_id,
        'temperature': temperature,
        'humidity': humidity,
        'timestamp': int(time.time())
    }
    
    producer.send('sensor-data', value=data)
    print(f"Sent: {data}")
    
    time.sleep(1)
# consumer.py
from kafka import KafkaConsumer
import json
 
consumer = KafkaConsumer(
    'sensor-data',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True,
    group_id='monitoring-app',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
 
# Process and analyze real-time data
for message in consumer:
    data = message.value
    
    # Alert on high temperatures
    if data['temperature'] > 30:
        print(f"ALERT! High temperature: {data['temperature']}Β°C from {data['sensor_id']}")
    
    # Store data or forward to another system
    # ...

Event-Driven Microservices

# order-service.py
from kafka import KafkaProducer
import json
import uuid
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
 
def create_order(user_id, items, total_amount):
    order_id = str(uuid.uuid4())
    
    order = {
        'order_id': order_id,
        'user_id': user_id,
        'items': items,
        'total_amount': total_amount,
        'status': 'created',
        'timestamp': int(time.time())
    }
    
    # Publish order created event
    producer.send('orders', value=order)
    
    return order_id
# payment-service.py
from kafka import KafkaConsumer, KafkaProducer
import json
 
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    group_id='payment-service',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
 
for message in consumer:
    order = message.value
    
    if order['status'] == 'created':
        # Process payment
        payment_successful = process_payment(order)
        
        if payment_successful:
            # Publish payment success event
            payment_event = {
                'order_id': order['order_id'],
                'status': 'payment_successful',
                'timestamp': int(time.time())
            }
        else:
            # Publish payment failed event
            payment_event = {
                'order_id': order['order_id'],
                'status': 'payment_failed',
                'timestamp': int(time.time())
            }
        
        producer.send('payment-events', value=payment_event)

Best Practices

  1. Producer Best Practices:

    • Use appropriate acknowledgment levels (acks)
    • Enable idempotent producers to prevent duplicates
    • Configure appropriate batch size and linger time
    • Implement proper error handling and retries
    • Use compression for large messages or high throughput
  2. Consumer Best Practices:

    • Choose appropriate offset reset behavior
    • Configure proper session timeouts and heartbeat intervals
    • Handle consumer rebalancing gracefully
    • Consider using manual offset commits for at-least-once processing
    • Size consumer groups appropriately (usually equal to partition count)
  3. Schema Management:

    • Consider using a schema registry (like Confluent Schema Registry)
    • Use Apache Avro, Protobuf, or JSON Schema for structured data
    • Implement schema evolution strategy
  4. Performance Tuning:

    • Monitor consumer lag
    • Adjust partition count based on throughput needs
    • Consider using the Confluent Python client for better performance
    • Configure appropriate JVM settings for brokers
  5. Security:

    • Enable SSL/TLS encryption
    • Use SASL authentication
    • Implement proper ACLs
    • Regularly rotate credentials

Troubleshooting

Common Issues and Solutions

  1. Connection Refused:

    • Verify broker addresses and ports
    • Check network connectivity
    • Ensure the Kafka cluster is running
  2. Consumer Not Receiving Messages:

    • Verify topic exists and has messages
    • Check consumer group ID
    • Verify auto.offset.reset configuration
    • Check consumer lag
  3. Producer Failures:

    • Verify broker connectivity
    • Check topic exists and is writable
    • Ensure proper serialization
  4. High Latency:

    • Increase producer batch size
    • Adjust linger.ms
    • Check network saturation
    • Monitor broker resources

Debugging Tools

  1. Consumer Group Information:

    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
  2. Topic Lag Monitoring:

    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
  3. Consumer Offset Inspection:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
  4. Python Debugging:

    import logging
    logging.basicConfig(level=logging.DEBUG)  # Set to DEBUG for Kafka client logs

This documentation provides a comprehensive overview of Apache Kafka, its core concepts, common commands, and Python integration. For more detailed information, refer to the official Apache Kafka documentation or the specific Python client libraries’ documentation.