Dagster + SFTP

High-performance secure file transfer integration with support for parallel transfers, batch operations, and advanced filtering. Built on asyncSSH for optimal performance.

Including SFTP in your Dagster pipelines enables secure, automated file transfer operations. With dagster-sftp, you can:

- Automatically detect and process new files landing on SFTP servers using sensors

- Perform parallel file transfers for improved performance

- Use advanced filtering with pattern matching and date-based file discovery

- Execute batch operations for efficient bulk file management

- Leverage asyncSSH for optimal connection handling and throughput

This community-maintained integration provides a high-performance resource for file transfer operations, making it easy to orchestrate secure data exchange workflows within your Dagster pipelines.

Installation

uv add dagster-sftp

Or with pip:

pip install dagster-sftp

Example

import tempfile
from datetime import datetime
from dagster_sftp import SFTPFileInfoConfig, SFTPResource
import dagster as dg

@dg.asset(compute_kind="sftp")
def process_sftp_file(
    context: dg.AssetExecutionContext, 
    config: SFTPFileInfoConfig, 
    sftp: SFTPResource
):
    """Process a file from SFTP server."""
    context.log.info(f"Processing file {config.path}")
    
    with tempfile.NamedTemporaryFile() as tmp_file:
        sftp.get_file(config.path, tmp_file.name)
        with open(tmp_file.name) as f:
            lines = f.readlines()
        context.log.info(f"Processed file with {len(lines)} lines")
    
    return {"file_processed": config.path, "lines": len(lines)}

@dg.sensor(name="sftp_file_sensor", target=process_sftp_file)
def sftp_file_sensor(context: dg.SensorEvaluationContext, sftp: SFTPResource):
    """Detect new files on SFTP server and trigger processing."""
    last_check = datetime.fromisoformat(context.cursor) if context.cursor else None
    current_check = datetime.now()
    
    new_files = sftp.list_files(
        base_path="/incoming",
        pattern="*.csv",
        files_only=True,
        modified_after=last_check,
    )
    
    if not new_files:
        return dg.SkipReason(f"No new files found")
    
    return dg.SensorResult(
        run_requests=[
            dg.RunRequest(
                asset_selection=[process_sftp_file.key],
                run_key=file.id,
                run_config=dg.RunConfig(
                    ops={
                        process_sftp_file.key.to_python_identifier(): {
                            "config": file.to_config_dict()
                        }
                    }
                ),
            )
            for file in new_files
        ],
        cursor=current_check.isoformat(),
    )

About SFTP

SFTP (SSH File Transfer Protocol) is a secure file transfer protocol that provides file access, file transfer, and file management functionalities over a secure SSH connection. It's widely used in enterprise environments for secure data exchange and automated file transfers.