Menu
fastapipythontutorialscreenshot-apiasync

FastAPI Background Screenshot Tasks Tutorial — Async Jobs in 10 Minutes

SnapSharp Team·April 26, 2026·4 min read

FastAPI's strengths are async I/O, dependency injection, and Pydantic-validated request bodies. All three pay off when integrating screenshots: HTTP calls to SnapSharp are I/O-bound, the API key is a perfect injectable dependency, and request validation catches bad URLs before they ever reach the backend.

This tutorial builds a FastAPI service that accepts screenshot requests, processes them in background tasks, and returns job IDs the client can poll. By the end you'll have a real async job queue without Celery — just FastAPI's BackgroundTasks and a small in-memory job store (swappable for Redis).

Prerequisites

  • Python 3.11+ and pip.
  • A FastAPI installation (pip install fastapi uvicorn[standard]).
  • A free SnapSharp API key from snapsharp.dev/sign-up.
  • Familiarity with async def, BackgroundTasks, and Depends.

Step 1: install and scaffold

pip install fastapi uvicorn[standard] snapsharp httpx pydantic-settings redis

Project layout:

app/
├── __init__.py
├── main.py
├── config.py
├── deps.py
├── jobs.py
└── schemas.py

Step 2: typed settings with pydantic-settings

app/config.py:

from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file='.env', extra='ignore')

    snapsharp_api_key: str
    redis_url: str = 'redis://localhost:6379/0'
    snapsharp_timeout: int = 60


settings = Settings()

pydantic-settings reads .env automatically, validates types, and fails fast if snapsharp_api_key is missing. No more os.environ['FOO'] scattered through the code.

Step 3: a typed SnapSharp client dependency

app/deps.py:

from functools import lru_cache
from snapsharp import SnapSharp
from .config import settings


@lru_cache(maxsize=1)
def get_snapsharp() -> SnapSharp:
    return SnapSharp(api_key=settings.snapsharp_api_key, timeout=settings.snapsharp_timeout)

@lru_cache ensures we instantiate the client once per process, not per request. FastAPI's dependency injection handles the rest.

Step 4: request and response schemas

app/schemas.py:

from typing import Literal, Optional
from pydantic import BaseModel, HttpUrl, Field


class ScreenshotRequest(BaseModel):
    url: HttpUrl
    width: int = Field(1280, ge=320, le=3840)
    height: int = Field(720, ge=240, le=2160)
    format: Literal['png', 'jpeg', 'webp'] = 'png'
    full_page: bool = False
    block_ads: bool = True
    dark_mode: bool = False


class JobCreated(BaseModel):
    job_id: str
    status: Literal['pending', 'processing', 'completed', 'failed']


class JobStatus(BaseModel):
    job_id: str
    status: Literal['pending', 'processing', 'completed', 'failed']
    image_url: Optional[str] = None
    error: Optional[str] = None
    created_at: float
    completed_at: Optional[float] = None

HttpUrl validates URL format. Field enforces min/max for dimensions. If a client sends width=99999, FastAPI returns a 422 with a clear error message, no manual validation needed.

Step 5: an in-memory job store (swappable for Redis)

app/jobs.py:

import time
import uuid
from typing import Dict, Optional
from .schemas import JobStatus


class JobStore:
    def __init__(self):
        self._jobs: Dict[str, JobStatus] = {}

    def create(self) -> str:
        job_id = uuid.uuid4().hex
        self._jobs[job_id] = JobStatus(
            job_id=job_id,
            status='pending',
            created_at=time.time(),
        )
        return job_id

    def update(self, job_id: str, **fields) -> None:
        job = self._jobs.get(job_id)
        if not job:
            return
        updated = job.model_copy(update=fields)
        self._jobs[job_id] = updated

    def get(self, job_id: str) -> Optional[JobStatus]:
        return self._jobs.get(job_id)


_store: Optional[JobStore] = None


def get_job_store() -> JobStore:
    global _store
    if _store is None:
        _store = JobStore()
    return _store

