Back to Blog
Using Python's Asyncio for Concurrency: Best Practices and Real-World Applications

Using Python's Asyncio for Concurrency: Best Practices and Real-World Applications

August 18, 202557 viewsUsing Python's Asyncio for Concurrency: Best Practices and Real-World Applications

Discover how to harness Python's asyncio for efficient concurrency with practical, real-world examples. This post walks you from core concepts to production-ready patterns — including web scraping, robust error handling with custom exceptions, and a Singleton session manager — using clear explanations and ready-to-run code.

Introduction

Concurrency — doing multiple things at once — is essential for modern applications: web crawlers, API clients, background jobs, and more. Python's asyncio provides a lightweight, cooperative concurrency model that's ideal for I/O-bound workloads.

Have you ever wanted to run hundreds of network requests concurrently without resorting to threads or complex process pools? Or build a resilient web scraper that plays nicely with rate limits and transient errors? This guide teaches you how to use asyncio effectively, with practical examples and best practices that scale from prototypes to production.

What you'll learn:

  • Core asyncio concepts and when to use them
  • Practical async patterns: tasks, gather, semaphores, timeouts
  • Building an async web scraper (and integrating with Beautiful Soup)
  • Better error handling using custom exceptions
  • A Singleton pattern for a shared aiohttp session
  • Performance tips and common pitfalls

Prerequisites

Before diving in, you should be comfortable with:

  • Python 3.7+ (asyncio improvements are significant since 3.7)
  • Basic synchronous I/O and HTTP requests
  • Fundamental Python constructs (functions, classes, exceptions)
Recommended libraries for examples:
  • aiohttp — async HTTP client/server
  • beautifulsoup4 — HTML parsing (works with async clients)
  • requests — mentioned as a synchronous alternative
Install with:
pip install aiohttp beautifulsoup4

Core Concepts (Quick Primer)

  • Event Loop: The scheduler running asynchronous tasks. Most programs use asyncio.run() to create and manage the loop.
  • Coroutine: An async function defined with async def. Calling it returns a coroutine object; you schedule it to run.
  • Task: A coroutine wrapped and scheduled with asyncio.create_task(). Runnable units managed by the loop.
  • await: Pause coroutine until awaited coroutine or Future completes.
  • async/await cooperativity: Only one coroutine runs at a time on the thread. It yields control at await points.
  • I/O-bound vs CPU-bound: Use asyncio for I/O-bound concurrency. For CPU-bound work, use ProcessPoolExecutor or separate processes.
Analogy: Think of the event loop as a single chef who switches tasks while waiting for the oven — efficient when waiting (I/O), less effective when actively chopping nonstop (CPU).

Simple asyncio Example

Let's start with a simple example to illustrate tasks and gathering.

import asyncio
import random

async def fetch(id: int): delay = random.uniform(0.5, 2.0) print(f"Task {id} sleeping for {delay:.2f}s") await asyncio.sleep(delay) result = f"result-{id}" print(f"Task {id} done") return result

async def main(): tasks = [asyncio.create_task(fetch(i)) for i in range(5)] results = await asyncio.gather(tasks) print("Results:", results)

if __name__ == "__main__": asyncio.run(main())

Line-by-line explanation:

  • import asyncio, random: standard libs for async and random delays.
  • async def fetch(id): defines a coroutine that simulates I/O with asyncio.sleep.
  • await asyncio.sleep(delay): non-blocking sleep; other tasks can run.
  • asyncio.create_task(fetch(i)): schedules the coroutine to run concurrently.
  • await asyncio.gather(tasks): wait for all tasks and collect results.
  • asyncio.run(main()): entry point; creates event loop and runs main.
Edge cases:
  • If any task raises, gather will propagate the exception by default (unless return_exceptions=True).
  • Tasks scheduled via create_task continue running if not awaited; ensure you gather or manage them.

Building an Async Web Scraper (Real-World)

You probably know the classic synchronous approach: Building a Web Scraper with Beautiful Soup and Requests: A Step-by-Step Guide uses requests + BeautifulSoup for scraping one page at a time. But when scraping many pages, asyncio + aiohttp provides much better throughput.

Below is an async scraper that:

  • Uses a Singleton-managed aiohttp ClientSession
  • Limits concurrency with a semaphore
  • Parses HTML with BeautifulSoup
  • Uses custom exceptions for robust error handling
# async_scraper.py
import asyncio
from typing import List
import aiohttp
from bs4 import BeautifulSoup

Singleton for aiohttp.ClientSession

class SingletonSession: _instance = None

def __new__(cls, args, kwargs): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._session = None return cls._instance

