Transports

pyfetcher supports multiple HTTP backends. Each transport implements the SyncTransport and/or AsyncTransport protocols.

Backend Capabilities

Backend

Sync

Async

Stream

TLS Fingerprint

CF Bypass

httpx

Yes

Yes

Yes

No

No

aiohttp

No

Yes

Yes

No

No

curl_cffi

Yes

Yes

Yes

Yes

No

cloudscraper

Yes

No

No

No

Yes

Base Protocols

Base transport protocols for pyfetcher.

Purpose:

Define the minimal sync/async transport interfaces consumed by fetch services. Implementations provide backend-specific HTTP execution while conforming to these duck-typed protocols.

Design:
  • Sync and async protocols are distinct to avoid forcing implementers to provide both.

  • Streaming is modeled explicitly for async transports.

  • Implementations own backend-specific session/client lifecycles.

Examples

>>> hasattr(SyncTransport, "fetch")
True
class pyfetcher.transports.base.SyncTransport(*args, **kwargs)[source]

Protocol for synchronous fetch transports.

Implementations must provide a fetch method that accepts a FetchRequest and returns a normalized FetchResponse.

fetch(request)[source]

Fetch a request synchronously.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized fetch response.

Return type:

FetchResponse

class pyfetcher.transports.base.AsyncTransport(*args, **kwargs)[source]

Protocol for asynchronous fetch transports.

Implementations must provide afetch for full responses and astream for chunked streaming.

async afetch(request)[source]

Fetch a request asynchronously.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized fetch response.

Return type:

FetchResponse

async astream(request)[source]

Stream a request asynchronously.

Parameters:

request (FetchRequest) – The fetch request to stream.

Returns:

An async iterator yielding StreamChunk objects.

Return type:

AsyncIterator[StreamChunk]

HTTPX

HTTPX transport implementation for pyfetcher.

Purpose:

Provide pooled synchronous and asynchronous fetching using httpx as the underlying HTTP client library.

Design:
  • One transport instance owns long-lived clients and pooling state.

  • Request conversion stays thin because the main contracts are transport-agnostic.

  • Streaming is supported via httpx.AsyncClient.stream.

  • Clients are lazily initialized on first use with settings derived from the request’s embedded policies.

Examples

>>> transport = HttpxTransport()
>>> hasattr(transport, "fetch")
True
class pyfetcher.transports.httpx.HttpxTransport(*, sync_client=None, async_client=None)[source]

Combined sync/async HTTPX transport with pooled client management.

Manages long-lived httpx.Client and httpx.AsyncClient instances that are lazily initialized from the first request’s embedded policies. Supports synchronous fetch, asynchronous fetch, and async streaming.

Parameters:
  • sync_client (httpx.Client | None) – Optional externally managed sync client (caller owns lifecycle).

  • async_client (httpx.AsyncClient | None) – Optional externally managed async client (caller owns lifecycle).

Examples

>>> transport = HttpxTransport()
>>> hasattr(transport, "afetch")
True
fetch(request)[source]

Fetch a request synchronously.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized FetchResponse.

Raises:

httpx.HTTPStatusError – If the response status indicates an error.

Return type:

FetchResponse

async afetch(request)[source]

Fetch a request asynchronously.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized FetchResponse.

Raises:

httpx.HTTPStatusError – If the response status indicates an error.

Return type:

FetchResponse

async astream(request)[source]

Stream a request asynchronously.

Yields chunks of the response body as StreamChunk objects, each carrying the raw bytes and a zero-based index.

Parameters:

request (FetchRequest) – The fetch request to stream.

Yields:

StreamChunk objects.

Raises:

httpx.HTTPStatusError – If the response status indicates an error.

Return type:

AsyncIterator[StreamChunk]

close()[source]

Close the owned sync client if present.

Only closes clients that were created internally (not externally provided via the constructor).

Return type:

None

async aclose()[source]

Close the owned async client if present.

Only closes clients that were created internally (not externally provided via the constructor).

Return type:

None

Aiohttp

Aiohttp transport implementation for pyfetcher.

Purpose:

Provide an async-first transport backed by aiohttp, suitable for pooled crawling, bounded concurrency, and chunked streaming.

Design:
  • A long-lived aiohttp.ClientSession owns connector pooling.

  • Connector settings come from the shared pool policy embedded in each request.

  • Full fetch and streaming interfaces are both supported.

Examples

>>> transport = AiohttpTransport()
>>> hasattr(transport, "afetch")
True
class pyfetcher.transports.aiohttp.AiohttpTransport(*, session=None)[source]

Async aiohttp transport with pooled session reuse.

Manages a long-lived aiohttp.ClientSession that is lazily created from the first request’s embedded policies. The session’s TCP connector provides connection pooling and keepalive management.

Parameters:

session (aiohttp.ClientSession | None) – Optional externally managed session (caller owns lifecycle).

Examples

>>> transport = AiohttpTransport()
>>> hasattr(transport, "astream")
True
async afetch(request)[source]

Fetch a request asynchronously.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized FetchResponse.

Raises:

aiohttp.ClientResponseError – If the response status indicates an error.

Return type:

FetchResponse

async astream(request)[source]

Stream a request asynchronously.

Yields chunks of the response body as StreamChunk objects.

Parameters:

request (FetchRequest) – The fetch request to stream.

Yields:

StreamChunk objects.

Raises:

aiohttp.ClientResponseError – If the response status indicates an error.

Return type:

AsyncIterator[StreamChunk]

async aclose()[source]

Close the owned session if present.

Only closes sessions that were created internally (not externally provided via the constructor).

Return type:

None

curl_cffi

curl_cffi transport implementation for pyfetcher.

Purpose:

Provide a transport backed by curl_cffi that impersonates real browser TLS fingerprints (JA3/JA4), making requests appear to originate from genuine browsers at the network layer. This is the most effective approach for bypassing TLS-based bot detection (e.g. Cloudflare, Akamai).

Design:
  • Uses curl_cffi.requests.Session for synchronous requests.

  • Uses curl_cffi.requests.AsyncSession for asynchronous requests.

  • The impersonate parameter selects which browser’s TLS fingerprint to match (e.g. 'chrome120', 'firefox', 'safari').

  • Sessions are lazily initialized on first use.

  • curl_cffi is imported lazily so it remains an optional dependency.

Examples

>>> transport = CurlCffiTransport(impersonate="chrome120")
>>> hasattr(transport, "fetch")
True
pyfetcher.transports.curl_cffi.CURL_CFFI_TARGETS: list[str] = ['chrome99', 'chrome100', 'chrome101', 'chrome104', 'chrome107', 'chrome110', 'chrome116', 'chrome119', 'chrome120', 'chrome123', 'chrome124', 'chrome131', 'chrome133', 'edge99', 'edge101', 'firefox95', 'firefox100', 'firefox102', 'firefox109', 'firefox117', 'safari15_3', 'safari15_5', 'safari17_0', 'safari17_2_1', 'safari18_0']

Supported curl_cffi browser impersonation targets.

class pyfetcher.transports.curl_cffi.CurlCffiTransport(*, impersonate='chrome120', sync_session=None, async_session=None)[source]

Transport using curl_cffi for browser TLS fingerprint impersonation.

Matches real browser JA3/JA4 TLS fingerprints at the network level, making requests indistinguishable from genuine browser traffic to TLS-based bot detection systems.

Parameters:
  • impersonate (str) – Browser TLS fingerprint target (e.g. 'chrome120', 'firefox', 'safari'). See CURL_CFFI_TARGETS for all options.

  • sync_session (object | None) – Optional externally managed sync session.

  • async_session (object | None) – Optional externally managed async session.

Examples

>>> transport = CurlCffiTransport(impersonate="chrome120")
>>> hasattr(transport, "afetch")
True
fetch(request)[source]

Fetch a request synchronously with browser TLS impersonation.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized FetchResponse.

Raises:

ImportError – If curl_cffi is not installed.

Return type:

FetchResponse

async afetch(request)[source]

Fetch a request asynchronously with browser TLS impersonation.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized FetchResponse.

Raises:

ImportError – If curl_cffi is not installed.

Return type:

FetchResponse

async astream(request)[source]

Stream a request asynchronously with browser TLS impersonation.

Parameters:

request (FetchRequest) – The fetch request to stream.

Yields:

StreamChunk objects.

Raises:

ImportError – If curl_cffi is not installed.

Return type:

AsyncIterator[StreamChunk]

close()[source]

Close the owned sync session if present.

Return type:

None

async aclose()[source]

Close the owned async session if present.

Return type:

None

Cloudscraper

Cloudscraper transport implementation for pyfetcher.

Purpose:

Provide synchronous fetching using cloudscraper as the underlying HTTP client library. Cloudscraper automatically handles Cloudflare’s anti-bot challenges (JavaScript challenges, CAPTCHAs, etc.), allowing transparent access to Cloudflare-protected sites.

Design:
  • cloudscraper is imported lazily so the package remains optional.

  • One transport instance owns a long-lived scraper session that is lazily created on first use.

  • Only synchronous fetch is supported because cloudscraper is built on top of requests, which is inherently synchronous.

Examples

>>> transport = CloudscraperTransport()
>>> hasattr(transport, "fetch")
True
class pyfetcher.transports.cloudscraper.CloudscraperTransport(*, browser='chrome')[source]

Synchronous cloudscraper transport for Cloudflare challenge bypass.

Manages a long-lived cloudscraper.create_scraper() session that is lazily initialized on first use. The browser parameter controls which browser profile cloudscraper uses for challenge solving.

Parameters:

browser (str) – Browser profile identifier passed to cloudscraper.create_scraper(). Defaults to "chrome".

Note

This transport only supports synchronous fetch. Cloudscraper is built on requests and does not provide an async API.

Examples

>>> transport = CloudscraperTransport()
>>> hasattr(transport, "fetch")
True
fetch(request)[source]

Fetch a request synchronously using cloudscraper.

Automatically handles Cloudflare anti-bot challenges, retrying internally when a challenge page is encountered.

Parameters:

request (FetchRequest) – The fetch request to execute.

Returns:

A normalized FetchResponse.

Raises:
  • cloudscraper.exceptions.CloudflareChallengeError – If the Cloudflare challenge cannot be solved.

  • requests.exceptions.HTTPError – If the response status indicates an error.

Return type:

FetchResponse

close()[source]

Close the owned scraper session if present.

Releases resources held by the underlying requests session.

Return type:

None