Denormalizing via embedding when migrating relational data from SQL to Cosmos DB

Arsen Vladimirskiy
7 min readDec 10, 2019

--

When migrating from a relational database (e.g. SQL Server) to a NoSQL database, like Azure Cosmos DB, it is often necessary to make changes to the data model to optimize it for the NoSQL use-cases.

One of the common transformations is denormalizing the data by embedding all of the related sub-items (when the sub-item list is bounded/limited) within one JSON document (e.g. including all of the Order Line Items within their Order).

In this post we look at a few options for performing this type of denormalization using Azure Data Factory or Azure Databricks.

For general guidance on data modeling for Cosmos DB, please review “Data modeling in Azure Cosmos DB”.

Update February 3, 2020: Please see the new official Azure Cosmos DB documentation covering the same scenario Migrate one-to-few relational data into Azure Cosmos DB SQL API account.

Example Scenario

Let’s say we have the following two tables in our SQL database, Orders and OrderDetails, and we want to combine this one-to-few relationship into one JSON document during relational data migration to Cosmos DB via embedding.

Order and OrderDetails SQL Tables

We can create the proper denormalization in a T-SQL query using “FOR JSON” like this:

SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Query results including embedded JSON

Ideally, it would be great to use a single Azure Data Factory (ADF) Copy Activity to query SQL as the source and write the output directly to Cosmos DB sink as proper JSON objects. However, currently, it is not possible to perform the needed JSON transformation in one Copy Activity. So, if we try to copy the results of the query above into a Cosmos DB SQL-API collection, we will see the JSON of the OrderDetails sub-items as a string property of our document (instead of a proper JSON array):

ADF Copy Activity Source
OrderDetails JSON is written as string property instead of a nested JSON array

We can workaround this current limitation in one of the following ways:

  • Use Azure Data Factory with two Copy Activities: (1) get JSON-formatted data from SQL to a text file in an intermediary blob storage location, and (2) load from the JSON text file to the Cosmos DB collection.
  • Use Azure Databricks Spark to read from SQL and write to Cosmos DB after applying proper schema with from_json().
  • There is also a great blog post from Azure Cosmos DB showing how to use Azure Databricks with Python to perform the embedding transformation - see “Migrating Relational Data with one-to-few relationships into Azure Cosmos DB SQL API”.

Let’s look at the first two approaches in a bit more detail.

ADF with Two Copy Activities

Although with the current ADF Copy Activity capabilities we cannot embed OrderDetails as a JSON-array in the destination Cosmos DB document, we can workaround the issue by using two separate Copy Activities.

Copy Activity #1: SqlJsonToBlobText

For the source, we use a SQL query to get the result set as a single column with one JSON object (representing the Order) per row using SQL Server’s OPENJSON and FOR JSON PATH capabilities:

SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
ADF Copy Activity Preview Showing Results of SQL JSON Query

For the sink of the SqlJsonToBlobText copy activity, we choose “Delimited Text” and point it to a specific folder in Azure Blob Storage with a dynamically generated unique file name (e.g. ‘@concat(pipeline().RunId,’.json’).

Since our text file is not really “delimited” and we do not want it to be parsed into separate columns using commas and want to preserve the double-quotes (“), we set “Column delimiter” to a Tab (“\t”) [or another character not occurring in the data] and “Quote character” to “No quote character”.

Configuration of the Delimited Text Container JSON Lines

Copy Activity #2: BlobJsonToCosmos

Next, we modify our ADF pipeline by adding the second Copy Activity that looks in Azure Blob Storage for the text file that was created by the first activity, and processes it as “JSON” source to insert to Cosmos DB sink as one document per JSON-row found in the text file.

Configuration of the JSON file source

Optionally, we also add a “Delete” activity to the pipeline so that it deletes all of the previous files remaining in the /Orders/ folder prior to each run. Our ADF pipeline now looks something like this:

ADF Pipeline With Two Copy Activities

After we trigger the pipeline above, we see a file created in our intermediary Azure Blob Storage location containing one JSON-object per row:

Text File with one JSON-object per row in intermediary blob storage

We also see Orders documents with properly embedded OrderDetails inserted into our Cosmos DB collection:

CosmosDB Order Document with Proper Nested JSON Array of Order Details

Azure Databricks Spark with from_json()

As another approach, we can use Azure Databricks Spark to copy the data from SQL Database source to Cosmos DB destination without creating the intermediary text/JSON files in Azure Blob Storage.

NOTE: For clarity and simplicity, Scala code snippets below include dummy database passwords explicitly inline, but you should always use Azure Databricks secrets.

First, we create and attach the required SQL connector and CosmosDB connector libraries to our Azure Databricks cluster and restart the cluster to make sure libraries are loaded.

Databricks Cluster Libraries Include SQL DB and Cosmos DB Connectors

Next, we get the results of the SQL query with “FOR JSON” output into a DataFrame:

// Language: Scala// Connect to Azure SQL https://docs.databricks.com/data/data-sources/sql-databases-azure.html
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Azure Databricks SQL Query Results

Next, we connect to our Cosmos DB database and collection:

// Language: Scala// Connect to Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configCosmos = Config(configMap)

Finally, we define our schema and use from_json to apply the DataFrame prior to saving it to the CosmosDB collection.

// Language: Scala// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)// Save nested data to Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Azure Databricks Schematized DataFrame Saved to Cosmos DB

At the end, we get properly saved embedded OrderDetails within each Order document in Cosmos DB collection:

Cosmos DB Collection with Embedded Order Details

Next Steps

Thank you!

Please leave feedback and questions below or on Twitter https://twitter.com/ArsenVlad

--

--

Arsen Vladimirskiy
Arsen Vladimirskiy

Written by Arsen Vladimirskiy

Principal Engineer / Architect, FastTrack for Azure at Microsoft