September 14, 202314 minute read

A Dagster-Powered Spam Filter

Using Dagster, you can maintain data trust and protect the integrity of any user-generated service with this powerful spam filter.
James Timmins
Name
James Timmins
Handle
@jamestimmins

Every product that allows users to send messages to each other quickly realizes that spam is a real problem, even at modest scale. When sites allow malicious users to send spam, user trust quickly erodes and they start to leave for competitors.

Effective spam filtering, that hides spam messages while letting legitimate messages through, is an essential part of email or messaging systems.

Spam filters use a number of tricks to do this, but one of the most common is a simple model known as a Bag of Words. This approach builds a model based on the words in previous messages, and whether each word is more common in spam or non-spam messages.

In this walkthrough, we’ll build a simple model with Dagster and expose it with a Flask API endpoint that can be used to test new messages. Then we’ll create a job that integrates new messages to improve the model over time and redeploys the updated model.

What we’re building

We’re building a simple (yet powerful) spam filter built on a Bag of Words model. We’ll seed the data with SMS Spam Collection Dataset from Kaggle and base our approach on Youtuber Ritvikmath’s spam filter. This approach takes lists of spam and non-spam messages, looks at each word in those messages, and determines whether the presence of each word suggests the message is or is not spam.

We’ll take this simple approach and modify it by adding messages sent by our own users, and on a monthly schedule we’ll rebuild the model, compare it against our previous month’s model, and promote the new model to production if it scores more highly than the prior month’s.

The live model will then be surfaced via a simple Flask API endpoint that accepts a message body as input and returns whether or not it’s flagged as spam.

Our workflow will create the following seven assets. The final four assets will then be saved to a JSON file where they’ll be accessible to the API.

An asset graph used to detect spam messages to improve data quality.
An Dagster asset graph showing the steps in detecting spam messages to improve data quality.

Setup

Start by building the default Dagster skeleton project.

pip install dagster
dagster project scaffold --name spam-filter
pip install -e ".[dev]"

Next, add the extra dependencies used in this project. These will allow us to build the Bag of Words model and serve the API endpoint.

pip install dagster-duckdb~=0.20 dagster-duckdb-pandas~=0.20 Flask~=2.3 pandas~=2.0 numpy~=1.25 scikit-learn~=1.3

Integrating the SMS spam dataset

Our first asset will read the sample spam messages from a file and convert them into a Pandas DataFrame.

You can download the spam dataset from Kaggle or this project’s GitHub. This file contains example SMS messages where spam messages are marked “spam” and non-spam messages are marked “ham.”

V1V2
ham"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat..."
hamOk lar... Joking wif u oni...
spamFree entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
hamU dun say so early hor... U c already then say...

The asset will read the file contents, rename the relevant columns to “spam” and “text,” and convert spam/ham to boolean values.

from dagster import asset
import pandas as pd

@asset
def sms_spam_examples() -> pd.DataFrame:
    spam_df = pd.read_csv("./spam.csv", encoding="ISO-8859-1", usecols=["v1", "v2"])
    spam_df = spam_df.rename(columns={"v1": "spam", "v2": "text"})
    spam_df["spam"] = spam_df["spam"] == "spam"
    return spam_df

We’ve defined our first asset, now we need to make it accessible to the Dagster UI. We’ll do this by adding it to the Dagster Definition object. Think of Definition as the Dagster global scope object, where everything it’s aware of can be viewed and operated on by Dagster.

We’ll also define an "io_manager" resource. Setting the base directory to "data" tells Dagster to put working assets into a "data" directory.

### __init__.py
from dagster import (
    load_assets_from_modules,
    FilesystemIOManager,
)

from . import assets
all_assets = load_assets_from_modules([assets])

io_manager = FilesystemIOManager(
    base_dir="data",
)

defs = Definitions(
    assets=all_assets,
    resources={
        "io_manager": io_manager,
    },
)

Building the spam model

For our first pass at a spam model, we’ll build the Bag of Words model based only on the SMS spam dataset. Later, we’ll integrate user messages from our application.

Beyond the Dagster code, we won’t dig into the specifics of the model code. If you’re curious about how to build a Bag of Words model from scratch, check out Ritvikmath’s excellent YouTube video.

