The extension point is a Python module that exposes register(registry), where registry is a JobRegistry. The CLI loads that module and calls register; job bodies live in your package or a standalone module.
-
Create a module (for example
mycompany/scheduler_jobs.py) with no-arg callables (or defaults-only parameters) that APScheduler can invoke, plusregister:from py_scheduler.registry import JobRegistry def daily_backup() -> None: ... def register(registry: JobRegistry) -> None: registry.register("daily_backup", daily_backup)
-
Point the scheduler at the module in either way:
- Environment variable
PY_SCHEDULER_JOBS_MODULE=mycompany.scheduler_jobs(handy in Docker/Kubernetes). - Or root YAML key
jobs_register_module: mycompany.scheduler_jobs(wins over the environment variable when set).
- Environment variable
-
Declare jobs in YAML with
nameequal to the string passed toregistry.register(...), and configureinterval/retryas documented in configuration.md.
You do not need to edit main.py when adding tasks: ship a new module plus YAML entries (and install any extra dependencies in the environment, for example pip install -e . from your own package).
Mount the config file at /app/config/config.yaml. If you use a private package, install it in the image (multi-stage build or pip install) or mount code onto PYTHONPATH.
Example:
docker build -t py-scheduler .
docker run --rm -p 9100:9100 \
-v /path/to/config.yaml:/app/config/config.yaml:ro \
-v py-scheduler-data:/data \
-e PY_SCHEDULER_JOBS_MODULE=mycompany.scheduler_jobs \
py-schedulerSet database_path in YAML to /data/scheduler.sqlite if you want durable history inside the volume.
The HTTP server exposes /metrics (port configurable, default 9100).
-
py_scheduler_job_failures_total— labelsjob_id,job_name. Counts terminal failures only (after retries), aligned with failure webhooks. -
py_scheduler_job_execution_seconds— histogram byjob_idandjob_name, one observation per attempt (success or failure). Example average latency in PromQL:sum by (job_id, job_name) ( rate(py_scheduler_job_execution_seconds_sum[5m]) ) / sum by (job_id, job_name) ( rate(py_scheduler_job_execution_seconds_count[5m]) )
Tune the [5m] window to your scrape interval and SLOs.
Each attempt inserts a row into job_executions with outcome in success, failed_retry, or failed_final, duration_ms, and error metadata when applicable. Disable persistence with database_path: "" in YAML.