Optimize Spark: Speed Up Your Code
Tune Spark performance with partitioning, caching, broadcast joins, predicate pushdown, and pool configuration for faster notebook execution.
Making Spark faster
Think of moving house with a team of helpers.
If one person carries all the heavy boxes while others carry nothing, the move takes forever (data skew). If helpers keep going back to the old house to look at the same boxes (no caching), they waste time. If you ship a grand piano across town when you could carry it next door (shuffle), youβre doing unnecessary work.
Spark optimization is about distributing work evenly, caching things you reuse, and minimizing data movement across the cluster.
Partitioning
Data partitioning (storage-level)
Partition your Delta tables by columns used in WHERE filters:
df.write.format("delta") \
.partitionBy("year", "month") \
.mode("overwrite") \
.save("Tables/FactOrders")
Effect: Spark reads only the partitions needed. WHERE year = 2026 AND month = 4 reads one folder instead of scanning the entire table.
When to partition: High-cardinality time columns (year/month), or columns always present in WHERE filters. Donβt over-partition: 10,000 partitions with 1 MB each is worse than 100 partitions with 100 MB each (small file problem).
Shuffle partitioning (runtime-level)
# Default: 200 shuffle partitions (often too many for small datasets)
spark.conf.set("spark.sql.shuffle.partitions", 50)
Rule of thumb: Set shuffle partitions to 2-3x the number of executor cores.
Caching
Cache DataFrames you read multiple times:
# Cache in memory
df_customers = spark.table("lakehouse.DimCustomer").cache()
# Use it multiple times
joined_orders = df_orders.join(df_customers, "customer_id")
joined_returns = df_returns.join(df_customers, "customer_id")
# Uncache when done
df_customers.unpersist()
| Storage Level | Where | When to Use |
|---|---|---|
| MEMORY_ONLY (.cache()) | Executor RAM | Small DataFrames read multiple times |
| MEMORY_AND_DISK | RAM first, spill to disk | Medium DataFrames that might not fit entirely in memory |
| DISK_ONLY | Executor disk | Large DataFrames read multiple times, where re-reading source is expensive |
Join optimization
Broadcast joins
When one table is small (under ~10 MB), Spark can broadcast it to every executor β eliminating the expensive shuffle of the large table.
from pyspark.sql.functions import broadcast
# Force broadcast of the small dimension table
df_result = df_orders.join(
broadcast(df_products), # Send products to every executor
"product_id"
)
Effect: Instead of shuffling 500M order rows, Spark sends the 5,000-row product table to every executor. Each executor joins locally. Massive speed improvement.
Sort-merge joins (default for large tables)
When both tables are large, Spark sorts both by the join key, then merges. This requires a shuffle (expensive but necessary).
Optimization: Ensure both tables are partitioned or bucketed by the join key to reduce shuffle data volume.
| Join Type | When | Performance |
|---|---|---|
| Broadcast join | One table is small (<10 MB default threshold) | Fast β no shuffle of the large table |
| Sort-merge join | Both tables are large | Moderate β requires shuffle and sort of both tables |
| Shuffle hash join | One table is moderately sized | Between broadcast and sort-merge |
Exam tip: Broadcast join threshold
The default broadcast threshold is 10 MB (spark.sql.autoBroadcastJoinThreshold). Tables under this size are automatically broadcast.
Exam pattern: βA join is slow despite one table being small.β Check if the small table is just above the threshold. Fix: increase the threshold or explicitly use broadcast().
Predicate pushdown
Spark pushes WHERE filters down to the file level β reading only relevant data.
# Good: filter early β Spark reads fewer files
df = spark.table("lakehouse.FactOrders") \
.filter(col("order_date") >= "2026-04-01") \
.filter(col("status") == "completed") \
.join(df_customers, "customer_id")
# Bad: join first, filter later β reads entire table before filtering
df = spark.table("lakehouse.FactOrders") \
.join(df_customers, "customer_id") \
.filter(col("order_date") >= "2026-04-01")
Rule: Filter as early as possible in the transformation chain.
Pool configuration
| Setting | Effect | Recommendation |
|---|---|---|
| Node count | More nodes = more parallelism | Scale up for large jobs, scale down for cost |
| Node size | More memory/cores per node | Increase for memory-heavy operations (large broadcasts, wide joins) |
| Auto-scale | Adds/removes nodes based on demand | Enable for variable workloads |
| Timeout | How long idle sessions stay alive | 10-30 min for interactive; shorter for automated runs |
Carlos's notebook joins a 500M-row FactProduction table with a 3,000-row DimProduct table. The join takes 8 minutes. What optimization would have the most impact?
A Spark notebook filters FactOrders (1B rows) after joining with DimCustomer. Moving the filter BEFORE the join reduces execution time by 70%. What optimization principle is this?
π¬ Video coming soon
Next up: Optimize Pipelines & Warehouses β tune pipeline performance and warehouse query execution.