PySpark Transformations: Code Your Pipeline
Write PySpark to transform data at scale β joins, aggregations, window functions, denormalization, and group-by patterns with line-by-line explanations.
PySpark in Fabric
Think of PySpark as Python with superpowers.
Regular Python processes data on one computer. PySpark distributes the work across many computers in parallel. A transformation that takes 2 hours in Python might take 3 minutes in PySpark β because 40 machines share the load.
In Fabric, you write PySpark in notebooks. The notebook connects to a Spark cluster automatically. You read from lakehouses, transform the data, and write back to lakehouses β all using DataFrame operations that look a lot like pandas.
Reading and writing data
Read from a lakehouse
# Read a Delta table
df_orders = spark.read.format("delta").load("Tables/FactOrders")
# Or use the table name directly
df_orders = spark.table("lakehouse.FactOrders")
# Read raw files
df_csv = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("Files/raw/orders/*.csv")
Write to a lakehouse
# Write as Delta table (overwrite)
df_result.write.format("delta") \
.mode("overwrite") \
.save("Tables/FactOrdersCleaned")
# Write as Delta table (append)
df_new_rows.write.format("delta") \
.mode("append") \
.save("Tables/FactOrders")
# Save as managed table
df_result.write.saveAsTable("lakehouse.FactOrdersCleaned")
Common transformations
Filtering rows
# Keep only completed orders from 2026
df_filtered = df_orders.filter(
(col("status") == "completed") & # What's happening: keep only completed
(year(col("order_date")) == 2026) # AND only from 2026
)
Selecting and renaming columns
df_clean = df_orders.select(
col("order_id"), # Keep as-is
col("customer_name").alias("customer"), # Rename
col("total_amount").cast("decimal(10,2)"), # Cast type
upper(col("country")).alias("country_code") # Transform + rename
)
Joins (denormalization)
Denormalization in PySpark is a join β combine normalised source tables into wider analytics tables.
# Join orders with customers and products
df_denormalized = df_orders \
.join(df_customers, df_orders.customer_id == df_customers.customer_id, "left") \
.join(df_products, df_orders.product_id == df_products.product_id, "left") \
.select(
df_orders.order_id,
df_orders.order_date,
df_customers.customer_name, # From customers table
df_customers.city, # Flattened into one row
df_products.product_name, # From products table
df_products.category, # Flattened into one row
df_orders.quantity,
df_orders.revenue
)
Whatβs happening: Three normalised tables (orders, customers, products) become one wide fact table with customer and product attributes included. This is denormalization.
Scenario: Carlos denormalizes production data
Carlos needs to create a FactProduction table for Precision Manufacturingβs Power BI reports. The source data is normalised across 5 SAP tables: ProductionBatches, Machines, Products, Factories, Shifts.
df_fact = df_batches \
.join(df_machines, "machine_id", "left") \
.join(df_products, "product_id", "left") \
.join(df_factories, "factory_id", "left") \
.join(df_shifts, "shift_id", "left") \
.select(
"batch_id", "production_date",
"factory_name", "factory_region",
"machine_type", "product_name", "product_category",
"shift_name", "units_produced", "units_defective"
)One wide table. No joins needed at query time. Power BI reports load instantly.
Grouping and aggregation
from pyspark.sql.functions import sum, avg, count, max, year, month, year, month
# Revenue by product category per month
df_summary = df_orders \
.groupBy(
year("order_date").alias("year"), # Group by year
month("order_date").alias("month"), # and month
"product_category" # and category
) \
.agg(
sum("revenue").alias("total_revenue"), # Sum revenue
count("order_id").alias("order_count"), # Count orders
avg("revenue").alias("avg_order_value") # Average order value
) \
.orderBy("year", "month", "product_category") # Sort results
Window functions
Window functions calculate values across a set of rows related to the current row β without collapsing them (unlike groupBy).
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, sum as spark_sum
# Running total of revenue per customer, ordered by date
window_spec = Window \
.partitionBy("customer_id") \
.orderBy("order_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total = df_orders.withColumn(
"running_revenue",
spark_sum("revenue").over(window_spec) # Running sum within each customer
)
# Previous order date (for calculating days between orders)
df_with_prev = df_orders.withColumn(
"prev_order_date",
lag("order_date", 1).over(
Window.partitionBy("customer_id").orderBy("order_date")
)
)
What's happening: Window functions explained
Window functions let you compute values across rows without losing detail:
partitionBy("customer_id")β calculate separately for each customer (like groupBy, but keeps all rows)orderBy("order_date")β within each partition, rows are ordered by daterowsBetween(unboundedPreceding, currentRow)β the window frame: from the first row in the partition up to the current rowsum("revenue").over(window_spec)β running total: adds up revenue from the first order to the current one
Result: every row keeps its individual data AND gets a running total column. GroupBy would collapse rows into summaries.
Handling nulls
# Drop rows where critical columns are null
df_clean = df_orders.dropna(subset=["customer_id", "order_date"])
# Fill nulls with defaults
df_filled = df_orders.fillna({
"discount": 0.0, # Missing discount = no discount
"shipping_method": "standard" # Default shipping
})
# Replace specific values
df_fixed = df_orders.withColumn(
"status",
when(col("status").isNull(), "unknown") # Null β "unknown"
.otherwise(col("status")) # Keep existing
)
Writing patterns comparison
| Write Mode | Behaviour | Use Case |
|---|---|---|
| overwrite | Replace entire table/partition | Full refresh of small tables, rebuilding fact tables |
| append | Add new rows to existing table | Adding new daily data, streaming micro-batches |
| MERGE INTO | Update existing + insert new (upsert) | Incremental loads, SCD Type 1, deduplication |
| overwriteDynamicPartition | Replace only the partitions present in the write data | Refreshing specific date partitions without touching others |
Carlos needs to calculate a 7-day rolling average of defect rates per factory, keeping every individual row in the output. Which PySpark approach should he use?
A PySpark notebook reads data from three normalised source tables (Orders, Customers, Products), joins them, and writes a single wide table to the lakehouse. What data engineering technique is this?
π¬ Video coming soon
Next up: Transform Data with SQL & KQL β use T-SQL and Kusto Query Language for transformations in warehouses and Eventhouses.