# dagster-development > Expert guidance for Dagster data orchestration including assets, resources, schedules, sensors, partitions, testing, and ETL patterns. Use when building or extending Dagster projects, writing assets, configuring automation, or integrating with dbt/dlt/Sling. - Author: Alex Noonan - Repository: C00ldudeNoonan/economic-data-project - Version: 20251217204602 - Stars: 37 - Forks: 0 - Last Updated: 2026-02-07 - Source: https://github.com/C00ldudeNoonan/economic-data-project - Web: https://mule.run/skillshub/@@C00ldudeNoonan/economic-data-project~dagster-development:20251217204602 --- --- name: dagster-development description: Expert guidance for Dagster data orchestration including assets, resources, schedules, sensors, partitions, testing, and ETL patterns. Use when building or extending Dagster projects, writing assets, configuring automation, or integrating with dbt/dlt/Sling. --- # Dagster Development Expert ## Quick Reference | If you're writing... | Check this section/reference | | ------------------------------------- | ------------------------------------------------------------- | | `@dg.asset` | [Assets](#assets-quick-reference) or `references/assets.md` | | `ConfigurableResource` | [Resources](#resources-quick-reference) or `references/resources.md` | | `@dg.schedule` or `ScheduleDefinition`| [Automation](#automation-quick-reference) or `references/automation.md` | | `@dg.sensor` | [Sensors](#sensors-quick-reference) or `references/automation.md` | | `PartitionsDefinition` | [Partitions](#partitions-quick-reference) or `references/automation.md` | | Tests with `dg.materialize()` | [Testing](#testing-quick-reference) or `references/testing.md` | | `@asset_check` | `references/testing.md#asset-checks` | | `@dlt_assets` or `@sling_assets` | `references/etl-patterns.md` | | `@dbt_assets` | [dbt Integration](#dbt-integration) or `dbt-development` skill | | `Definitions` or code locations | `references/project-structure.md` | --- ## Core Concepts **Asset**: A persistent object (table, file, model) that your pipeline produces. Define with `@dg.asset`. **Resource**: External services/tools (databases, APIs) shared across assets. Define with `ConfigurableResource`. **Job**: A selection of assets to execute together. Create with `dg.define_asset_job()`. **Schedule**: Time-based automation for jobs. Create with `dg.ScheduleDefinition`. **Sensor**: Event-driven automation that watches for changes. Define with `@dg.sensor`. **Partition**: Logical divisions of data (by date, category). Define with `PartitionsDefinition`. **Definitions**: The container for all Dagster objects in a code location. --- ## Assets Quick Reference ### Basic Asset ```python import dagster as dg @dg.asset def my_asset() -> None: """Asset description appears in the UI.""" # Your computation logic here pass ``` ### Asset with Dependencies ```python @dg.asset def downstream_asset(upstream_asset) -> dict: """Depends on upstream_asset by naming it as a parameter.""" return {"processed": upstream_asset} ``` ### Asset with Metadata ```python @dg.asset( group_name="analytics", key_prefix=["warehouse", "staging"], description="Cleaned customer data", ) def customers() -> None: pass ``` **Naming**: Use nouns describing what is produced (`customers`, `daily_revenue`), not verbs (`load_customers`). --- ## Resources Quick Reference ### Define a Resource ```python from dagster import ConfigurableResource class DatabaseResource(ConfigurableResource): connection_string: str def query(self, sql: str) -> list: # Implementation here pass ``` ### Use in Assets ```python @dg.asset def my_asset(database: DatabaseResource) -> None: results = database.query("SELECT * FROM table") ``` ### Register in Definitions ```python dg.Definitions( assets=[my_asset], resources={"database": DatabaseResource(connection_string="...")}, ) ``` --- ## Automation Quick Reference ### Schedule ```python import dagster as dg from my_project.defs.jobs import my_job my_schedule = dg.ScheduleDefinition( job=my_job, cron_schedule="0 0 * * *", # Daily at midnight ) ``` ### Common Cron Patterns | Pattern | Meaning | | ------------- | -------------------------- | | `0 * * * *` | Every hour | | `0 0 * * *` | Daily at midnight | | `0 0 * * 1` | Weekly on Monday | | `0 0 1 * *` | Monthly on the 1st | | `0 0 5 * *` | Monthly on the 5th | --- ## Sensors Quick Reference ### Basic Sensor Pattern ```python @dg.sensor(job=my_job) def my_sensor(context: dg.SensorEvaluationContext): # 1. Read cursor (previous state) previous_state = json.loads(context.cursor) if context.cursor else {} current_state = {} runs_to_request = [] # 2. Check for changes for item in get_items_to_check(): current_state[item.id] = item.modified_at if item.id not in previous_state or previous_state[item.id] != item.modified_at: runs_to_request.append(dg.RunRequest( run_key=f"run_{item.id}_{item.modified_at}", run_config={...} )) # 3. Return result with updated cursor return dg.SensorResult( run_requests=runs_to_request, cursor=json.dumps(current_state) ) ``` **Key**: Use cursors to track state between sensor evaluations. --- ## Partitions Quick Reference ### Time-Based Partition ```python weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01") @dg.asset(partitions_def=weekly_partition) def weekly_data(context: dg.AssetExecutionContext) -> None: partition_key = context.partition_key # e.g., "2023-01-01" # Process data for this partition ``` ### Static Partition ```python region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"]) @dg.asset(partitions_def=region_partition) def regional_data(context: dg.AssetExecutionContext) -> None: region = context.partition_key ``` ### Partition Types | Type | Use Case | | ---- | -------- | | `DailyPartitionsDefinition` | One partition per day | | `WeeklyPartitionsDefinition` | One partition per week | | `MonthlyPartitionsDefinition` | One partition per month | | `StaticPartitionsDefinition` | Fixed set of partitions | | `MultiPartitionsDefinition` | Combine multiple partition dimensions | --- ## Testing Quick Reference ### Direct Function Testing ```python def test_my_asset(): result = my_asset() assert result == expected_value ``` ### Testing with Materialization ```python def test_asset_graph(): result = dg.materialize( assets=[asset_a, asset_b], resources={"database": mock_database}, ) assert result.success assert result.output_for_node("asset_b") == expected ``` ### Mocking Resources ```python from unittest.mock import Mock def test_with_mocked_resource(): mocked_resource = Mock() mocked_resource.query.return_value = [{"id": 1}] result = dg.materialize( assets=[my_asset], resources={"database": mocked_resource}, ) assert result.success ``` ### Asset Checks ```python @dg.asset_check(asset=my_asset) def validate_non_empty(my_asset): return dg.AssetCheckResult( passed=len(my_asset) > 0, metadata={"row_count": len(my_asset)}, ) ``` --- ## dbt Integration For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the `dbt-development` skill. ### Basic dbt Assets ```python from dagster_dbt import DbtCliResource, dbt_assets from pathlib import Path dbt_project_dir = Path(__file__).parent / "dbt_project" @dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json") def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() ``` ### dbt Resource ```python dg.Definitions( assets=[my_dbt_assets], resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)}, ) ``` **Full patterns**: See [Dagster dbt docs](https://docs.dagster.io/integrations/libraries/dbt) --- ## When to Load References ### Load `references/assets.md` when: - Defining complex asset dependencies - Adding metadata, groups, or key prefixes - Working with asset factories - Understanding asset materialization patterns ### Load `references/resources.md` when: - Creating custom `ConfigurableResource` classes - Integrating with databases, APIs, or cloud services - Understanding resource scoping and lifecycle ### Load `references/automation.md` when: - Creating schedules with complex cron patterns - Building sensors with cursors and state management - Implementing partitions and backfills - Automating dbt or other integration runs ### Load `references/testing.md` when: - Writing unit tests for assets - Mocking resources and dependencies - Using `dg.materialize()` for integration tests - Creating asset checks for data validation ### Load `references/etl-patterns.md` when: - Using dlt for embedded ETL - Using Sling for database replication - Loading data from files or APIs - Integrating external ETL tools ### Load `references/project-structure.md` when: - Setting up a new Dagster project - Configuring `Definitions` and code locations - Using `dg` CLI for scaffolding - Organizing large projects with Components --- ## Project Structure ### Recommended Layout ``` my_project/ ├── pyproject.toml ├── src/ │ └── my_project/ │ ├── definitions.py # Main Definitions │ └── defs/ │ ├── assets/ │ │ ├── __init__.py │ │ └── my_assets.py │ ├── jobs.py │ ├── schedules.py │ ├── sensors.py │ └── resources.py └── tests/ └── test_assets.py ``` ### Definitions Pattern (Modern) ```python # src/my_project/definitions.py from pathlib import Path from dagster import definitions, load_from_defs_folder @definitions def defs(): return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent) ``` ### Scaffolding with dg CLI ```bash # Create new project uvx create-dagster my_project # Scaffold new asset file dg scaffold defs dagster.asset assets/new_asset.py # Scaffold schedule dg scaffold defs dagster.schedule schedules.py # Scaffold sensor dg scaffold defs dagster.sensor sensors.py # Validate definitions dg check defs ``` --- ## Common Patterns ### Job Definition ```python trip_update_job = dg.define_asset_job( name="trip_update_job", selection=["taxi_trips", "taxi_zones"], ) ``` ### Run Configuration ```python from dagster import Config class MyAssetConfig(Config): filename: str limit: int = 100 @dg.asset def configurable_asset(config: MyAssetConfig) -> None: print(f"Processing {config.filename} with limit {config.limit}") ``` ### Asset Dependencies with External Sources ```python @dg.asset(deps=["external_table"]) def derived_asset() -> None: """Depends on external_table which isn't managed by Dagster.""" pass ``` --- ## Anti-Patterns to Avoid | Anti-Pattern | Better Approach | | ------------ | --------------- | | Hardcoding credentials in assets | Use `ConfigurableResource` with env vars | | Giant assets that do everything | Split into focused, composable assets | | Ignoring asset return types | Use type annotations for clarity | | Skipping tests for assets | Test assets like regular Python functions | | Not using partitions for time-series | Use `DailyPartitionsDefinition` etc. | | Putting all assets in one file | Organize by domain in separate modules | --- ## CLI Quick Reference ```bash # Development dg dev # Start Dagster UI dg check defs # Validate definitions # Scaffolding dg scaffold defs dagster.asset assets/file.py dg scaffold defs dagster.schedule schedules.py dg scaffold defs dagster.sensor sensors.py # Production dagster job execute -j my_job # Execute a job dagster asset materialize -a my_asset # Materialize an asset ``` --- ## References - **Assets**: `references/assets.md` - Detailed asset patterns - **Resources**: `references/resources.md` - Resource configuration - **Automation**: `references/automation.md` - Schedules, sensors, partitions - **Testing**: `references/testing.md` - Testing patterns and asset checks - **ETL Patterns**: `references/etl-patterns.md` - dlt, Sling, file/API ingestion - **Project Structure**: `references/project-structure.md` - Definitions, Components - **Official Docs**: https://docs.dagster.io - **API Reference**: https://docs.dagster.io/api/dagster