Pipeline

pyfetcher includes an event-driven pipeline that connects three stages:

  1. Crawl – Discover URLs, fetch pages, store in Postgres

  2. Scrape – Extract content, metadata, and media references

  3. Download – Acquire binary assets via yt-dlp, gallery-dl, or HTTP

Architecture

        flowchart LR
    Seeds["Seeds / RSS / Sitemap"] --> Crawl

    subgraph Pipeline
        Crawl["🔍 Crawl Stage"]
        Scrape["📄 Scrape Stage"]
        Download["💾 Download Stage"]
        Crawl -- "PG NOTIFY" --> Scrape
        Scrape -- "PG NOTIFY" --> Download
    end

    Crawl --> Pages[(pages table)]
    Crawl --> NewJobs["new crawl jobs"]
    Scrape --> Enriched[(pages enriched)]
    Scrape --> DlJobs["download jobs"]
    Download --> Media[(media_assets)]
    Download --> MinIO[("MinIO objects")]

    style Crawl fill:#4A90D9,color:#fff
    style Scrape fill:#7B68EE,color:#fff
    style Download fill:#2ECC71,color:#fff
    style MinIO fill:#F39C12,color:#fff
    

Data Flow

        sequenceDiagram
    participant S as Seeds
    participant C as Crawl Worker
    participant PG as PostgreSQL
    participant Sc as Scrape Worker
    participant D as Download Worker
    participant M as MinIO

    S->>PG: Insert crawl jobs
    PG-->>C: NOTIFY crawl_jobs
    C->>PG: Claim job (SELECT FOR UPDATE SKIP LOCKED)
    C->>C: Fetch page via FetchService
    C->>PG: Store page + create scrape job
    PG-->>Sc: NOTIFY scrape_jobs
    Sc->>PG: Claim job
    Sc->>Sc: Extract text, metadata, media URLs
    Sc->>PG: Update page + create download jobs
    PG-->>D: NOTIFY download_jobs
    D->>PG: Claim job
    D->>D: Download via yt-dlp / gallery-dl / HTTP
    D->>M: Upload to MinIO
    D->>PG: Record media_asset
    

Each stage:

  • Claims jobs from Postgres using SELECT FOR UPDATE SKIP LOCKED

  • Processes them with configurable concurrency

  • Writes results and emits NOTIFY for the next stage

Quick Start

# Start infrastructure
make infra-up
make migrate

# Run the pipeline
make pipeline

Programmatic Usage

from pyfetcher.pipeline.runner import PipelineRunner
from pyfetcher.config import PyfetcherConfig

config = PyfetcherConfig(
    crawl_concurrency=10,
    scrape_concurrency=20,
    download_concurrency=5,
)
runner = PipelineRunner(config)
await runner.start()

Seeding URLs

from pyfetcher.crawler.frontier import Frontier
from pyfetcher.db.engine import build_async_engine, build_session_factory

engine = build_async_engine()
session_factory = build_session_factory(engine)

async with session_factory() as session:
    frontier = Frontier()
    await frontier.add_urls(session, [
        "https://example.com",
        "https://example.com/blog",
    ])
    await session.commit()

Custom Spider

from pyfetcher.crawler.spider import Spider, SpiderResult
from pyfetcher.contracts.response import FetchResponse
from pyfetcher.scrape.links import extract_links

spider = Spider(name="my-spider")

@spider.router.default
async def handle_page(url: str, response: FetchResponse) -> SpiderResult:
    links = extract_links(response.text or "", base_url=url)
    return SpiderResult(
        discovered_urls=[l.url for l in links if not l.is_external],
        items=[{"url": url, "title": "..."}],
    )