The following asset uses Dagster’s "multi_asset" to create multiple output assets. spam_model builds training and testing DataFrames as well as spam and non-spam bags of words. Creating multiple outputs may seem like overkill, but it lets us monitor each asset individually in the Dagster UI.

### assets.py

from collections import Counter
from dagster import AssetOut, multi_asset
from sklearn.model_selection import train_test_split
...

@multi_asset(
    outs={
        "spam_bag_of_words": AssetOut(),
        "non_spam_bag_of_words": AssetOut(),
        "spam_training_dataframe": AssetOut(),
        "spam_testing_dataframe": AssetOut(),
    }
)
def spam_model(sms_spam_examples: pd.DataFrame):
    def preprocess_text(text: str) -> str:
        return text.lower().translate(str.maketrans("", "", string.punctuation))

    def calculate_word_frequencies(words: list, shared_words: set) -> dict:
        word_counts = Counter(words)
        total_words = len(words)
        return {word: word_counts[word] / total_words for word in shared_words}

    spam_dataset=sms_spam_examples
    spam_dataset["text"] = spam_dataset["text"].apply(preprocess_text)

    spam_dataset = spam_dataset.sample(frac=1)

    train_data, test_data = train_test_split(spam_dataset, test_size=0.3)

    spam_training_words = " ".join(train_data[train_data["spam"]].text).split()
    non_spam_training_words = " ".join(train_data[~train_data["spam"]].text).split()

    shared_words = set(spam_training_words).intersection(set(non_spam_training_words))
    spam_bag_of_words = calculate_word_frequencies(spam_training_words, shared_words)
    non_spam_bag_of_words = calculate_word_frequencies(
        non_spam_training_words, shared_words
    )

    return spam_bag_of_words, non_spam_bag_of_words, train_data, test_data

Scoring the new model

Once our model exists, we can test it against the test dataset. We’ll calculate the true positive rate (spam messages accurately identified) and false positive rate (non-spam identified incorrectly identified as spam), allowing us to compare the new model with previously created models.

This asset will use all of the assets created by the spam_model function. model_score returns a dictionary of values rather than a multi_asset because the values are used for a single purpose and are only passed to one downstream asset. This contrasts with spam_model, where the assets created don’t move as a single unit. As we’ll see with the model_file asset created shortly, it only makes use of two of the assets created by spam_model.

### asset
import numpy as np

...

@asset
def model_score(
    spam_bag_of_words: Dict,
    non_spam_bag_of_words: Dict,
    spam_training_dataframe: pd.DataFrame,
    spam_testing_dataframe: pd.DataFrame,
) -> Dict:
    def get_prediction_rate(predictions: pd.Series, true_values: pd.Series) -> float:
        return np.sum(predictions & true_values) / np.sum(true_values)

    def get_spam_probability(text: str) -> float:
        return spam.compare_text_to_bow(
            text.split(),
            spam_bag_of_words=spam_bag_of_words,
            non_spam_bag_of_words=non_spam_bag_of_words,
            percent_spam=percent_spam,
        )

We’ll need to create the file spam.py as well to store compare_text_to_bow(), which determines whether a piece of text is spam or not. This is placed in a standalone file, where it’s also accessible to our API.

### spam.py

import numpy as np

def compare_text_to_bow(text, spam_bag_of_words, non_spam_bag_of_words, percent_spam):
    valid_words = [word for word in text if word in spam_bag_of_words]

    spam_probs, non_spam_probs = [], []
    for word in valid_words:
        spam_probs.append(spam_bag_of_words[word])
        non_spam_probs.append(non_spam_bag_of_words[word])

    spam_score = sum(map(np.log, spam_probs)) + np.log(percent_spam)
    non_spam_score = sum(map(np.log, non_spam_probs)) + np.log(1 - percent_spam)

    return bool(non_spam_score < spam_score)

Tracking models in the database

We have all of the necessary pieces to create and score our spam filter model, and our next step is to save the model weights and create the database table to keep track of the current model to use. The first time we build the model, the model is automatically set to “active”, meaning it’s used in our spam filter. In the future, every time the model is built, it will be compared to the previous model and only promoted to production if performance exceeds the current model’s.

