Dagster + SharePoint

Connect Dagster with Microsoft SharePoint document libraries using the Graph API. Enable automated file operations, folder management, and data extraction from SharePoint.

About this integration

Including SharePoint in your Dagster pipelines enables seamless document management and data extraction workflows. With dagster-sharepoint, you can:

- Automatically detect and process new files added to SharePoint document libraries

- Download and process documents as they're uploaded by your team

- Manage folder structures and file metadata programmatically

- Trigger downstream pipelines when critical documents are updated

- Integrate SharePoint data into your broader data orchestration workflows

This community-maintained integration uses the Microsoft Graph API to provide reliable file operations and folder management within SharePoint, making it easy to incorporate collaborative documents into your data pipelines.

Installation

uv add dagster-sharepoint

Or with pip:

pip install dagster-sharepoint

Example

from datetime import datetime, timedelta
from dagster_sharepoint import FileInfoConfig, SharePointResource
import dagster as dg

@dg.asset(compute_kind="sharepoint")
def process_sharepoint_file(
    context: dg.AssetExecutionContext,
    sharepoint: SharePointResource,
    config: FileInfoConfig,
):
    """Process SharePoint files."""
    context.log.info(f"Processing file from SharePoint {config}")
    
    contents = sharepoint.download_file(config.id)
    context.log.info(f"Downloaded file {config.parent_path}/{config.name}")
    
    if config.name.endswith(".csv"):
        lines = contents.decode("utf-8").splitlines()
        context.log.info(f"CSV file has {len(lines)} lines")
        return {"file_name": config.name, "lines": len(lines)}
    
    return {"file_name": config.name, "size": len(contents)}

@dg.sensor(
    name="sharepoint_new_files",
    minimum_interval_seconds=600,
    target=[process_sharepoint_file],
)
def sharepoint_new_files(
    context: dg.SensorEvaluationContext,
    sharepoint: SharePointResource,
) -> dg.SensorResult:
    """Sensor that checks for new files in SharePoint."""
    last_check = (
        datetime.fromisoformat(context.cursor)
        if context.cursor
        else datetime.now() - timedelta(weeks=999)
    )
    current_check = datetime.now()
    
    newly_created_files = sharepoint.list_newly_created_files(
        since_timestamp=last_check,
        file_name_glob_pattern="*/Reports/*.csv",
        recursive=True,
    )
    
    if not newly_created_files:
        return dg.SkipReason(f"No new files found")
    
    return dg.SensorResult(
        run_requests=[
            dg.RunRequest(
                asset_selection=[process_sharepoint_file.key],
                run_key=file.id,
                run_config=dg.RunConfig(
                    ops={
                        process_sharepoint_file.key.to_python_identifier(): {
                            "config": file.to_config_dict()
                        }
                    }
                ),
            )
            for file in newly_created_files
        ],
        cursor=current_check.isoformat(),
    )

About SharePoint

Microsoft SharePoint is a web-based collaborative platform that integrates with Microsoft Office. It's primarily used as a document management and storage system, enabling teams to store, organize, share, and access information from any device. SharePoint is widely used in enterprises for intranets, document management, collaboration spaces, and business intelligence tools.