Building an ETL Pipeline with Python: Techniques for Data Extraction, Transformation, and Loading

Building an ETL Pipeline with Python: Techniques for Data Extraction, Transformation, and Loading

October 11, 202511 min read33 viewsBuilding an ETL Pipeline with Python: Techniques for Data Extraction, Transformation, and Loading

Learn how to design and implement a robust ETL pipeline in Python. This guide walks you through extraction from APIs and databases, transformation with pandas, best practices for pagination, caching with functools, advanced f-string usage, and reliable loading into a database — complete with production-ready patterns and code examples.

Introduction

Extract-Transform-Load (ETL) pipelines are the backbone of data engineering. They move data from sources (APIs, databases, files) into a destination (data warehouse, analytics DB, S3), while applying cleaning and enrichment along the way. In this post you'll learn practical strategies and production-ready Python techniques to build ETL pipelines that are reliable, maintainable, and performant.

We'll cover:

  • Key concepts and prerequisites
  • A step-by-step ETL example: extracting from a paginated API, transforming data with pandas, and loading into PostgreSQL
  • Best practices: batching, retries, idempotency, logging
  • Advanced Python tips: using f-strings, functools caching and partials
  • Common pitfalls and how to avoid them
Whether you're preparing data for analytics, machine learning, or internal dashboards, this guide gives you patterns you can reuse and adapt.

Prerequisites

Before starting, make sure you have a working Python 3.x environment and these libraries (install via pip if needed):

  • requests
  • pandas
  • sqlalchemy (and a DB driver like psycopg2 for PostgreSQL)
  • tqdm (optional, for progress bars)
Basic familiarity with:
  • HTTP APIs and pagination
  • SQL and basic database operations
  • pandas DataFrame operations

Core Concepts: What Makes a Good ETL Pipeline?

Think of an ETL pipeline as a series of small, testable functions wired together. Key principles:

  • Idempotency: Re-running the pipeline should not create duplicates or corrupt state.
  • Fault tolerance: Handle transient errors with retries and backoff.
  • Observability: Clear logging and metrics to know what's happening.
  • Efficiency: Batch network and DB operations, process data in streaming/chunks if large.
  • Modularity: Separate extraction, transformation, and loading steps so they’re easy to test.
A simple visual (described):
  • Source(s) -> Extractor(s) -> Transformer(s) -> Loader(s) -> Target (DB or files)
  • Alongside: logging, retries, and monitoring.

Step-by-Step Example: ETL from Paginated API to PostgreSQL

We'll implement an ETL that:

  1. Extracts data from a paginated REST API
  2. Transforms and cleans it with pandas
  3. Loads it into PostgreSQL in batches
This example demonstrates pagination best practices and uses functools and f-strings where they add value.

1) Extraction: Handling Pagination Robustly

Many APIs return results in pages. Implementing pagination correctly avoids missed data, rate-limit surprises, and excessive memory usage.

Key pagination best practices:

  • Use server-provided pagination tokens or next links if available.
  • Request only required fields (reduce payload).
  • Implement exponential backoff on 429/5xx responses.
  • Use streaming or batching to avoid high memory use.
Example extractor for a hypothetical API that returns JSON with next URL:

import requests
from time import sleep
from typing import Iterator, Dict

def fetch_paginated(url: str, params: dict = None, headers: dict = None, max_retries: int = 5) -> Iterator[Dict]: """ Generator that yields items from a paginated API that returns JSON with 'results' and 'next'. Implements simple exponential backoff on transient errors. """ params = params or {} headers = headers or {} attempt = 0 while url: try: resp = requests.get(url, params=params, headers=headers, timeout=10) if resp.status_code == 429: # Too Many Requests: rate limit attempt += 1 sleep_time = min(60, 2 attempt) sleep(sleep_time) continue resp.raise_for_status() data = resp.json() for item in data.get("results", []): yield item url = data.get("next") # API provides next page URL or None params = {} # subsequent pages often encode params in next URL attempt = 0 # reset retries after success except requests.RequestException as exc: attempt += 1 if attempt > max_retries: raise sleep(min(60, 2 attempt))