Before working in Dagster, we need to create an application database to keep track of the current model. Our API will be able to query this database to find the current model data, which is why we need to do more than create a Dagster asset that returns a model value.

A production database would likely be an RDBMS like Postgres or MySQL, but we’ll use DuckDB. Since our project will need two tables (“models” and, later, “messages”), let’s create both now.

Create the file “database.py” inside the spam_filter directory. This will create the database in your project’s root directory, which will be accessible to the API.

### database.py

import duckdb


def create_database():
    conn = duckdb.connect("./database.duckdb")

    conn.execute(
        """
    CREATE TABLE IF NOT EXISTS messages (
        spam BOOLEAN,
        body VARCHAR
    );
    """
    )

    conn.execute(
        """
    CREATE TABLE IF NOT EXISTS models (
        path VARCHAR,
        status VARCHAR,
        true_positive_rate FLOAT,
        false_positive_rate FLOAT
    );
    """
    )


if __name__ == "__main__":
    create_database()
    conn = duckdb.connect("./database.duckdb")
    tables = conn.execute("SHOW TABLES").fetchall()
    print(tables)

Run “python database.py” from the command line to create the DuckDB database and tables.

It’s necessary to create a Dagster resource so that the database is also accessible within our workflow. The following resource looks confusing, but it serves two (hopefully) straightforward purposes.

Database.query() is a standalone method that connects to the database, runs an SQL query, and returns the responses as a Pandas DataFrame. We’ll use this method to look up individual records in the database.

When it’s time to add a new active record to the database, we need to run two separate commands — updating the previous active record to inactive and adding a new active record — as a single transaction. The Database.__enter__() and Database.__exit__() methods allow us to use the database resource as a context manager. This lets us create a connection and then call Database.execute() as many times as we want before committing all of the commands at once.

If you’re unfamiliar with Python’s context managers, take a look at the official Python documentation.

import json
import os

import duckdb
import pandas as pd
from dagster import ConfigurableResource
from duckdb import DuckDBPyConnection
from pydantic import PrivateAttr


class Database(ConfigurableResource):
    path: str
    _conn: DuckDBPyConnection = PrivateAttr()

    def __enter__(self):
        self._conn = duckdb.connect(self.path)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if self._conn is not None:
            self._conn.commit()
            self._conn.close()

    def execute(self, body):
        if self._conn is None:
            raise Exception(
                "Database connection not initialized. Use the object within a 'with' block."
            )

        self._conn.execute(body)

    def query(self, body: str):
        with duckdb.connect(self.path) as conn:
            result = conn.query(body)
            if result is None:
                return pd.DataFrame()
            else:
                return result.to_df()

Make this new resource accessible to Dagster by adding it to the Definition object.

### __init__.py
from dagster import (
    Definitions
)

...
defs = Definitions(
    ...
    resources={
       ...
        "database": Database(path="./database.duckdb"),
    },
)

Look for pre-built integrations before writing your own resources

We defined a custom Database resource, but this often isn’t necessary. Dagster has dozens of existing integrations, including pre-built resources, that you can use out of the box. You can find these on the Dagster integrations page.

We built this to illustrate how to create a context manager that works on any database type. If we knew we wanted to use DuckDB in both dev and prod, we could have used Dagster’s DuckDB integration.

Saving models to a file

In addition to saving a record of our new model, we need to save the actual model weights to a file accessible to our API code. This means defining a new resource for storing our model file.

ModelStorage.setup_for_execution() will only be executed one time when Dagster is initialized, so it’s the perfect place to make sure our model storage directory exists. The .write() and .read() methods convert data to/from JSON.

### resources.py
import json
import os

class ModelStorage(ConfigurableResource):
    dir: str

    def setup_for_execution(self, context) -> None:
        os.makedirs(self.dir, exist_ok=True)

    def write(self, filename, data):
        with open(f"{self.dir}/{filename}", "w") as f:
            json.dump(data, f)

    def read(self, filename):
        with open(f"{self.dir}/{filename}", "r") as f:
            return json.load(f)

Like our prior resources, we need to make ModelStorage accessible to Dagster.

### __init__.py

from dagster import (
    Definitions
)

from .resources import ModelStorage
...
defs = Definitions(
    ...
    resources={
       ...
        "model_storage": ModelStorage(dir="./weights"),
    },
)

