Async Usage¶
pyhaul has a real async engine — not a sync wrapper running in a thread pool.
haul_async() uses async with and async for natively, sharing all
non-I/O logic with the sync path.
Supported async clients¶
| Library | Client type | Install |
|---|---|---|
| httpx | httpx.AsyncClient |
pip install pyhaul[httpx] |
| aiohttp | aiohttp.ClientSession |
pip install pyhaul[aiohttp] |
| niquests | niquests.AsyncSession |
pip install pyhaul[niquests] |
What about urllib3?
urllib3 is sync-only. For async downloads with urllib3, use
asyncio.to_thread() to run the sync
haul() in a thread pool.
Basic async download¶
Note
pyhaul sets auto_decompress=False on aiohttp requests internally to
ensure raw bytes for accurate resume. Your session's other settings
(auth, proxies, timeouts) pass through unchanged.
Concurrent downloads with TaskGroup¶
asyncio.TaskGroup (Python 3.11+) is the cleanest way to download multiple
files concurrently:
import asyncio
from pathlib import Path
import httpx
from pyhaul import haul_async, PartialHaulError
URLS = [
("https://data.example.edu/census/2024-vol01.csv.gz", Path("data/vol01.csv.gz")),
("https://data.example.edu/census/2024-vol02.csv.gz", Path("data/vol02.csv.gz")),
("https://data.example.edu/census/2024-vol03.csv.gz", Path("data/vol03.csv.gz")),
]
async def download_one(client: httpx.AsyncClient, url: str, dest: Path):
for attempt in range(1, 11):
try:
await haul_async(url, client, dest=dest)
return dest
except PartialHaulError:
if attempt == 10:
raise
await asyncio.sleep(min(2**attempt, 30))
async def main():
Path("data").mkdir(exist_ok=True)
async with httpx.AsyncClient() as client:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(download_one(client, url, dest))
for url, dest in URLS
]
for task in tasks:
print(f"done: {task.result()}")
asyncio.run(main())
import asyncio
from pathlib import Path
import aiohttp
from pyhaul import haul_async, PartialHaulError
URLS = [
("https://data.example.edu/census/2024-vol01.csv.gz", Path("data/vol01.csv.gz")),
("https://data.example.edu/census/2024-vol02.csv.gz", Path("data/vol02.csv.gz")),
("https://data.example.edu/census/2024-vol03.csv.gz", Path("data/vol03.csv.gz")),
]
async def download_one(session: aiohttp.ClientSession, url: str, dest: Path):
for attempt in range(1, 11):
try:
await haul_async(url, session, dest=dest)
return dest
except PartialHaulError:
if attempt == 10:
raise
await asyncio.sleep(min(2**attempt, 30))
async def main():
Path("data").mkdir(exist_ok=True)
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(download_one(session, url, dest))
for url, dest in URLS
]
for task in tasks:
print(f"done: {task.result()}")
asyncio.run(main())
import asyncio
from pathlib import Path
import niquests
from pyhaul import haul_async, PartialHaulError
URLS = [
("https://data.example.edu/census/2024-vol01.csv.gz", Path("data/vol01.csv.gz")),
("https://data.example.edu/census/2024-vol02.csv.gz", Path("data/vol02.csv.gz")),
("https://data.example.edu/census/2024-vol03.csv.gz", Path("data/vol03.csv.gz")),
]
async def download_one(session: niquests.AsyncSession, url: str, dest: Path):
for attempt in range(1, 11):
try:
await haul_async(url, session, dest=dest)
return dest
except PartialHaulError:
if attempt == 10:
raise
await asyncio.sleep(min(2**attempt, 30))
async def main():
Path("data").mkdir(exist_ok=True)
async with niquests.AsyncSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(download_one(session, url, dest))
for url, dest in URLS
]
for task in tasks:
print(f"done: {task.result()}")
asyncio.run(main())
Each haul_async() call manages its own checkpoint independently. A crash
partway through leaves each file in a separately resumable state.
Limiting concurrency with Semaphore¶
When downloading many files, limit concurrency to avoid overwhelming the server or exhausting file descriptors:
Async with tenacity¶
tenacity supports async natively. Decorate
an async def and tenacity handles the await:
from tenacity import (
retry, retry_if_exception, stop_after_attempt, wait_exponential_jitter,
)
import httpx
from pyhaul import haul_async, PartialHaulError, UnexpectedStatusError
def _retryable(exc: BaseException) -> bool:
if isinstance(exc, (PartialHaulError, httpx.TransportError)):
return True
return isinstance(exc, UnexpectedStatusError) and exc.is_transient
@retry(
retry=retry_if_exception(_retryable),
wait=wait_exponential_jitter(initial=2, max=60),
stop=stop_after_attempt(10),
)
async def download(client: httpx.AsyncClient, url: str, dest: str):
return await haul_async(url, client, dest=dest)
from tenacity import (
retry, retry_if_exception, stop_after_attempt, wait_exponential_jitter,
)
import aiohttp
from pyhaul import haul_async, PartialHaulError, UnexpectedStatusError
def _retryable(exc: BaseException) -> bool:
if isinstance(exc, (PartialHaulError, aiohttp.ClientError)):
return True
return isinstance(exc, UnexpectedStatusError) and exc.is_transient
@retry(
retry=retry_if_exception(_retryable),
wait=wait_exponential_jitter(initial=2, max=60),
stop=stop_after_attempt(10),
)
async def download(session: aiohttp.ClientSession, url: str, dest: str):
return await haul_async(url, session, dest=dest)
from tenacity import (
retry, retry_if_exception, stop_after_attempt, wait_exponential_jitter,
)
import niquests
from pyhaul import haul_async, PartialHaulError, UnexpectedStatusError
def _retryable(exc: BaseException) -> bool:
if isinstance(exc, (PartialHaulError, niquests.RequestException)):
return True
return isinstance(exc, UnexpectedStatusError) and exc.is_transient
@retry(
retry=retry_if_exception(_retryable),
wait=wait_exponential_jitter(initial=2, max=60),
stop=stop_after_attempt(10),
)
async def download(session: niquests.AsyncSession, url: str, dest: str):
return await haul_async(url, session, dest=dest)
Mixing sync clients with asyncio¶
urllib3 and requests.Session are sync-only. If your application is async but
you need one of these clients, use asyncio.to_thread()
to run the sync haul() in a thread without blocking the event loop:
Warning
asyncio.to_thread() runs the download in a separate OS thread.
You get non-blocking I/O from the event loop's perspective, but you
lose the benefits of true async streaming (single-threaded concurrency,
lower memory, backpressure). Prefer a native async client when possible.
Progress reporting¶
The on_progress callback is synchronous even in async mode — pass a
HaulState to track progress. Keep the callback fast:
from pyhaul import haul_async, HaulState
state = HaulState()
high_water = 0
def show_progress(state: HaulState):
global high_water
# After a retry, valid_length may rewind to the last checkpoint.
# Use a high-water mark to avoid showing backward progress.
high_water = max(high_water, state.valid_length)
if state.reported_length:
pct = high_water / state.reported_length * 100
print(f"\r{pct:.1f}%", end="", flush=True)
async def main():
async with httpx.AsyncClient() as client:
result = await haul_async(
url, client, dest="file.bin",
state=state, on_progress=show_progress,
)
asyncio.run(main())