async def get_session(self): if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() return self._session

async def close(self): if self._session and not self._session.closed: await self._session.close()

Custom exceptions

class ScraperError(Exception): """Base class for scraper errors."""

class FetchError(ScraperError): """Raised when fetching a page fails."""

async def fetch_html(url: str, sem: asyncio.Semaphore, timeout: int = 10) -> str: session_manager = SingletonSession() session = await session_manager.get_session() async with sem: # rate limiting try: async with session.get(url, timeout=timeout) as resp: if resp.status != 200: raise FetchError(f"Non-200 status: {resp.status} for {url}") return await resp.text() except asyncio.TimeoutError: raise FetchError(f"Timeout fetching {url}") except aiohttp.ClientError as e: raise FetchError(f"Client error {e} for {url}")

def parse_title(html: str) -> str: soup = BeautifulSoup(html, "html.parser") title = soup.title.string.strip() if soup.title and soup.title.string else "No Title" return title

async def scrape(urls: List[str], concurrency: int = 10) -> List[tuple]: sem = asyncio.Semaphore(concurrency) tasks = [] for url in urls: tasks.append(asyncio.create_task(_scrape_one(url, sem))) results = await asyncio.gather(tasks, return_exceptions=True) return results

async def _scrape_one(url: str, sem: asyncio.Semaphore): try: html = await fetch_html(url, sem) title = parse_title(html) return (url, title) except ScraperError as e: return (url, f"ERROR: {e}")

async def main(): urls = [ "https://example.com", "https://httpbin.org/status/404", "https://httpbin.org/delay/2", ] results = await scrape(urls, concurrency=3) for url, title in results: print(url, "->", title) # Close singleton session await SingletonSession().close()

if __name__ == "__main__": asyncio.run(main())

Explanation:

  • SingletonSession: Implements a simple Singleton to share a single aiohttp.ClientSession across tasks (reduces connection overhead).
- This is an example of implementing the Singleton pattern in Python: centralizes resource management and avoids creating multiple ClientSession objects.
  • Custom exceptions (ScraperError, FetchError): allow clear error semantics and tailored handling.
  • fetch_html: uses a semaphore to rate-limit concurrent requests to concurrency.
  • parse_title: uses BeautifulSoup to parse titles — this integrates the familiar Beautiful Soup parsing into an async workflow.
  • scrape and _scrape_one: schedule tasks and use gather(..., return_exceptions=True) to ensure one failing URL doesn't cancel the whole batch.
  • main: demo URLs include examples that produce normal results, 404s, and delays.
Lines to watch:
  • return_exceptions=True: gather collects exceptions rather than propagating them. We convert exceptions to readable results in _scrape_one.
  • await SingletonSession().close(): ensures session is closed at the end to free resources.
Edge cases and notes:
  • aiohttp's ClientSession should be closed to avoid warnings.
  • Parsing large HTML in the event loop can block; consider offloading to thread pool if parsing is heavy: await loop.run_in_executor(...).

Handling Errors and Timeouts

Robust async programs need careful error handling. Patterns to consider:

  • Use custom exceptions for clarity (as above).
  • Use asyncio.wait_for() to add an explicit timeout around coroutines.
  • Use gather(..., return_exceptions=True) when you want partial success.
  • Cancel tasks with task.cancel() and handle asyncio.CancelledError inside coroutines if cleanup is needed.
Example: adding a per-task timeout and handling cancellations.
async def safe_fetch(url, sem, timeout=5):
    try:
        return await asyncio.wait_for(fetch_html(url, sem), timeout=timeout)
    except asyncio.TimeoutError:
        raise FetchError(f"Timed out {url}")
    except asyncio.CancelledError:
        # Optional cleanup
        raise

Explanation:

  • wait_for wraps the coroutine with a timeout. If the inner coroutine is still running after timeout, it raises asyncio.TimeoutError.
  • Cancelling tasks sends CancelledError into the coroutine; handle for cleanup if necessary.

When Not to Use asyncio

Ask yourself: Is the workload CPU-bound or I/O-bound?

  • If CPU-bound (heavy data processing), prefer ProcessPoolExecutor:
# CPU-bound example
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(n): # heavy computation return sum(ii for i in range(n))

async def main(): loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_heavy, 10_000_000) print(result)

asyncio.run(main())

Explanation: run_in_executor offloads CPU work to a separate process, avoiding blocking the event loop.

