Data Movement: ADF, Kafka, and Spark Connectors
Choose and implement the right data movement strategy for Cosmos DB β Azure Data Factory copy activities, Kafka connectors, Spark connectors, bulk executor, and live migration patterns.
Moving data in and out of Cosmos DB
Think of Cosmos DB as a warehouse. Sometimes you need to bring stock in (migrate from an old database), ship stock out (feed analytics), or continuously transfer between warehouses (streaming). Different tools are like different trucks β you pick the one that fits the load.
Azure Data Factory is the general-purpose truck. Kafka is the conveyor belt for real-time streams. Spark is the heavy-lifter for big-data processing.
Amaraβs data integration challenge
π‘ Amara at SensorFlow has three data movement needs:
- Migrate 2TB of historical sensor data from MongoDB to Cosmos DB
- Stream real-time sensor events from Kafka into Cosmos DB
- Export daily aggregates to a data lake for TomΓ‘sβs ML models
Each need calls for a different tool.
Choosing the right tool
| Tool | Pattern | Throughput | Coding Required | Best For |
|---|---|---|---|---|
| Azure Data Factory | Batch copy, scheduled pipelines | High (parallel copy) | No-code/low-code | ETL/ELT, scheduled migrations, cross-service copy |
| Kafka Connector | Real-time streaming (source + sink) | Very high (continuous) | Configuration + some code | Event streaming, CDC, real-time integration |
| Spark Connector | Batch + streaming read/write | Very high (distributed) | Spark code (Scala/Python) | Big-data processing, ML pipelines, complex transforms |
| Bulk Executor | High-speed batch import | Maximum (SDK-level) | C#/Java code | Initial data load, one-time migration |
| Data Migration Tool | One-time import | Moderate | No-code (GUI/CLI) | Simple one-time migrations from JSON, CSV, MongoDB, SQL |
Azure Data Factory copy activity
ADF is the go-to for scheduled, no-code data movement:
{
"type": "Copy",
"source": {
"type": "MongoDbV2Source",
"query": "{ 'timestamp': { '$gte': '2024-01-01' } }"
},
"sink": {
"type": "CosmosDbSqlApiSink",
"writeBehavior": "upsert",
"writeBatchSize": 500
},
"settings": {
"parallelCopies": 8,
"dataIntegrationUnits": 32
}
}
Key ADF settings for Cosmos DB:
- writeBehavior:
insertorupsert(upsert is safer for retries) - writeBatchSize: Number of documents per batch (default 10, max 200 for insert)
- parallelCopies: Number of concurrent write threads
- Throughput impact: ADF writes consume container RU/s β monitor 429 errors
Exam tip: ADF and RU consumption
ADF copy activities consume your containerβs provisioned RU/s. If you run a large migration during peak hours, you can starve your application. Best practices:
- Run migrations during off-peak hours
- Temporarily increase RU/s (autoscale helps)
- Use ADFβs write batch size and parallel copies settings to throttle
- Monitor the
429 TooManyRequestsmetric during migration
Kafka connector
The Cosmos DB Kafka connector supports both source (read from change feed) and sink (write to Cosmos DB):
# Kafka sink connector configuration
name=cosmosdb-sink
connector.class=com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
topics=sensor-readings
connect.cosmos.connection.endpoint=https://sensorflow-cosmos.documents.azure.com:443/
connect.cosmos.master.key=<key>
connect.cosmos.databasename=sensorflow
connect.cosmos.containers.topicmap=sensor-readings#readings
Source connector uses the change feed to stream Cosmos DB changes into Kafka topics β useful for building event-driven architectures where downstream systems subscribe to data changes.
Spark connector
For big-data scenarios, the Spark connector reads and writes directly:
# Read from Cosmos DB in PySpark
df = spark.read.format("cosmos.oltp") \
.option("spark.cosmos.accountEndpoint", endpoint) \
.option("spark.cosmos.accountKey", key) \
.option("spark.cosmos.database", "sensorflow") \
.option("spark.cosmos.container", "readings") \
.load()
# Transform and write back
daily_agg = df.groupBy("deviceId", "date").agg(avg("temperature"))
daily_agg.write.format("cosmos.oltp") \
.option("spark.cosmos.accountEndpoint", endpoint) \
.option("spark.cosmos.accountKey", key) \
.option("spark.cosmos.database", "sensorflow") \
.option("spark.cosmos.container", "daily-aggregates") \
.option("spark.cosmos.write.strategy", "ItemOverwrite") \
.mode("append") \
.save()
Live migration pattern (zero downtime)
For migrating from an existing database to Cosmos DB without downtime:
Phase 1: Bulk copy (historical data)
Source DB ββ[ADF/Spark bulk copy]βββ Cosmos DB
Phase 2: Change data capture (ongoing changes)
Source DB ββ[CDC stream]βββ Cosmos DB
(Runs in parallel with Phase 1, catches changes during copy)
Phase 3: Cutover
- Verify data consistency between source and target
- Switch application connection string to Cosmos DB
- Keep CDC running briefly to catch final stragglers
- Decommission source database
Exam tip: migration validation
The exam may ask about validating a migration. Key checks:
- Document count comparison between source and target
- Spot-check specific documents for data integrity
- Application testing against the new Cosmos DB instance before cutover
- Throughput monitoring β ensure provisioned RU/s can handle production traffic
π¬ Video walkthrough
π¬ Video coming soon
Data Movement β DP-420 Module 17
Data Movement β DP-420 Module 17
~14 minFlashcards
Knowledge Check
Amara needs to migrate 2TB of historical data from MongoDB to Cosmos DB with minimal coding. Which tool is best?
SensorFlow runs an ADF migration job that triggers frequent 429 errors. What should Amara do?
Amara needs real-time, continuous data flow from Cosmos DB to downstream consumers. Which approach is most appropriate?
Next up: Indexing Policies β how to tune Range, Spatial, and Composite indexes to optimise query performance and reduce write costs.