Dagster + Salesforce

Integrate Salesforce CRM data into your Dagster pipelines. Query records, perform bulk operations, and sync customer data with support for multiple authentication methods.

About this integration

Including Salesforce in your Dagster pipelines enables comprehensive CRM data integration and automation. With dagster-salesforce, you can:

- Query and export Salesforce data (Accounts, Leads, Opportunities) for analysis

- Perform bulk data operations using Salesforce Bulk API 2.0 for efficiency

- Create, update, and manage Salesforce records programmatically

- Sync CRM data with your data warehouse or analytics platform

- Trigger workflows when Salesforce data changes

- Support multiple authentication methods including username/password and OAuth

This community-maintained integration provides resources for interacting with the Salesforce API, making it easy to incorporate CRM data into your data orchestration workflows and ensure your analytics stay current with your customer relationships.

Installation

uv add dagster-salesforce

Or with pip:

pip install dagster-salesforce

Example

from pathlib import Path
from dagster_salesforce import SalesforceResource
from dagster_salesforce.credentials import SalesforceUserPasswordCredentials
import dagster as dg

@dg.asset(compute_kind="salesforce")
def salesforce_accounts(
    context: dg.AssetExecutionContext, 
    salesforce: SalesforceResource
):
    """Query and export Account records from Salesforce."""
    account_obj = salesforce.get_object_client("Account")
    
    output_dir = Path("/tmp/salesforce_accounts")
    results = account_obj.query_to_csv(
        query="SELECT Id, Name, Type, Industry FROM Account LIMIT 1000",
        output_directory=output_dir,
        batch_size=10000,
    )
    
    context.log.info(
        f"Downloaded {sum(r.number_of_records for r in results)} accounts"
    )
    
    # Create a new lead
    lead_obj = salesforce.get_object_client("Lead")
    lead_id = lead_obj.create_record({
        "FirstName": "John",
        "LastName": "Doe",
        "Company": "Acme Corp",
        "Email": "john.doe@acmecorp.com",
    })
    
    context.log.info(f"Created new lead with ID: {lead_id}")
    
    return {
        "accounts_exported": sum(r.number_of_records for r in results),
        "new_lead_id": lead_id,
    }

defs = dg.Definitions(
    assets=[salesforce_accounts],
    resources={
        "salesforce": SalesforceResource(
            credentials=SalesforceUserPasswordCredentials(
                username=dg.EnvVar("SALESFORCE_USERNAME"),
                password=dg.EnvVar("SALESFORCE_PASSWORD"),
                security_token=dg.EnvVar("SALESFORCE_SECURITY_TOKEN"),
                domain="login",
            )
        ),
    },
)

About Salesforce

Salesforce is a cloud-based customer relationship management (CRM) platform that helps businesses connect with and understand their customers. It provides a comprehensive suite of tools for sales, customer service, marketing automation, analytics, and application development, making it one of the world's leading CRM platforms.