Best Practices (Quick Reference)

  • Prefer a single shared aiohttp.ClientSession per program (Singleton pattern fits here).
  • Use semaphores or rate-limiting to avoid hammering remote servers.
  • Use asyncio.create_task() for background coroutines and keep references so they don't get garbage-collected.
  • Close sessions and resources cleanly (use async with where possible).
  • Use timeouts and retry policies for network reliability.
  • Avoid heavy CPU work on the event loop — use executors or separate processes.

Common Pitfalls and How to Avoid Them

  • Blocking the event loop: No blocking I/O (like file.read() or requests.get()) directly inside async code. Use aiofiles or run_in_executor.
  • Creating many ClientSession objects: leads to resource exhaustion. Use shared sessions.
  • Forgetting to await tasks: scheduled tasks may be cancelled on program exit or leak.
  • Overusing concurrency: more tasks isn't always better; tune concurrency with profiling.
  • Misunderstanding gather semantics: gather by default will raise the first exception — use return_exceptions=True when appropriate.

Advanced Tips

  • Exponential backoff with jitter for retries: prevents thundering herd.
  • Use structured concurrency libraries (e.g., anyio, trio) for stricter guarantees — asyncio lacks native structured concurrency primitives (but 3.11+ adds Task groups).
  • Profiling: use asyncio.all_tasks() and logging to inspect running tasks.
  • Integrating synchronous libraries:
- If a dependency only provides synchronous API (e.g., legacy scraping with requests + Beautiful Soup), run that code in a thread pool via run_in_executor. - Example: reusing "Building a Web Scraper with Beautiful Soup and Requests" logic inside async code:
import requests
from bs4 import BeautifulSoup
import asyncio

def sync_fetch_and_parse(url): r = requests.get(url, timeout=5) r.raise_for_status() return BeautifulSoup(r.text, "html.parser").title.string

async def async_wrapper(url): loop = asyncio.get_running_loop() title = await loop.run_in_executor(None, sync_fetch_and_parse, url) return title

Explanation:
  • run_in_executor runs blocking requests calls in a thread, letting event loop continue.

Performance Considerations

  • Connection pooling via ClientSession greatly improves throughput.
  • DNS resolution can be a bottleneck — ensure proper connector settings if needed (aiohttp.TCPConnector).
  • Keep an eye on memory: thousands of concurrent coroutines holding large data leads to high memory usage.
  • Use backpressure and queue-based producers/consumers when scraping huge link graphs.

Example: Retry with Exponential Backoff

A small helper implementing retries with jitter:

import random
import asyncio

async def retry(coro_func, args, retries=3, base=0.5, factor=2): attempt = 0 while True: try: return await coro_func(args) except Exception as e: attempt += 1 if attempt > retries: raise sleep = base (factor * (attempt - 1)) sleep = sleep (1 + random.random() * 0.1) # jitter await asyncio.sleep(sleep)

Explanation:

  • Attempts to execute coro_func up to retries times.
  • Uses exponential backoff with jitter to spread retries.

Conclusion

Asyncio unlocks efficient concurrency for I/O-bound Python programs. Use it to scale web scrapers, API clients, and any network-heavy workloads. Combine asyncio with:

  • aiohttp for async HTTP,
  • BeautifulSoup (or run sync parsing in executors if needed),
  • custom exceptions for clear error semantics,
  • and Singleton patterns to manage shared resources like ClientSession.
Try adapting the scraper example to scrape a list of sites you care about. Measure, tune concurrency, and add retry/backoff to handle real-world networks.

Further Reading & References

Call to action: Try converting a synchronous scraping script you already wrote into asyncio + aiohttp using the patterns above — drop your code in a comment or reply here if you want a review or help optimizing it.

Related Posts

Using Python's Type Hinting for Better Code Clarity and Maintenance

Type hints transform Python code from ambiguous scripts into self-documenting, maintainable systems. This post walks through practical type-hinting techniques — from simple annotations to generics, Protocols, and TypedDicts — and shows how they improve real-world workflows like Pandas pipelines, built-in function usage, and f-string-based formatting for clearer messages. Follow along with hands-on examples and best practices to level up your code quality.

Mastering Python Data Analysis with pandas: A Practical Guide for Intermediate Developers

Dive into practical, production-ready data analysis with pandas. This guide covers core concepts, real-world examples, performance tips, and integrations with Python REST APIs, machine learning, and pytest to help you build reliable, scalable analytics workflows.

Leveraging the Power of Python Decorators: Advanced Use Cases and Performance Benefits

Discover how Python decorators can simplify cross-cutting concerns, improve performance, and make your codebase cleaner. This post walks through advanced decorator patterns, real-world use cases (including web scraping with Beautiful Soup), performance benchmarking, and robust error handling strategies—complete with practical, line-by-line examples.