Implementing Data Quality with Amazon Deequ & Apache Spark

Image for post
Image for post

Data quality is an important aspect whenever we ingest data. In a big data scenario this becomes very challenging considering the high volume, velocity & variety of data. Incomplete or wrong data can lead more false predictions by a machine learning algorithm, we may lose opportunities to monetize our data because of the data issues and business can lose their confidence on the data.

Apache Spark has become a technology by default nowadays for big data ingestion & transformation. This becomes more robust with the managed service provided by Databricks and data pipeline built in Azure Data Factory v2 (we have other variations as well).

If I want to inject a data quality validation black box inside my data ingestion pipeline, probably the pipeline will look like the below diagram. Once the data are landed into my raw zone or staging zone, I may want to pass these through the black boxes before I consume or transform.

Image for post
Image for post
A typical data ingestion pipeline with data quality & other functionalities (data lake layers have not been shown).

The data flow may differ based on our use cases however, for this blog we’ll concentrate on designing the Data Quality & Bad Record Identification Black Boxes only.

Image for post
Image for post
The ‘in-scope’ black boxes we’ll discuss today.

Data Quality Black Box — Expectations

  1. To highlight the overall data quality for the data ingestion (either batch or streaming).
  2. Unified process to define, measure & report quality.
  3. Profile the input data in terms of completeness, accuracy, uniqueness, distinctness.
  4. Data type validations for non-structured, semi-structured & structured data.
  5. User defined validations, validations using pattern matching including email ids, phone numbers or different government issued identities like SSN, NINO, Aadhar etc.
  6. Should be extensible, any new validations can be added with mostly changing configuration and less/no coding change.
  7. Example, uniqueness of employee id, valid country of residence for >80% of records, etc.

Bad Record Identification Black Box — Expectations

  1. To filter out individual records failing the validations (bad data).
  2. Forward the ‘good data’ for further processing and save the ‘bad data’ into a staging, to apply manual or automatic data cleansing steps.
  3. Configurable, as we may not want to correct all of the data failing the previous data quality box.
  4. Example, find duplicate employee ids, invalid emails (formats), employee names containing junk characters etc.

We have different options to realize the above black boxes. For the first, data quality black box I could find the following interesting tools/frameworks!

  1. Apache Griffin — Open source Data Quality framework for Big Data. Built by eBay, it’s now an Apache Top Level Project. It comes with the data quality service platform with a model engine, data collection layer, data process and storage layer and a RESTful Griffin service.
  2. Amazon Deequ— An open source tool developed & used at Amazon. This is a library built on top of Apache Spark. Works on tabular data, anything which can be represented as a Spark Dataframe.

Now considering my situation, I would like the data quality box to sit in my ingestion pipeline (built in Azure Data Factory), highlight the data quality report and continue with the current flows with the good data & filter out the bad data for further processing. Though Griffin gives me a robust end to end solution however, based on my use case Deequ fits better.

Image for post
Image for post
The blackboxes realized by Amazon Deequ, Apache Spark & Scala.

Amazon Deequ Installation

The latest Deequ jar can be downloaded from maven repository. Once downloaded it can be easily installed into a Databricks cluster.

Image for post
Image for post

Sample Dataset

For this POC I’ll take sample human resources datasets from here. The datasets contain the below fields:

Image for post
Image for post
Source: http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/.

Data Quality Validation

