PyDeequ is a Python API for Deequ, a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
PyDeequ 2.0 introduces a new architecture using Spark Connect, bringing significant improvements:
| Feature | PyDeequ 1.x | PyDeequ 2.0 |
|---|---|---|
| Communication | Py4J (JVM bridge) | Spark Connect (gRPC) |
| Assertions | Python lambdas | Serializable predicates |
| Spark Session | Local only | Local or Remote |
| Architecture | Tight JVM coupling | Clean client-server |
Key Benefits:
- No Py4J dependency - Uses Spark Connect protocol for communication
- Serializable predicates - Replace Python lambdas with predicate objects (
eq,gte,between, etc.) - Remote execution - Connect to remote Spark clusters via Spark Connect
- Cleaner API - Simplified imports and more Pythonic interface
flowchart LR
subgraph CLIENT["Python Client"]
A["Python Code"] --> B["Protobuf<br/>Serialization"]
end
B -- gRPC --> C["Spark Connect (gRPC)"]
subgraph SERVER["Spark Connect Server"]
D["DeequRelationPlugin"] --> E["Deequ Core"] --> F["Spark DataFrame API"] --> G["(Data)"]
end
G --> H["Results"] -- gRPC --> I["Python DataFrame"]
%% Styling for compactness and distinction
classDef code fill:#C8F2FB,stroke:#35a7c2,color:#13505B,font-weight:bold;
class A code;
How it works:
- Client Side: PyDeequ 2.0 builds checks and analyzers as Protobuf messages
- Transport: Messages are sent via gRPC to the Spark Connect server
- Server Side: The
DeequRelationPlugindeserializes messages and executes Deequ operations - Results: Verification results are returned as a Spark DataFrame
| Feature | PyDeequ 1.x | PyDeequ 2.0 |
|---|---|---|
| Constraint Verification | ||
| VerificationSuite | Yes | Yes |
| Check constraints | Yes | Yes |
| Custom SQL expressions | Yes | Yes |
| Metrics & Analysis | ||
| AnalysisRunner | Yes | Yes |
| All standard analyzers | Yes | Yes |
| Column Profiling | ||
| ColumnProfilerRunner | Yes | Yes |
| Numeric statistics | Yes | Yes |
| KLL sketch profiling | Yes | Yes |
| Low-cardinality histograms | Yes | Yes |
| Constraint Suggestions | ||
| ConstraintSuggestionRunner | Yes | Yes |
| Rule sets (DEFAULT, EXTENDED, etc.) | Yes | Yes |
| Train/test split evaluation | Yes | Yes |
| Metrics Repository | ||
| FileSystemMetricsRepository | Yes | Planned |
| Execution Mode | ||
| Local Spark | Yes | No |
| Spark Connect (remote) | No | Yes |
- Python 3.9+
- Apache Spark 3.5.0+
- Java 17 (Java 21+ has known compatibility issues with Spark 3.5)
Download the pre-compiled Deequ JAR with Spark Connect support from the GitHub pre-releases:
mkdir -p ~/deequ-beta && cd ~/deequ-beta
curl -L -o deequ_2.12-2.1.0b-spark-3.5.jar \
https://github.com/awslabs/python-deequ/releases/download/v2.0.0b1/deequ_2.12-2.1.0b-spark-3.5.jarOptional, should only be needed for quick local testing.
# Download Spark 3.5
curl -L -o spark-3.5.0-bin-hadoop3.tgz \
https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzf spark-3.5.0-bin-hadoop3.tgz
export SPARK_HOME=~/deequ-beta/spark-3.5.0-bin-hadoop3
export PATH=$SPARK_HOME/bin:$PATHSpark Connect is a client-server architecture introduced in Spark 3.4 that allows remote connectivity to Spark clusters. For more details, see the Spark Connect Overview.
export JAVA_HOME=/path/to/java17
$SPARK_HOME/sbin/start-connect-server.sh \
--packages org.apache.spark:spark-connect_2.12:3.5.0 \
--jars ~/deequ-beta/deequ_2.12-2.1.0b-spark-3.5.jar \
--conf spark.connect.extensions.relation.classes=com.amazon.deequ.connect.DeequRelationPluginCommand explanation:
| Option | Description |
|---|---|
--packages |
Downloads the Spark Connect package from Maven |
--jars |
Loads the Deequ JAR with Spark Connect support |
--conf spark.connect.extensions.relation.classes |
Registers the Deequ plugin to handle custom operations |
The server starts on localhost:15002 by default. You can verify it's running:
ps aux | grep SparkConnectServerInstall the beta wheel directly from the GitHub release:
pip install https://github.com/awslabs/python-deequ/releases/download/v2.0.0b1/pydeequ-2.0.0b1-py3-none-any.whl
pip install pyspark[connect]==3.5.0
# Python 3.12+ users: install setuptools (provides distutils removed in 3.12)
pip install setuptoolsfrom pyspark.sql import SparkSession, Row
from pydeequ.v2.checks import Check, CheckLevel
from pydeequ.v2.verification import VerificationSuite
from pydeequ.v2.predicates import eq, gte
# Connect to Spark Connect server
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
# Create sample data
df = spark.createDataFrame([
Row(id=1, name="Alice", age=25),
Row(id=2, name="Bob", age=30),
Row(id=3, name="Charlie", age=None),
])
# Define checks using the new predicate API
check = (Check(CheckLevel.Error, "Data quality checks")
.hasSize(eq(3))
.isComplete("id")
.isComplete("name")
.hasCompleteness("age", gte(0.5))
.isUnique("id"))
# Run verification
result = (VerificationSuite(spark)
.onData(df)
.addCheck(check)
.run())
result.show(truncate=False)
spark.stop()$SPARK_HOME/sbin/stop-connect-server.shFor a comprehensive example covering data analysis, constraint verification, column profiling, and constraint suggestions, see tutorials/data_quality_example_v2.py.
from pydeequ.v2.predicates import eq, gt, gte, lt, lte, between
check.hasSize(eq(3)) # size == 3
check.hasCompleteness("col", gte(0.9)) # completeness >= 0.9
check.hasMean("value", between(10, 20)) # 10 <= mean <= 20| Predicate | Description | Example |
|---|---|---|
eq(v) |
Equal to v | eq(1.0) |
gt(v) |
Greater than v | gt(0) |
gte(v) |
Greater than or equal | gte(0.9) |
lt(v) |
Less than v | lt(100) |
lte(v) |
Less than or equal | lte(1.0) |
between(a, b) |
Between a and b (inclusive) | between(0, 1) |
from pydeequ.v2.verification import AnalysisRunner
from pydeequ.v2.analyzers import (
Size, Completeness, Mean, Sum, Minimum, Maximum,
StandardDeviation, ApproxCountDistinct, Distinctness,
Uniqueness, Entropy, Correlation
)
result = (AnalysisRunner(spark)
.onData(df)
.addAnalyzer(Size())
.addAnalyzer(Completeness("name"))
.addAnalyzer(Mean("age"))
.run())
result.show()| Method | Description |
|---|---|
hasSize(predicate) |
Check total row count |
isComplete(column) |
Check column has no nulls |
hasCompleteness(column, predicate) |
Check completeness ratio |
areComplete(columns) |
Check multiple columns have no nulls |
isUnique(column) |
Check column values are unique |
hasUniqueness(columns, predicate) |
Check uniqueness ratio |
hasDistinctness(columns, predicate) |
Check distinctness ratio |
hasMin(column, predicate) |
Check minimum value |
hasMax(column, predicate) |
Check maximum value |
hasMean(column, predicate) |
Check mean value |
hasSum(column, predicate) |
Check sum |
hasStandardDeviation(column, predicate) |
Check standard deviation |
hasApproxCountDistinct(column, predicate) |
Check approximate distinct count |
hasCorrelation(col1, col2, predicate) |
Check correlation between columns |
hasEntropy(column, predicate) |
Check entropy |
hasApproxQuantile(column, quantile, predicate) |
Check approximate quantile |
satisfies(expression, name, predicate) |
Custom SQL expression |
hasPattern(column, pattern, predicate) |
Check regex pattern match ratio |
containsEmail(column, predicate) |
Check email format ratio |
containsCreditCardNumber(column, predicate) |
Check credit card format ratio |
isNonNegative(column) |
Check all values >= 0 |
isPositive(column) |
Check all values > 0 |
Profile column distributions and statistics across your dataset:
from pydeequ.v2.profiles import ColumnProfilerRunner, KLLParameters
# Basic profiling
profiles = (ColumnProfilerRunner(spark)
.onData(df)
.run())
profiles.show()
# Advanced profiling with options
profiles = (ColumnProfilerRunner(spark)
.onData(df)
.restrictToColumns(["id", "name", "age"]) # Profile specific columns
.withLowCardinalityHistogramThreshold(100) # Generate histograms for low-cardinality columns
.withKLLProfiling() # Enable KLL sketch for approximate quantiles
.setKLLParameters(KLLParameters(
sketch_size=2048,
shrinking_factor=0.64,
num_buckets=64
))
.run())Profile Result Schema:
| Column | Type | Description |
|---|---|---|
column |
STRING | Column name |
completeness |
DOUBLE | Non-null ratio (0.0-1.0) |
approx_distinct_values |
LONG | Approximate cardinality |
data_type |
STRING | Detected data type |
is_data_type_inferred |
BOOLEAN | Whether type was inferred |
type_counts |
STRING | JSON of type distribution |
histogram |
STRING | JSON histogram (low cardinality only) |
mean |
DOUBLE | Mean (numeric columns only) |
minimum |
DOUBLE | Minimum value (numeric only) |
maximum |
DOUBLE | Maximum value (numeric only) |
sum |
DOUBLE | Sum (numeric only) |
std_dev |
DOUBLE | Standard deviation (numeric only) |
approx_percentiles |
STRING | JSON percentiles (numeric only) |
kll_buckets |
STRING | JSON KLL buckets (if enabled) |
Auto-generate data quality constraints based on your data:
from pydeequ.v2.suggestions import ConstraintSuggestionRunner, Rules
# Basic suggestion generation
suggestions = (ConstraintSuggestionRunner(spark)
.onData(df)
.addConstraintRules(Rules.DEFAULT)
.run())
suggestions.show(truncate=False)
# Advanced usage with train/test evaluation
suggestions = (ConstraintSuggestionRunner(spark)
.onData(df)
.addConstraintRules(Rules.DEFAULT)
.addConstraintRules(Rules.EXTENDED)
.restrictToColumns(["id", "status", "score"])
.useTrainTestSplitWithTestsetRatio(0.2, seed=42) # Evaluate suggestions on test set
.run())Available Rule Sets:
| Rule Set | Description |
|---|---|
Rules.DEFAULT |
Completeness, type, categorical range, non-negative |
Rules.STRING |
String length constraints (min/max length) |
Rules.NUMERICAL |
Numeric constraints (min, max, mean, stddev) |
Rules.COMMON |
Uniqueness for approximately unique columns |
Rules.EXTENDED |
All rules combined |
Suggestion Result Schema:
| Column | Type | Description |
|---|---|---|
column_name |
STRING | Column the constraint applies to |
constraint_name |
STRING | Type of constraint |
current_value |
STRING | Current metric value |
description |
STRING | Human-readable description |
suggesting_rule |
STRING | Rule that generated this |
code_for_constraint |
STRING | Python code snippet |
evaluation_status |
STRING | "Success" or "Failure" (if train/test enabled) |
evaluation_metric_value |
DOUBLE | Metric value on test set |
Import changes:
# Before (1.x)
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
# After (2.0)
from pydeequ.v2.checks import Check, CheckLevel
from pydeequ.v2.verification import VerificationSuite
from pydeequ.v2.predicates import eq, gte, betweenLambda to predicate:
# Before (1.x)
check.hasSize(lambda x: x == 3)
check.hasCompleteness("col", lambda x: x >= 0.9)
# After (2.0)
check.hasSize(eq(3))
check.hasCompleteness("col", gte(0.9))Profiler changes:
# Before (1.x) - returns Python object
from pydeequ.profiles import ColumnProfilerRunner
result = ColumnProfilerRunner(spark).onData(df).run()
for col, profile in result.profiles.items():
print(profile)
# After (2.0) - returns DataFrame
from pydeequ.v2.profiles import ColumnProfilerRunner
result = ColumnProfilerRunner(spark).onData(df).run()
result.show()Suggestions changes:
# Before (1.x) - returns Python object
from pydeequ.suggestions import ConstraintSuggestionRunner, DEFAULT
result = ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()
print(result)
# After (2.0) - returns DataFrame
from pydeequ.v2.suggestions import ConstraintSuggestionRunner, Rules
result = ConstraintSuggestionRunner(spark).onData(df).addConstraintRules(Rules.DEFAULT).run()
result.show()- Check Java version:
java -version(must be Java 17, not 21+) - Check port availability:
lsof -i :15002 - Check logs:
tail -f $SPARK_HOME/logs/spark-*-SparkConnectServer-*.out
Ensure the Spark Connect server is running:
ps aux | grep SparkConnectServerEnsure the Deequ JAR is correctly specified in --jars when starting the server.
This error occurs when using Java 21+ with Spark 3.5. Use Java 17 instead:
export JAVA_HOME=/path/to/java17This occurs on Python 3.12+ because distutils was removed. Install setuptools:
pip install setuptoolsThe legacy PyDeequ API uses Py4J for JVM communication. It is still available for backward compatibility.
pip install pydeequNote: Set the SPARK_VERSION environment variable to match your Spark version.
from pyspark.sql import SparkSession, Row
import pydeequ
spark = (SparkSession
.builder
.config("spark.jars.packages", pydeequ.deequ_maven_coord)
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)
.getOrCreate())
df = spark.sparkContext.parallelize([
Row(a="foo", b=1, c=5),
Row(a="bar", b=2, c=6),
Row(a="baz", b=3, c=None)
]).toDF()from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("b")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()from pydeequ.checks import *
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Review Check")
checkResult = VerificationSuite(spark) \
.onData(df) \
.addCheck(
check.hasSize(lambda x: x >= 3) \
.hasMin("b", lambda x: x == 0) \
.isComplete("c") \
.isUnique("a") \
.isContainedIn("a", ["foo", "bar", "baz"]) \
.isNonNegative("b")) \
.run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()from pydeequ.profiles import *
result = ColumnProfilerRunner(spark) \
.onData(df) \
.run()
for col, profile in result.profiles.items():
print(profile)from pydeequ.suggestions import *
suggestionResult = ConstraintSuggestionRunner(spark) \
.onData(df) \
.addConstraintRule(DEFAULT()) \
.run()
print(suggestionResult)from pydeequ.repository import *
from pydeequ.analyzers import *
metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {'tag': 'pydeequ hello world'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(ApproxCountDistinct('b')) \
.useRepository(repository) \
.saveOrAppendResult(resultKey) \
.run()spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()There are 4 main components of Deequ:
- Metrics Computation
Profilesleverages Analyzers to analyze each column of a dataset.Analyzerscompute metrics for data profiling and validation at scale.
- Constraint Suggestion
- Specify rules for Analyzers to return suggested constraints.
- Constraint Verification
- Validate data against constraints you define.
- Metrics Repository
- Persist and track Deequ runs over time.
Please report any issues or feedback to:
- GitHub Issues: https://github.com/awslabs/deequ/issues
- Tag PyDeequ 2.0 issues with
pydeequ-2.0
When reporting issues, include:
- Python version
- Spark version
- Java version
- Operating system
- Full error message and stack trace
- Minimal code to reproduce
Please refer to the contributing doc for how to contribute to PyDeequ.
This library is licensed under the Apache 2.0 License.
- Setup SDKMAN
- Setup Java
- Setup Apache Spark
- Install Poetry
- Run tests locally
curl -s https://get.sdkman.io | bash
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk versionsdk list java
sdk install java 17.0.9-amzn # For PyDeequ 2.0
sdk install java 11.0.10.hs-adpt # For PyDeequ 1.xsdk list spark
sdk install spark 3.5.0poetry install
poetry update
poetry show -opoetry run pytestdocker build . -t spark-3.5-docker-test
docker run spark-3.5-docker-test