Connecting our model with the new resources

It’s finally time to save our model to a file that can be accessed by the API and create a database record if the model improves on existing models.

The following asset does a few key things. We’ll address the high-level purpose of the asset and then look at the Dagster-specific components.

  1. Fetches the record of the currently active model from the database.
  2. If no active record currently exists, or if the new model has more true positives and fewer false positives, proceed to save the new model.
  3. Creates a file to store the new model.
  4. Updates the existing database record to set it to “inactive” and creates a record for the new model. Note that we can use the database object as a context manager and call conn.execute() multiple times due to how we defined the Database resource.
### asset.py

@asset
def model_file(
    model_score: Dict,
    spam_bag_of_words: Dict,
    non_spam_bag_of_words: Dict,
    database: Database,
    model_storage: ModelStorage,
):
    df = database.query("SELECT * FROM models WHERE status = 'active'")
    if (
        df.empty
        or (df.iloc[0].true_positive_rate <= model_score["true_positives"])
        and (df.iloc[0].false_positive_rate >= model_score["false_positives"])
    ):
        filename = f"{int(time.time())}.json"
        model_storage.write(
            filename,
            {
                "spam_bow": spam_bag_of_words,
                "non_spam_bow": non_spam_bag_of_words,
                "percent_spam": model_score["percent_spam"],
            },
        )
        with database as conn:
            conn.execute(
                "UPDATE models SET status = 'inactive' WHERE status = 'active'"
            )
            conn.execute(
                f"""
                INSERT INTO models (path, status, true_positive_rate, false_positive_rate) 
                VALUES ('{filename}', 'active', {model_score["true_positives"]}, {model_score["false_positives"]})
                """
            )

How score_and_save_model is called by a Dagster job

To call score_and_save_model, we need to add it to a “job,” which is Dagster’s internal term for a pipeline. There are a few ways to define a job, but we’ll do it directly in our project’s __init__.py file.

### __init__.py

from dagster import (
    define_asset_job,
)
...
create_spam_model_job = define_asset_job(name="create_spam_model_job")

define_asset_job grabs all of the assets in our directory and creates a job based on the asset graph.

In order to run this job on a recurring basis, we’ll need to create a schedule and pass the job and schedule objects to Dagster’s Definition object, as we did with the resources.

### __init__.py

from dagster import (
    Definitions,
    ScheduleDefinition,
)
...

save_model_schedule = ScheduleDefinition(
    job=create_spam_model_job,
    cron_schedule="0 0 1 * *",  # every month
)

defs = Definitions(
    ...
    schedules=[create_spam_model_schedule],
    jobs=[create_spam_model_job],
)

Adding user messages to our spam filter

We want our spam filter to get better over time, which means that every time the model is rebuilt, it should include new user messages in the training data. We already created a “messages” table in the database, now it’s time to create two new assets to make use of those messages.

To generate sample messages, copy sample_data.py from GitHub. It’s a simple script that creates messages containing either “spammy” terms like “winner”, “urgent”, and “free”, or innocuous terms such as “schedule” and “meeting”.

We’ll now create a new asset to fetch all user messages from the database.

### assets.py

from dagster import asset
from .resources import Database
import pandas as pd
...

@asset
def user_messages(database: Database) -> pd.DataFrame:
    results = database.query(
        f"""
        SELECT
            spam,
            body
        FROM
            messages
    """
    )
    results.rename(columns={"body": "text"}, inplace=True)
    return results

We were previously passing the output of the sms_spam_examples() asset directly into the spam_model() asset. We’re going to create an intermediary asset to combine the output of both sms_spam_examples() and user_messages(), which can be passed into spam_model().

import pandas as pd
...

@asset
def spam_dataset(sms_spam_examples, user_messages) -> pd.DataFrame:
    joined_dataframe = pd.concat([sms_spam_examples, user_messages], ignore_index=True)
    return joined_dataframe.sample(frac=1).reset_index(drop=True)

This simple asset concatenates the two upstream assets and shuffles their contents.

Now pass the output of spam_dataset() into spam_model().

