About this integration
Including SharePoint in your Dagster pipelines enables seamless document management and data extraction workflows. With dagster-sharepoint, you can:
- Automatically detect and process new files added to SharePoint document libraries
- Download and process documents as they're uploaded by your team
- Manage folder structures and file metadata programmatically
- Trigger downstream pipelines when critical documents are updated
- Integrate SharePoint data into your broader data orchestration workflows
This community-maintained integration uses the Microsoft Graph API to provide reliable file operations and folder management within SharePoint, making it easy to incorporate collaborative documents into your data pipelines.
Installation
uv add dagster-sharepointOr with pip:
pip install dagster-sharepointExample
from datetime import datetime, timedelta
from dagster_sharepoint import FileInfoConfig, SharePointResource
import dagster as dg
@dg.asset(compute_kind="sharepoint")
def process_sharepoint_file(
context: dg.AssetExecutionContext,
sharepoint: SharePointResource,
config: FileInfoConfig,
):
"""Process SharePoint files."""
context.log.info(f"Processing file from SharePoint {config}")
contents = sharepoint.download_file(config.id)
context.log.info(f"Downloaded file {config.parent_path}/{config.name}")
if config.name.endswith(".csv"):
lines = contents.decode("utf-8").splitlines()
context.log.info(f"CSV file has {len(lines)} lines")
return {"file_name": config.name, "lines": len(lines)}
return {"file_name": config.name, "size": len(contents)}
@dg.sensor(
name="sharepoint_new_files",
minimum_interval_seconds=600,
target=[process_sharepoint_file],
)
def sharepoint_new_files(
context: dg.SensorEvaluationContext,
sharepoint: SharePointResource,
) -> dg.SensorResult:
"""Sensor that checks for new files in SharePoint."""
last_check = (
datetime.fromisoformat(context.cursor)
if context.cursor
else datetime.now() - timedelta(weeks=999)
)
current_check = datetime.now()
newly_created_files = sharepoint.list_newly_created_files(
since_timestamp=last_check,
file_name_glob_pattern="*/Reports/*.csv",
recursive=True,
)
if not newly_created_files:
return dg.SkipReason(f"No new files found")
return dg.SensorResult(
run_requests=[
dg.RunRequest(
asset_selection=[process_sharepoint_file.key],
run_key=file.id,
run_config=dg.RunConfig(
ops={
process_sharepoint_file.key.to_python_identifier(): {
"config": file.to_config_dict()
}
}
),
)
for file in newly_created_files
],
cursor=current_check.isoformat(),
)About SharePoint
Microsoft SharePoint is a web-based collaborative platform that integrates with Microsoft Office. It's primarily used as a document management and storage system, enabling teams to store, organize, share, and access information from any device. SharePoint is widely used in enterprises for intranets, document management, collaboration spaces, and business intelligence tools.
