Implementing a Batch Processing System in Python: Techniques for Handling Large Data Sets Efficiently

Implementing a Batch Processing System in Python: Techniques for Handling Large Data Sets Efficiently

October 16, 202513 min read84 viewsImplementing a Batch Processing System in Python: Techniques for Handling Large Data Sets Efficiently

Learn pragmatic techniques for building robust, memory-efficient batch processing systems in Python. This post covers chunking, streaming, parallelism (with multiprocessing), custom context managers, scheduling scripts with cron/Windows Task Scheduler, and production-ready best practices—with clear code examples and thorough explanations.

Introduction

Processing large data sets can quickly overwhelm memory and slow down pipelines if not designed carefully. Whether you're transforming CSV logs, aggregating metrics, or loading millions of records into a database, a batch processing system helps you process data in manageable, efficient chunks.

In this post you'll learn how to:

  • Design a chunked, streaming pipeline that keeps memory low.
  • Implement parallel processing for CPU-bound tasks using multiprocessing.
  • Create a custom context manager to manage resources elegantly with the with statement.
  • Automate daily runs with cron and Windows Task Scheduler.
  • Apply production-focused best practices: logging, error handling, checkpointing, and monitoring.
This guide is aimed at intermediate Python developers and emphasizes pragmatic, ready-to-run examples. Let's begin by laying out the prerequisites and core concepts.

Prerequisites

Before diving into code, you should be comfortable with:

  • Python 3.x basics (functions, iterators, generators).
  • File I/O and the standard library (csv, os, itertools, concurrent.futures, multiprocessing, contextlib).
  • Basic shell commands (for cron scheduling) or familiarity with Task Scheduler on Windows.
Optional but helpful:
  • pandas for data-frame style operations (we'll show both pandas and pure-Python approaches).
  • A basic understanding of databases (for batch inserts).

Core Concepts

Breaking the problem into focused concepts helps:

  1. Streaming and chunking: Read a large data source in small chunks (e.g., 1,000 lines at a time) rather than loading everything into memory.
  2. Idempotent processing: Design jobs so re-running them is safe (helps with retries).
  3. Checkpointing: Save progress at intervals to resume on failure.
  4. Parallelism: Use multiprocessing for CPU-bound tasks or threading/async for I/O-bound tasks.
  5. Resource management: Use context managers (with) to ensure files, locks, and connections are closed properly.
  6. Scheduling: Automate runs with cron (Linux/macOS) or Task Scheduler (Windows).
Now let’s implement step-by-step examples.

Step-by-Step Examples

We'll create a small system that:

  • Reads a very large CSV file in chunks.
  • Processes each row (simulate CPU-bound work).
  • Writes results out in batches.
  • Uses a custom context manager for safe output handling.
  • Optionally parallelizes per-chunk processing with multiprocessing.
Assume a CSV with the header: id, value. We'll compute a CPU-intensive function (simulated) on value.

1) Chunked CSV Reader (generator pattern)

Code:

import csv
from pathlib import Path
from typing import Iterator, List, Dict

def read_csv_in_chunks(path: str, chunk_size: int = 1000) -> Iterator[List[Dict[str, str]]]: """ Yield successive chunks of rows from a CSV file as lists of dicts. """ p = Path(path) with p.open('r', newline='', encoding='utf-8') as fh: reader = csv.DictReader(fh) chunk = [] for row in reader: chunk.append(row) if len(chunk) >= chunk_size: yield chunk chunk = [] if chunk: yield chunk

Explanation (line-by-line):

  • import csv, Path, typing: imports.
  • read_csv_in_chunks(path, chunk_size): defines a generator that yields lists (chunks) of rows.
  • with p.open(...) as fh: opens file safely; with ensures file closure.
  • reader = csv.DictReader(fh): yields rows as dicts keyed by header.
  • We accumulate rows into chunk. When chunk_size reached, yield chunk and reset.
  • After the loop, if any leftover rows, yield them.
Inputs/Outputs/Edge Cases:
  • Input: path to a CSV file with headers.
  • Output: iterator yielding lists of rows (as dicts).
  • Edge cases: If file is empty, generator yields nothing. If CSV is malformed, csv may raise csv.Error — catch in calling code.
Why use this? A generator avoids loading whole file into memory and gives you control per chunk.

2) CPU-bound Row Processing Function (simulate work)

We'll write a CPU-bound function (e.g., heavy math or string processing). For demo, we'll use a prime-related computation to simulate CPU work.