Explanation (line-by-line):

  • import requests, sleep, typing Iterator/Dict: dependencies.
  • fetch_paginated signature: returns a generator of dicts.
  • Initialize params/headers and retry counter.
  • Loop while url exists: perform GET request with timeout.
  • On 429 (rate limiting): back off and retry.
  • raise_for_status() triggers for HTTP errors (5xx, 4xx), then parse JSON.
  • Yield each item from results.
  • Obtain the next page link from next.
  • On transient RequestException, retry with exponential backoff up to max_retries.
Edge cases:
  • If API uses page numbers instead of next, you can construct the next URL yourself.
  • If the API returns streaming JSON lines (NDJSON), use resp.iter_lines().
Note: This extractor is memory-efficient because it yields items; we avoid loading all pages at once.

2) Transformation: Cleaning with pandas

Once extracted, transform the data. For medium-sized datasets (fits in memory), pandas is a great tool. For larger datasets, do chunked processing.

Example transform pipeline:

  • Normalize nested JSON
  • Convert timestamps
  • Deduplicate and validate required fields
import pandas as pd
from datetime import datetime

def transform_records(records): """ Transform a list of dict records into a cleaned pandas DataFrame. """ df = pd.json_normalize(records) # flatten nested JSON # Standardize column names (example) df.columns = [c.replace(" ", "_").lower() for c in df.columns] # Parse timestamp with robust parsing df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce', utc=True) # Drop rows missing required columns df = df.dropna(subset=['id', 'created_at']) # Remove duplicates by id keeping latest df = df.sort_values('created_at').drop_duplicates(subset='id', keep='last') # Additional derived column using f-strings for clarity in logging or output df['summary'] = df.apply( lambda row: f"id={row['id']} created={row['created_at'].isoformat()}", axis=1 ) return df

Explanation:

  • pd.json_normalize flattens nested JSON into columns.
  • Normalize column names for consistency.
  • pd.to_datetime(..., errors='coerce') converts timestamps, turning invalid ones into NaT.
  • dropna removes rows missing required fields (enforces schema).
  • Deduplicate based on id.
  • Use an f-string in summary to create a readable summary per row — this demonstrates f-strings for dynamic string formatting (fast and readable). See Python docs for f-strings for more tips.
Edge cases:
  • If the dataset is huge, consider processing in batches: collect N items, convert to DataFrame, transform, and load, then repeat.

3) Loading: Batch Insert into PostgreSQL

Efficient loading means batching and using transactions. You can use SQLAlchemy to handle connections and DataFrame.to_sql for convenience or use psycopg2.extras.execute_values for speed.

Example using SQLAlchemy + to_sql (simpler, good for moderate volumes):

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

def get_engine(user, password, host, db, port=5432) -> Engine: url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}" return create_engine(url, pool_size=5, max_overflow=10)

def load_to_postgres(df: pd.DataFrame, engine: Engine, table_name: str, if_exists='append'): """ Load DataFrame to PostgreSQL. For large data, enable method='multi' and chunking. """ df.to_sql(table_name, con=engine, if_exists=if_exists, index=False, method='multi', chunksize=1000)

Line-by-line:

  • create_engine: configures DB connection; f-string constructs the DSN (note secure handling of credentials in real systems).
  • to_sql with method='multi' instructs pandas to insert multiple rows per statement; chunksize controls batch size.
For high-volume inserts, use psycopg2's execute_values:

import psycopg2
from psycopg2.extras import execute_values

def load_using_execute_values(conn, df: pd.DataFrame, table_name: str): tuples = [tuple(x) for x in df.to_numpy()] cols = ','.join(df.columns) placeholders = ','.join(['%s'] * len(df.columns)) query = f"INSERT INTO {table_name} ({cols}) VALUES %s ON CONFLICT (id) DO UPDATE SET ...;" with conn.cursor() as cur: execute_values(cur, query, tuples, page_size=1000) conn.commit()

