Dagster Data Engineering Glossary:
Data Derivation
Deriving data definition:
In data engineering, "deriving data" refers to the process of extracting, transforming, and generating new data from existing datasets.
This derived data is often used to enhance understanding, improve data quality, support decision-making, or meet specific business needs. The derived data is usually a result of applying some transformation, computation, or aggregation on the source data.
Overall, deriving data is a fundamental aspect of data engineering, enabling organizations to extract greater value from their data assets.
Key elements of data derivation:
Source Data: This is the original dataset you start with. It could be ingested from databases, data lakes, external APIs, or other sources.
Transformation: This involves applying operations to the source data to generate new information. Transformations can be simple arithmetic operations, aggregations, filtering, joining multiple datasets, or even applying complex algorithms.
Derived Data: The materialized result of the transformation. This is the new data that was not present in its original form in the source data but has been extracted or computed based on it.
Example use cases of deriving data:
Aggregations: Summing up sales figures by month to generate a monthly sales report.
Feature Engineering in Machine Learning: Creating new predictive features from existing dataset columns. For example, from a dataset containing "date of birth", you can derive an "age" column.
Data Enrichment: Combining data from different sources. If you have a list of users with their email addresses and another list with email addresses and purchase history, you could derive a combined dataset showing user activity.
Data Normalization: Transforming data to fit a standard scale or format. For instance, converting temperature readings from Fahrenheit to Celsius.
Data Imputation: Filling missing values based on certain rules or algorithms derived from the available data.
Temporal Derivations: Generating data like "Month to Date" or "Year to Date" metrics.
It's important to note the following when working with derived data:
Data Lineage: It's essential to track where the derived data comes from and understand the transformations applied. This helps in ensuring transparency, reproducibility, and trust in the derived data.
Quality Checks: Derived data should undergo quality checks to ensure its accuracy and relevance. Incorrect transformations can lead to misleading conclusions.
Storage and Management: Depending on the size and usage, derived data might be stored differently from the source data. Considerations about storage, retrieval efficiency, and cost are important.
An example scenario of data derivation in data engineering:
To illustrate the concept of data derivation, let's consider a scenario: Real-time Stream Processing for Anomaly Detection.
Say you work for a fintech company that processes millions of transactional events every minute through your payment gateway. You aim to derive new critical data assets in real-time to identify anomalous transaction patterns, which might indicate fraudulent activities. These data assets will be used by downstream teams (data consumers) to run anti-fraoud processes.
Going back to our three elements of data derivation:
1. Source Data: Real-time streaming data of transactions, ingested through tools like Apache Kafka. Each transaction record consists of attributes like user_id
, transaction_id
, transaction_amount
, location
, merchant_id
, and timestamp
.
2. Transformation & Derivation:
You would use several techniques for transofrming the data and deriving your new key data asset:
Windowing: Using Apache Flink or Apache Spark Streaming, the data stream is divided into time-based windows, e.g., tumbling windows of 5 minutes.
Feature Engineering: For each window, derive new features such as:
- Average transaction amount for the user in the last 24 hours.
- Number of distinct locations from which transactions originated in the last 30 minutes.
- Count of transactions for the user in the last 5 minutes.
Joining with Reference Data: Join the real-time data stream with static datasets (residing in a distributed file system like HDFS or a database) that might include:
- User's historical transaction behavior.
- Merchant risk scores.
Anomaly Scoring: Use a pre-trained machine learning model (like an Isolation Forest or One-Class SVM) to score each transaction for its likelihood of being anomalous. This model would have been previously trained on historical data and continuously retrained at regular intervals.
3. Derived Data:
For each transaction in the current time window, a derived record is produced that includes the original attributes, the newly derived features, and an anomaly score.
Aggregated metrics like the total number of high-risk scores in the current window, average risk score per merchant, etc. are created.
4. Sink & Action:
You store derived insights in a real-time analytics database like Druid or ClickHouse for dashboarding and alerting.
You use the anomaly score to trigger alerts or even block transactions in real-time, using thresholds or business rules.
Considerations:
For this scenario to work, you would want to invest time in several aspects of the project:
State Management: You need to maintain the state across windows for continuity, especially for continuous queries that rely on historical data.
Scalability: As data rates fluctuate, the processing system needs to scale. Running Dagster on Kubernetes can be used for orchestrating the scalability of such streaming applications.
Model Management: Machine Learning models need frequent re-training. Ensuring the ML model used for anomaly scoring remains up-to-date and relevant is crucial. This involves model versioning, continuous training, and strategies for model deployment without interrupting the streaming process, all of which can be orchestrated via Dagster data pipelines.
This example embodies advanced data engineering concepts, blending real-time processing, machine learning, and scalable architectures to derive valuable insights from raw transactional data.
An example of deriving data using Python and Spark
Let's consider a scenario involving Apache Spark, one of the most widely-used distributed data processing frameworks. In this example, we'll derive features from user transaction data for further analytics or machine learning.
Again, let's look at processing a large dataset of user transactions to derive user behavior metrics over a time window. We will not replicate the entire scenario listed above, but just demonstrate the derivation concept.
First, let's create two dummy source data files. Naturally, in a real-world scenario, data sources would likely be more diverse (e.g., databases, data lakes, streaming sources), and configurations would be more intricate.
Create the following two files locally (in the same folder as your demo code):
transactions.csv
contains columns: user_id, transaction_id, transaction_amount, timestamp
user_id,transaction_id,transaction_amount,timestamp
1,1001,50.25,2023-09-13 10:00:00
1,1002,75.00,2023-09-12 14:15:10
2,1003,100.00,2023-09-15 12:30:45
2,1004,25.50,2023-09-14 09:05:00
3,1005,110.75,2023-09-11 16:20:20
1,1006,45.00,2023-09-17 15:30:00
3,1007,95.25,2023-09-18 11:00:00
user_profiles.csv
contains columns: user_id, signup_date, user_type
user_id,signup_date,user_type
1,2023-01-01,Dagster Team
2,2023-06-15,Dagster Enterprise
3,2022-11-20,Dagster Team
Using these as our source, we will derive the following data
- Total transaction amounts for each user in the last 7 days.
- Total transactions count for each user in the last 7 days.
- Days since user signup till today.
Note: You need to have the necessary Python libraries installed (a Spark environment, and PySpark) in your Python environment to run this code.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta
# Configure Spark application
conf = SparkConf().setAppName("MySparkApp").setMaster("local[*]") # Setting master to "local[*]" for local execution
# Initialize SparkContext with the configuration and set the log level
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
# Initialize SparkSession
spark = SparkSession.builder \
.appName("DeriveDataExample") \
.getOrCreate()
# Load data
transactions = spark.read.csv('transactions.csv', header=True, inferSchema=True)
user_profiles = spark.read.csv('user_profiles.csv', header=True, inferSchema=True)
# Calculate the timestamp for 7 days ago
seven_days_ago = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
# Filter transactions in the last 7 days
recent_transactions = transactions.filter(transactions.timestamp > seven_days_ago)
# Derive total transaction amounts and counts for each user in the last 7 days
derived_metrics = recent_transactions.groupBy('user_id').agg(
F.sum('transaction_amount').alias('total_amount_7d'),
F.count('transaction_id').alias('transaction_count_7d')
)
# Derive days since user signup
user_derived = user_profiles.withColumn('days_since_signup',
F.datediff(F.current_date(), F.col('signup_date')))
# Join the derived metrics with user profiles
final_derived_data = user_derived.join(derived_metrics, on='user_id', how='left').fillna(0)
# Display or save the derived data
final_derived_data.show()
# Optionally, save to a destination
# final_derived_data.write.csv('output_derived_data.csv', header=True)
spark.stop()
# Stop the SparkContext when done
sc.stop()
The output of the example will be the derived data as follows:
+-------+-----------+------------------+-----------------+---------------+--------------------+
|user_id|signup_date| user_type|days_since_signup|total_amount_7d|transaction_count_7d|
+-------+-----------+------------------+-----------------+---------------+--------------------+
| 1| 2023-01-01| Dagster Team| 262| 45.0| 1|
| 2| 2023-06-15|Dagster Enterprise| 97| 125.5| 2|
| 3| 2022-11-20| Dagster Team| 304| 95.25| 1|
+-------+-----------+------------------+-----------------+---------------+--------------------+
Using Apache Spark and Python (PySpark), this example of data derivation demonstrates:
- Distributed data processing with Apache Spark.
- Aggregation methods on large datasets.
- Data transformations and enrichments (joining, date calculations).
- Scalability: This code can handle very large datasets across clusters.