# dagster-local > Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod. - Author: alexismanuel - Repository: alexismanuel/dotfiles - Version: 20260105203939 - Stars: 0 - Forks: 0 - Last Updated: 2026-02-06 - Source: https://github.com/alexismanuel/dotfiles - Web: https://mule.run/skillshub/@@alexismanuel/dotfiles~dagster-local:20260105203939 --- --- name: dagster-local description: Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod. --- # Dagster Local Skill Programmatic interaction with Dagster via GraphQL API and Kubernetes for pod-level logs. ## Quick Start ```python from scripts.dagster_client import DagsterClient client = DagsterClient() # defaults to http://localhost:3000/graphql # List all assets assets = client.list_assets() # Get recent runs runs = client.get_recent_runs(limit=10) # Get logs for a specific run logs = client.get_run_logs(run_id="abc123") ``` ## Configuration ```python client = DagsterClient(graphql_url="http://localhost:3000/graphql") ``` ## Available Operations ### Query Operations | Function | Purpose | |----------|---------| | `list_repositories()` | List all code locations/repositories | | `list_jobs(repo_location, repo_name)` | List jobs in a repository | | `list_assets(repo_location, repo_name)` | List assets in a repository | | `get_recent_runs(limit)` | Get recent run history | | `get_run_info(run_id)` | Get detailed run info and status | | `get_run_logs(run_id)` | Get event logs for a run | | `get_asset_info(asset_key)` | Get asset details and dependencies | ### Mutation Operations | Function | Purpose | |----------|---------| | `launch_job(repo_location, repo_name, job_name, config)` | Launch a job run | | `materialize_asset(asset_key, repo_location, repo_name)` | Materialize an asset | | `terminate_run(run_id)` | Terminate an in-progress run | ## Kubernetes Integration When Dagster runs on K8s (each run = separate pod): ```python from scripts.k8s_logs import get_pod_logs_for_run, get_dagster_pods # Get pod logs for a specific run logs = get_pod_logs_for_run(run_id="abc123", namespace="dagster") # List all Dagster-related pods pods = get_dagster_pods(namespace="dagster") ``` ## Debugging Workflow 1. **Check recent runs**: `get_recent_runs()` to find failed runs 2. **Get run details**: `get_run_info(run_id)` for status and error summary 3. **Get Dagster logs**: `get_run_logs(run_id)` for step-level events 4. **Get pod logs** (K8s): `get_pod_logs_for_run(run_id)` for stdout/stderr ## Common Patterns ### Wait for Run Completion ```python import time def wait_for_run(client, run_id, timeout=300): start = time.time() while time.time() - start < timeout: info = client.get_run_info(run_id) status = info.get("status") if status in ["SUCCESS", "FAILURE", "CANCELED"]: return info time.sleep(5) raise TimeoutError(f"Run {run_id} did not complete") ``` ### Get Failure Reason ```python def get_failure_reason(client, run_id): logs = client.get_run_logs(run_id) failures = [e for e in logs.get("events", []) if "error" in e] return failures[-1] if failures else None ``` ## GraphQL Reference For advanced queries, see `references/graphql_queries.md`.