Ingesting Data: SQL Methods & CDC
CTAS, CREATE OR REPLACE TABLE, COPY INTO, and change data capture feeds β the SQL-based ingestion methods the exam tests explicitly.
SQL-based ingestion methods
SQL gives you three ways to move data into a table β each with different superpowers.
- CTAS (CREATE TABLE AS SELECT) β creates a brand-new table from a query. Like photocopying a document into a new folder.
- CREATE OR REPLACE TABLE β creates the table or completely overwrites it if it exists. Like printing a fresh copy every time.
- COPY INTO β appends data from files into an existing table, skipping files already loaded. Like a mailroom that stamps each letter βprocessed.β
CTAS (CREATE TABLE AS SELECT)
-- Create a new table from a query
CREATE TABLE silver.clean_orders AS
SELECT order_id, customer_id, amount, order_date
FROM bronze.raw_orders
WHERE amount > 0 AND order_date IS NOT NULL;
Key facts:
- Creates a new Delta table
- Fails if the table already exists (use
IF NOT EXISTSorCREATE OR REPLACE) - Schema is inferred from the SELECT query
- Data is copied β the new table is independent of the source
CREATE OR REPLACE TABLE
-- Full refresh: replace the table contents entirely
CREATE OR REPLACE TABLE gold.daily_summary AS
SELECT
order_date,
COUNT(*) as order_count,
SUM(amount) as total_revenue
FROM silver.clean_orders
GROUP BY order_date;
Key facts:
- Idempotent β running it twice produces the same result
- Replaces both schema AND data
- Preserves table history (you can time travel to previous versions)
- Perfect for full-refresh aggregation tables
COPY INTO
-- Incrementally load new CSV files into an existing table
COPY INTO bronze.raw_sales
FROM 'abfss://landing@storage.dfs.core.windows.net/sales/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Key facts:
- Only processes files not already loaded (tracks processed files internally)
- Idempotent β safe to run multiple times without duplicates
- Appends to an existing table (doesnβt overwrite)
- Supports CSV, JSON, Parquet, Avro, ORC, and text files
| Feature | CTAS | CREATE OR REPLACE | COPY INTO |
|---|---|---|---|
| Creates table? | Yes (new) | Yes (replaces) | No (existing table) |
| Operation | One-time creation | Full refresh | Incremental append |
| Idempotent? | No (fails if exists) | Yes | Yes |
| Tracks loaded files? | No | No | Yes |
| Best for | Initial table creation | Full-refresh aggregates | Incremental file loading |
COPY INTO vs Auto Loader
Both handle incremental file loading, but differently:
| Feature | COPY INTO | Auto Loader |
|---|---|---|
| File tracking | Internal state per table | Checkpoint directory |
| Streaming | No (batch only) | Yes (continuous or triggered) |
| File discovery | Lists directory each run | Event-based notifications |
| Scale | Good for thousands of files | Better for millions of files |
| Schema evolution | mergeSchema option | Built-in schema inference |
Exam pattern: Small-to-medium file volumes β COPY INTO. Large-scale, continuous file ingestion β Auto Loader.
Change Data Capture (CDC)
CDC captures row-level changes (inserts, updates, deletes) from a source system:
Enabling the Delta change data feed
-- Enable change data feed on a table
ALTER TABLE silver.customers
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Read changes since version 5
SELECT * FROM table_changes('silver.customers', 5);
-- Read changes between timestamps
SELECT * FROM table_changes('silver.customers', '2026-04-01', '2026-04-21');
The change data feed adds metadata columns to each change row:
| Column | Values | Meaning |
|---|---|---|
_change_type | insert, update_preimage, update_postimage, delete | What happened |
_commit_version | Version number | Which transaction |
_commit_timestamp | Timestamp | When it happened |
Processing CDC feeds
TomΓ‘s processes NovaPayβs transaction CDC feed to keep the fraud detection table in sync:
# Read CDC changes as a stream
changes = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("bronze.transactions"))
# Apply changes to the target table
def apply_changes(batch_df, batch_id):
batch_df.createOrReplaceTempView("changes")
spark.sql("""
MERGE INTO silver.fraud_transactions AS target
USING changes AS source
ON target.txn_id = source.txn_id
WHEN MATCHED AND source._change_type = 'update_postimage'
THEN UPDATE SET *
WHEN NOT MATCHED AND source._change_type = 'insert'
THEN INSERT *
""")
changes.writeStream.foreachBatch(apply_changes).start()
π¬ Video coming soon
Knowledge check
Ravi rebuilds DataPulse's daily revenue summary every morning from scratch. If he runs the job twice by accident, the result should be identical. Which SQL method should he use?
Freshmart suppliers upload new CSV files to ADLS daily. Mei Lin needs to load these files into a bronze table. Files should only be loaded once, even if the job runs multiple times. Which method should she use?
Next up: Streaming Ingestion: Structured Streaming & Event Hubs β real-time data processing with Spark and Azure Event Hubs.