### Install PyDeequ Module Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/test_data_quality_at_scale.ipynb Install the PyDeequ library using pip. It is recommended to specify a version for consistent behavior. ```bash pip install pydeequ==1.2.0 ``` -------------------------------- ### Setup SparkSession for Deequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Configure and create a SparkSession with the required packages and classpath for using deequ. This setup is necessary before performing data profiling or constraint suggestion. ```python from pyspark.sql import SparkSession, Row, DataFrame import json import pandas as pd import sagemaker_pyspark import pydeequ classpath = ":".join(sagemaker_pyspark.classpath_jars()) spark = (SparkSession .builder .config("spark.driver.extraClassPath", classpath) .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate()) ``` -------------------------------- ### Install SDKMAN Source: https://github.com/awslabs/python-deequ/blob/master/README.md Installs SDKMAN, a tool for managing parallel versions of software development kits. Follow prompts to restart the terminal if necessary, then source the init script and verify the installation. ```bash $ curl -s https://get.sdkman.io | bash If the environment needs tweaking for SDKMAN to be installed, the installer will prompt you accordingly and ask you to restart. Next, open a new terminal or enter: $ source "$HOME/.sdkman/bin/sdkman-init.sh" Lastly, run the following code snippet to ensure that installation succeeded: $ sdk version ``` -------------------------------- ### Install SDKMAN Source: https://github.com/awslabs/python-deequ/blob/master/docs/source/README.md Installs SDKMAN, a tool for managing parallel versions of software development kits. Follow prompts to restart the terminal if needed, then source the init script and verify the installation. ```bash $ curl -s https://get.sdkman.io | bash ``` ```bash $ source "$HOME/.sdkman/bin/sdkman-init.sh" ``` ```bash $ sdk version ``` -------------------------------- ### List and Install Java Versions with SDKMAN Source: https://github.com/awslabs/python-deequ/blob/master/README.md Lists available AdoptOpenJDK OpenJDK versions and installs specific versions of Java using SDKMAN. ```bash List the AdoptOpenJDK OpenJDK versions $ sdk list java To install For Java 11 $ sdk install java 11.0.10.hs-adpt To install For Java 11 $ sdk install java 8.0.292.hs-adpt ``` -------------------------------- ### List and Install Java Versions with SDKMAN Source: https://github.com/awslabs/python-deequ/blob/master/docs/source/README.md Lists available AdoptOpenJDK OpenJDK versions and provides commands to install specific Java versions, such as Java 11 or Java 8. ```bash $ sdk list java ``` ```bash $ sdk install java 11.0.10.hs-adpt ``` ```bash $ sdk install java 8.0.292.hs-adpt ``` -------------------------------- ### Start PySpark Session with PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/test_data_quality_at_scale.ipynb Initializes a PySpark session with specific configurations for PyDeequ, including classpath, Maven packages, exclusions, driver memory, and Parquet read mode for datetime correctness. Ensure `sagemaker_pyspark` and `pydeequ` are installed. ```python from pyspark.sql import SparkSession, Row, DataFrame import json import pandas as pd classpath = ":".join(sagemaker_pyspark.classpath_jars()) spark = ( SparkSession .builder .config("spark.driver.extraClassPath", classpath) .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .config("spark.driver.memory", "15g") .config("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED") .getOrCreate() ) ``` -------------------------------- ### List and Install Apache Spark Versions with SDKMAN Source: https://github.com/awslabs/python-deequ/blob/master/README.md Lists available Apache Spark versions and installs a specific version using SDKMAN. ```bash List the Apache Spark versions: $ sdk list spark To install For Spark 3 $ sdk install spark 3.0.2 ``` -------------------------------- ### Initialize SparkSession and PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/anomaly_detection.ipynb Imports necessary libraries and initializes a SparkSession with PyDeequ configurations. Ensure sagemaker_pyspark is installed for classpath. ```python from pyspark.sql import SparkSession, Row, DataFrame import pandas as pd import sagemaker_pyspark import pydeequ classpath = ":".join(sagemaker_pyspark.classpath_jars()) spark = ( SparkSession .builder .config("spark.driver.extraClassPath", classpath) .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate() ) sc = spark.sparkContext ``` -------------------------------- ### List and Install Apache Spark Versions with SDKMAN Source: https://github.com/awslabs/python-deequ/blob/master/docs/source/README.md Lists available Apache Spark versions and provides a command to install a specific Spark version, such as Spark 3.0.2. ```bash $ sdk list spark ``` ```bash $ sdk install spark 3.0.2 ``` -------------------------------- ### Install and Update Project Dependencies with Poetry Source: https://github.com/awslabs/python-deequ/blob/master/README.md Installs or updates all project dependencies using Poetry. ```bash poetry install poetry update ``` -------------------------------- ### Install Required Libraries Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/02-synthetic-data-jewelry.ipynb Installs the awswrangler and essential_generators libraries. Ensure you are using compatible versions. ```python !python3 -m pip install awswrangler==3.7.2 !python3 -m pip install essential_generators==1.0 ``` -------------------------------- ### Example Column Profile Output Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/profiles.ipynb Shows an example of the detailed profile generated for a column, including completeness, distinct values, data type, type counts, and histogram. This output is for the 'insight' column. ```text StandardProfiles for column: insight: { "completeness": 1.0, "approximateNumDistinctValues": 2, "dataType": "String", "isDataTypeInferred": false, "typeCounts": { "Boolean": 0, "Fractional": 0, "Integral": 0, "Unknown": 0, "String": 3010972 }, "histogram": [ [ "N", 1701683, 0.5651606856523408 ], [ "Y", 1309289, 0.43483931434765916 ] ] } StandardProfiles for column: review_id: { "completeness": 1.0, "approximateNumDistinctValues": 3160409, "dataType": "String", "isDataTypeInferred": false, "typeCounts": { "Boolean": 0, "Fractional": 0, "Integral": 0, "Unknown": 0, "String": 3010972 }, "histogram": null } NumericProfiles for column: customer_id: { "completeness": 1.0, "approximateNumDistinctValues": 866021, "dataType": "Integral", "isDataTypeInferred": false, "typeCounts": { "Boolean": 0, "Fractional": 0, "Integral": 3010972, "Unknown": 0, "String": 0 }, "histogram": null, "kll": "None", "mean": 514989.4746510429, "maximum": 929121.0, "minimum": 100000.0, "sum": 1550618888469.0, "stdDev": 239465.84713597817, "approxPercentiles": [] } StandardProfiles for column: review_date: { "completeness": 1.0, "approximateNumDistinctValues": 7916, "dataType": "String", "isDataTypeInferred": false, "typeCounts": {}, "histogram": null } NumericProfiles for column: helpful_votes: { "completeness": 1.0, "approximateNumDistinctValues": 24, "dataType": "Integral", "isDataTypeInferred": false, "typeCounts": {}, "histogram": [ [ "12", 287924, 0.09562493440656372 ], [ "8", 8337, 0.0027688733073572254 ], [ "19", 72602, 0.02411247929240126 ], [ "23", 434, 0.00014413950046695883 ], [ "4", 13, 4.317542640715357e-06 ], [ "15", 520006, 0.1727036983406023 ], [ "11", 159423, 0.052947353877751104 ], [ "9", 27286, 0.009062189884196863 ], [ "22", 2218, 0.000736639198238974 ], [ "26", 2, 6.642373293408242e-07 ], [ "13", 427526, 0.1419893642318826 ], [ "24", 70, 2.3248306526928844e-05 ], [ "16", 426940, 0.14179474269438574 ], [ "5", 87, 2.889432382632585e-05 ], [ "10", 72476, 0.024070632340652785 ], [ "21", 8287, 0.0027522673741237048 ], [ "6", 450, 0.00014945339910168542 ], [ "17", 288496, 0.0958149062827552 ], [ "25", 12, 3.985423976044945e-06 ], [ "14", 519410, 0.17250575561645873 ], [ "20", 27100, 0.009000415812568167 ], [ "18", 159692, 0.053036693798547446 ], [ "7", 2180, 0.0007240186889814984 ], [ "3", 1, ``` -------------------------------- ### Example Constraint Suggestions for 'valuable' column Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Illustrates specific constraint suggestions for the 'valuable' column, including completeness and data type. ```python Constraint suggestion for 'valuable': 'valuable' has less than 62% missing values The corresponding Python code is: .hasCompleteness("valuable", lambda x: x >= 0.38, "It should be above 0.38!") Constraint suggestion for 'valuable': 'valuable' has type Boolean The corresponding Python code is: .hasDataType("valuable", ConstrainableDataTypes.Boolean) ``` -------------------------------- ### Example Constraint Suggestions for 'productName' column Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Presents constraint suggestions for the 'productName' column, including value containment and completeness. ```python Constraint suggestion for 'productName': 'productName' has value range 'thingC', 'thingA', 'thingB', 'thingE', 'thingD' The corresponding Python code is: .isContainedIn("productName", ["thingC", "thingA", "thingB", "thingE", "thingD"]) Constraint suggestion for 'productName': 'productName' is not null The corresponding Python code is: .isComplete("productName") ``` -------------------------------- ### Build and Run Tests with Docker Source: https://github.com/awslabs/python-deequ/blob/master/README.md Build a Docker image for testing and run tests within a container. This is an alternative if local dependency installation fails. ```bash docker build . -t spark-3.3-docker-test docker run spark-3.3-docker-test ``` -------------------------------- ### Load Analysis Results from Metrics Repository Source: https://github.com/awslabs/python-deequ/blob/master/README.md Load previous analysis results from the FileSystemMetricsRepository. This example demonstrates loading results for a specific analyzer before a certain time. ```python result_metrep_df = repository.load() \ .before(ResultKey.current_milli_time()) \ .forAnalyzers([ApproxCountDistinct('b')]) \ .getSuccessMetricsAsDataFrame() ``` -------------------------------- ### Generate Example Spark DataFrame Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Creates a sample Spark DataFrame with various data types and potential missing values for constraint suggestion testing. ```python df = spark.sparkContext.parallelize([ Row(productName="thingA", totalNumber="13.0", status="IN_TRANSIT", valuable="true"), Row(productName="thingA", totalNumber="5", status="DELAYED", valuable="false"), Row(productName="thingB", totalNumber=None, status="DELAYED", valuable=None), Row(productName="thingC", totalNumber=None, status="IN_TRANSIT", valuable="false"), Row(productName="thingD", totalNumber="1.0", status="DELAYED", valuable="true"), Row(productName="thingC", totalNumber="7.0", status="UNKNOWN", valuable=None), Row(productName="thingC", totalNumber="20", status="UNKNOWN", valuable=None), Row(productName="thingE", totalNumber="20", status="DELAYED", valuable="false"), Row(productName="thingA", totalNumber="13.0", status="IN_TRANSIT", valuable="true"), Row(productName="thingA", totalNumber="5", status="DELAYED", valuable="false"), Row(productName="thingB", totalNumber=None, status="DELAYED", valuable=None), Row(productName="thingC", totalNumber=None, status="IN_TRANSIT", valuable="false"), Row(productName="thingD", totalNumber="1.0", status="DELAYED", valuable="true"), Row(productName="thingC", totalNumber="17.0", status="UNKNOWN", valuable=None), Row(productName="thingC", totalNumber="22", status="UNKNOWN", valuable=None), Row(productName="thingE", totalNumber="23", status="DELAYED", valuable="false")]).toDF() ``` -------------------------------- ### Poetry Dependency Management Source: https://github.com/awslabs/python-deequ/blob/master/docs/source/README.md Provides essential Poetry commands for installing and updating project dependencies, and for displaying dependency information. ```bash poetry install ``` ```bash poetry update ``` ```bash # --tree: List the dependencies as a tree. # --latest (-l): Show the latest version. # --outdated (-o): Show the latest version but only for packages that are outdated. poetry show -o ``` -------------------------------- ### Example Constraint Suggestions for 'totalNumber' column Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Shows constraint suggestions for the 'totalNumber' column, covering non-negativity, completeness, and data type. ```python Constraint suggestion for 'totalNumber': 'totalNumber' has no negative values The corresponding Python code is: .isNonNegative("totalNumber") Constraint suggestion for 'totalNumber': 'totalNumber' has less than 47% missing values The corresponding Python code is: .hasCompleteness("totalNumber", lambda x: x >= 0.53, "It should be above 0.53!") Constraint suggestion for 'totalNumber': 'totalNumber' has type Fractional The corresponding Python code is: .hasDataType("totalNumber", ConstrainableDataTypes.Fractional) ``` -------------------------------- ### Display Generated Document Example Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Prints a specific generated document from the list of documents, identified by its index. ```python documents[0] ``` -------------------------------- ### Deequ Dependency Resolution Details Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/analyzers.ipynb Detailed output of the dependency resolution process, including found modules, artifacts, and any evicted modules. This confirms successful setup. ```text Ivy Default Cache set to: /home/ec2-user/.ivy2/cache The jars for the packages stored in: /home/ec2-user/.ivy2/jars com.amazon.deequ#deequ added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-23421fea-77b3-4d69-9251-54adf6371fd9;1.0 confs: [default] found com.amazon.deequ#deequ;2.0.3-spark-3.3 in central found org.scala-lang#scala-reflect;2.12.10 in central found org.scalanlp#breeze_2.12;0.13.2 in central found org.scalanlp#breeze-macros_2.12;0.13.2 in central found com.github.fommil.netlib#core;1.1.2 in central found net.sf.opencsv#opencsv;2.3 in central found com.github.rwl#jtransforms;2.4.0 in central found junit#junit;4.8.2 in central found org.apache.commons#commons-math3;3.2 in central found org.spire-math#spire_2.12;0.13.0 in central found org.spire-math#spire-macros_2.12;0.13.0 in central found org.typelevel#machinist_2.12;0.6.1 in central found com.chuusai#shapeless_2.12;2.3.2 in central found org.typelevel#macro-compat_2.12;1.1.1 in central found org.slf4j#slf4j-api;1.7.5 in central :: resolution report :: resolve 435ms :: artifacts dl 12ms :: modules in use: com.amazon.deequ#deequ;2.0.3-spark-3.3 from central in [default] com.chuusai#shapeless_2.12;2.3.2 from central in [default] com.github.fommil.netlib#core;1.1.2 from central in [default] com.github.rwl#jtransforms;2.4.0 from central in [default] junit#junit;4.8.2 from central in [default] net.sf.opencsv#opencsv;2.3 from central in [default] org.apache.commons#commons-math3;3.2 in [default] org.scala-lang#scala-reflect;2.12.10 from central in [default] org.scalanlp#breeze-macros_2.12;0.13.2 from central in [default] org.scalanlp#breeze_2.12;0.13.2 from central in [default] org.slf4j#slf4j-api;1.7.5 from central in [default] org.spire-math#spire-macros_2.12;0.13.0 from central in [default] org.spire-math#spire_2.12;0.13.0 from central in [default] org.typelevel#machinist_2.12;0.6.1 from central in [default] org.typelevel#macro-compat_2.12;1.1.1 from central in [default] :: evicted modules: org.scala-lang#scala-reflect;2.12.1 by [org.scala-lang#scala-reflect;2.12.10] in [default] org.scala-lang#scala-reflect;2.12.0 by [org.scala-lang#scala-reflect;2.12.10] in [default] --------------------------------------------------------------------- | | modules || artifacts | --------------------------------------------------------------------- | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 17 | 0 | 0 | 2 || 15 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-23421fea-77b3-4d69-9251-54adf6371fd9 confs: [default] 0 artifacts copied, 15 already retrieved (0kB/9ms) ``` -------------------------------- ### Display a Further Generated Document Example Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Prints a specific generated document from the list of documents, identified by its index. ```python documents[10000] ``` -------------------------------- ### Display Another Generated Document Example Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Prints a specific generated document from the list of documents, identified by its index. ```python documents[100] ``` -------------------------------- ### Example Constraint Suggestions for 'status' column Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Details constraint suggestions for the 'status' column, focusing on value range and completeness. ```python Constraint suggestion for 'status': 'status' has value range 'DELAYED', 'UNKNOWN', 'IN_TRANSIT' The corresponding Python code is: .isContainedIn("status", ["DELAYED", "UNKNOWN", "IN_TRANSIT"]) Constraint suggestion for 'status': 'status' is not null The corresponding Python code is: .isComplete("status") ``` -------------------------------- ### Import PyDeequ and SageMaker PySpark Modules Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/test_data_quality_at_scale.ipynb Import the necessary libraries, sagemaker_pyspark and pydeequ, after installation. This step is crucial for utilizing PyDeequ's functionalities within a SageMaker environment. ```python import sagemaker_pyspark import pydeequ ``` -------------------------------- ### Initialize PyDeequ Repository and Run Verification Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/basic_anomalies_script_demo.ipynb Sets up a FileSystemMetricsRepository to store metrics and defines a verification suite to run anomaly checks on the previous day's data. It uses a RelativeRateOfChangeStrategy to detect anomalies in the Size metric. ```python import pydeequ from pydeequ.repository import * from pydeequ.verification import * metricsRepository = FileSystemMetricsRepository(session, s3_write_path) key_tags = {'tag': 'yesterday'} resultKey_yesterday = ResultKey(session, ResultKey.current_milli_time(), key_tags) prev_Result = VerificationSuite(session).onData(previous_dataframe) .useRepository(metricsRepository) .saveOrAppendResult(resultKey_yesterday) .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) .run() ``` -------------------------------- ### Initialize Spark Session for PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/KLLExample.ipynb Sets up a SparkSession with necessary configurations for PyDeequ, including AWS-specific JARs and Maven coordinates for Deequ and FastJava. ```python import pydeequ import sagemaker_pyspark from pyspark.sql import SparkSession, Row classpath = ":".join(sagemaker_pyspark.classpath_jars()) # aws-specific jars spark = ( SparkSession .builder .config("spark.driver.extraClassPath", classpath) .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate()) ``` -------------------------------- ### Create FileSystemMetricsRepository with autogenerated path Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/repository_file_dbfs.ipynb Initialize `FileSystemMetricsRepository` without a path to let it autonomously generate one. This path is used to materialize metrics for consecutive PyDeequ runs. ```python from pydeequ.repository import FileSystemMetricsRepository repository = FileSystemMetricsRepository(spark) print(repository.path) ``` -------------------------------- ### Get DataFrame Shape Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Returns the dimensions (number of rows and columns) of the DataFrame. ```python dat.shape ``` -------------------------------- ### Initialize Spark Session with PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/analyzers.ipynb Configure and create a SparkSession with necessary PyDeequ dependencies and classpath. This is required before using PyDeequ functionalities. ```python from pyspark.sql import SparkSession, Row, DataFrame import json import pandas as pd import sagemaker_pyspark import pydeequ classpath = ":".join(sagemaker_pyspark.classpath_jars()) spark = ( SparkSession .builder .config("spark.driver.extraClassPath", classpath) .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate() ) ``` -------------------------------- ### Get Total Number of Years Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Returns the total number of elements in the 'years' array. ```python years.shape[0] ``` -------------------------------- ### Run Pytest Locally Source: https://github.com/awslabs/python-deequ/blob/master/README.md Execute tests using pytest. Ensure you have poetry installed and configured. ```bash $ poetry run pytest ``` -------------------------------- ### Set up PySpark Session with PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/README.md Initialize a SparkSession configured for PyDeequ, including necessary Maven coordinates for Deequ and FastJava. Then, create a sample DataFrame. ```python from pyspark.sql import SparkSession, Row import pydeequ spark = (SparkSession .builder .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate()) df = spark.sparkContext.parallelize([ Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)]).toDF() ``` -------------------------------- ### VerificationSuite Methods Source: https://github.com/awslabs/python-deequ/blob/master/docs/verification.md Methods for setting up and running a verification suite. ```APIDOC ## VerificationSuite ### Description Represents a suite of verification checks to be performed on data. ### Methods - **VerificationSuite(spark_session)** - Description: Constructor for VerificationSuite. - Parameters: - spark_session (SparkSession) - Required - The Spark session. - **onData(data)** - Description: Specifies the DataFrame to perform verification on. - Parameters: - data (DataFrame) - Required - The input data. ``` -------------------------------- ### Get Unique Marketplaces Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Finds and displays all unique values present in the 'marketplace' column of the DataFrame. ```python dat["marketplace"].unique() ``` -------------------------------- ### Get Number of Generated Documents Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Calculates and returns the total number of documents generated by the `DocumentGenerator`. ```python len(documents) ``` -------------------------------- ### Create Sample DataFrame Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/hasPattern_check.ipynb Creates a Spark DataFrame with sample data containing columns like 'a', 'creditCard', 'email', 'ssn', and 'URL' for testing verification rules. ```python df = spark.sparkContext.parallelize([ Row(a="foo", creditCard="5130566665286573", email="foo@example.com", ssn="123-45-6789", URL="http://userid@example.com:8080"), Row(a="bar", creditCard="4532677117740914", email="bar@example.com", ssn="123456789", URL="http://foo.com/(something)?after=parens"), Row(a="baz", creditCard="3401453245217421", email="foobar@baz.com", ssn="000-00-0000", URL ="http://userid@example.com:8080")]).toDF() ``` -------------------------------- ### Get Number of Records Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/profiles.ipynb Prints the total number of records in the result. This is useful for understanding the size of the dataset being analyzed. ```python print(result.numRecords) ``` -------------------------------- ### Get Number of Columns in Customer Insight Data Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/02-synthetic-data-jewelry.ipynb Extracts the number of columns from the customer insight data shape. ```python n = cust_ins_ready.shape[1] n ``` -------------------------------- ### Initialize Glue and Spark Context Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/data_cleaning_tutorial.ipynb Sets up the necessary Glue and Spark contexts for processing data within an AWS Glue environment. This is a standard initialization step for Glue jobs. ```python import sys from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from pyspark.context import SparkContext glueContext = GlueContext(SparkContext.getOrCreate()) session = glueContext.spark_session ``` -------------------------------- ### Initialize Spark Session with PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/hasPattern_check.ipynb Sets up a SparkSession configured to use PyDeequ, including necessary Maven coordinates for Deequ and excludes for fastjson. ```python import pydeequ from pyspark.sql import SparkSession, Row spark = (SparkSession .builder .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate()) ``` -------------------------------- ### Create Synthetic Office Products Dataset Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/03-synthetic-data-other-products.ipynb Generates a synthetic dataset for 'Office_Products' with specified parameters like size factor, votes, scale, and review years. It uses predefined product component pools. ```python dat = rgh.create_dataset(size_factor = 3, total_votes = np.arange(2, 44, 2), helpful_votes = np.arange(0, 32), scale = 0.4, review_years = np.arange(1995, 2014), product_category = 'Office_Products', product_components = [product_prefix_pool, product_pool, product_suffix_pool], marketplace_factor = 0.4) ``` -------------------------------- ### Initialize FileSystemMetricsRepository Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/simple_metrics_repository_tutorial.ipynb Sets up the FileSystemMetricsRepository to store metrics in JSON format on S3. For in-memory storage, use InMemoryMetricsRepository. ```python s3_write_path = "s3://joanpydeequ/tmp/simple_metrics_tutorial.json" import pydeequ from pydeequ.repository import * repository = FileSystemMetricsRepository(session, s3_write_path) ``` -------------------------------- ### Get Shape of Customer Insight Data Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/02-synthetic-data-jewelry.ipynb Retrieves the dimensions (number of rows and columns) of the shuffled customer insight data. ```python cust_ins_ready.shape ``` -------------------------------- ### Initialize FileSystemMetricsRepository Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/repository.ipynb Initializes a FileSystemMetricsRepository to store metrics in JSON format on the local disk. An alternative InMemoryMetricsRepository can be used without a file path. ```python from pydeequ.repository import * metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json') print(f'metrics_file path: {metrics_file}') repository = FileSystemMetricsRepository(spark, metrics_file) ``` -------------------------------- ### Set up PySpark Session with PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/docs/source/README.md Initializes a PySpark session configured for PyDeequ, including necessary Maven coordinates for Deequ and FastJava. It also creates a sample Spark DataFrame for subsequent operations. ```python from pyspark.sql import SparkSession, Row import pydeequ spark = ( SparkSession .builder .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate() ) df = spark.sparkContext.parallelize([ Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, None)]).toDF() ``` -------------------------------- ### Run KLL Sketch Check with Default Parameters Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/KLLCheckExample.ipynb This snippet sets up and runs a PyDeequ VerificationSuite with KLL sketch constraints. It checks data size, maximum values, and KLL sketch properties on the 'numViews' column using default sketch parameters. ```python check = Check(spark, CheckLevel.Error, "KLL Checks") result = VerificationSuite(spark) \ .onData(df) \ .addCheck( check.hasSize(lambda x: x == 5) \ .hasMax("numViews", lambda x: x <= 10) \ .kllSketchSatisfies("numViews", lambda x: x.parameters().apply(0) <= 0)) .run() ``` -------------------------------- ### Define Product Name, Prefix, and Suffix Pools Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Initializes lists of potential product names, descriptive prefixes, and feature suffixes to be used in generating synthetic product titles. ```python # pool of product names, prefixes and suffixes from which we will generate the product titles product_pool= ["fax machine", "banknote counter", "electronic alarm clock","electric pencil sharpener", "blu-ray", "floor lamp", "hair dryer", "paper copier", "electric drill", "video camera", "radio", "air purifier", "floor heater", "cd player", "iron", "kettle", "mp3 player", "video player", "electric stove", "electric razor", "dvd", "curling iron", "office printer", "wireless speaker", "kitchen scale", "theater receiver", "electronic cigarettes", "computer", "television", "smartphone", "surge protector", "remote control", "headset", "game controller", "cellular phone", "bluetooth speaker"] product_prefix_pool = ["large", "red", "small", "orange", "green", "black", "silver", "yellow", "compact", "energy-efficient", "vintage", "pink", "portable", "white", "metal", "stainless-steel"] product_suffix_pool = ["newest model", "refurbished", "renewed", "1996 model", "with the storage case", "charger included", "charger not included", "batteries not included", "waterproof", "with adapter", "with wooden inlays", "with silver details", "with black handle", "EU compatible", "Japan compatible", "US compatible"] ``` -------------------------------- ### Get Row Marker Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/03-synthetic-data-other-products.ipynb Determines the number of rows in the generated synthetic dataset. This is useful for slicing other dataframes to match the synthetic data size. ```python row_marker = dat.shape[0] row_marker ``` -------------------------------- ### Load Data into DynamicFrame Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/data_cleaning_tutorial.ipynb Loads data from a Glue Data Catalog table into a DynamicFrame. Use this to start processing data with AWS Glue. ```python from awsglue.dynamicframe import DynamicFrame medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(database = "medical_billing_demo", table_name = "medicare_hospital_provider_csv") medicare_dynamicframe.printSchema() ``` -------------------------------- ### Show Project Dependencies with Poetry Source: https://github.com/awslabs/python-deequ/blob/master/README.md Displays project dependencies, with options to show them as a tree, list the latest versions, or filter for outdated packages. ```bash # --tree: List the dependencies as a tree. # --latest (-l): Show the latest version. # --outdated (-o): Show the latest version but only for packages that are outdated. poetry show -o ``` -------------------------------- ### Create Synthetic Dataset for Tools & Home Improvement Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/03-synthetic-data-other-products.ipynb Generates a synthetic dataset for the 'Tools_Home_Improvement' category with specified parameters like size factor, votes, scale, review years, and product components. The 'marketplace_factor' influences the distribution of marketplace-related attributes. ```python dat = rgh.create_dataset(size_factor = 2, total_votes = np.arange(10, 66, 2), helpful_votes = np.arange(2, 48), scale = 1, review_years = np.arange(1996, 2010), product_category = 'Tools_Home_Improvement', product_components = [product_prefix_pool, product_pool, product_suffix_pool], marketplace_factor = 0.6) ``` -------------------------------- ### Create ResultKey with Tags Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/repository.ipynb Sets up a ResultKey for storing metrics. This key includes a timestamp and custom tags for organizing and retrieving specific metric sets. ```python key_tags = {'tag': 'electronics'} resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags) ``` -------------------------------- ### Output Analysis Results as JSON Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/test_data_quality_at_scale.ipynb Use AnalyzerContext.successMetricsAsJson to get analysis results in JSON format. This is useful for programmatic consumption of data quality metrics. ```python analysisResult_json = AnalyzerContext.successMetricsAsJson(spark, analysisResult) analyisResult_json ``` -------------------------------- ### Initialize InMemoryMetricsRepository Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/anomaly_detection.ipynb Imports the repository module and creates an instance of InMemoryMetricsRepository to store metrics for anomaly detection. ```python from pydeequ.repository import * metricsRepository = InMemoryMetricsRepository(spark) ``` -------------------------------- ### Set Spark Version for PyDeequ Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/test_data_quality_at_scale.ipynb Set the SPARK_VERSION environment variable before installing PyDeequ to ensure compatibility. This is often necessary in specific environments like SageMaker. ```python import os os.environ["SPARK_VERSION"] = '3.3' ``` -------------------------------- ### Get Lengths of Customer Cohorts Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Retrieves the number of customers in each generated cohort. This provides a detailed view of the distribution of synthetic customers across different groups. ```python [len(x) for x in customer_cohorts] ``` -------------------------------- ### Run VerificationSuite and Display Results Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/basic_example.ipynb Defines and runs a VerificationSuite with multiple checks on the DataFrame, then converts the results to a DataFrame and displays them. This snippet includes checks for size, minimum value, completeness, uniqueness, containment, and non-negativity. ```python from pydeequ.checks import * from pydeequ.verification import * check = Check(spark, CheckLevel.Error, "Integrity checks") checkResult = VerificationSuite(spark) \ .onData(df) \ .addCheck( check.hasSize(lambda x: x >= 3) \ .hasMin("b", lambda x: x == 0) \ .isComplete("c") \ .isUnique("a") \ .isContainedIn("a", ["foo", "bar", "baz"]) \ .isNonNegative("b")) \ .run() checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult) checkResult_df.show() ``` -------------------------------- ### Create Synthetic Dataset Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/03-synthetic-data-other-products.ipynb Generates a synthetic dataset for product reviews using specified parameters. Ensure necessary libraries like numpy and the dataset generation utility are imported. ```python dat = rgh.create_dataset(size_factor = 1, total_votes = np.arange(0, 150), helpful_votes = np.arange(0, 60), scale = 0.5, review_years = np.arange(1998, 2015), product_category = 'Beauty_Personal_Care', product_components = [product_prefix_pool, product_pool, product_suffix_pool], marketplace_factor = 0.5) ``` -------------------------------- ### Get Row-Level Constraint Verification Results Source: https://github.com/awslabs/python-deequ/blob/master/README.md Obtain row-level results from a VerificationSuite run to identify specific rows that pass or fail constraints. This is useful for debugging data quality issues. ```python rowLevelResult_df = VerificationResult.rowLevelResultsAsDataFrame(spark, checkResult, df) rowLevelResult_df.show() ``` -------------------------------- ### Configure and Generate Documents with Essential Generators Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Initializes the DocumentGenerator and sets a template to generate synthetic review data including marketplace, review headline, and review body. The `documents` method is called to create a specified number of entries. ```python gen = DocumentGenerator() template = {'marketplace': ['US', 'UK', 'DE', 'JP', 'FR', None, ''], 'review_headline':'sentence', 'review_body': 'paragraph'} gen.set_template(template) documents = gen.documents(3010972) ``` -------------------------------- ### Generate Unique Review IDs Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/01-synthetic-data-electronics.ipynb Generates a list of unique review IDs, each starting with 'R' followed by 14 alphanumeric characters. This function is used to create synthetic review identifiers. ```python reviews_unique = ['R' + ''.join(random.choices(string.ascii_uppercase + string.digits, k=14)) for x in range(n)] ``` -------------------------------- ### Initialize FileSystemMetricsRepository with managed path Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/repository_file_dbfs.ipynb Initialize `FileSystemMetricsRepository` with a specific managed path on DBFS. Note the use of the 'dbfs:/' prefix for the Spark API format. ```python metrics_spark_api =f"dbfs:/{target_metrics_file_path}" repository = FileSystemMetricsRepository(spark, metrics_spark_api) print(repository.path) ``` -------------------------------- ### Create Synthetic Product Dataset Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/03-synthetic-data-other-products.ipynb Generates a synthetic dataset for products, specifying various parameters like size factor, vote counts, review years, and product components. Ensure necessary libraries like numpy and pandas are imported. ```python dat = rgh.create_dataset(size_factor = 3, total_votes = np.arange(11, 91), helpful_votes = np.arange(3, 71), scale = 0.4, review_years = np.arange(1996, 2018), product_category = 'Music', product_components = [product_prefix_pool, product_pool, product_suffix_pool], marketplace_factor = 1) ``` -------------------------------- ### Detect Anomalies and Send SNS Notifications Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/advanced_anomaly_detection_tutorial.ipynb Iterates through monthly data, performs anomaly detection using Holt-Winters, and sends an SNS notification if an anomaly is found. Requires AWS SNS setup with topic ARN and client. ```python # Use AWS SNS import boto3 import json # Topic for AWS SNS topicArn = 'arn:aws:sns:us-west-2:498730894712:jewelry_hw' snsClient = boto3.client('sns', region_name = 'us-west-2') for month in range(1,9): date = "'2015" +'/'+str(month)+"'" df = df_2015.filter("review_date =" + date) key_tags = {'tag': date} result_key_2015 = ResultKey(session, ResultKey.current_milli_time(), key_tags) jewelry_result = VerificationSuite(session).onData(df) .useRepository(metricsRepository) .saveOrAppendResult(result_key_2015) .addAnomalyCheck(HoltWinters(session, MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes')) .run() df = VerificationResult.checkResultsAsDataFrame(session, jewelry_result) if (jewelry_result.status != "Success"): print("Anomaly for total_votes has been detected") print(date) message = df.select("constraint_message").collect() response = snsClient.publish(TopicArn = topicArn, Message = "anomaly detected in data frame: \n" + json.dumps(message), Subject = "Anomaly Detected in the jewelry dataset:"+ date, MessageAttributes = {"TransactionType": {"DataType": "String.Array", "StringValue": "Anomaly Detected in Glue"}}) break ``` -------------------------------- ### Analyze Data with PyDeequ Analyzers Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/data_cleaning_tutorial.ipynb Utilizes PyDeequ's `AnalysisRunner` to perform data quality analysis on a Spark DataFrame. It calculates dataset size, completeness of a specific column, and data types within another column. Requires PyDeequ and Spark to be installed. ```python import pydeequ from pydeequ.analyzers import * analysisResult = AnalysisRunner(session) \ .onData(medicare) \ .addAnalyzer(Size()) \ .addAnalyzer(Completeness("Provider Name")) \ .addAnalyzer(DataType("Provider Id")) \ .run() analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult) analysisResult_df.show(n=13, truncate=False, vertical=False) ``` -------------------------------- ### Create Synthetic Computer Dataset Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/03-synthetic-data-other-products.ipynb Generates a synthetic dataset for computer products with specified parameters like vote counts, review years, and marketplace factors. Requires deequ and numpy libraries. ```python dat = rgh.create_dataset(size_factor = 1, total_votes = np.arange(20, 120), helpful_votes = np.arange(10, 60), scale = 0.8, review_years = np.arange(1996, 2017), product_category = 'Computers', product_components = [product_prefix_pool, product_pool, product_suffix_pool], marketplace_factor = 0.4) ``` -------------------------------- ### Create Sample DataFrame Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/KLLCheckExample.ipynb Creates a small Spark DataFrame with sample data, including various data types and null values, to be used for data quality checks. ```python df = spark.sparkContext.parallelize([ Row(idx=1, name="Thingy A", description="awesome thing.", rating="high", numViews=0), Row(idx=2, name="Thingy B", description="available at http://thingb.com", rating=None, numViews=0), Row(idx=3, name=None, description=None, rating="low", numViews=5), Row(idx=4, name="Thingy D", description="checkout https://thingd.ca", rating="low", numViews=10), Row(idx=5, name="Thingy E", description=None, rating="high", numViews=12)]).toDF() ``` -------------------------------- ### Initialize FileSystemMetricsRepository Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/advanced_anomaly_detection_tutorial.ipynb Initializes the FileSystemMetricsRepository to store data quality metrics in JSON format on S3. Specify the S3 write path. ```python # S3 write path s3_write_path = "s3://devpydeequ/tmp/holt_winters_tutorial.json" import pydeequ from pydeequ.repository import * metricsRepository = FileSystemMetricsRepository(session,s3_write_path) ``` -------------------------------- ### Initialize Document Generator Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/synthetic_data/02-synthetic-data-jewelry.ipynb Initializes a `DocumentGenerator` and sets a template for generating review headlines and bodies. The template specifies 'sentence' for headlines and 'paragraph' for bodies. ```python gen = DocumentGenerator() template = {'review_headline':'sentence', 'review_body': 'paragraph'} gen.set_template(template) documents = gen.documents(n) ``` ```python len(documents) ``` ```python documents[0:3] ``` -------------------------------- ### Initialize GlueContext and SparkSession Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/basic_anomalies_script_demo.ipynb Initializes the GlueContext and SparkSession, which are necessary for running AWS Glue jobs and PySpark applications. Sets up the S3 path for writing results. ```python #get a job name and acccess parameters to have a job import sys from awsglue.utils import getResolvedOptions #access sparkcontext in order to get a glueContext from pyspark.context import SparkContext #Takes running glue context into a python script from awsglue.context import GlueContext glucContext = GlueContext(SparkContext.getOrCreate()) session = glueContext.spark_session s3_write_path = "s3://joanpydeequ/tmp/demo_anomaly_detection.json" ``` -------------------------------- ### Iterate and Print Constraint Suggestions Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/constraint_suggestion_example.ipynb Loops through the results from the ConstraintSuggestionRunner to display the suggested constraint description and the corresponding Python code for each. ```python for sugg in suggestionResult['constraint_suggestions']: print(f"Constraint suggestion for '{sugg['column_name']}': {sugg['description']}") print(f"The corresponding Python code is: {sugg['code_for_constraint']} ") ``` -------------------------------- ### Verify metrics.json persistence to DBFS Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/repository_file_dbfs.ipynb After running `VerificationSuite.run()`, the metrics are persisted to the file underlying `FileSystemMetricsRepository`. Read this file to verify its content. ```python historical_repository_path = repository.path with open(f"/dbfs{historical_repository_path}", "r") as f: print(f.read()) ``` -------------------------------- ### Load and Display Metrics Source: https://github.com/awslabs/python-deequ/blob/master/tutorials/glue_tutorials/basic_anomalies_script_demo.ipynb Loads metrics from the repository and displays the Size metric for the previous dataset as a DataFrame. ```python metrics_dataframe = metricsRepository.load().forAnalyzers([Size()]).getSuccessMetricsAsDataFrame() metrics_dataframe.show() ```