# nightly-jobs > Add scripts to the consolidated nightly job runner - Author: Justinsato - Repository: Justinsato/brickston-ai - Version: 20260203201127 - Stars: 0 - Forks: 0 - Last Updated: 2026-02-06 - Source: https://github.com/Justinsato/brickston-ai - Web: https://mule.run/skillshub/@@Justinsato/brickston-ai~nightly-jobs:20260203201127 --- --- name: nightly-jobs description: Add scripts to the consolidated nightly job runner --- # Nightly Jobs Skill ## Overview This skill guides you through integrating scripts into the brickston-ai consolidated nightly job system. ## File Locations - **Scripts**: `scripts/` - **Job Runner**: `apps/api/app/tasks/nightly_runner.py` (or equivalent) - **Scheduler Config**: Check Cloud Scheduler jobs in GCP Console ## Adding a New Nightly Job ### Step 1: Create the Script Location: `scripts/.py` ```python #!/usr/bin/env python3 """ Description: Brief description of what this job does. Schedule: Nightly at 2:00 AM PST """ import asyncio import logging from datetime import datetime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def main(): """Main job execution.""" start_time = datetime.now() logger.info(f"Starting job at {start_time}") try: # Job logic here await process_data() elapsed = datetime.now() - start_time logger.info(f"Job completed successfully in {elapsed}") return True except Exception as e: logger.error(f"Job failed: {e}") raise async def process_data(): """Core processing logic.""" # Implementation pass if __name__ == "__main__": asyncio.run(main()) ``` ### Step 2: Add Database Connection (if needed) ```python import os from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker DATABASE_URL = os.environ.get("DATABASE_URL") async def get_db_session(): engine = create_async_engine(DATABASE_URL) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with async_session() as session: yield session ``` ### Step 3: Register in Job Runner Add to the consolidated job runner configuration: ```python NIGHTLY_JOBS = [ { "name": "new_job_name", "script": "scripts/new_job.py", "schedule": "0 2 * * *", # 2:00 AM daily "enabled": True, "timeout_minutes": 30, }, # ... other jobs ] ``` ### Step 4: Add to Cloud Scheduler (for GCP deployments) ```bash # Create a Cloud Scheduler job that triggers the Cloud Run cron service gcloud scheduler jobs create http new-nightly-job \ --location us-west1 \ --schedule "0 10 * * *" \ --uri "https://brickston-cron-job-HASH-uw.a.run.app/run/new-job" \ --http-method POST \ --oidc-service-account-email brickston-scheduler@graphic-iridium-485814-b2.iam.gserviceaccount.com \ --time-zone "UTC" ``` ## Job Patterns ### Data Sync Job ```python async def sync_external_data(): """Sync data from external source.""" async with get_db_session() as db: # Fetch from external API external_data = await fetch_external_api() # Upsert into database for record in external_data: await upsert_record(db, record) await db.commit() ``` ### Cleanup Job ```python async def cleanup_old_records(): """Remove records older than retention period.""" cutoff_date = datetime.now() - timedelta(days=90) async with get_db_session() as db: result = await db.execute( text("DELETE FROM temp_data WHERE created_at < :cutoff"), {"cutoff": cutoff_date} ) logger.info(f"Deleted {result.rowcount} old records") await db.commit() ``` ### Report Generation Job ```python async def generate_daily_report(): """Generate and store daily summary report.""" async with get_db_session() as db: # Aggregate data metrics = await calculate_daily_metrics(db) # Store report await db.execute( text(""" INSERT INTO daily_reports (report_date, metrics) VALUES (:date, :metrics) """), {"date": datetime.now().date(), "metrics": json.dumps(metrics)} ) await db.commit() ``` ## Error Handling & Alerting ```python import traceback async def main(): try: await run_job() except Exception as e: error_msg = f"Job failed: {e}\n{traceback.format_exc()}" logger.error(error_msg) # Optionally send alert await send_slack_alert(error_msg) raise ``` ## Testing ```bash # Run job manually cd /path/to/repo python scripts/new_job.py # With environment DATABASE_URL=postgresql://... python scripts/new_job.py ``` ## Checklist - [ ] Script created in `scripts/` - [ ] Proper logging configured - [ ] Database connection (if needed) - [ ] Error handling with clear messages - [ ] Registered in job runner - [ ] Added to Cloud Scheduler for deployment - [ ] Tested locally - [ ] Documented execution time expectations