@multi_asset(
    outs={
        "spam_bag_of_words": AssetOut(),
        "non_spam_bag_of_words": AssetOut(),
        "spam_training_dataframe": AssetOut(),
        "spam_testing_dataframe": AssetOut(),
    }
)
def spam_model(spam_dataset: pd.DataFrame):
    def preprocess_text(text: str) -> str:
        return text.lower().translate(str.maketrans("", "", string.punctuation))

    def calculate_word_frequencies(words: list, shared_words: set) -> dict:
        word_counts = Counter(words)
        total_words = len(words)
        return {word: word_counts[word] / total_words for word in shared_words}

    spam_dataset["text"] = spam_dataset["text"].apply(preprocess_text)

Exposing our model with an API

We previously installed Flask, which we’ll use to create a simple spam endpoint.

### api.py

from flask import Flask, jsonify, request
from spam_filter import spam

app = Flask(__name__)

@app.route("/api/find_spam", methods=["POST"])
def find_spam():
    payload = request.get_json()
    text_value = payload.get("text", None)

    if text_value is None:
        return jsonify({"success": False, "error": "Missing 'text' parameter"})

    try:
        is_spam = spam.is_spam(text_value)
        return jsonify({"success": True, "is_spam": is_spam})

    except Exception as e:
        return jsonify({"success": False, "error": str(e)})

if __name__ == "__main__":
    app.run(port=5000)

Note that our API uses the spam.py file from before, but we’ll need to add the is_spam() function. This function queries the database to find the path to the currently active model and reads the file before passing the contents into compare_text_to_bow().

import duckdb
import json

def is_spam(text):
    with duckdb.connect("./database.duckdb") as conn:
        df = conn.execute("SELECT * FROM models WHERE status = 'active'").fetchdf()
        path_value = df.loc[0, "path"]
        with open(f"./weights/{path_value}", "r") as f:
            data = json.load(f)
            spam_bow = data.get("spam_bow")
            non_spam_bow = data.get("non_spam_bow")
            percent_spam = data.get("percent_spam")

    return compare_text_to_bow(text.split(), spam_bow, non_spam_bow, percent_spam)

Testing our spam filter

To test the results of our spam filter, run the API script in the terminal.

$ python api.py
* Serving Flask app 'api'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on <http://127.0.0.1:5000>
Press CTRL+C to quit
127.0.0.1 - - [06/Sep/2023 03:15:13] "POST /api/find_spam HTTP/1.1" 200 -

Next use cURL in another terminal window to try different messages to see if they get flagged as spam.

curl -X POST -H "Content-Type: application/json" -d '{"text": "you are the winner!"}' <http://127.0.0.1:5000/api/find_spam>

{"is_spam":true,"success":true}

Moving the pipeline into production

The core Bag of Words algorithm, though simple, is quite powerful and can be effective in production. But with any sample project, it’s a good idea to consider what would need to change in order to move this into production. We’ll touch on a few pieces here, but for a more in depth guide, take a look at Dagster’s guide on transitioning data pipelines from development to production.

Swapping local resources for production resources

We’re using local file storage to store models and DuckDB as an application database. In production, you can swap out local file storage for a service like Amazon S3 buckets. To reduce latency, the newest version of the model can be deployed in a Docker container along with your application.

When accessing production message data, you’ll probably use a database like Postgres or MySQL. Consider a read replica of relevant tables if needed when accessing large volumes of data from your Dagster jobs.

I/O Manager

Like the file storage and application database, you may need an alternative to the in-memory DuckDB I/O Manager in production. The post “Poor Man’s Data Lake with MotherDuck” walks through porting a local project to MotherDuck, which allows a one-line change from local DuckDB development to a serverless DuckDB platform.

You can also use tools like Snowflake or Databricks.

Sampling message data

Our model uses all existing user messages to build each month’s new model. This won’t work for a large-scale application. An alternative is to sample results and grab only a subset of relevant messages to build the model.

Testing and logging

We didn’t dig into testing or logging in this tutorial, but these are essential parts of a robust production pipeline. Take a look at Dagster’s guides on effective testing and logging.

Wrapping up

Improving data quality is an ongoing battle in any data team. Spam erodes data quality and data trust. When we ingest user-provided data, should always consider the challenge of abuse and spam. Hopefully our tutorial will help pave the way for better data quality in your organization, as this approach can be applied to many more use cases than just messaging.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Blog Post. Blog Post