Dagster Integration:
Dagster + Great Expectations
Yield an expectation and its output with all relevant metadata.
About this integration
With this integration, you can build Great Expectations validations inside Dagster Ops.
Installation
pip install dagster-ge
Example
from dagster_ge.factory import ge_data_context, ge_validation_op_factory
from pandas import read_csv
from dagster import job, op
@op
def read_in_datafile(csv_path):
return read_csv(csv_path)
@op
def process_payroll(df):
return len(df)
@op
def postprocess_payroll(numrows, expectation):
if expectation["success"]:
return numrows
else:
raise ValueError
payroll_expectations = ge_validation_op_factory(
name="ge_validation_op", datasource_name="getest", suite_name="basic.warning"
)
@job(
resource_defs={"ge_data_context": ge_data_context},
config={
"resources": {
"ge_data_context": {
"config": {"ge_root_dir": "./great_expectations"}
}
},
"ops": {
"read_in_datafile": {
"inputs": {
"csv_path": {"value": "./data/succeed.csv"}
}
}
},
},
)
def payroll_data():
output_df = read_in_datafile()
postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))
About Great Expectations
Great Expectations is a shared, open standard for data quality. It helps data teams eliminate pipeline debt, through data testing, documentation, and profiling.