
Implementing a Batch Processing System in Python: Techniques for Handling Large Data Sets Efficiently
Learn pragmatic techniques for building robust, memory-efficient batch processing systems in Python. This post covers chunking, streaming, parallelism (with multiprocessing), custom context managers, scheduling scripts with cron/Windows Task Scheduler, and production-ready best practices—with clear code examples and thorough explanations.
Introduction
Processing large data sets can quickly overwhelm memory and slow down pipelines if not designed carefully. Whether you're transforming CSV logs, aggregating metrics, or loading millions of records into a database, a batch processing system helps you process data in manageable, efficient chunks.
In this post you'll learn how to:
- Design a chunked, streaming pipeline that keeps memory low.
- Implement parallel processing for CPU-bound tasks using multiprocessing.
- Create a custom context manager to manage resources elegantly with the
withstatement. - Automate daily runs with cron and Windows Task Scheduler.
- Apply production-focused best practices: logging, error handling, checkpointing, and monitoring.
Prerequisites
Before diving into code, you should be comfortable with:
- Python 3.x basics (functions, iterators, generators).
- File I/O and the standard library (
csv,os,itertools,concurrent.futures,multiprocessing,contextlib). - Basic shell commands (for cron scheduling) or familiarity with Task Scheduler on Windows.
- pandas for data-frame style operations (we'll show both pandas and pure-Python approaches).
- A basic understanding of databases (for batch inserts).
Core Concepts
Breaking the problem into focused concepts helps:
- Streaming and chunking: Read a large data source in small chunks (e.g., 1,000 lines at a time) rather than loading everything into memory.
- Idempotent processing: Design jobs so re-running them is safe (helps with retries).
- Checkpointing: Save progress at intervals to resume on failure.
- Parallelism: Use multiprocessing for CPU-bound tasks or threading/async for I/O-bound tasks.
- Resource management: Use context managers (
with) to ensure files, locks, and connections are closed properly. - Scheduling: Automate runs with cron (Linux/macOS) or Task Scheduler (Windows).
Step-by-Step Examples
We'll create a small system that:
- Reads a very large CSV file in chunks.
- Processes each row (simulate CPU-bound work).
- Writes results out in batches.
- Uses a custom context manager for safe output handling.
- Optionally parallelizes per-chunk processing with
multiprocessing.
1) Chunked CSV Reader (generator pattern)
Code:
import csv
from pathlib import Path
from typing import Iterator, List, Dict
def read_csv_in_chunks(path: str, chunk_size: int = 1000) -> Iterator[List[Dict[str, str]]]:
"""
Yield successive chunks of rows from a CSV file as lists of dicts.
"""
p = Path(path)
with p.open('r', newline='', encoding='utf-8') as fh:
reader = csv.DictReader(fh)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
Explanation (line-by-line):
import csv, Path, typing: imports.read_csv_in_chunks(path, chunk_size): defines a generator that yields lists (chunks) of rows.with p.open(...) as fh: opens file safely;withensures file closure.reader = csv.DictReader(fh): yields rows as dicts keyed by header.- We accumulate rows into
chunk. Whenchunk_sizereached,yield chunkand reset. - After the loop, if any leftover rows, yield them.
- Input: path to a CSV file with headers.
- Output: iterator yielding lists of rows (as dicts).
- Edge cases: If file is empty, generator yields nothing. If CSV is malformed,
csvmay raisecsv.Error— catch in calling code.
2) CPU-bound Row Processing Function (simulate work)
We'll write a CPU-bound function (e.g., heavy math or string processing). For demo, we'll use a prime-related computation to simulate CPU work.
Code:
import math
def heavy_computation(x: int) -> int:
"""Simulate CPU-bound work: count primes <= x (naive)."""
def is_prime(n: int) -> bool:
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0:
return False
r = int(math.sqrt(n))
for i in range(3, r+1, 2):
if n % i == 0:
return False
return True
count = 0
for n in range(2, x+1):
if is_prime(n):
count += 1
return count
Explanation:
- The
heavy_computationfunction counts primes up toxusing a naive test, making it intentionally CPU-heavy. - Input: integer x. Output: count of primes <= x.
- Edge case: small x returns 0 or correct small counts.
3) Processing a Chunk (single-threaded) and Batch Output
We'll map the heavy function on each row and write outputs in batches.
Code:
from typing import List, Dict
import json
def process_chunk(chunk: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""
Process a chunk of rows and return results as list of dicts.
Expects row['value'] to be parseable as int.
"""
results = []
for row in chunk:
try:
x = int(row['value'])
except (KeyError, ValueError):
# Skip malformed or missing values; in production, log this
continue
result = heavy_computation(x)
results.append({'id': row.get('id'), 'value': x, 'primes_up_to_value': result})
return results
def append_results_to_jsonl(path: str, results: List[Dict[str, str]]):
"""
Append results as JSON Lines for easy streaming consumption.
"""
with open(path, 'a', encoding='utf-8') as fh:
for r in results:
fh.write(json.dumps(r, ensure_ascii=False) + '\n')
Explanation:
process_chunkconverts 'value' to int, runs heavy computation, and collects outputs.- We handle malformed rows by catching KeyError/ValueError and skipping (could be logged instead).
append_results_to_jsonlappends JSON Lines (one JSON per line) to a file. JSONL is convenient for downstream processing.
- If
resultsis empty, function simply opens and closes file — cheap operation. You might skip write if empty.
4) Custom Context Manager for Output (resource management)
Using the with statement makes resource handling explicit. We'll create a custom context manager that opens the output file and ensures atomic rotation/cleanup. This is also a good spot to showcase the requested topic "Creating Custom Python Context Managers with the 'with' Statement: Practical Use Cases for Enhanced Resource Management".
Code:
import os
from contextlib import AbstractContextManager
from typing import TextIO
class AtomicAppendFile(AbstractContextManager):
"""
Context manager that opens a temporary file for appending and renames it
into place on close to ensure partial writes aren't visible. Good for
short batch writes to shared storage.
"""
def __init__(self, target_path: str, mode: str = 'a', encoding='utf-8'):
self.target_path = target_path
self.temp_path = f"{target_path}.tmp-{os.getpid()}"
self.mode = mode
self.encoding = encoding
self._fh: TextIO = None
def __enter__(self) -> TextIO:
# Open temporary file for append/read; we'll write and then rename.
# For append semantics we still open temp and later append its content.
self._fh = open(self.temp_path, 'w', encoding=self.encoding)
return self._fh
def __exit__(self, exc_type, exc, tb):
try:
self._fh.flush()
os.fsync(self._fh.fileno())
except Exception:
pass
finally:
self._fh.close()
if exc_type is None:
# No exception: append temp to target atomically
with open(self.target_path, 'a', encoding=self.encoding) as target_fh, \
open(self.temp_path, 'r', encoding=self.encoding) as tmp_fh:
for line in tmp_fh:
target_fh.write(line)
os.remove(self.temp_path)
else:
# On exception, remove temp file to avoid leaving garbage
try:
os.remove(self.temp_path)
except Exception:
pass
# Return False to propagate exceptions if any
return False
Explanation:
AtomicAppendFileis a custom context manager subclassingAbstractContextManager. It implements__enter__and__exit__.- On enter: opens a temporary file (unique by PID).
- On exit: flushes and syncs, closes temp. If no exception occurred, appends temp contents to target file and removes temp. If exception occurred, temp file is removed.
- Why use this? To avoid leaving partially written output on failure and provide cleaner atomicity for batch writes.
- Edge cases: This simple strategy is fine for single-process writers. For multiple concurrent writers you might use file locks or specialized append mechanisms.
5) Putting it Together: Single-threaded Batch Runner
Now combine reader, processor, and context manager into a runner.
Code:
def run_batch(input_csv: str, output_jsonl: str, chunk_size: int = 1000):
for chunk in read_csv_in_chunks(input_csv, chunk_size=chunk_size):
results = process_chunk(chunk)
if not results:
continue
with AtomicAppendFile(output_jsonl) as fh:
for r in results:
fh.write(json.dumps(r, ensure_ascii=False) + '\n')
Explanation:
- For each chunk, process and write results using
AtomicAppendFile. - If
resultsempty, skip writing (avoids unnecessary file operations).
heavy_computation becomes the bottleneck? Enter parallelism.
6) Leveraging Multiprocessing for CPU-Bound Tasks
If the per-row processing is CPU-bound (like our prime counting), Python's multiprocessing can leverage multiple cores. This aligns with the requested topic "Leveraging Python's Multiprocessing for CPU-Bound Task Optimization: A Practical Approach".
We’ll create a worker pool per chunk and map across rows.
Code:
from multiprocessing import Pool, cpu_count
def process_row_safe(row: Dict[str, str]):
try:
x = int(row['value'])
except (KeyError, ValueError):
return None
return {'id': row.get('id'), 'value': x, 'primes_up_to_value': heavy_computation(x)}
def process_chunk_parallel(chunk, processes=None):
processes = processes or max(1, cpu_count() - 1)
with Pool(processes=processes) as pool:
results = pool.map(process_row_safe, chunk)
# Filter out None results
return [r for r in results if r is not None]
Explanation:
process_row_safewraps row processing and handles conversion errors, returningNonefor malformed rows.process_chunk_parallelcreates a pool and maps the rows in parallel. We usecpu_count() - 1to leave one core free (a common practice).- Edge cases: Multiprocessing pickles arguments — ensure functions are defined at top level (they are). Avoid sending very large objects to workers regularly; prefer sending small rows.
- For very small chunk sizes, process creation/communication overhead can dominate. Increase chunk size or use persistent worker processes.
- You can benchmark with
timeto find optimal chunk size and number of processes.
7) End-to-end Runner with Parallelism and Checkpointing
Add checkpointing so the job can resume if interrupted. A simple approach: write processed output to JSONL and keep track of last processed input offset (e.g., number of rows processed).
Code:
import shelve
def run_batch_with_checkpoint(input_csv: str, output_jsonl: str, checkpoint_db: str,
chunk_size: int = 1000, processes: int = None):
# checkpoint stores 'rows_processed'
with shelve.open(checkpoint_db) as db:
rows_processed = db.get('rows_processed', 0)
current = 0
for chunk in read_csv_in_chunks(input_csv, chunk_size=chunk_size):
# Skip chunks we've already processed
if current + len(chunk) <= rows_processed:
current += len(chunk)
continue
# If partially through a chunk, we could slice, but for simplicity require chunks be processed whole.
results = process_chunk_parallel(chunk, processes=processes)
if results:
with AtomicAppendFile(output_jsonl) as fh:
for r in results:
fh.write(json.dumps(r, ensure_ascii=False) + '\n')
current += len(chunk)
# persist checkpoint
with shelve.open(checkpoint_db) as db:
db['rows_processed'] = current
Explanation:
shelvestores a lightweight checkpoint database.- Before processing each chunk, we skip chunks already processed.
- After processing a chunk, we update the checkpoint to the new row count.
- This allows resuming after failure without reprocessing already-completed chunks.
- Edge cases: If input file changes (rows inserted/deleted), checkpoint by row count may become invalid. For stable input, it's fine; for dynamic sources, prefer checkpointing by unique IDs or offsets.
8) Automating Daily Runs (cron and Windows Task Scheduler)
Once your script works, you’ll likely want it to run regularly. Here are simple instructions:
- Cron (Linux/macOS):
crontab -e
- Example to run daily at 2:00 AM:
- 0 2 * /usr/bin/python3 /path/to/your/script.py >> /var/log/batch_job.log 2>&1
- Ensure environment variables, PATH, and virtualenvs are handled (cron has a minimal environment). You may prefer a wrapper shell script that activates a virtualenv first.
- Windows Task Scheduler:
Related topic: "Automating Daily Tasks with Python: Creating Scheduled Scripts Using Cron and Windows Task Scheduler" — ensure your script writes robust logs and handles being re-run (idempotency).
Best Practices
- Use streaming formats (CSV reader, JSONL) and generators to keep memory low.
- Choose chunk size by benchmarking — too small adds overhead, too large uses memory.
- For CPU-bound tasks, use
multiprocessing. For I/O-bound tasks,concurrent.futures.ThreadPoolExecutoror asyncio may be better. - Log extensively (structured logs) for diagnosability. Consider Python's
loggingmodule with rotating logs. - Use retries/backoff for transient errors (network or DB).
- Make processing idempotent; use unique IDs to avoid duplicate results after retries.
- Secure resource handling with
withand custom context managers for complex flows. - Add early validation of input schema to fail fast on broken sources.
Common Pitfalls
- Forgetting pickle limitations with multiprocessing (functions must be importable at top-level).
- Underestimating IPC overhead — not every problem benefits from parallelization.
- Not testing with dataset sizes representative of production — testing on tiny data can hide scaling issues.
- Using naive checkpointing that breaks if input changes; prefer stable identifiers.
- Not handling partial writes — use techniques like atomic writes or write-then-rename.
Advanced Tips
- For very large, random-access binary files, consider memory-mapped I/O (
mmap) to reduce copying overhead. - When writing to databases, use bulk/batched inserts to minimize round trips.
- Evaluate specialized tools: Apache Spark, Dask, or cloud-managed batch services, if data grows beyond what a single machine should handle.
- Use profiling (
cProfile,line_profiler) to find real hotspots. - If overhead of pickling is heavy, consider
multiprocessing.shared_memoryor writing C extensions.
Monitoring and Observability
- Emit metrics: number of rows processed, latency per chunk, errors.
- Integrate with monitoring (Prometheus, CloudWatch) or at minimum write metrics to a file.
- Add alerts on failure or when processed rate deviates significantly.
Conclusion
Implementing a robust batch processing system in Python involves more than looping over files. Use streaming and chunking to control memory usage, use multiprocessing for CPU-bound tasks, and manage resources with context managers such as custom with-based classes to avoid leaks and inconsistent states. Automate your pipeline with cron or Windows Task Scheduler and make it production-ready with logging, checkpointing, and monitoring.
Try the example code with a realistic dataset, experiment with chunk sizes and number of processes, and incorporate checkpointing to make the system resilient. If you'd like, copy the examples and adapt them to your data source, or ask for a ready-made template for CSV → database pipelines.
Call to action: Clone these snippets into a script, run them on a sample large CSV, and profile the performance. Share your bottlenecks and I'll help optimize them.
Further Reading and References
- Python CSV module: https://docs.python.org/3/library/csv.html
- contextlib: https://docs.python.org/3/library/contextlib.html
- multiprocessing: https://docs.python.org/3/library/multiprocessing.html
- concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html
- Cron introduction: https://en.wikipedia.org/wiki/Cron
- Windows Task Scheduler docs: https://docs.microsoft.com/en-us/windows/win32/taskschd/task-scheduler-start-page
Was this article helpful?
Your feedback helps us improve our content. Thank you!