Notes:

  • Use ON CONFLICT (Postgres) for idempotent upserts.
  • Avoid raw string formatting for queries with user input to prevent SQL injection — use parameters or ORM.

Putting It Together: Full ETL Flow

High-level flow:

  1. Use fetch_paginated to stream items in batches (e.g., collect 1000 items).
  2. For each batch, call transform_records.
  3. Load transformed DataFrame to DB using batch insert/upsert.
Example orchestration:

from itertools import islice
from tqdm import tqdm  # progress bar optional

def batched_iterator(iterator, batch_size): """Yield lists of items up to batch_size from an iterator.""" it = iter(iterator) while True: batch = list(islice(it, batch_size)) if not batch: break yield batch

def run_etl(api_url, db_engine, table, batch_size=1000): items = fetch_paginated(api_url) for batch in batched_iterator(items, batch_size): df = transform_records(batch) if not df.empty: load_to_postgres(df, db_engine, table)

This architecture keeps memory bounded and enables checkpointing and retries per batch.

Advanced Python Optimizations

  • Caching with functools.lru_cache: If your pipeline enriches records with repeated lookups (e.g., fetching metadata for an ID), functools.lru_cache can avoid redundant network calls:
from functools import lru_cache

@lru_cache(maxsize=1024) def fetch_metadata(id): # expensive network call ...

Remember lru_cache works with hashable arguments and lives in memory — monitor cache size.

  • Partial functions with functools.partial: Useful to create specialized functions with preset arguments when wiring pipeline components:
from functools import partial

load_small_batch = partial(load_to_postgres, chunksize=500, if_exists='append')

  • Use generators and streaming transforms to process data without loading whole datasets into memory.
  • Use multiprocessing for CPU-bound transforms, or concurrent.futures for I/O-bound enrichments — be mindful of DB locks and API rate limits.

Best Practices

  • Log at key points: batch start/end, records processed, errors.
  • Make operations idempotent using upserts or watermarking (track last processed timestamp).
  • Use configuration for credentials and environment variables (avoid hardcoding).
  • Validate schemas early (e.g., using pandera or simple asserts).
  • Use retry libraries (tenacity) for robust retry logic with jitter/backoff.
  • Monitor performance: measure throughput (records/sec), DB latency, and API response times.

Implementing Pagination in Python Web Applications: Best Practices (Contextual)

When your pipeline consumes or exposes paginated endpoints (e.g., REST APIs for dashboards), follow these practices:

  • Prefer cursor-based pagination over offset-based for stable and performant paging on large datasets.
  • Return a next link or token instead of page numbers; it's easier for clients to follow.
  • Support limit and fields parameters so clients request only what they need.
  • Document rate limits and provide clear HTTP status codes (429).
  • For web apps, combine server-side pagination with client-side lazy loading or infinite scroll depending on UX; remember to support accessible alternatives.
These API design and client-handling practices directly affect ETL extraction reliability.

Common Pitfalls and How to Avoid Them

  • Schema drift: Sources change column names/types. Use schema checks and fail fast, or build flexible parsers that map new fields.
  • Duplicate records: Use primary keys and idempotent upserts with ON CONFLICT.
  • Memory exhaustion: Process in batches or streaming; prefer generators.
  • Hidden data loss: Dropping rows with dropna can remove important records — log counts and sample dropped rows for inspection.
  • Hard-to-debug failures: Add rich logging and sample payloads for failed batches.
  • Blocking operations: Avoid long synchronous calls in loops; use async requests or thread pools for parallel I/O.

