Using Snowflake on Azure for Querying Azure Event Hubs Capture Avro Files

Video Walkthrough

Tip: Play the video full screen.

Table of Contents

00:00 Beginning of video
01:58 Create resource group, storage account for files, and storage queue
03:15 View storage account in Microsoft Storage Explorer
04:15 Create Azure Event Hubs Namespace and Event Hub
05:50 Create Event Grid subscription blob2queue using Azure CLI
06:45 Create Event Grid subscription blob2eventhub Azure Portal
08:40 Uploading files to Azure Blob Storage using Azure CLI upload-batch
10:40 Viewing messages in the queue
12:10 Using Snowflake on Azure worksheet
13:28 Create Snowflake stage pointing to the Azure Blob Storage container
17:20 Querying number of records in Avro files
20:40 Decoding message body
23:00 Inserting decoded data into Snowflake table
24:50 Querying captured events using Snowflake JSON parsing capability
27:45 Comparing events captured in queue with ones captured in Event Hub
29:30 Grouping query to view events by minute
30:25 Deleting files and querying deletion events
34:20 Comparing total number of events in queue vs Event Hub after deleting all files

Create Storage Account For Files and Queue

# Create resource group
az group create -n avehc1 -l eastus2
# Create storage account for upload files and for queue
az storage account create -g avehc1 -n avmyfiles1 --sku Standard_LRS -l eastus2 --kind StorageV2
# Create container
az storage container create -n myfiles --account-name avmyfiles1

# Create queue for events
az storage queue create -n queue1 --account-name avmyfiles1

Create Azure Event Hub

# Create event hub namespace
az eventhubs namespace create -g avehc1 -n avehc1ns -l eastus2 --sku Standard
# Create event hub
az eventhubs eventhub create -g avehc1 -n avehc1 --namespace-name avehc1ns
# Create storage account and container for event hub capture files
az storage account create -g avehc1 -n avmycapture1 --sku Standard_LRS -l eastus2 --kind StorageV2 az storage container create -n mycapture --account-name avmycapture1

Create Event Grid Subscription

az eventgrid event-subscription create --resource-id /subscriptions/SUBSCRIPTION_ID/resourceGroups/avehc1/providers/Microsoft.Storage/storageAccounts/avmyfiles1 --name blob2queue --endpoint-type storagequeue --endpoint /subscriptions/SUSBSCRIPTION_ID/resourceGroups/avehc1/providers/Microsoft.Storage/storageAccounts/avmyfiles1/queueservices/default/queues/queue1

Upload Batch of Files to Generate Events

az storage blob upload-batch --account-name avmyfiles1 --destination myfiles --source /mnt/c/Python36 --pattern "*.*"

Snowflake Queries

use database TEST_DB;create or replace file format av_avro_format
type = 'AVRO'
compression = 'NONE';

show file formats;
-- Create Snowflake stage pointing to the container with the captured Avro files
create or replace stage aveventgrid_capture
url='azure://avmycapture1.blob.core.windows.net/mycapture'
credentials=(azure_sas_token='?st=xxxxxxxxxxxxxxxxxxxxxxx') file_format = av_avro_format;
-- List all Avro files
list @aveventgrid_capture;
-- Count records in all Avro files
select count(*) from @aveventgrid_capture;
-- Look at raw data in one Avro file
select * from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;
-- Decode the body
select HEX_DECODE_STRING($1:Body) from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;
-- Parse other fields of the Avro file
select HEX_DECODE_STRING($1:Body), TO_TIMESTAMP(REPLACE($1:EnqueuedTimeUtc,'""',''),'MM/DD/YYYY HH:MI:SS AM'), TO_NUMBER($1:Offset), $1:Properties, TO_NUMBER($1:SequenceNumber), $1:SystemProperties from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;
-- Create table to store parsed Avro capture files
create or replace table aveventgrid_capture
( jsontext variant,
eh_enqueued_time_utc timestamp_ntz,
eh_offset int,
eh_properties variant,
eh_sequence_number int,
eh_system_properties variant
);
-- Review the table which is initially empty
select * from aveventgrid_capture;
-- Load data from Avro files into the created Snowflake table
copy into aveventgrid_capture (jsontext, eh_enqueued_time_utc, eh_offset, eh_properties, eh_sequence_number, eh_system_properties)
from ( select HEX_DECODE_STRING($1:Body), TO_TIMESTAMP(REPLACE($1:EnqueuedTimeUtc,'""',''),'MM/DD/YYYY HH:MI:SS AM'), TO_NUMBER($1:Offset), $1:Properties, TO_NUMBER($1:SequenceNumber), $1:SystemProperties from @aveventgrid_capture
);
-- Review how lateral flatten works to break up a JSON array into individual records
select value from aveventgrid_capture, lateral flatten ( input => jsontext );
-- Query event grid blob storage events by parsing JSON using Snowflake’s built-in functions
select
value:eventType::string as eventType,
value:eventTime::timestamp as eventTime,
value:subject::string as subject,
value:id::string as id
from aveventgrid_capture, lateral flatten ( input => jsontext )
where not value:eventType::string is null;
-- Look for specific event id
select
value:eventType::string as eventType,
value:eventTime::timestamp as eventTime,
value:subject::string as subject,
value:id::string as id
from aveventgrid_capture, lateral flatten ( input => jsontext )
where value:id::string = '50eac4d4-e01e-00b5-5584-9da1d9063d9d';
-- Group events by minute and event type
select
date_trunc('MINUTE',value:eventTime::timestamp) as eventTimeWindow,
value:eventType::string as eventType,
count(*) as eventCount
from aveventgrid_capture, lateral flatten ( input => jsontext )
where not value:eventType::string is null
group by eventTimeWindow, eventType
order by eventTimeWindow desc;

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Arsen Vladimirskiy

Arsen Vladimirskiy

149 Followers

Principal Engineer / Architect, FastTrack for Azure at Microsoft