
Using Python's Multiprocessing for CPU-Bound Tasks: A Practical Guide
Learn how to accelerate CPU-bound workloads in Python using the multiprocessing module. This practical guide walks you through concepts, runnable examples, pipeline integration, and best practices — including how to chunk data with itertools and optimize database writes with SQLAlchemy.
Introduction
Python's Global Interpreter Lock (GIL) makes CPU-bound parallelism using threads ineffective for many workloads. Enter multiprocessing: a powerful standard-library module that lets you run Python code in separate processes, bypass the GIL and using multiple CPU cores.
In this guide you'll learn:
- Core concepts of Python multiprocessing and why it's ideal for CPU-bound tasks.
- Practical, step-by-step code examples (with line-by-line explanations).
- How to integrate multiprocessing into a data pipeline: ingestion → processing → storage.
- How to use itertools to manipulate streaming data efficiently.
- Tips on optimizing writes and queries with SQLAlchemy when storing processed results.
Prerequisites
- Python 3.7+ (examples use modern multiprocessing APIs).
- Basic familiarity with processes, functions, and modules.
- Intermediate Python: functions, iterators, and context managers.
- Optional: SQLAlchemy and a local DB (for the persistence examples).
if __name__ == '__main__':
guard — examples below explain why.
Core Concepts — Quick Primer
- GIL (Global Interpreter Lock): prevents multiple native threads from executing Python bytecode simultaneously in a single process; threads are good for I/O-bound tasks but not CPU-heavy ones.
- Process vs Thread: processes have independent memory spaces; communication typically uses queues, pipes, or shared memory; processes avoid the GIL.
- Pool vs Process:
Pool
is a high-level API for a pool of worker processes (Pool.map
,Pool.imap_unordered
);Process
gives low-level control. - Pickling: arguments and return values are serialized (pickled) for inter-process communication — keep arguments picklable (no lambda closures).
- Chunking: splitting work into units; choose an appropriate chunk size to balance overhead vs load balancing.
- Shared memory:
multiprocessing.shared_memory
(Python 3.8+) ormultiprocessing.Array
for reducing copying when working with large binary arrays (e.g., numpy).
Practical Example — CPU-Bound Task: Prime Checking
We'll use a CPU-bound example: checking whether numbers are prime across a large range. This is contrived but illustrates CPU parallelism clearly.
Example 1 — Simple Pool.map
# prime_mp_simple.py
import math
from multiprocessing import Pool, cpu_count
def is_prime(n: int) -> bool:
"""Return True if n is prime (simple trial division)."""
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0:
return False
limit = int(math.sqrt(n)) + 1
for i in range(3, limit, 2):
if n % i == 0:
return False
return True
def main():
numbers = list(range(10_000_000, 10_000_200)) # CPU-heavy slice
workers = cpu_count() - 1 or 1 # leave one core free
with Pool(processes=workers) as pool:
results = pool.map(is_prime, numbers)
primes = [n for n, prime in zip(numbers, results) if prime]
print(f"Found {len(primes)} primes in the range.")
if __name__ == "__main__":
main()
Line-by-line explanation:
- import math, Pool, cpu_count: required tools.
is_prime(n)
: naive but illustrative prime test; note it uses only pure Python arithmetic (CPU-bound).numbers
: a list of integers to test — it's large enough to take noticeable CPU time.workers
: set to number of CPU cores minus one to avoid 100% saturation.or 1
ensures at least one worker.with Pool(...) as pool
: create a pool; context manager ensures clean termination.pool.map(is_prime, numbers)
: distributes tasks (each number) among worker processes.primes = [...]
: collect results locally.- Guard
if __name__ == "__main__":
is required on Windows to avoid recursive spawning.
map
will pickle and send each number to worker processes — for many tiny tasks the overhead is significant.- For many small items, prefer
chunksize
orimap_unordered
(next example).
Example 2 — Using chunksize and imap_unordered for throughput
Imbalanced tasks or lots of small tasks benefit from imap_unordered
and a tuned chunksize
to reduce IPC overhead and improve throughput.
# prime_mp_chunks.py
from multiprocessing import Pool, cpu_count
def is_prime(n):
# same implementation as before
import math
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0:
return False
limit = int(math.sqrt(n)) + 1
for i in range(3, limit, 2):
if n % i == 0:
return False
return True
def chunked_prime_check(numbers):
workers = cpu_count()
chunk_size = max(1, len(numbers) // (workers 4)) # heuristic
with Pool(workers) as p:
for n, prime in zip(numbers, p.imap_unordered(is_prime, numbers, chunksize=chunk_size)):
if prime:
yield n
if __name__ == "__main__":
numbers = list(range(1_000_000, 1_000_500))
primes = list(chunked_prime_check(numbers))
print("Primes found:", len(primes))
Explanation:
chunk_size
: heuristic reduces overhead by grouping several numbers per IPC transfer.imap_unordered
: yields results as workers finish (not preserving input order), which improves responsiveness and reduces waiting time for skewed tasks.yield n
: stream results to caller — memory efficient for large result sets.
Building a Data Pipeline: Ingestion → Processing → Storage
A common real-world scenario: ingest raw records from files or streams, perform CPU-heavy transformations, and persist results to a database. We'll sketch a pipeline integrating multiprocessing, itertools for chunking, and SQLAlchemy for optimized DB writes.
Pipeline design
- Ingest: stream data from CSV/JSON or message queue.
- Transform: CPU-heavy computation per record (e.g., feature extraction, compression, cryptographic hashing, expensive calculations).
- Load: batch insert results into a DB efficiently.
- Use iterators and
itertools.islice
to chunk streamed data without loading the whole dataset into memory. - Offload CPU-heavy
transform
to aPool
. - Use bulk operations and transaction management in SQLAlchemy to minimize DB overhead.
Example 3 — Pipeline with itertools chunking and SQLAlchemy bulk insert
# pipeline_mp_sqlalchemy.py
import csv
from itertools import islice
from multiprocessing import Pool, cpu_count
from typing import Iterable, Dict, Any
import time
--- Dummy CPU-bound transform ---
def compute_heavy(record: Dict[str, Any]) -> Dict[str, Any]:
# Simulate heavy CPU work, e.g., complex parsing or numeric transformations
x = int(record['value'])
total = 0
for i in range(1, 10000):
total += (x i) % (i + 1)
record['processed'] = total
return record
def chunked_iterator(iterator: Iterable, size: int):
"""Yield lists of at most size
elements from iterator
."""
it = iter(iterator)
while True:
chunk = list(islice(it, size))
if not chunk:
break
yield chunk
-- Pseudo code for saving to DB via SQLAlchemy (simplified) ---
def save_batch_to_db(session_factory, rows: Iterable[Dict[str, Any]]):
# session_factory() returns a SQLAlchemy session; keep this function DB-agnostic
from sqlalchemy import insert
session = session_factory()
try:
# Assume a table object 'my_table' is available in scope, e.g., from metadata
session.bulk_insert_mappings(MyModel, rows) # efficient bulk insert
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def process_csv_in_parallel(csv_path: str, session_factory, batch_size=1000):
workers = max(1, cpu_count() - 1)
with Pool(workers) as pool:
with open(csv_path, newline='') as f:
reader = csv.DictReader(f)
for batch in chunked_iterator(reader, batch_size):
# map is picklable: simple dicts are fine
processed = pool.map(compute_heavy, batch)
save_batch_to_db(session_factory, processed)
Example usage (requires SQLAlchemy setup not shown)
if __name__ == "__main__":
start = time.time()
process_csv_in_parallel('large_file.csv', session_factory=my_session_factory, batch_size=500)
print("Elapsed:", time.time() - start)
Line-by-line highlights:
compute_heavy
: simulates CPU work and returns mutated record.chunked_iterator
: usesitertools.islice
to stream fixed-size batches from the CSV reader — avoids loading all input into memory.process_csv_in_parallel
: for each chunk, mapscompute_heavy
across workers and then saves results using an efficient bulk insert.session.bulk_insert_mappings(...)
: a SQLAlchemy method designed for fast bulk insert; prefer it over inserting rows one by one.
- Use database transactions and commit per batch, not per row.
- Create appropriate indexes on columns you query often — this reduces read/query time later.
- For inserts, disabling autocommit and using
executemany
/bulk operations is faster. - If using PostgreSQL, consider
COPY
for massive imports.
- SQLAlchemy docs: bulk operations and session management (see official docs for
bulk_insert_mappings
and core executemany).
Leveraging itertools for Efficient Data Manipulation
itertools
is a treasure trove for memory-efficient streaming operations. Use it to:
- Chunk large iterators (example used
islice
). - Use
imap
-like behavior withitertools.starmap
for functions with multiple args. - Combine multiple iterables with
chain.from_iterable
to flatten nested iterables.
from itertools import tee, islice
def pairwise(iterable):
a, b = tee(iterable)
next(b, None)
return zip(a, b)
Usage
for a, b in pairwise([1, 2, 3, 4]):
print(a, b) # 1 2, then 2 3, then 3 4
Why this matters in multiprocessing:
- Constructing batches with
itertools
keeps memory usage low. - Reduces pickling overhead by sending only the necessary chunk to worker processes.
Best Practices and Performance Considerations
- Use
cpu_count()
to size pools but avoid oversubscription. A good rule: number of workers ≈ number of physical cores (or logical cores minus 1). - Measure and tune
chunksize
: too small — too much IPC; too large — poor load balancing. - Avoid sending huge objects to workers repeatedly. Use shared memory (e.g.,
multiprocessing.shared_memory
orArray
) for big binary buffers or numpy arrays. - Favor
Pool.imap_unordered
when order doesn't matter — it yields results as soon as they're available. - Avoid lambdas or closures for worker functions (not easily picklable on Windows). Define top-level functions.
- Use
if __name__ == '__main__'
for cross-platform safety. On Windows multiprocessing spawns new processes importing the main module — guard prevents re-execution. - Handle exceptions: worker exceptions raise
Exception
on the main side when retrieving results — catch and log. - Profile first: use
cProfile
,timeit
, or simple time measurements to ensure CPU-bound work justifies parallelization.
Common Pitfalls and How to Avoid Them
- "Nothing speeds up — I still see single-core usage": likely using threads for CPU-bound tasks or hitting pickling overhead. Use processes for CPU-bound tasks.
- Overhead-dominated tasks: tiny tasks can take longer when parallelized. Group small tasks into chunks.
- Memory blowup: sending large lists/objects every task to workers duplicates memory. Use shared memory or minimize payload size.
- Unpicklable objects: file handles, locks, and many runtime objects cannot be sent to workers. Use process-safe serializable objects or initializers.
- Database contention: many processes doing DB writes concurrently can overwhelm the DB. Use batching, connection pools, or a single writer process if necessary.
Advanced Tips
- Use
multiprocessing.Pool(initializer=..., initargs=...)
to run per-process initialization (e.g., open a persistent connection, load a large model into each worker once). - For numpy-heavy workloads, consider shared memory arrays with
multiprocessing.shared_memory
to avoid copying large arrays. - For heterogeneous workloads, combine
concurrent.futures.ProcessPoolExecutor
(higher-level) — it's often friendlier and integrates with concurrent APIs. - Use
psutil
to monitor CPU and memory use when tuning worker counts. - Consider
numba
or C extensions for extreme CPU-bound hotspots — they can often obviate the need for multiprocessing.
Example: Pool initializer for per-worker setup
# initializer_example.py
from multiprocessing import Pool, cpu_count
_GLOBAL_MODEL = None
def init_worker(model_path):
global _GLOBAL_MODEL
# hypothetical heavy model load, e.g., ML model into memory
_GLOBAL_MODEL = load_model_from_disk(model_path)
def process_record(record):
global _GLOBAL_MODEL
# use _GLOBAL_MODEL inside worker without reloading per task
return _GLOBAL_MODEL.predict(record)
if __name__ == "__main__":
models_path = "/data/heavy_model.bin"
with Pool(processes=cpu_count(), initializer=init_worker, initargs=(models_path,)) as p:
results = p.map(process_record, records)
Benefit: you avoid reloading a heavy model on every function invocation.
Error Handling Patterns
- Catch exceptions in worker and wrap results with status metadata:
def safe_worker(item):
try:
return {"item": item, "result": compute_heavy(item), "error": None}
except Exception as exc:
return {"item": item, "result": None, "error": str(exc)}
This pattern lets the master process continue and inspect failures without losing the entire job.
Conclusion
Multiprocessing is a central tool for making CPU-bound Python programs scalable. Combine it with efficient streaming from itertools, careful chunking, and database bulk operations (SQLAlchemy bulk inserts) to build practical and maintainable data pipelines.
Key takeaways:
- Use multiprocessing Pool for common parallel patterns and tune
chunksize
. - Guard with
if __name__ == '__main__'
for cross-platform compatibility. - Use itertools to keep memory usage low while batching work.
- Optimize persistence with SQLAlchemy bulk operations, and avoid per-row commits.
- Profile and iterate: always measure before and after parallelizing.
chunksize
values. Measure CPU usage and throughput, and watch your processing time drop.
Further Reading and References
- Python official docs: multiprocessing — https://docs.python.org/3/library/multiprocessing.html
- SQLAlchemy documentation: Bulk Inserts and Performance — https://docs.sqlalchemy.org/
- itertools docs: https://docs.python.org/3/library/itertools.html
- multiprocessing.shared_memory: https://docs.python.org/3/library/multiprocessing.shared_memory.html
Pool.imap_unordered
and itertools.islice
— then share your benchmark results!Was this article helpful?
Your feedback helps us improve our content. Thank you!