Advanced Tips and Tools

  • Orchestration: Use Airflow, Prefect, or Dagster for scheduling, retries, and lineage.
  • Data validation: Use pandera, pydantic, or Great Expectations to validate intermediate DataFrames.
  • Caching: Use functools.lru_cache for CPU or network-bound repeated calls. For distributed caching use Redis.
  • Use psycopg2.extras.execute_values or COPY FROM for highest throughput into Postgres.
  • For large scale, store transformed data as Parquet on S3 and use SQL engines (Snowflake, Redshift, BigQuery) for analytics.

Example: Using functools for caching & partials (detailed)

from functools import lru_cache, partial
import requests

@lru_cache(maxsize=512) def get_country_name(country_code: str) -> str: resp = requests.get(f"https://restcountries.com/v3.1/alpha/{country_code}") resp.raise_for_status() return resp.json()[0]['name']['common']

def enrich_df_with_country(df): df['country_name'] = df['country_code'].map(get_country_name) return df

Example partial to preconfigure loader

from sqlalchemy import create_engine engine = create_engine("postgresql+psycopg2://user:pass@host/db") configured_loader = partial(load_to_postgres, engine=engine, table_name="events")

Notes:

  • get_country_name uses lru_cache to avoid repeated network calls for the same country code.
  • partial creates a preconfigured loader to simplify orchestration code.

Mastering Python's f-strings: Tips and Advanced Usage for Dynamic String Formatting

f-strings are concise and powerful:

  • Basic: f"{value}"
  • Format specifiers: f"{value:.2f}" for floats, f"{date:%Y-%m-%d}" for dates
  • Expressions allowed: f"{x + y}"
  • Use for SQL fragments carefully — never inject user input directly into SQL with f-strings.
Example f-string with datetime formatting:

from datetime import datetime
now = datetime.utcnow()
message = f"ETL run started at {now:%Y-%m-%d %H:%M:%S} UTC"

Conclusion

Building a reliable ETL pipeline in Python is largely about composing small, well-tested components: robust extraction (including pagination handling), clear and efficient transformations, and safe, batched loading. Use Python features like f-strings for readable formatting, functools for caching and partial functions to simplify wiring, and follow best practices for batching, retries, and idempotency.

Call to action: Try the code snippets above with a small paginated API or mock service. Experiment with batching sizes and observe performance. If you need, adapt the loaders to your target DB or data lake.

Further Reading and References

If you'd like, I can provide:
  • A Dockerized example project
  • An Airflow DAG implementing this ETL
  • A version that writes to S3/Parquet instead of PostgreSQL
Happy coding — go build something reliable!

Was this article helpful?

Your feedback helps us improve our content. Thank you!

Stay Updated with Python Tips

Get weekly Python tutorials and best practices delivered to your inbox

We respect your privacy. Unsubscribe at any time.

Related Posts

Implementing the Observer Pattern in Python: Practical Use Cases, Dataclasses, Flask WebSockets & Dask Integrations

Learn how to implement the **Observer pattern** in Python with clean, production-ready examples. This post walks through core concepts, thread-safe and dataclass-based implementations, a real-time chat example using Flask and WebSockets, and how to hook observers into Dask-powered pipelines for monitoring and progress updates.

Mastering List Comprehensions: Tips and Tricks for Cleaner Python Code

Unlock the full power of Python's list comprehensions to write clearer, faster, and more expressive code. This guide walks intermediate developers through essentials, advanced patterns, performance trade-offs, and practical integrations with caching and decorators to make your code both concise and robust.

Mastering Python Multiprocessing: Effective Strategies for Boosting Performance in CPU-Bound Tasks

Unlock the full potential of Python for CPU-intensive workloads by diving into the multiprocessing module, a game-changer for overcoming the Global Interpreter Lock (GIL) limitations. This comprehensive guide explores practical strategies, real-world examples, and best practices to parallelize your code, dramatically enhancing performance in tasks like data processing and simulations. Whether you're an intermediate Python developer looking to optimize your applications or curious about concurrency, you'll gain actionable insights to implement multiprocessing effectively and avoid common pitfalls.