Prerequisites
Technical Requirements
- Python 3.6+ installed
- Java 8+ installed (required for Spark)
- Apache Spark installation (for PySpark)
- Hadoop ecosystem and Hive setup (for PyHive)
- pip (Python package manager)
Basic Knowledge
- Familiarity with Python programming
- Understanding of distributed computing concepts
- Basic SQL knowledge (especially for Hive interactions)
Installation Guide
Installing PySpark
# Install PySpark using pip
pip install pyspark
# To install a specific version
pip install pyspark==3.3.0
Environmental variables you might need to set:
export JAVA_HOME=/path/to/java
export SPARK_HOME=/path/to/spark
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYSPARK_PYTHON=python3
Installing PyHive
# Install PyHive and its dependencies
pip install pyhive
pip install thrift
pip install sasl
pip install thrift-sasl
# For optional dependencies
pip install pandas # For DataFrame support
pip install sqlalchemy # For SQLAlchemy integration
PySpark
PySpark is the Python API for Apache Spark, a unified analytics engine for large-scale data processing.
SparkContext and SparkSession
SparkContext
The entry point to any Spark functionality, representing the connection to a Spark cluster.
from pyspark import SparkContext
# Create a SparkContext
sc = SparkContext("local", "My App")
# Use the SparkContext
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
sum_result = distData.sum() # 15
# Stop the SparkContext when finished
sc.stop()
SparkSession
The newer entry point for DataFrame and SQL functionality in Spark 2.0+.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("My Spark Application") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# Use the SparkSession
df = spark.createDataFrame([(1, "John"), (2, "Jane")], ["id", "name"])
df.show()
# Stop the SparkSession when finished
spark.stop()
RDD vs DataFrame API
RDD (Resilient Distributed Dataset)
- Low-level, fundamental data structure in Spark
- Immutable, distributed collection of objects
- Offers fine-grained control but requires more manual optimization
# RDD example
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared_rdd = rdd.map(lambda x: x * x)
print(squared_rdd.collect()) # [1, 4, 9, 16, 25]
DataFrame API
- Higher-level abstraction built on top of RDDs
- Organized into named columns (like a table)
- Optimized execution through Catalyst optimizer
- Better interoperability with external data sources
# DataFrame example
from pyspark.sql import Row
# Create a DataFrame from a list of rows
data = [Row(id=1, name="Alice", age=25),
Row(id=2, name="Bob", age=30),
Row(id=3, name="Charlie", age=35)]
df = spark.createDataFrame(data)
# Show the DataFrame
df.show()
# Filter and select operations
df.filter(df.age > 30).select("id", "name").show()
Transformations vs Actions
Transformations
- Lazy operations that define a new dataset from an existing one
- Don’t execute immediately; only create a lineage of operations
- Examples:
map
,filter
,groupBy
,join
# Transformation examples
rdd = sc.parallelize([1, 2, 3, 4, 5])
filtered_rdd = rdd.filter(lambda x: x % 2 == 0) # Doesn't execute yet
mapped_rdd = filtered_rdd.map(lambda x: x * 10) # Still no execution
Actions
- Operations that trigger computation and return results
- Force evaluation of the transformation lineage
- Examples:
collect
,count
,first
,take
,reduce
# Action examples (these trigger actual computation)
result = mapped_rdd.collect() # [20, 40]
count = mapped_rdd.count() # 2
first_element = mapped_rdd.first() # 20
Example Transformations
map
Applies a function to each element in the dataset.
# Double each number
rdd = sc.parallelize([1, 2, 3, 4, 5])
doubled_rdd = rdd.map(lambda x: x * 2)
print(doubled_rdd.collect()) # [2, 4, 6, 8, 10]
filter
Returns a new dataset with elements that pass a predicate function.
# Keep only even numbers
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
even_rdd = rdd.filter(lambda x: x % 2 == 0)
print(even_rdd.collect()) # [2, 4, 6, 8]
flatMap
Similar to map
, but each input item can be mapped to 0 or more output items.
# Split sentences into words
rdd = sc.parallelize(["Hello world", "How are you"])
words_rdd = rdd.flatMap(lambda x: x.split(" "))
print(words_rdd.collect()) # ["Hello", "world", "How", "are", "you"]
reduceByKey
Combines values with the same key using a specified function.
# Count word occurrences
words = ["apple", "banana", "apple", "orange", "banana", "apple"]
pairs_rdd = sc.parallelize(words).map(lambda word: (word, 1))
counts_rdd = pairs_rdd.reduceByKey(lambda a, b: a + b)
print(counts_rdd.collect()) # [("apple", 3), ("banana", 2), ("orange", 1)]
UDFs in PySpark
User-Defined Functions (UDFs) allow you to apply custom Python functions to DataFrame columns.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# Define a UDF
def square(x):
return x * x
# Register the UDF with return type
square_udf = udf(square, IntegerType())
# Create a DataFrame
df = spark.createDataFrame([(1,), (2,), (3,), (4,)], ["value"])
# Apply the UDF
result_df = df.withColumn("squared", square_udf(df["value"]))
result_df.show()
Complete PySpark Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Sales Analysis Example") \
.getOrCreate()
# Create sample data
data = [
("2023-01-01", "Product A", 100, 10),
("2023-01-02", "Product B", 200, 5),
("2023-01-03", "Product A", 120, 12),
("2023-01-04", "Product C", 300, 8),
("2023-01-05", "Product B", 150, 6)
]
# Define schema
columns = ["date", "product", "amount", "quantity"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Show the DataFrame
print("Original DataFrame:")
df.show()
# Perform transformations
result = df.groupBy("product") \
.agg(count("*").alias("sales_count"),
avg("amount").alias("avg_amount"),
sum_udf = df.amount.sum().alias("total_amount")) \
.filter(col("avg_amount") > 100) \
.orderBy(col("total_amount").desc())
# Show the result
print("Analysis Result:")
result.show()
# Stop the SparkSession
spark.stop()
PyHive
PyHive is a collection of Python DB-API and SQLAlchemy interfaces for Hive and Presto.
Connecting to Hive using PyHive
Basic connection to Hive:
from pyhive import hive
# Connect to Hive server
conn = hive.Connection(
host='hive-server-hostname',
port=10000, # Default HiveServer2 port
username='username',
password='password', # Optional if security is not enabled
database='default' # Optional, default database
)
# Create a cursor
cursor = conn.cursor()
Query Execution and Results Fetching
Basic Query Execution
# Execute a query
cursor.execute('SHOW TABLES')
# Fetch all results
tables = cursor.fetchall()
print("Tables:", tables)
# Execute a more complex query
cursor.execute('''
SELECT
product_id,
product_name,
SUM(quantity) as total_quantity,
AVG(price) as avg_price
FROM sales
GROUP BY product_id, product_name
HAVING SUM(quantity) > 100
ORDER BY total_quantity DESC
LIMIT 10
''')
# Process results
for row in cursor.fetchall():
print(row)
# Close the connection
conn.close()
Parameterized Queries
# Execute a parameterized query
min_price = 50
cursor.execute('SELECT * FROM products WHERE price > %s', (min_price,))
# Fetch results
products = cursor.fetchall()
print(f"Products more expensive than ${min_price}:", products)
Working with Pandas
import pandas as pd
# Execute a query
cursor.execute('SELECT * FROM sales WHERE date > "2023-01-01"')
# Convert results to a pandas DataFrame
df = pd.DataFrame(cursor.fetchall(), columns=[desc[0] for desc in cursor.description])
# Now you can use pandas functionality
print("Summary statistics:")
print(df.describe())
# Export to CSV
df.to_csv('sales_data.csv', index=False)
Complete PyHive Example
from pyhive import hive
import pandas as pd
import matplotlib.pyplot as plt
# Connect to Hive
conn = hive.Connection(
host='hive-server',
port=10000,
username='hive_user'
)
try:
cursor = conn.cursor()
# Create a table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales_data (
sale_date STRING,
product_id INT,
product_name STRING,
category STRING,
quantity INT,
unit_price FLOAT,
total_amount FLOAT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
''')
# Insert some sample data
cursor.execute('''
INSERT INTO sales_data VALUES
('2023-01-01', 1, 'Laptop', 'Electronics', 5, 899.99, 4499.95),
('2023-01-02', 2, 'Smartphone', 'Electronics', 10, 499.99, 4999.90),
('2023-01-03', 3, 'Headphones', 'Accessories', 20, 59.99, 1199.80),
('2023-01-04', 1, 'Laptop', 'Electronics', 3, 899.99, 2699.97),
('2023-01-05', 4, 'Monitor', 'Electronics', 8, 249.99, 1999.92)
''')
# Query for analysis
cursor.execute('''
SELECT
category,
SUM(total_amount) as revenue,
SUM(quantity) as units_sold
FROM sales_data
GROUP BY category
''')
# Convert to DataFrame
result = pd.DataFrame(cursor.fetchall(), columns=['Category', 'Revenue', 'Units Sold'])
print("Sales Analysis by Category:")
print(result)
# Further analysis with pandas
total_revenue = result['Revenue'].sum()
print(f"\nTotal Revenue: ${total_revenue:.2f}")
# Calculate percentage
result['Revenue_Percent'] = (result['Revenue'] / total_revenue) * 100
print("\nRevenue Breakdown:")
for _, row in result.iterrows():
print(f"{row['Category']}: ${row['Revenue']:.2f} ({row['Revenue_Percent']:.1f}%)")
except Exception as e:
print(f"Error: {e}")
finally:
# Close the connection
conn.close()
Integration of PySpark with PyHive
You can integrate PySpark with PyHive to leverage the strengths of both:
from pyspark.sql import SparkSession
from pyhive import hive
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Spark-Hive Integration") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()
# Read data from Hive into a Spark DataFrame
df = spark.sql("SELECT * FROM hive_database.sales_table")
# Process the data with Spark
result_df = df.filter(df.amount > 1000) \
.groupBy("product_category") \
.agg({"amount": "sum", "quantity": "avg"})
# Show the results
result_df.show()
# Write the results back to Hive
result_df.write.mode("overwrite").saveAsTable("hive_database.sales_summary")
# Alternative approach: Use PyHive to query and load the data
conn = hive.Connection(host='hive-server', port=10000, username='user')
cursor = conn.cursor()
cursor.execute("SELECT * FROM sales_table WHERE date > '2023-01-01'")
data = cursor.fetchall()
# Create a Spark DataFrame from the PyHive result
columns = [desc[0] for desc in cursor.description]
spark_df = spark.createDataFrame(data, columns)
# Process with Spark
spark_df.show()
# Clean up
conn.close()
spark.stop()
This documentation provides a comprehensive overview of PySpark and PyHive, including prerequisites, installation guides, key concepts, and practical examples with code snippets. Both tools are powerful for big data processing and analytics, with PySpark focusing on distributed data processing and PyHive enabling Python-based access to Hive databases.