Code:

import math

def heavy_computation(x: int) -> int: """Simulate CPU-bound work: count primes <= x (naive).""" def is_prime(n: int) -> bool: if n <= 1: return False if n <= 3: return True if n % 2 == 0: return False r = int(math.sqrt(n)) for i in range(3, r+1, 2): if n % i == 0: return False return True

count = 0 for n in range(2, x+1): if is_prime(n): count += 1 return count

Explanation:

  • The heavy_computation function counts primes up to x using a naive test, making it intentionally CPU-heavy.
  • Input: integer x. Output: count of primes <= x.
  • Edge case: small x returns 0 or correct small counts.

3) Processing a Chunk (single-threaded) and Batch Output

We'll map the heavy function on each row and write outputs in batches.

Code:

from typing import List, Dict
import json

def process_chunk(chunk: List[Dict[str, str]]) -> List[Dict[str, str]]: """ Process a chunk of rows and return results as list of dicts. Expects row['value'] to be parseable as int. """ results = [] for row in chunk: try: x = int(row['value']) except (KeyError, ValueError): # Skip malformed or missing values; in production, log this continue result = heavy_computation(x) results.append({'id': row.get('id'), 'value': x, 'primes_up_to_value': result}) return results

def append_results_to_jsonl(path: str, results: List[Dict[str, str]]): """ Append results as JSON Lines for easy streaming consumption. """ with open(path, 'a', encoding='utf-8') as fh: for r in results: fh.write(json.dumps(r, ensure_ascii=False) + '\n')

Explanation:

  • process_chunk converts 'value' to int, runs heavy computation, and collects outputs.
  • We handle malformed rows by catching KeyError/ValueError and skipping (could be logged instead).
  • append_results_to_jsonl appends JSON Lines (one JSON per line) to a file. JSONL is convenient for downstream processing.
Edge cases:
  • If results is empty, function simply opens and closes file — cheap operation. You might skip write if empty.

4) Custom Context Manager for Output (resource management)

Using the with statement makes resource handling explicit. We'll create a custom context manager that opens the output file and ensures atomic rotation/cleanup. This is also a good spot to showcase the requested topic "Creating Custom Python Context Managers with the 'with' Statement: Practical Use Cases for Enhanced Resource Management".

Code:

import os
from contextlib import AbstractContextManager
from typing import TextIO

class AtomicAppendFile(AbstractContextManager): """ Context manager that opens a temporary file for appending and renames it into place on close to ensure partial writes aren't visible. Good for short batch writes to shared storage. """ def __init__(self, target_path: str, mode: str = 'a', encoding='utf-8'): self.target_path = target_path self.temp_path = f"{target_path}.tmp-{os.getpid()}" self.mode = mode self.encoding = encoding self._fh: TextIO = None

def __enter__(self) -> TextIO: # Open temporary file for append/read; we'll write and then rename. # For append semantics we still open temp and later append its content. self._fh = open(self.temp_path, 'w', encoding=self.encoding) return self._fh

def __exit__(self, exc_type, exc, tb): try: self._fh.flush() os.fsync(self._fh.fileno()) except Exception: pass finally: self._fh.close() if exc_type is None: # No exception: append temp to target atomically with open(self.target_path, 'a', encoding=self.encoding) as target_fh, \ open(self.temp_path, 'r', encoding=self.encoding) as tmp_fh: for line in tmp_fh: target_fh.write(line) os.remove(self.temp_path) else: # On exception, remove temp file to avoid leaving garbage try: os.remove(self.temp_path) except Exception: pass # Return False to propagate exceptions if any return False

Explanation:

  • AtomicAppendFile is a custom context manager subclassing AbstractContextManager. It implements __enter__ and __exit__.
  • On enter: opens a temporary file (unique by PID).
  • On exit: flushes and syncs, closes temp. If no exception occurred, appends temp contents to target file and removes temp. If exception occurred, temp file is removed.
  • Why use this? To avoid leaving partially written output on failure and provide cleaner atomicity for batch writes.
  • Edge cases: This simple strategy is fine for single-process writers. For multiple concurrent writers you might use file locks or specialized append mechanisms.
Related reading: See Python's official contextlib docs for patterns: https://docs.python.org/3/library/contextlib.html

5) Putting it Together: Single-threaded Batch Runner

Now combine reader, processor, and context manager into a runner.

Code:

def run_batch(input_csv: str, output_jsonl: str, chunk_size: int = 1000):
    for chunk in read_csv_in_chunks(input_csv, chunk_size=chunk_size):
        results = process_chunk(chunk)
        if not results:
            continue
        with AtomicAppendFile(output_jsonl) as fh:
            for r in results:
                fh.write(json.dumps(r, ensure_ascii=False) + '\n')

Explanation:

  • For each chunk, process and write results using AtomicAppendFile.
  • If results empty, skip writing (avoids unnecessary file operations).
This single-threaded approach is memory-efficient and reliable. But what if heavy_computation becomes the bottleneck? Enter parallelism.

6) Leveraging Multiprocessing for CPU-Bound Tasks

If the per-row processing is CPU-bound (like our prime counting), Python's multiprocessing can leverage multiple cores. This aligns with the requested topic "Leveraging Python's Multiprocessing for CPU-Bound Task Optimization: A Practical Approach".

We’ll create a worker pool per chunk and map across rows.

Code:

from multiprocessing import Pool, cpu_count

def process_row_safe(row: Dict[str, str]): try: x = int(row['value']) except (KeyError, ValueError): return None return {'id': row.get('id'), 'value': x, 'primes_up_to_value': heavy_computation(x)}

def process_chunk_parallel(chunk, processes=None): processes = processes or max(1, cpu_count() - 1) with Pool(processes=processes) as pool: results = pool.map(process_row_safe, chunk) # Filter out None results return [r for r in results if r is not None]

Explanation:

  • process_row_safe wraps row processing and handles conversion errors, returning None for malformed rows.
  • process_chunk_parallel creates a pool and maps the rows in parallel. We use cpu_count() - 1 to leave one core free (a common practice).
  • Edge cases: Multiprocessing pickles arguments — ensure functions are defined at top level (they are). Avoid sending very large objects to workers regularly; prefer sending small rows.
Performance considerations:
  • For very small chunk sizes, process creation/communication overhead can dominate. Increase chunk size or use persistent worker processes.
  • You can benchmark with time to find optimal chunk size and number of processes.

7) End-to-end Runner with Parallelism and Checkpointing

Add checkpointing so the job can resume if interrupted. A simple approach: write processed output to JSONL and keep track of last processed input offset (e.g., number of rows processed).

Code:

import shelve

def run_batch_with_checkpoint(input_csv: str, output_jsonl: str, checkpoint_db: str, chunk_size: int = 1000, processes: int = None): # checkpoint stores 'rows_processed' with shelve.open(checkpoint_db) as db: rows_processed = db.get('rows_processed', 0)

current = 0 for chunk in read_csv_in_chunks(input_csv, chunk_size=chunk_size): # Skip chunks we've already processed if current + len(chunk) <= rows_processed: current += len(chunk) continue

# If partially through a chunk, we could slice, but for simplicity require chunks be processed whole. results = process_chunk_parallel(chunk, processes=processes) if results: with AtomicAppendFile(output_jsonl) as fh: for r in results: fh.write(json.dumps(r, ensure_ascii=False) + '\n')

current += len(chunk) # persist checkpoint with shelve.open(checkpoint_db) as db: db['rows_processed'] = current

Explanation:

  • shelve stores a lightweight checkpoint database.
  • Before processing each chunk, we skip chunks already processed.
  • After processing a chunk, we update the checkpoint to the new row count.
  • This allows resuming after failure without reprocessing already-completed chunks.
  • Edge cases: If input file changes (rows inserted/deleted), checkpoint by row count may become invalid. For stable input, it's fine; for dynamic sources, prefer checkpointing by unique IDs or offsets.

8) Automating Daily Runs (cron and Windows Task Scheduler)

Once your script works, you’ll likely want it to run regularly. Here are simple instructions:

  • Cron (Linux/macOS):
- Edit crontab: crontab -e - Example to run daily at 2:00 AM: - 0 2 * /usr/bin/python3 /path/to/your/script.py >> /var/log/batch_job.log 2>&1 - Ensure environment variables, PATH, and virtualenvs are handled (cron has a minimal environment). You may prefer a wrapper shell script that activates a virtualenv first.
  • Windows Task Scheduler:
- Open Task Scheduler > Create Basic Task. - Give name, schedule (daily), and action: Start a program. - Program/script: path to python executable (e.g., C:\Python39\python.exe). - Add arguments: C:\path\to\script.py - Start in: C:\path\to - Set to run whether user is logged in or not and configure logging redirect in your script.

