Introduction
Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications. It provides high-throughput, fault-tolerant, publish-subscribe messaging system that can handle hundreds of thousands of messages per second.
Core Concepts
Topics and Partitions
Topics are the fundamental organization unit in Kafka, similar to tables in a database or folders in a file system. Topics are named channels to which messages are published.
Partitions are the way Kafka achieves scalability. Each topic is split into one or more partitions, which are ordered, immutable sequences of messages.
Key characteristics:
- Each partition is an ordered, immutable sequence of records
- Messages in a partition are assigned a sequential ID number called an offset
- Partitions allow Kafka to scale horizontally
- Multiple consumers can read from a topic in parallel (one consumer per partition)
Brokers
Brokers are the Kafka servers that store the data and handle client requests. A Kafka cluster consists of one or more brokers.
Key characteristics:
- Each broker hosts some of the topic partitions
- Brokers are identified by an integer ID
- When a client connects to any broker, it connects to the entire cluster
- A typical cluster has 3-5 brokers for high availability
ZooKeeper
ZooKeeper is a distributed coordination service that Kafka uses to manage the cluster. It helps in maintaining configuration information, naming, providing distributed synchronization, and group services.
Key responsibilities:
- Maintains the list of brokers in the cluster
- Tracks topic creation and deletion
- Elects a controller (one of the brokers)
- Manages consumer groups (in older versions)
- Stores ACLs (Access Control Lists)
Note: Kafka is moving away from ZooKeeper dependency in newer versions (KIP-500).
Producers
Producers publish messages to topics. Producers can choose to send messages to specific partitions or let Kafka handle the distribution.
Key characteristics:
- Can send messages synchronously or asynchronously
- Can specify a key with each message for consistent routing
- Support idempotent and transactional semantics
- Include configurable retry and batching mechanisms
Consumers
Consumers subscribe to topics and process the published messages. Each consumer tracks its position in the log with an offset.
Key characteristics:
- Pull-based consumption model
- Maintain their position in the log through offsets
- Can read from multiple partitions
- Can be part of a consumer group for parallel processing
Consumer Groups
Consumer Groups allow a group of consumers to cooperatively consume messages from topics. Each consumer in a group reads from exclusive partitions.
Key characteristics:
- Enables horizontal scaling of consumption
- Provides fault tolerance
- Automatic rebalancing when consumers join or leave
- At most one consumer per partition within a group
Kafka Commands
kafka-topics.sh
This command manages Kafka topics (create, delete, describe, alter).
Common Options:
--bootstrap-server
: Specifies the Kafka broker(s) to connect to--topic
: Specifies the topic name--create
: Creates a new topic--delete
: Deletes a topic--describe
: Shows details of a topic--list
: Lists all available topics--partitions
: Sets the number of partitions--replication-factor
: Sets the replication factor
Examples:
Create a topic:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1
List all topics:
kafka-topics.sh --bootstrap-server localhost:9092 --list
Describe a topic:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
Delete a topic:
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
Alter topic configuration:
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 5
kafka-console-producer.sh
This command creates a producer for sending messages to a Kafka topic from the command line.
Common Options:
--bootstrap-server
: Specifies the Kafka broker(s) to connect to--topic
: Specifies the topic to produce messages to--property
: Sets producer properties like compression, batch size, etc.
Examples:
Basic producer:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
Producer with custom properties:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic \
--property "parse.key=true" \
--property "key.separator=:"
After running the command, you can enter messages line by line:
> This is message 1
> This is message 2
> key:This is a message with a key
kafka-console-consumer.sh
This command creates a consumer for reading messages from a Kafka topic via the command line.
Common Options:
--bootstrap-server
: Specifies the Kafka broker(s) to connect to--topic
: Specifies the topic to consume messages from--from-beginning
: Reads messages from the beginning of the topic--group
: Specifies a consumer group name--property
: Sets consumer properties
Examples:
Basic consumer (only new messages):
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic
Consumer reading from the beginning:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Consumer with a specific group:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group
Consumer showing keys:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning \
--property "print.key=true" \
--property "key.separator=: "
Python Kafka Programming
Setup and Installation
Prerequisites:
- Python 3.6+
- Access to a Kafka cluster
Installing the Kafka Python Client:
pip install kafka-python
For more advanced use cases (better performance, more features):
pip install confluent-kafka
Basic Project Structure:
kafka-python-project/
βββ requirements.txt
βββ producer.py
βββ consumer.py
βββ config.py
Example requirements.txt:
kafka-python==2.0.2
# or
# confluent-kafka==2.1.1
Example config.py:
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC = 'my-topic'
CONSUMER_GROUP = 'my-consumer-group'
Producer Examples
Basic Producer Example:
from kafka import KafkaProducer
import json
import time
# Create a producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Send some messages
for i in range(10):
data = {'number': i, 'timestamp': time.time()}
# Send asynchronously
future = producer.send('my-topic', value=data)
# Block until message is sent (optional)
result = future.get(timeout=60)
print(f"Message sent to {result.topic}, partition {result.partition}, offset {result.offset}")
time.sleep(1)
# Flush and close
producer.flush()
producer.close()
Producer with Key:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda x: x.encode('utf-8'),
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Messages with the same key go to the same partition
user_ids = ['user1', 'user2', 'user3', 'user1', 'user2']
for user_id in user_ids:
data = {'user_id': user_id, 'action': 'login'}
producer.send('user-events', key=user_id, value=data)
producer.flush()
producer.close()
Producer with Error Handling:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
retries=5, # Number of retries if sending fails
acks='all' # Wait for all replicas to acknowledge
)
def on_send_success(record_metadata):
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
def on_send_error(exc):
print(f"Failed to send message: {exc}")
# Send with callbacks
for i in range(10):
data = {'number': i}
# Send with callbacks for success and failure
producer.send('my-topic', value=data).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
producer.close()
Using Confluent Kafka Producer (Better Performance):
from confluent_kafka import Producer
import json
import socket
# Configure producer
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': socket.gethostname()
}
producer = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
# Produce messages
for i in range(10):
data = {'number': i}
producer.produce(
'my-topic',
key=str(i),
value=json.dumps(data),
callback=delivery_report
)
# Trigger any callbacks for messages that have been delivered
producer.poll(0)
# Wait for any outstanding messages to be delivered
producer.flush()
Consumer Examples
Basic Consumer Example:
from kafka import KafkaConsumer
import json
# Create a consumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', # 'latest' for only new messages
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Consume messages
for message in consumer:
print(f"Received: {message.value} from partition {message.partition} at offset {message.offset}")
# Access message metadata
print(f"Key: {message.key}, Topic: {message.topic}, Timestamp: {message.timestamp}")
Manual Offset Commit:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False, # Disable auto commit
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
try:
for message in consumer:
print(f"Processing message: {message.value}")
# Process the message (your business logic here)
# ...
# Commit offsets manually after processing
consumer.commit()
except Exception as e:
print(f"Error: {e}")
finally:
consumer.close()
Subscribing to Multiple Topics:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Subscribe to multiple topics
consumer.subscribe(['topic1', 'topic2', 'topic3'])
for message in consumer:
print(f"Topic: {message.topic}, Value: {message.value}")
Using Confluent Kafka Consumer:
from confluent_kafka import Consumer, KafkaError
import json
# Consumer configuration
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
# Subscribe to topic
consumer.subscribe(['my-topic'])
try:
while True:
# Poll for messages
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event - not an error
print(f"Reached end of partition {msg.partition()}")
else:
print(f"Error: {msg.error()}")
else:
# Process message
value = json.loads(msg.value().decode('utf-8'))
print(f"Received message: {value}")
# Manual commit
consumer.commit(msg)
except KeyboardInterrupt:
print("Interrupted")
finally:
# Close consumer
consumer.close()
Advanced Configuration
Producer Configuration:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['broker1:9092', 'broker2:9092'], # Multiple brokers for fault tolerance
acks='all', # Wait for all replicas
retries=5, # Retry up to 5 times
batch_size=16384, # Batch size in bytes
linger_ms=50, # Wait up to 50ms to batch messages
buffer_memory=33554432, # 32MB producer buffer
compression_type='snappy', # Compress messages (snappy, gzip, lz4)
max_in_flight_requests_per_connection=5,
key_serializer=lambda x: x.encode('utf-8'),
value_serializer=lambda x: x.encode('utf-8')
)
Consumer Configuration:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['broker1:9092', 'broker2:9092'],
group_id='my-group',
enable_auto_commit=True,
auto_commit_interval_ms=5000, # Commit every 5 seconds
auto_offset_reset='earliest', # Where to start if no offset
max_poll_records=500, # Max records per poll
max_poll_interval_ms=300000, # Max time between polls (5 min)
session_timeout_ms=10000, # Session timeout (10 sec)
heartbeat_interval_ms=3000, # Heartbeat interval (3 sec)
key_deserializer=lambda x: x.decode('utf-8') if x else None,
value_deserializer=lambda x: x.decode('utf-8') if x else None
)
SSL Configuration for Secure Connections:
from kafka import KafkaProducer, KafkaConsumer
# Security configuration for both producer and consumer
security_config = {
'security_protocol': 'SSL',
'ssl_cafile': '/path/to/ca.pem', # CA certificate
'ssl_certfile': '/path/to/client.pem', # Client certificate
'ssl_keyfile': '/path/to/client.key', # Client private key
'ssl_password': 'password', # Private key password if any
}
# Create secure producer
producer = KafkaProducer(
bootstrap_servers=['secure-broker:9093'],
**security_config
)
# Create secure consumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['secure-broker:9093'],
group_id='my-group',
**security_config
)
SASL Authentication:
from kafka import KafkaProducer
# SASL/PLAIN authentication
producer = KafkaProducer(
bootstrap_servers=['broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='username',
sasl_plain_password='password',
ssl_cafile='/path/to/ca.pem'
)
# For SASL/SCRAM authentication
producer = KafkaProducer(
bootstrap_servers=['broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='username',
sasl_plain_password='password',
ssl_cafile='/path/to/ca.pem'
)
Common Use Cases
Real-time Data Pipeline
# producer.py
from kafka import KafkaProducer
import json
import time
import random
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Simulate IoT sensor data
while True:
sensor_id = f"sensor-{random.randint(1, 10)}"
temperature = round(random.uniform(20, 35), 1)
humidity = round(random.uniform(30, 80), 1)
data = {
'sensor_id': sensor_id,
'temperature': temperature,
'humidity': humidity,
'timestamp': int(time.time())
}
producer.send('sensor-data', value=data)
print(f"Sent: {data}")
time.sleep(1)
# consumer.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'sensor-data',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='monitoring-app',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Process and analyze real-time data
for message in consumer:
data = message.value
# Alert on high temperatures
if data['temperature'] > 30:
print(f"ALERT! High temperature: {data['temperature']}Β°C from {data['sensor_id']}")
# Store data or forward to another system
# ...
Event-Driven Microservices
# order-service.py
from kafka import KafkaProducer
import json
import uuid
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def create_order(user_id, items, total_amount):
order_id = str(uuid.uuid4())
order = {
'order_id': order_id,
'user_id': user_id,
'items': items,
'total_amount': total_amount,
'status': 'created',
'timestamp': int(time.time())
}
# Publish order created event
producer.send('orders', value=order)
return order_id
# payment-service.py
from kafka import KafkaConsumer, KafkaProducer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
group_id='payment-service',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
for message in consumer:
order = message.value
if order['status'] == 'created':
# Process payment
payment_successful = process_payment(order)
if payment_successful:
# Publish payment success event
payment_event = {
'order_id': order['order_id'],
'status': 'payment_successful',
'timestamp': int(time.time())
}
else:
# Publish payment failed event
payment_event = {
'order_id': order['order_id'],
'status': 'payment_failed',
'timestamp': int(time.time())
}
producer.send('payment-events', value=payment_event)
Best Practices
-
Producer Best Practices:
- Use appropriate acknowledgment levels (
acks
) - Enable idempotent producers to prevent duplicates
- Configure appropriate batch size and linger time
- Implement proper error handling and retries
- Use compression for large messages or high throughput
- Use appropriate acknowledgment levels (
-
Consumer Best Practices:
- Choose appropriate offset reset behavior
- Configure proper session timeouts and heartbeat intervals
- Handle consumer rebalancing gracefully
- Consider using manual offset commits for at-least-once processing
- Size consumer groups appropriately (usually equal to partition count)
-
Schema Management:
- Consider using a schema registry (like Confluent Schema Registry)
- Use Apache Avro, Protobuf, or JSON Schema for structured data
- Implement schema evolution strategy
-
Performance Tuning:
- Monitor consumer lag
- Adjust partition count based on throughput needs
- Consider using the Confluent Python client for better performance
- Configure appropriate JVM settings for brokers
-
Security:
- Enable SSL/TLS encryption
- Use SASL authentication
- Implement proper ACLs
- Regularly rotate credentials
Troubleshooting
Common Issues and Solutions
-
Connection Refused:
- Verify broker addresses and ports
- Check network connectivity
- Ensure the Kafka cluster is running
-
Consumer Not Receiving Messages:
- Verify topic exists and has messages
- Check consumer group ID
- Verify auto.offset.reset configuration
- Check consumer lag
-
Producer Failures:
- Verify broker connectivity
- Check topic exists and is writable
- Ensure proper serialization
-
High Latency:
- Increase producer batch size
- Adjust linger.ms
- Check network saturation
- Monitor broker resources
Debugging Tools
-
Consumer Group Information:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
-
Topic Lag Monitoring:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
-
Consumer Offset Inspection:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
-
Python Debugging:
import logging logging.basicConfig(level=logging.DEBUG) # Set to DEBUG for Kafka client logs
This documentation provides a comprehensive overview of Apache Kafka, its core concepts, common commands, and Python integration. For more detailed information, refer to the official Apache Kafka documentation or the specific Python client librariesβ documentation.