FastAPI's strengths are async I/O, dependency injection, and Pydantic-validated request bodies. All three pay off when integrating screenshots: HTTP calls to SnapSharp are I/O-bound, the API key is a perfect injectable dependency, and request validation catches bad URLs before they ever reach the backend.
This tutorial builds a FastAPI service that accepts screenshot requests, processes them in background tasks, and returns job IDs the client can poll. By the end you'll have a real async job queue without Celery — just FastAPI's BackgroundTasks and a small in-memory job store (swappable for Redis).
Prerequisites
- Python 3.11+ and pip.
- A FastAPI installation (
pip install fastapi uvicorn[standard]). - A free SnapSharp API key from snapsharp.dev/sign-up.
- Familiarity with
async def,BackgroundTasks, andDepends.
Step 1: install and scaffold
pip install fastapi uvicorn[standard] snapsharp httpx pydantic-settings redisProject layout:
app/
├── __init__.py
├── main.py
├── config.py
├── deps.py
├── jobs.py
└── schemas.pyStep 2: typed settings with pydantic-settings
app/config.py:
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file='.env', extra='ignore')
snapsharp_api_key: str
redis_url: str = 'redis://localhost:6379/0'
snapsharp_timeout: int = 60
settings = Settings()pydantic-settings reads .env automatically, validates types, and fails fast if snapsharp_api_key is missing. No more os.environ['FOO'] scattered through the code.
Step 3: a typed SnapSharp client dependency
app/deps.py:
from functools import lru_cache
from snapsharp import SnapSharp
from .config import settings
@lru_cache(maxsize=1)
def get_snapsharp() -> SnapSharp:
return SnapSharp(api_key=settings.snapsharp_api_key, timeout=settings.snapsharp_timeout)@lru_cache ensures we instantiate the client once per process, not per request. FastAPI's dependency injection handles the rest.
Step 4: request and response schemas
app/schemas.py:
from typing import Literal, Optional
from pydantic import BaseModel, HttpUrl, Field
class ScreenshotRequest(BaseModel):
url: HttpUrl
width: int = Field(1280, ge=320, le=3840)
height: int = Field(720, ge=240, le=2160)
format: Literal['png', 'jpeg', 'webp'] = 'png'
full_page: bool = False
block_ads: bool = True
dark_mode: bool = False
class JobCreated(BaseModel):
job_id: str
status: Literal['pending', 'processing', 'completed', 'failed']
class JobStatus(BaseModel):
job_id: str
status: Literal['pending', 'processing', 'completed', 'failed']
image_url: Optional[str] = None
error: Optional[str] = None
created_at: float
completed_at: Optional[float] = NoneHttpUrl validates URL format. Field enforces min/max for dimensions. If a client sends width=99999, FastAPI returns a 422 with a clear error message, no manual validation needed.
Step 5: an in-memory job store (swappable for Redis)
app/jobs.py:
import time
import uuid
from typing import Dict, Optional
from .schemas import JobStatus
class JobStore:
def __init__(self):
self._jobs: Dict[str, JobStatus] = {}
def create(self) -> str:
job_id = uuid.uuid4().hex
self._jobs[job_id] = JobStatus(
job_id=job_id,
status='pending',
created_at=time.time(),
)
return job_id
def update(self, job_id: str, **fields) -> None:
job = self._jobs.get(job_id)
if not job:
return
updated = job.model_copy(update=fields)
self._jobs[job_id] = updated
def get(self, job_id: str) -> Optional[JobStatus]:
return self._jobs.get(job_id)
_store: Optional[JobStore] = None
def get_job_store() -> JobStore:
global _store
if _store is None:
_store = JobStore()
return _storeThis is fine for a single-process dev environment. For production, replace with Redis (we'll do that in Step 8). The interface stays the same.
Step 6: the main app with BackgroundTasks
app/main.py:
import time
import logging
from pathlib import Path
from fastapi import FastAPI, BackgroundTasks, HTTPException, Depends, status
from fastapi.responses import FileResponse, JSONResponse
from snapsharp import SnapSharp, SnapSharpError
from .deps import get_snapsharp
from .jobs import JobStore, get_job_store
from .schemas import ScreenshotRequest, JobCreated, JobStatus
logger = logging.getLogger(__name__)
app = FastAPI(title='Screenshot Service', version='1.0.0')
STORAGE_DIR = Path('./storage')
STORAGE_DIR.mkdir(exist_ok=True)
@app.get('/health')
async def health():
return {'ok': True}
def _process_job(
job_id: str,
request: ScreenshotRequest,
snap: SnapSharp,
store: JobStore,
):
"""Runs in a background thread. Sync because the SDK is sync."""
store.update(job_id, status='processing')
try:
image = snap.screenshot(
str(request.url),
width=request.width,
height=request.height,
format=request.format,
full_page=request.full_page,
block_ads=request.block_ads,
dark_mode=request.dark_mode,
)
except SnapSharpError as exc:
logger.exception('SnapSharp failure for job %s', job_id)
store.update(
job_id,
status='failed',
error=f'{exc.status}: {exc.message}',
completed_at=time.time(),
)
return
file_path = STORAGE_DIR / f'{job_id}.{request.format}'
file_path.write_bytes(image)
store.update(
job_id,
status='completed',
image_url=f'/jobs/{job_id}/image',
completed_at=time.time(),
)
@app.post('/screenshots', response_model=JobCreated, status_code=status.HTTP_202_ACCEPTED)
async def create_screenshot(
request: ScreenshotRequest,
background_tasks: BackgroundTasks,
snap: SnapSharp = Depends(get_snapsharp),
store: JobStore = Depends(get_job_store),
):
job_id = store.create()
background_tasks.add_task(_process_job, job_id, request, snap, store)
return JobCreated(job_id=job_id, status='pending')
@app.get('/jobs/{job_id}', response_model=JobStatus)
async def get_job(job_id: str, store: JobStore = Depends(get_job_store)):
job = store.get(job_id)
if not job:
raise HTTPException(404, 'job not found')
return job
@app.get('/jobs/{job_id}/image')
async def get_job_image(job_id: str, store: JobStore = Depends(get_job_store)):
job = store.get(job_id)
if not job or job.status != 'completed':
raise HTTPException(404, 'image not ready')
fmt = job.image_url.rsplit('.', 1)[-1] if job.image_url else 'png'
file_path = STORAGE_DIR / f'{job_id}.png'
if not file_path.exists():
raise HTTPException(404, 'file missing')
return FileResponse(file_path, media_type=f'image/{fmt}')Run with uvicorn app.main:app --reload --port 8000.
Test it:
# Submit a job
curl -X POST http://localhost:8000/screenshots \
-H 'Content-Type: application/json' \
-d '{"url": "https://github.com", "width": 1280, "height": 720}'
# → {"job_id": "abc...", "status": "pending"}
# Poll status
curl http://localhost:8000/jobs/abc...
# → {"job_id": "abc...", "status": "completed", "image_url": "/jobs/abc.../image"}
# Fetch the image
curl http://localhost:8000/jobs/abc.../image -o screenshot.pngThe HTTP returns immediately with 202 Accepted. The screenshot is captured in a background thread. The client polls until ready.
Step 7: synchronous endpoint for fast captures
For low-latency cases (well-known URLs, small dimensions, no full_page), bypass the job queue and return the binary inline:
from fastapi.responses import Response
@app.get('/screenshot')
async def quick_screenshot(
url: str,
width: int = 1280,
height: int = 720,
snap: SnapSharp = Depends(get_snapsharp),
):
try:
image = snap.screenshot(url, width=width, height=height, format='png')
except SnapSharpError as exc:
raise HTTPException(502, f'SnapSharp failed: {exc.message}')
return Response(
image,
media_type='image/png',
headers={'Cache-Control': 'public, max-age=86400, stale-while-revalidate=604800'},
)Note we still use async def — FastAPI happily mixes sync SDK calls in async endpoints. The framework will offload them to a thread pool automatically. If you want true async I/O, use httpx.AsyncClient and call SnapSharp's HTTP API directly (see the screenshot docs).
Step 8: swap the job store for Redis
For production, store jobs in Redis so multiple worker processes share state:
import json
import redis
from typing import Optional
from .schemas import JobStatus
class RedisJobStore:
def __init__(self, url: str):
self.r = redis.Redis.from_url(url, decode_responses=True)
def create(self) -> str:
import uuid, time
job_id = uuid.uuid4().hex
job = JobStatus(job_id=job_id, status='pending', created_at=time.time())
self.r.set(f'job:{job_id}', job.model_dump_json(), ex=86400)
return job_id
def update(self, job_id: str, **fields) -> None:
raw = self.r.get(f'job:{job_id}')
if not raw:
return
job = JobStatus.model_validate_json(raw)
updated = job.model_copy(update=fields)
self.r.set(f'job:{job_id}', updated.model_dump_json(), ex=86400)
def get(self, job_id: str) -> Optional[JobStatus]:
raw = self.r.get(f'job:{job_id}')
return JobStatus.model_validate_json(raw) if raw else NoneUpdate get_job_store to return the Redis-backed implementation:
@lru_cache(maxsize=1)
def get_job_store():
return RedisJobStore(settings.redis_url)The endpoints don't change — that's what dependency injection bought us.
Step 9: production deployment
uvicorn behind nginx
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4Workers are independent OS processes, each with their own SnapSharp client. Four workers on a 2-vCPU box handles ~200 concurrent requests easily because most time is spent waiting on SnapSharp's response.
Docker
Dockerfile:
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./app/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]Fly.io
fly launch --no-deploy
fly secrets set SNAPSHARP_API_KEY=sk_live_...
fly redis create
fly deploySet min_machines_running = 0 and auto_stop_machines = true for cheap idle.
Common pitfalls
Pitfall 1: blocking the event loop. Calling snap.screenshot() (sync) inside async def blocks one event loop tick if the I/O is fast. FastAPI offloads it automatically, but for hot paths consider httpx.AsyncClient directly against the SnapSharp HTTP API.
Pitfall 2: BackgroundTasks dies with the request. FastAPI's BackgroundTasks runs after the response is sent but in the same event loop. If your task is long (10+ seconds), use a real worker (Celery, RQ, dramatiq). For sub-10s screenshots, BackgroundTasks is fine.
Pitfall 3: Pydantic v1 vs v2 mismatch. This tutorial assumes Pydantic v2 (model_dump_json, model_validate_json). If you're on v1, use .json() and .parse_raw().
Pitfall 4: file storage on ephemeral disks. Fly.io machines and many container hosts have ephemeral file systems. Use S3 or a Fly.io volume for persistent storage. Don't write to ./storage and expect it to survive restarts.
Pitfall 5: missing OpenAPI docs. FastAPI auto-generates /docs (Swagger) and /redoc. Use them when wiring up clients — saves writing API docs by hand.
Final code
Five files:
app/config.py— typed settings.app/deps.py— SnapSharp client dependency.app/jobs.py— job store (in-memory or Redis).app/schemas.py— Pydantic models.app/main.py— endpoints and background task logic.
Around 200 lines total. Type-checked end to end via Pydantic and FastAPI's introspection.
Conclusion
FastAPI is the right tool for an async screenshot service. Pydantic catches bad input. Dependency injection handles the SnapSharp client cleanly. BackgroundTasks gives you a job queue without the operational overhead of Celery. Swap the job store for Redis and you have a multi-worker production service.
Next steps: read about webhooks for real-time updates, compare the Django + Celery tutorial, or explore the Python automation guide.
Related: Python tutorial · Pricing · OpenAPI / Postman / ChatGPT integrations