This is fine for a single-process dev environment. For production, replace with Redis (we'll do that in Step 8). The interface stays the same.

Step 6: the main app with BackgroundTasks

app/main.py:

import time
import logging
from pathlib import Path
from fastapi import FastAPI, BackgroundTasks, HTTPException, Depends, status
from fastapi.responses import FileResponse, JSONResponse
from snapsharp import SnapSharp, SnapSharpError

from .deps import get_snapsharp
from .jobs import JobStore, get_job_store
from .schemas import ScreenshotRequest, JobCreated, JobStatus

logger = logging.getLogger(__name__)

app = FastAPI(title='Screenshot Service', version='1.0.0')

STORAGE_DIR = Path('./storage')
STORAGE_DIR.mkdir(exist_ok=True)


@app.get('/health')
async def health():
    return {'ok': True}


def _process_job(
    job_id: str,
    request: ScreenshotRequest,
    snap: SnapSharp,
    store: JobStore,
):
    """Runs in a background thread. Sync because the SDK is sync."""
    store.update(job_id, status='processing')
    try:
        image = snap.screenshot(
            str(request.url),
            width=request.width,
            height=request.height,
            format=request.format,
            full_page=request.full_page,
            block_ads=request.block_ads,
            dark_mode=request.dark_mode,
        )
    except SnapSharpError as exc:
        logger.exception('SnapSharp failure for job %s', job_id)
        store.update(
            job_id,
            status='failed',
            error=f'{exc.status}: {exc.message}',
            completed_at=time.time(),
        )
        return

    file_path = STORAGE_DIR / f'{job_id}.{request.format}'
    file_path.write_bytes(image)

    store.update(
        job_id,
        status='completed',
        image_url=f'/jobs/{job_id}/image',
        completed_at=time.time(),
    )


@app.post('/screenshots', response_model=JobCreated, status_code=status.HTTP_202_ACCEPTED)
async def create_screenshot(
    request: ScreenshotRequest,
    background_tasks: BackgroundTasks,
    snap: SnapSharp = Depends(get_snapsharp),
    store: JobStore = Depends(get_job_store),
):
    job_id = store.create()
    background_tasks.add_task(_process_job, job_id, request, snap, store)
    return JobCreated(job_id=job_id, status='pending')


@app.get('/jobs/{job_id}', response_model=JobStatus)
async def get_job(job_id: str, store: JobStore = Depends(get_job_store)):
    job = store.get(job_id)
    if not job:
        raise HTTPException(404, 'job not found')
    return job


@app.get('/jobs/{job_id}/image')
async def get_job_image(job_id: str, store: JobStore = Depends(get_job_store)):
    job = store.get(job_id)
    if not job or job.status != 'completed':
        raise HTTPException(404, 'image not ready')

    fmt = job.image_url.rsplit('.', 1)[-1] if job.image_url else 'png'
    file_path = STORAGE_DIR / f'{job_id}.png'
    if not file_path.exists():
        raise HTTPException(404, 'file missing')
    return FileResponse(file_path, media_type=f'image/{fmt}')

Run with uvicorn app.main:app --reload --port 8000.

Test it:

# Submit a job
curl -X POST http://localhost:8000/screenshots \
  -H 'Content-Type: application/json' \
  -d '{"url": "https://github.com", "width": 1280, "height": 720}'
# → {"job_id": "abc...", "status": "pending"}

# Poll status
curl http://localhost:8000/jobs/abc...
# → {"job_id": "abc...", "status": "completed", "image_url": "/jobs/abc.../image"}

# Fetch the image
curl http://localhost:8000/jobs/abc.../image -o screenshot.png

The HTTP returns immediately with 202 Accepted. The screenshot is captured in a background thread. The client polls until ready.

Step 7: synchronous endpoint for fast captures

For low-latency cases (well-known URLs, small dimensions, no full_page), bypass the job queue and return the binary inline:

from fastapi.responses import Response


@app.get('/screenshot')
async def quick_screenshot(
    url: str,
    width: int = 1280,
    height: int = 720,
    snap: SnapSharp = Depends(get_snapsharp),
):
    try:
        image = snap.screenshot(url, width=width, height=height, format='png')
    except SnapSharpError as exc:
        raise HTTPException(502, f'SnapSharp failed: {exc.message}')

    return Response(
        image,
        media_type='image/png',
        headers={'Cache-Control': 'public, max-age=86400, stale-while-revalidate=604800'},
    )

Note we still use async def — FastAPI happily mixes sync SDK calls in async endpoints. The framework will offload them to a thread pool automatically. If you want true async I/O, use httpx.AsyncClient and call SnapSharp's HTTP API directly (see the screenshot docs).

Step 8: swap the job store for Redis

For production, store jobs in Redis so multiple worker processes share state:

import json
import redis
from typing import Optional
from .schemas import JobStatus


class RedisJobStore:
    def __init__(self, url: str):
        self.r = redis.Redis.from_url(url, decode_responses=True)

    def create(self) -> str:
        import uuid, time
        job_id = uuid.uuid4().hex
        job = JobStatus(job_id=job_id, status='pending', created_at=time.time())
        self.r.set(f'job:{job_id}', job.model_dump_json(), ex=86400)
        return job_id

    def update(self, job_id: str, **fields) -> None:
        raw = self.r.get(f'job:{job_id}')
        if not raw:
            return
        job = JobStatus.model_validate_json(raw)
        updated = job.model_copy(update=fields)
        self.r.set(f'job:{job_id}', updated.model_dump_json(), ex=86400)

    def get(self, job_id: str) -> Optional[JobStatus]:
        raw = self.r.get(f'job:{job_id}')
        return JobStatus.model_validate_json(raw) if raw else None

Update get_job_store to return the Redis-backed implementation:

@lru_cache(maxsize=1)
def get_job_store():
    return RedisJobStore(settings.redis_url)

The endpoints don't change — that's what dependency injection bought us.

Step 9: production deployment

uvicorn behind nginx

uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

Workers are independent OS processes, each with their own SnapSharp client. Four workers on a 2-vCPU box handles ~200 concurrent requests easily because most time is spent waiting on SnapSharp's response.

Docker

Dockerfile:

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY app/ ./app/

EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Fly.io

fly launch --no-deploy
fly secrets set SNAPSHARP_API_KEY=sk_live_...
fly redis create
fly deploy

Set min_machines_running = 0 and auto_stop_machines = true for cheap idle.

Common pitfalls

Pitfall 1: blocking the event loop. Calling snap.screenshot() (sync) inside async def blocks one event loop tick if the I/O is fast. FastAPI offloads it automatically, but for hot paths consider httpx.AsyncClient directly against the SnapSharp HTTP API.

Pitfall 2: BackgroundTasks dies with the request. FastAPI's BackgroundTasks runs after the response is sent but in the same event loop. If your task is long (10+ seconds), use a real worker (Celery, RQ, dramatiq). For sub-10s screenshots, BackgroundTasks is fine.

Pitfall 3: Pydantic v1 vs v2 mismatch. This tutorial assumes Pydantic v2 (model_dump_json, model_validate_json). If you're on v1, use .json() and .parse_raw().

Pitfall 4: file storage on ephemeral disks. Fly.io machines and many container hosts have ephemeral file systems. Use S3 or a Fly.io volume for persistent storage. Don't write to ./storage and expect it to survive restarts.

Pitfall 5: missing OpenAPI docs. FastAPI auto-generates /docs (Swagger) and /redoc. Use them when wiring up clients — saves writing API docs by hand.

Final code

Five files:

  • app/config.py — typed settings.
  • app/deps.py — SnapSharp client dependency.
  • app/jobs.py — job store (in-memory or Redis).
  • app/schemas.py — Pydantic models.
  • app/main.py — endpoints and background task logic.

Around 200 lines total. Type-checked end to end via Pydantic and FastAPI's introspection.

Conclusion

FastAPI is the right tool for an async screenshot service. Pydantic catches bad input. Dependency injection handles the SnapSharp client cleanly. BackgroundTasks gives you a job queue without the operational overhead of Celery. Swap the job store for Redis and you have a multi-worker production service.

Next steps: read about webhooks for real-time updates, compare the Django + Celery tutorial, or explore the Python automation guide.


Related: Python tutorial · Pricing · OpenAPI / Postman / ChatGPT integrations

FastAPI Background Screenshot Tasks Tutorial — Async Jobs in 10 Minutes — SnapSharp Blog