Deequ comes up with a huge list of validators we can readily use. I have listed down the checks /constraints along with one liner description. For further details, refer here.

  1. hasSize — calculates the data frame size and runs the assertion on it.
  2. isComplete — asserts on a column completion.
  3. hasCompleteness — asserts on a column completion.
  4. isUnique — asserts on a column uniqueness.
  5. isPrimaryKey — asserts on a column(s) primary key characteristics.
  6. hasUniqueness — asserts on uniqueness in a single or combined set of key columns.
  7. hasDistinctness — distinctness in a single or combined set of key columns.
  8. hasUniqueValueRatio — the unique value ratio in a single or combined set of key columns.
  9. hasNumberOfDistinctValues — asserts on the number of distinct values a column has.
  10. hasHistogramValues — asserts on column’s value distribution.
  11. hasEntropy — asserts on a column entropy.
  12. hasMutualInformation — asserts on a mutual information between two columns.
  13. hasApproxQuantile — asserts on an approximated quantile.
  14. hasMinLength — asserts on the minimum length of the column.
  15. hasMaxLength — asserts on the maximum length of the column.
  16. hasMin — asserts on the minimum of the column.
  17. hasMax — asserts on the maximum of the column.
  18. hasMean — asserts on the mean of the column.
  19. hasSum — asserts on the sum of the column.
  20. hasStandardDeviation — asserts on the standard deviation of the column.
  21. hasApproxCountDistinct — asserts on the approximate count distinct of the given column.
  22. hasCorrelation — asserts on the Pearson correlation between two columns.
  23. satisfies — runs the given condition on the data frame.
  24. hasPattern — checks for pattern compliance.
  25. containsCreditCardNumber - verifies against a Credit Card pattern.
  26. containsEmail — verifies against an e-mail pattern.
  27. containsURL — verifies against an URL pattern.
  28. containsSocialSecurityNumber — verifies against the Social security number pattern for the US.
  29. hasDataType — verifies against the fraction of rows that conform to the given data type.
  30. isNonNegative — asserts that a column contains no negative values.
  31. isPositive — asserts that a column contains no negative values.
  32. isLessThan — asserts that, in each row, the value of columnA < the value of columnB.
  33. isLessThanOrEqualTo — asserts that, in each row, the value of columnA ≤ the value of columnB.
  34. isGreaterThan — asserts that, in each row, the value of columnA > the value of columnB.
  35. isGreaterThanOrEqualTo — asserts that, in each row, the value of columnA ≥ to the value of columnB.
  36. isContainedIn — asserts that every non-null value in a column is contained in a set of predefined values.

Deequ supports programmatic addition of the constrains and run against the input data set. If we can represent the data set in Spark DataFrame format, that’s enough! Find below a sample code:

import scala.util.matching.Regex
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.ConstrainableDataTypes
val _check = Check(CheckLevel.Error, "Data Validation Check")
.isComplete("First_Name")
.isComplete("Month_Name_of_Joining")
.isContainedIn("Month_Name_of_Joining", Array("August", "July", "January", "April", "December", "November", "February", "March", "June", "September", "May", "October"))
.isComplete("Phone_No")
.containsEmail("E_Mail")
.containsSocialSecurityNumber("SSN")
.isContainedIn("Day_of_Joining", 1, 31, includeLowerBound = true, includeUpperBound = true)
.hasPattern("Phone_No", """^[+]*[(]{0,1}[0-9]{1,4}[)]{0,1}[-\s\./0-9]*$""".r)
.hasDataType("Emp_ID", ConstrainableDataTypes.Integral)
.isUnique("Emp_ID")
val verificationResult: VerificationResult = { VerificationSuite()
.onData(<input data as Spark DataFrame>)
.addCheck(_check)
.run()
}
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
display (resultDataFrame)

As I want to run the constrains based on some metadata, I loaded the constraints in a configurable metadata table as follows:

Image for post
Image for post
The metadata table with columns & constraints to execute.

Once I load the DQRules from the metadata table and successfully parse, I can call the _check.<method name> based on the rule type. As the methods return type is com.amazon.deequ.checks.Check i.e. supports method chaining style, we can easily construct a sequence of constraints inside it!

Once that is done, we can call the following section to run the verifier and return the com.amazon.deequ.VerificationResult.

val verificationResult: VerificationResult = { VerificationSuite()
.onData(<input data as Spark DataFrame>)
.addCheck(_check)
.run()
}

The VerificationResult can be converted into a Spark DataFrame and then I can save into a table against a RunId for manual inspection.

val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)display (resultDataFrame)
Image for post
Image for post
The ‘resultDataFrame’ showing data quality verification result.

Constraints Identification

From the previous section, you may wonder why I added/followed the Deequ constraint method names in my metadata table, DQRule column!

