Denormalizing via embedding when migrating relational data from SQL to Cosmos DB

Example Scenario

Order and OrderDetails SQL Tables
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Query results including embedded JSON
ADF Copy Activity Source
OrderDetails JSON is written as string property instead of a nested JSON array
  • Use Azure Data Factory with two Copy Activities: (1) get JSON-formatted data from SQL to a text file in an intermediary blob storage location, and (2) load from the JSON text file to the Cosmos DB collection.
  • Use Azure Databricks Spark to read from SQL and write to Cosmos DB after applying proper schema with from_json().
  • There is also a great blog post from Azure Cosmos DB showing how to use Azure Databricks with Python to perform the embedding transformation - see “Migrating Relational Data with one-to-few relationships into Azure Cosmos DB SQL API”.

ADF with Two Copy Activities

SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
ADF Copy Activity Preview Showing Results of SQL JSON Query
Configuration of the Delimited Text Container JSON Lines
Configuration of the JSON file source
ADF Pipeline With Two Copy Activities
Text File with one JSON-object per row in intermediary blob storage
CosmosDB Order Document with Proper Nested JSON Array of Order Details

Azure Databricks Spark with from_json()

Databricks Cluster Libraries Include SQL DB and Cosmos DB Connectors
// Language: Scala// Connect to Azure SQL https://docs.databricks.com/data/data-sources/sql-databases-azure.html
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Azure Databricks SQL Query Results
// Language: Scala// Connect to Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configCosmos = Config(configMap)
// Language: Scala// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)// Save nested data to Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Azure Databricks Schematized DataFrame Saved to Cosmos DB
Cosmos DB Collection with Embedded Order Details

Next Steps

--

--

--

Principal Engineer / Architect, FastTrack for Azure at Microsoft

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

API Basics Explained!!

{UPDATE} Unicorn Cooking Christmas Food Hack Free Resources Generator

Monthly Digest — Aug 2021

Tekton on Mac

Code Refactoring VS Optimization

I am a tech enthusiast.

How I programmed AI which writes pop songs about AI

How I Became a Botrepreneur

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Arsen Vladimirskiy

Arsen Vladimirskiy

Principal Engineer / Architect, FastTrack for Azure at Microsoft

More from Medium

Delta Sharing on Azure

Data Factory Data Flow Vs Azure Data Bricks

Databricks Best Practice

Azure Data Factory — On-premises Oracle table to Azure Data Lake file example