Related topic: "Automating Daily Tasks with Python: Creating Scheduled Scripts Using Cron and Windows Task Scheduler" — ensure your script writes robust logs and handles being re-run (idempotency).

Best Practices

  • Use streaming formats (CSV reader, JSONL) and generators to keep memory low.
  • Choose chunk size by benchmarking — too small adds overhead, too large uses memory.
  • For CPU-bound tasks, use multiprocessing. For I/O-bound tasks, concurrent.futures.ThreadPoolExecutor or asyncio may be better.
  • Log extensively (structured logs) for diagnosability. Consider Python's logging module with rotating logs.
  • Use retries/backoff for transient errors (network or DB).
  • Make processing idempotent; use unique IDs to avoid duplicate results after retries.
  • Secure resource handling with with and custom context managers for complex flows.
  • Add early validation of input schema to fail fast on broken sources.

Common Pitfalls

  • Forgetting pickle limitations with multiprocessing (functions must be importable at top-level).
  • Underestimating IPC overhead — not every problem benefits from parallelization.
  • Not testing with dataset sizes representative of production — testing on tiny data can hide scaling issues.
  • Using naive checkpointing that breaks if input changes; prefer stable identifiers.
  • Not handling partial writes — use techniques like atomic writes or write-then-rename.

Advanced Tips

  • For very large, random-access binary files, consider memory-mapped I/O (mmap) to reduce copying overhead.
  • When writing to databases, use bulk/batched inserts to minimize round trips.
  • Evaluate specialized tools: Apache Spark, Dask, or cloud-managed batch services, if data grows beyond what a single machine should handle.
  • Use profiling (cProfile, line_profiler) to find real hotspots.
  • If overhead of pickling is heavy, consider multiprocessing.shared_memory or writing C extensions.

Monitoring and Observability

  • Emit metrics: number of rows processed, latency per chunk, errors.
  • Integrate with monitoring (Prometheus, CloudWatch) or at minimum write metrics to a file.
  • Add alerts on failure or when processed rate deviates significantly.

Conclusion

Implementing a robust batch processing system in Python involves more than looping over files. Use streaming and chunking to control memory usage, use multiprocessing for CPU-bound tasks, and manage resources with context managers such as custom with-based classes to avoid leaks and inconsistent states. Automate your pipeline with cron or Windows Task Scheduler and make it production-ready with logging, checkpointing, and monitoring.

Try the example code with a realistic dataset, experiment with chunk sizes and number of processes, and incorporate checkpointing to make the system resilient. If you'd like, copy the examples and adapt them to your data source, or ask for a ready-made template for CSV → database pipelines.

Call to action: Clone these snippets into a script, run them on a sample large CSV, and profile the performance. Share your bottlenecks and I'll help optimize them.

Further Reading and References

Happy batching — and if you want a downloadable example repo or a template that writes to PostgreSQL with batched inserts, ask and I’ll provide one.

Was this article helpful?

Your feedback helps us improve our content. Thank you!

Stay Updated with Python Tips

Get weekly Python tutorials and best practices delivered to your inbox

We respect your privacy. Unsubscribe at any time.

Related Posts

Mastering Advanced Data Structures in Python: From Linked Lists to Trees with Practical Examples

Dive into the world of advanced data structures in Python and elevate your programming skills from intermediate to expert level. This comprehensive guide walks you through implementing linked lists, stacks, queues, and trees with hands-on code examples, clear explanations, and real-world applications. Whether you're optimizing algorithms or building efficient systems, you'll gain the knowledge to tackle complex problems confidently, including tips on integrating these structures with tools like Dask for handling large datasets.

Leveraging Python's multiprocessing Module for Parallel Processing: Patterns, Pitfalls, and Performance Tips

Dive into practical strategies for using Python's multiprocessing module to speed up CPU-bound tasks. This guide covers core concepts, hands-on examples, debugging and logging techniques, memory-optimization patterns for large datasets, and enhancements using functools — everything an intermediate Python developer needs to parallelize safely and effectively.

Mastering Python Web Automation with Selenium: Best Practices, Common Pitfalls, and Pro Tips for Intermediate Developers

Dive into the world of web automation with Python and Selenium, where you'll learn to craft robust scripts that interact with websites effortlessly. This comprehensive guide covers everything from setup to advanced techniques, highlighting best practices to avoid common pitfalls and ensure your automations run smoothly. Whether you're scraping data or testing web apps, gain the skills to build efficient, reliable scripts that save time and boost productivity—perfect for intermediate Python learners ready to level up.