Denormalizing via embedding when migrating relational data from SQL to Cosmos DB

When migrating from a relational database (e.g. SQL Server) to a NoSQL database, like Azure Cosmos DB, it is often necessary to make changes to the data model to optimize it for the NoSQL use-cases.

One of the common transformations is denormalizing the data by embedding all of the related sub-items (when the sub-item list is bounded/limited) within one JSON document (e.g. including all of the Order Line Items within their Order).

In this post we look at a few options for performing this type of denormalization using Azure Data Factory or Azure Databricks.

For general guidance on data modeling for Cosmos DB, please review “Data modeling in Azure Cosmos DB”.

Update February 3, 2020: Please see the new official Azure Cosmos DB documentation covering the same scenario Migrate one-to-few relational data into Azure Cosmos DB SQL API account.

Example Scenario

Let’s say we have the following two tables in our SQL database, Orders and OrderDetails, and we want to combine this one-to-few relationship into one JSON document during relational data migration to Cosmos DB via embedding.

Image for post
Image for post
Order and OrderDetails SQL Tables

We can create the proper denormalization in a T-SQL query using “FOR JSON” like this:

Image for post
Image for post
Query results including embedded JSON

Ideally, it would be great to use a single Azure Data Factory (ADF) Copy Activity to query SQL as the source and write the output directly to Cosmos DB sink as proper JSON objects. However, currently, it is not possible to perform the needed JSON transformation in one Copy Activity. So, if we try to copy the results of the query above into a Cosmos DB SQL-API collection, we will see the JSON of the OrderDetails sub-items as a string property of our document (instead of a proper JSON array):

Image for post
Image for post
ADF Copy Activity Source
Image for post
Image for post
OrderDetails JSON is written as string property instead of a nested JSON array

We can workaround this current limitation in one of the following ways:

  • Use Azure Data Factory with two Copy Activities: (1) get JSON-formatted data from SQL to a text file in an intermediary blob storage location, and (2) load from the JSON text file to the Cosmos DB collection.
  • Use Azure Databricks Spark to read from SQL and write to Cosmos DB after applying proper schema with from_json().
  • There is also a great blog post from Azure Cosmos DB showing how to use Azure Databricks with Python to perform the embedding transformation - see “Migrating Relational Data with one-to-few relationships into Azure Cosmos DB SQL API”.

Let’s look at the first two approaches in a bit more detail.

ADF with Two Copy Activities

Although with the current ADF Copy Activity capabilities we cannot embed OrderDetails as a JSON-array in the destination Cosmos DB document, we can workaround the issue by using two separate Copy Activities.

Copy Activity #1: SqlJsonToBlobText

For the source, we use a SQL query to get the result set as a single column with one JSON object (representing the Order) per row using SQL Server’s OPENJSON and FOR JSON PATH capabilities:

Image for post
Image for post
ADF Copy Activity Preview Showing Results of SQL JSON Query

For the sink of the SqlJsonToBlobText copy activity, we choose “Delimited Text” and point it to a specific folder in Azure Blob Storage with a dynamically generated unique file name (e.g. ‘@concat(pipeline().RunId,’.json’).

Since our text file is not really “delimited” and we do not want it to be parsed into separate columns using commas and want to preserve the double-quotes (“), we set “Column delimiter” to a Tab (“\t”) [or another character not occurring in the data] and “Quote character” to “No quote character”.

Image for post
Image for post
Configuration of the Delimited Text Container JSON Lines

Copy Activity #2: BlobJsonToCosmos

Next, we modify our ADF pipeline by adding the second Copy Activity that looks in Azure Blob Storage for the text file that was created by the first activity, and processes it as “JSON” source to insert to Cosmos DB sink as one document per JSON-row found in the text file.

Image for post
Image for post
Configuration of the JSON file source

Optionally, we also add a “Delete” activity to the pipeline so that it deletes all of the previous files remaining in the /Orders/ folder prior to each run. Our ADF pipeline now looks something like this:

Image for post
Image for post
ADF Pipeline With Two Copy Activities

After we trigger the pipeline above, we see a file created in our intermediary Azure Blob Storage location containing one JSON-object per row:

Image for post
Image for post
Text File with one JSON-object per row in intermediary blob storage

We also see Orders documents with properly embedded OrderDetails inserted into our Cosmos DB collection:

Image for post
Image for post
CosmosDB Order Document with Proper Nested JSON Array of Order Details

Azure Databricks Spark with from_json()

As another approach, we can use Azure Databricks Spark to copy the data from SQL Database source to Cosmos DB destination without creating the intermediary text/JSON files in Azure Blob Storage.

NOTE: For clarity and simplicity, Scala code snippets below include dummy database passwords explicitly inline, but you should always use Azure Databricks secrets.

First, we create and attach the required SQL connector and CosmosDB connector libraries to our Azure Databricks cluster and restart the cluster to make sure libraries are loaded.

Image for post
Image for post
Databricks Cluster Libraries Include SQL DB and Cosmos DB Connectors

Next, we get the results of the SQL query with “FOR JSON” output into a DataFrame:

// Connect to Azure SQL https://docs.databricks.com/data/data-sources/sql-databases-azure.html
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Image for post
Image for post
Azure Databricks SQL Query Results

Next, we connect to our Cosmos DB database and collection:

// Connect to Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configCosmos = Config(configMap)

Finally, we define our schema and use from_json to apply the DataFrame prior to saving it to the CosmosDB collection.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)// Save nested data to Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Image for post
Image for post
Azure Databricks Schematized DataFrame Saved to Cosmos DB

At the end, we get properly saved embedded OrderDetails within each Order document in Cosmos DB collection:

Image for post
Image for post
Cosmos DB Collection with Embedded Order Details

Next Steps

Thank you!

Please leave feedback and questions below or on Twitter https://twitter.com/ArsenVlad

Written by

Principal Engineer / Architect, FastTrack for Azure at Microsoft

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store