If I had a table with hundreds of columns and I wanted to run different DQ rules against those, it would be a huge task to add individual constraints against each column. But, nothing to worry — Deequ is in rescue!

Deequ has a ConstraintSuggestionRunner which can generate the constraints for most of the columns. Though we may not want to use all of the generated constraints and we may want to add some extra but, it’s worth to start with the initial auto-suggested constraints and amend as required! If we can run the ConstraintSuggestionRunner on a ‘Good’ data set, the proposed constraints will be closer to our desire.

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method
// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
// data to suggest constraints for
.onData(<input data as Spark DataFrame>)
// default set of rules for constraint suggestion
.addConstraintRules(Rules.DEFAULT)
// run data profiling and constraint suggestion
.run()
}
// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(column, constraint.description, constraint.codeForConstraint)
}
}.toSeq.toDS()
Image for post
Image for post
We may not require all of the suggested constraints. Constraints marked in ‘red’, we would like to skip here.

Bad Records Identification

Though Deequ provides an overall data quality report it doesn’t fetch the individual bad records which failed the constraints. However, we can construct methods to create dynamic queries to identify bad records. We can get ideas from few Deequ constraint implementations e.g. isContainedIn, isNonNegative etc. otherwise, we can use different Spark features e.g. using rlike function.

Image for post
Image for post
https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/checks/Check.scala
Image for post
Image for post
https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/checks/Check.scala

Using these approaches, we can create a set of methods to identify the bad records.

As I have mentioned earlier, though I’m running different data quality constraints on most of the ingested fields, I may not mark the records failing against all of the constraints. So, if you remember the metadata table described earlier, I have added a new column BRIdentificationRequired, which should be ‘Y’ to be pulled for bad records identification process.

Image for post
Image for post
Extended the metadata table to identify the constraints required to be pass through the bad record identification box.

Once I have my metadata updated & bad records identification logic is built, I can include some corrupt records into the HR data and ready to run my code. After execution, I can extract out the records which failed the rules.

Image for post
Image for post
The ‘bad’ records with the ‘RuleFailed’ reason.

As soon as the bad records are separated from the good ones, the data cleansing jobs can be started (manual cleansing or automation), anyway this is out of scope for today’s discussion.

If we store the daily data quality reports, we can use Spark or can build an UI to show week on week quality comparisons, green/amber/red flags based on permissible thresholds etc.

Image for post
Image for post
A sample data quality dashboard.

Metrics Report

Metrics reporting comes along with Deequ along with the data quality. Refer here & here for the available metrics.

This is also just a few lines of code to extract the metrics:

// Use Deequ library to calculate overall metrics of the ingested data
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}
val analysisResult: AnalyzerContext = { AnalysisRunner
// data to run the analysis on
.onData(<input data as Spark DataFrame>)
// define analyzers that compute metrics
.addAnalyzer(Size())
.addAnalyzer(Completeness("review_id"))
.addAnalyzer(ApproxCountDistinct("review_id"))
.addAnalyzer(Mean("Age_in_Yrs"))
.addAnalyzer(Compliance("Emp Age 18+", "Age_in_Yrs >= 18.0"))
.addAnalyzer(Correlation("Age_in_Yrs", "Weight_in_Kgs"))
.addAnalyzer(Correlation("Age_in_Company_Years", "Salary"))
// compute metrics
.run()
}
// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)
display (metrics)
Image for post
Image for post
Few sample metrics.

The computed metrics can give insights about the ingested data. E.g. the two positive Pearson correlations above highlights the statistical relationships among the columns!

Additional Information

  • Deequ — Anomaly Detection — We can look for anomalies in our ingested data by storing the metrics of our data in a MetricsRepository and then by using the VerificationSuite. Fore more information follow this.
  • Deequ — Single Column Profiling — In case we want to make a sense of a large data, we can follow this.
  • In case you want to go for the automatic data cleansing, you can evaluate Spark SQL regexp_replace function.

Thanks for reading. In case you want to share your case studies or want to connect, please ping me via LinkedIn.

Written by

Tech enthusiast, Azure Big Data Architect.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store