
Efficient Data Processing with Python's Multiprocessing: A Step-by-Step Guide
Learn how to speed up CPU-bound data processing in Python using the multiprocessing module. This step-by-step guide walks intermediate Python developers through core concepts, practical examples (including producer-consumer pipelines), performance considerations, and advanced tips like shared memory and integration with itertools and asyncio.
Introduction
Python's Global Interpreter Lock (GIL) can make parallel CPU-bound work tricky with threads. Enter multiprocessing: a standard library module that spins up multiple processes to run Python code in parallel, bypassing the GIL. This guide gives you a practical, step-by-step approach to using multiprocessing for efficient data processing, from simple parallel maps to full producer-consumer real-time pipelines.
You'll learn:
- When and why to use multiprocessing.
- How to build common patterns: parallel map, process pools, and queued pipelines.
- Performance tips: chunk sizes, serialization overhead, and shared memory.
- How itertools and async/await can complement multiprocessing.
- Best practices and common pitfalls.
Prerequisites
Before we dive in, ensure you're comfortable with:
- Python 3.x (examples assume 3.8+ for shared_memory features).
- Basic familiarity with functions, iterators, and exceptions.
- Basic terminal usage and installing packages (if needed).
- multiprocessing: https://docs.python.org/3/library/multiprocessing.html
- itertools: https://docs.python.org/3/library/itertools.html
- asyncio: https://docs.python.org/3/library/asyncio.html
Core Concepts
Let's break the topic into core ideas:
- Process vs Thread: Processes run in separate memory spaces and bypass the GIL, making them suitable for CPU-bound tasks. Threads are lighter but still subject to the GIL for CPU-bound code.
- Serialization (Pickle): Data passed between processes is serialized (pickled), which costs time. Large objects can be expensive to send.
- Process startup cost: Creating processes has overhead. Use pools for repeated work.
- Shared memory: Modern Python provides shared_memory and multiprocessing.Value/Array to reduce copying.
- I/O vs CPU-bound: For I/O-bound workloads, asyncio (async/await) or threads often perform better. For CPU-bound work, multiprocessing is usually best.
- Pipelines and Queues: Use multiprocessing.Queue or Manager to build producer-consumer pipelines for streaming data.
How asyncio fits: async/await is excellent for high-concurrency I/O and can be combined with multiprocessing: use async producers/consumers and delegate CPU-heavy tasks to process pools (e.g., with loop.run_in_executor).
Step-by-Step Examples
We'll start with small examples and build up to a real-time pipeline.
Example 1 — Parallel map with multiprocessing.Pool
Use Pool.map for simple parallelism.
# parallel_map_example.py
import multiprocessing as mp
import math
from time import perf_counter
def is_prime(n: int) -> bool:
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0:
return False
r = int(math.sqrt(n))
for i in range(3, r + 1, 2):
if n % i == 0:
return False
return True
if __name__ == "__main__":
nums = [106 + i for i in range(2000)] # CPU-intensive checks
start = perf_counter()
with mp.Pool(processes=mp.cpu_count()) as pool:
results = pool.map(is_prime, nums)
print("Primes found:", sum(results))
print("Elapsed:", perf_counter() - start)
Line-by-line:
- import mp and math, and perf_counter for timing.
is_prime: CPU-bound function to test primality.nums: list of numbers to test.- Use
mp.Pool(...)with a number of processes equal to CPU cores. pool.map(is_prime, nums): distributes work across processes, returning a list of booleans.- Print count and elapsed time.
- Input:
numslist. Output:resultslist of booleans.
- If
numsis very large,pool.mapwill try to send the entire list; considerimapor chunk sizes.
- Use
chunksizeparam:pool.map(is_prime, nums, chunksize=20)can improve throughput by reducing IPC overhead.
Example 2 — Using itertools to chunk data
When sending many small tasks, batching reduces overhead. Use itertools.islice to chunk an iterator.
# chunking_with_itertools.py
import itertools
from typing import Iterable, List, Any
def chunked(iterable: Iterable, size: int):
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
Usage example
if __name__ == "__main__":
data = range(1, 101)
for c in chunked(data, 10):
print(c)
Explanation:
chunkedyields lists of up tosizeitems fromiterableusingitertools.islice.- This is memory-efficient for streaming inputs.
- Combine with Pool.map by mapping a function that processes a chunk.
Example 3 — Producer-Consumer Pipeline (Multiprocessing Queues)
This pattern is ideal for real-time data pipelines: a producer reads data (e.g., from sockets or sensors), workers process it, and a consumer persists/aggregates results.
# pipeline_example.py
import multiprocessing as mp
import time
import random
def producer(out_queue: mp.Queue, n_items: int):
for i in range(n_items):
item = {"id": i, "value": random.random()}
out_queue.put(item)
time.sleep(0.01) # simulate I/O or incoming stream
# Signal consumers to stop
for _ in range(mp.cpu_count()):
out_queue.put(None)
def worker(in_queue: mp.Queue, out_queue: mp.Queue):
while True:
item = in_queue.get()
if item is None: # shutdown signal
out_queue.put(None)
break
# CPU-bound processing simulation
item["processed"] = item["value"] 2
out_queue.put(item)
def consumer(in_queue: mp.Queue):
shutdown_signals = 0
while True:
item = in_queue.get()
if item is None:
shutdown_signals += 1
if shutdown_signals >= mp.cpu_count():
break
continue
print("Consumed:", item)
if __name__ == "__main__":
q1 = mp.Queue(maxsize=100)
q2 = mp.Queue(maxsize=100)
p = mp.Process(target=producer, args=(q1, 500))
p.start()
workers = []
for _ in range(mp.cpu_count()):
w = mp.Process(target=worker, args=(q1, q2))
w.start()
workers.append(w)
consumer_proc = mp.Process(target=consumer, args=(q2,))
consumer_proc.start()
p.join()
for w in workers:
w.join()
consumer_proc.join()
Line-by-line:
producerpushes items toq1and signals shutdown by sendingNoneper worker.workerconsumes fromq1, processes data, sends results toq2. OnNone, forwards shutdown to consumer.consumerreadsq2and exits after receiving shutdown signals equal to worker count.- Processes are created and started in
__main__to be safe on Windows.
maxsizeprovides a form of backpressure; producers block whenq1fills.- Use
mp.Manager().Queue()if you need a proxy queue for shared state; plainmp.Queueis faster in many cases.
Example 4 — Using concurrent.futures.ProcessPoolExecutor
concurrent.futures provides a higher-level, futures-based API.
# processpool_executor.py
from concurrent.futures import ProcessPoolExecutor, as_completed
import math
def fib(n: int) -> int:
if n <= 1:
return n
a, b = 0, 1
for _ in range(n-1):
a, b = b, a + b
return b
if __name__ == "__main__":
with ProcessPoolExecutor() as exe:
futures = [exe.submit(fib, n) for n in range(30, 40)]
for fut in as_completed(futures):
print(fut.result())
Explanation:
ProcessPoolExecutorhandles process pool lifecycle.submitreturns Future objects; useas_completedto process results as they arrive.- Great for when you need exception handling and timeouts with futures.
Example 5 — Shared memory for large data (numpy arrays)
Large arrays are expensive to copy between processes. Use multiprocessing.shared_memory (Python 3.8+) to share a NumPy array without copying.
# shared_memory_example.py
import numpy as np
from multiprocessing import Process
from multiprocessing import shared_memory
def worker(name, shape, dtype):
existing_shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# modify in place
arr = 2
existing_shm.close()
if __name__ == "__main__":
arr = np.arange(10, dtype=np.int64)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:] # copy initial data
p = Process(target=worker, args=(shm.name, arr.shape, arr.dtype))
p.start()
p.join()
print("Modified:", shared_arr) # should be doubled
shm.close()
shm.unlink()
Explanation:
- Create shared memory block with
SharedMemory(create=True). - Create a NumPy ndarray backed by shared buffer.
- Worker opens existing shared memory by name and manipulates array in place.
- No pickling/copy overhead for the array contents.
- You must manage lifecycle (close/unlink) carefully to avoid leaks.
- Works across processes on the same machine only.
Performance Considerations
- Measure first: Use perf_counter or timeit to confirm bottlenecks before parallelizing.
- Number of processes: Usually equal to CPU cores for CPU-bound tasks. For hyperthreaded CPUs, testing helps.
- Chunking: Smaller tasks add overhead; batching reduces overhead.
- IPC cost: Passing large objects can be slower than doing work sequentially. Consider shared memory or memory-mapped files.
- Process startup: Starting many short-lived processes is costly. Use pools.
- Pickle overhead: Complex Python objects can be expensive to serialize. Prefer simple types or NumPy arrays via shared memory.
Error Handling and Robustness
- Catch exceptions inside worker functions and log context; unhandled exceptions in child processes might be silent.
- When using Pool, exceptions propagate to the parent when retrieving results (e.g., map raises).
- Use timeouts with
get(on queues) or futures (withfuture.result(timeout=...)) to avoid indefinite blocks. - For production, consider restarts: monitor worker processes and respawn on failure.
def safe_worker(item):
try:
return do_work(item)
except Exception as e:
# Log and return a sentinel or re-raise
return {"error": str(e), "input": item}
Combining Multiprocessing with itertools and asyncio
- Use itertools to efficiently prepare iterables for workers (e.g., chunking or producing sliding windows).
- For real-time pipelines, you might have asynchronous I/O producers (websockets, HTTP requests) handled with asyncio while delegating CPU tasks to process pools.
# async_with_process_pool.py (concept)
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_task(n):
# expensive CPU-bound work
return sum(ii for i in range(n))
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
tasks = [loop.run_in_executor(pool, cpu_task, 10_000_000) for _ in range(5)]
results = await asyncio.gather(*tasks)
print(results)
Run: asyncio.run(main())
Benefits:
- Async handles many concurrent I/O-bound producers.
- CPU tasks are delegated to processes via the executor.
- Ideal for building real-time data pipelines where ingestion is async and processing is CPU-heavy.
Best Practices
- Use
if __name__ == "__main__":to guard process creation (required on Windows). - Prefer ProcessPoolExecutor for simpler code unless you need custom Process behavior.
- Use chunking and tune
chunksize. - Avoid sending big objects repeatedly — use shared memory, files, or databases for large data.
- Handle shutdown gracefully with sentinel values and timeouts.
- Use logging instead of print for production systems and include process IDs in logs.
- Benchmark with realistic workloads and measure end-to-end latency, not just CPU time.
Common Pitfalls
- Forgetting the
__main__guard — causes recursive process spawning on Windows. - Assuming threads are parallel for CPU-bound work — the GIL prevents true parallel CPU execution.
- Passing unpicklable objects (e.g., open file handles or local functions) to worker processes.
- Not managing process lifecycle (leaking shared memory or orphaned processes).
- Blocking the event loop in asyncio by performing CPU-heavy tasks there instead of offloading them.
Advanced Tips
- Use
multiprocessing.shared_memoryfor large NumPy arrays and use read-only memoryviews where possible. - For very large data, consider memory-mapped files (numpy.memmap) to avoid transfers.
- Use
multiprocessing.SimpleQueue()for faster inter-process messaging in some contexts. - On Linux,
forkcan copy-on-write share memory pages; leveraging that can reduce startup copy costs, but be careful with threads and resource handles. - For cross-machine scalability, use specialized tools (Dask, Ray, Apache Kafka) rather than multiprocessing.
Building Real-Time Data Pipelines: Techniques and Tools
Multiprocessing is one building block for real-time pipelines. Combine it with:
- asyncio for high-throughput I/O ingestion (websockets, HTTP clients).
- itertools for efficient iteration and windowing.
- multiprocessing queues/shared memory for transferring work to CPU processes.
- External systems (Redis, Kafka) for buffering, persistence, and scaling across machines.
- Frameworks like Dask or Ray for distributed processing if you outgrow single-machine multiprocessing.
- Async producers ingest data concurrently.
- Items are placed into an in-memory queue or buffer.
- Worker processes consume, transform, or enrich items.
- Results are aggregated or forwarded to a sink (DB, filesystem, streaming service).
Conclusion
Multiprocessing is a powerful tool in the Python toolbox for CPU-bound data processing. Use pools to manage worker processes, itertools to simplify iteration and batching, and asyncio when combining high-concurrency I/O with CPU-heavy tasks delegated to processes. Mind serialization costs, use shared memory for large arrays, and always measure performance.
Try the examples:
- Run the Pool and pipeline scripts with realistic data.
- Experiment with chunk sizes, process counts, and shared memory for real gains.
- Combine async producers with process pools for real-time ingestion and CPU-bound processing.
Further Reading
- Official multiprocessing docs: https://docs.python.org/3/library/multiprocessing.html
- itertools docs: https://docs.python.org/3/library/itertools.html
- asyncio docs: https://docs.python.org/3/library/asyncio.html
- multiprocessing.shared_memory (PEP 554 context): https://docs.python.org/3/library/multiprocessing.shared_memory.html
- Dask: https://dask.org
- Ray: https://docs.ray.io
Was this article helpful?
Your feedback helps us improve our content. Thank you!