Often our Spark jobs need to access tables or data in various formats from different sources. Based on the source type or use case we choose different approaches. Few approaches I have mentioned below with sample codes. The detail end to end configurations have not been shown.

Spark Table with Selective Source Files from Azure Data Lake Storage

The metadata are stored at Spark end (Hive metastore) however, the actual data have been kept at the ADLS.

Problem to be solved:

  1. We have existing records in object store or data lake and we want to create a Hive table based on the data.
Image for post
Image for post
Files inside date-wise folders at ADLS

Configuration / Sample Code:

  1. The following code is written in the Azure Databricks with the actual data held in the Azure Data Lake Store (ADLS).

%sql
CREATE TABLE MyFactTable1
(
Date INT,
DimId1 STRING,
DimId2 STRING,
CurrencyCode STRING,
Country STRING
)
USING CSV
OPTIONS (header ‘false’, inferSchema ‘true’, delimiter ‘,’)
LOCATION “/mnt/My_Facts/*/*/20*.csv”

Cautions:

  1. In case the underlying files/records are deleted, Spark will throw exception. spark.catalog.refreshTable(“MyFactTable1”) needs to be executed to refresh the Spark catalog. The same ‘refreshTable’ method needs to be executed in case to load any newly added file in the source location.

Spark Table with Partition based on Source Folder Structure at Azure Data Lake Storage

The metadata are stored at the Spark end (Hive metastore) however, the actual data have been kept at ADLS. Partition information are defined based on source data folder structure.

Problem to be solved:

  1. We have existing records in object store or data lake and we want to create a Hive table based on the data.

Source Folder Structure (example):

Image for post
Image for post
Files inside date-wise folders at ADLS

Configuration / Sample Code:

%sql
CREATE EXTERNAL TABLE MyFactTable2
(
Date INT,
DimId1 STRING,
DimId2 STRING,
CurrencyCode STRING,
Country STRING
)
PARTITIONED BY (Year STRING, Month STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
TBLPROPERTIES (‘header’ = ‘false’, ‘inferSchema’ = ‘false’)
LOCATION “/mnt/My_Facts/”;

ALTER TABLE MyFactTable2 ADD PARTITION (Year = ‘2017’, Month = ‘11’) LOCATION “/mnt/My_Facts/2017/11”;
ALTER TABLE MyFactTable2 ADD PARTITION (Year = ‘2017’, Month = ‘12’) LOCATION “/mnt/My_Facts/2017/12”;

Now we can write queries like:

SELECT * FROM MyFactTable2 WHERE Year = 2017
SELECT * FROM MyFactTable2 WHERE Month = 12 AND Year = 2017

Image for post
Image for post

Data from any new files loaded under the folders (in our example, inside Month = 11 and 12) can be queried.

Cautions:

  1. If the data folders (in this example under MM) contain files which we don’t want to load (selective data load), this syntax won’t support; no regular expression is supported here (check the first example which supports the regular expression).

For further details refer: https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

If the data source is in a different cloud region than the region hosting the running Spark job, we could incur extra charges for out-of-region data transfer. Apart from that, network latency will be more than the intra-region communication (e.g. Azure Data Lake Storage & Azure Databricks both are in East US2).

Spark Table with Azure SQL Database

Problem to be solved:

  1. We don’t want to write Azure SQL Database connectivity code in each Spark jobs / Databricks notebooks and instead can create a Hive table and refer the table in our code/Databricks Notebooks.

Configuration / Sample Code:

%sql
CREATE TABLE flights2
USING org.apache.spark.sql.jdbc
OPTIONS (
url ‘jdbc:sqlserver://<sql_database_server_name>:1433;database=<database_name>;user=<user>;password=<password>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;’,
database ‘<database_name>’,
dbtable ‘<schema_name>.<table_name>’
)

Cautions:

  1. Check the Azure SQL Database pricing tier/subscription level (unit: Database Transaction Units/DTUs) if that would be fine to support the Spark job, increasing DTUs will increase cost.

Spark Table with Azure SQL Data Warehouse

Problem to be solved:

  1. We don’t want to write Azure SQL DW connectivity code in each Spark jobs / Databricks notebooks — boilerplate code.

Configuration / Sample Code:

%sql
CREATE TABLE flights
USING com.databricks.spark.sqldw
OPTIONS (
url ‘jdbc:sqlserver://<server-name>:1433;database=<database_name>;user=<user>;password=<password>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;’, database ‘<database_name>’,
dbtable ‘<schema>.<table_name>’,
forward_spark_azure_storage_credentials ‘true’,
tempdir ‘wasbs://<container>@<storage_account>.blob.core.windows.net/<container>’
)

We also need to set the following in Spark Configuration once.

%scala
spark.conf.set(“fs.azure.account.key.<storage_account>.blob.core.windows.net”, “<storage_account_access_key>”)

Cautions:

  1. Check the Azure SQL Data Warehouse units (DWU) if that would be fine to support our Spark job, scaling out will increase cost.

Spark Table with Azure Cosmos DB

Problem to be solved:

  1. Visualize Azure Cosmos DB records using Power BI (for details, refer: https://github.com/Azure/azure-cosmosdb-spark/wiki/Configuring-Power-BI-Direct-Query-to-Azure-Cosmos-DB-via-Apache-Spark-(HDI))

Configuration / Sample Code:

%sql
CREATE TABLE flights
USING com.microsoft.azure.cosmosdb.spark
options (
endpoint ‘https://<endpoint_url>:443/’,
database ‘<database_name>’,
collection ‘<collection_name>’,
masterkey ‘<masterkey>’
)

Cautions:

  1. While using Azure Cosmos DB we may face the RequestRateTooLargeException at Spark end. We can check the Cosmos DB Throughput (increasing this will increase cost). Otherwise, we can use ‘query_pagesize’ option (refer: https://docs.microsoft.com/en-us/azure/cosmos-db/spark-connector)

Spark Table with Azure Blob Storage

Problem to be solved:

  1. We want to create a Spark table backed by Azure Blobs.

Configuration / Sample Code:

  1. Configure the container having blob with ‘Private’ access (i.e. no anonymous access)
Image for post
Image for post
Container access level

2. Set the Storage Account access key into the Spark Configuration:

%scala
spark.conf.set(“fs.azure.account.key.<storage_account>.blob.core.windows.net”, “<storage_account_access_key>”)

3. Create the table in Spark:

%sql
CREATE TABLE MyManagedFactTableBlob
(
Date INT,
DimId1 STRING,
DimId2 STRING,
CurrencyCode STRING,
Country STRING
)
USING CSV
OPTIONS (header ‘false’, inferSchema ‘false’, delimiter ‘,’)
LOCATION ‘wasbs://<container>@<storage_account>.blob.core.windows.net/’

Cautions:

  1. spark.catalog.refreshTable(“MyManagedFactTableBlob “) needs to be executed to refresh the Spark catalog in case new blobs are added into the container.

Spark Table with Databricks

DBFS is a distributed file system installed on Databricks Runtime clusters. For Azure, files in DBFS persist to Azure Blob storage, so we will not lose data even after we terminate a cluster. Here, data is managed by Spark only.

Problem to be solved:

  1. We want to create the table/load the records into Databricks/Spark end. Azure Databricks stores the records in Parquet format by default, with Snappy compression algorithm.

Configuration / Sample Code:

%scala
val df = spark.read
.option(“sep”, “\t”)
.option(“header”, “false”)
.option(“inferSchema”, “true”)
.csv(“/mnt/My_Facts/*/*/20*.tsv”)
df.write.mode(SaveMode.Overwrite).partitionBy(“DateKey”).saveAsTable(“<dbName>.<tableName>”)

Cautions:

  1. Spark table doesn’t support UPDATE/DELETE/MERGE operations. So, in some cases we may need to perform multiple operations rather than a single command available in Databricks Delta (e.g.: https://docs.azuredatabricks.net/spark/latest/spark-sql/language-manual/merge-into.html)

Spark with Databricks Delta Table

At the time of writing this, Databricks Delta is in preview mode (refer: https://azure.microsoft.com/en-in/updates/azure-databricks-delta-now-in-preview/) .

Problem to be solved:

  1. We want to create the table/load the records into Databricks/Spark end and we want to use CRUD operations on the table. Great while updating/correcting a Delta table with very large record set. With normal Spark table, this is not possible.

Configuration / Sample Code:

%scala
val df = spark.read
.option(“sep”, “\t”)
.option(“header”, “false”)
.option(“inferSchema”, “true”)
.csv(“/mnt/My_Facts/*/*/20*.tsv”)
df.write.format(“delta”).mode(SaveMode.Overwrite).partitionBy(“DateKey”).saveAsTable(“<dbName>.<tableName>”)

Cautions:

  1. Available in Azure Databricks Premium tier only, costlier than the Standard tier.

The above codes were tested with Azure Databricks Runtime Version 4.2 (includes Apache Spark 2.3.1, Scala 2.11)

Written by

Tech enthusiast, Azure Big Data Architect.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store