
Building an ETL Pipeline with Python: Techniques for Data Extraction, Transformation, and Loading
Learn how to design and implement a robust ETL pipeline in Python. This guide walks you through extraction from APIs and databases, transformation with pandas, best practices for pagination, caching with functools, advanced f-string usage, and reliable loading into a database — complete with production-ready patterns and code examples.
Introduction
Extract-Transform-Load (ETL) pipelines are the backbone of data engineering. They move data from sources (APIs, databases, files) into a destination (data warehouse, analytics DB, S3), while applying cleaning and enrichment along the way. In this post you'll learn practical strategies and production-ready Python techniques to build ETL pipelines that are reliable, maintainable, and performant.
We'll cover:
- Key concepts and prerequisites
- A step-by-step ETL example: extracting from a paginated API, transforming data with pandas, and loading into PostgreSQL
- Best practices: batching, retries, idempotency, logging
- Advanced Python tips: using f-strings,
functoolscaching and partials - Common pitfalls and how to avoid them
Prerequisites
Before starting, make sure you have a working Python 3.x environment and these libraries (install via pip if needed):
- requests
- pandas
- sqlalchemy (and a DB driver like psycopg2 for PostgreSQL)
- tqdm (optional, for progress bars)
- HTTP APIs and pagination
- SQL and basic database operations
- pandas DataFrame operations
Core Concepts: What Makes a Good ETL Pipeline?
Think of an ETL pipeline as a series of small, testable functions wired together. Key principles:
- Idempotency: Re-running the pipeline should not create duplicates or corrupt state.
- Fault tolerance: Handle transient errors with retries and backoff.
- Observability: Clear logging and metrics to know what's happening.
- Efficiency: Batch network and DB operations, process data in streaming/chunks if large.
- Modularity: Separate extraction, transformation, and loading steps so they’re easy to test.
- Source(s) -> Extractor(s) -> Transformer(s) -> Loader(s) -> Target (DB or files)
- Alongside: logging, retries, and monitoring.
Step-by-Step Example: ETL from Paginated API to PostgreSQL
We'll implement an ETL that:
- Extracts data from a paginated REST API
- Transforms and cleans it with pandas
- Loads it into PostgreSQL in batches
functools and f-strings where they add value.
1) Extraction: Handling Pagination Robustly
Many APIs return results in pages. Implementing pagination correctly avoids missed data, rate-limit surprises, and excessive memory usage.
Key pagination best practices:
- Use server-provided pagination tokens or next links if available.
- Request only required fields (reduce payload).
- Implement exponential backoff on 429/5xx responses.
- Use streaming or batching to avoid high memory use.
next URL:
import requests
from time import sleep
from typing import Iterator, Dict
def fetch_paginated(url: str, params: dict = None, headers: dict = None, max_retries: int = 5) -> Iterator[Dict]:
"""
Generator that yields items from a paginated API that returns JSON with 'results' and 'next'.
Implements simple exponential backoff on transient errors.
"""
params = params or {}
headers = headers or {}
attempt = 0
while url:
try:
resp = requests.get(url, params=params, headers=headers, timeout=10)
if resp.status_code == 429: # Too Many Requests: rate limit
attempt += 1
sleep_time = min(60, 2 attempt)
sleep(sleep_time)
continue
resp.raise_for_status()
data = resp.json()
for item in data.get("results", []):
yield item
url = data.get("next") # API provides next page URL or None
params = {} # subsequent pages often encode params in next URL
attempt = 0 # reset retries after success
except requests.RequestException as exc:
attempt += 1
if attempt > max_retries:
raise
sleep(min(60, 2 attempt))
Explanation (line-by-line):
- import requests, sleep, typing Iterator/Dict: dependencies.
- fetch_paginated signature: returns a generator of dicts.
- Initialize params/headers and retry counter.
- Loop while
urlexists: perform GET request with timeout. - On 429 (rate limiting): back off and retry.
raise_for_status()triggers for HTTP errors (5xx, 4xx), then parse JSON.- Yield each item from
results. - Obtain the next page link from
next. - On transient RequestException, retry with exponential backoff up to max_retries.
- If API uses page numbers instead of
next, you can construct the next URL yourself. - If the API returns streaming JSON lines (NDJSON), use
resp.iter_lines().
2) Transformation: Cleaning with pandas
Once extracted, transform the data. For medium-sized datasets (fits in memory), pandas is a great tool. For larger datasets, do chunked processing.
Example transform pipeline:
- Normalize nested JSON
- Convert timestamps
- Deduplicate and validate required fields
import pandas as pd
from datetime import datetime
def transform_records(records):
"""
Transform a list of dict records into a cleaned pandas DataFrame.
"""
df = pd.json_normalize(records) # flatten nested JSON
# Standardize column names (example)
df.columns = [c.replace(" ", "_").lower() for c in df.columns]
# Parse timestamp with robust parsing
df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce', utc=True)
# Drop rows missing required columns
df = df.dropna(subset=['id', 'created_at'])
# Remove duplicates by id keeping latest
df = df.sort_values('created_at').drop_duplicates(subset='id', keep='last')
# Additional derived column using f-strings for clarity in logging or output
df['summary'] = df.apply(
lambda row: f"id={row['id']} created={row['created_at'].isoformat()}", axis=1
)
return df
Explanation:
pd.json_normalizeflattens nested JSON into columns.- Normalize column names for consistency.
pd.to_datetime(..., errors='coerce')converts timestamps, turning invalid ones into NaT.dropnaremoves rows missing required fields (enforces schema).- Deduplicate based on
id. - Use an f-string in
summaryto create a readable summary per row — this demonstrates f-strings for dynamic string formatting (fast and readable). See Python docs for f-strings for more tips.
- If the dataset is huge, consider processing in batches: collect N items, convert to DataFrame, transform, and load, then repeat.
3) Loading: Batch Insert into PostgreSQL
Efficient loading means batching and using transactions. You can use SQLAlchemy to handle connections and DataFrame.to_sql for convenience or use psycopg2.extras.execute_values for speed.
Example using SQLAlchemy + to_sql (simpler, good for moderate volumes):
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
def get_engine(user, password, host, db, port=5432) -> Engine:
url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}"
return create_engine(url, pool_size=5, max_overflow=10)
def load_to_postgres(df: pd.DataFrame, engine: Engine, table_name: str, if_exists='append'):
"""
Load DataFrame to PostgreSQL. For large data, enable method='multi' and chunking.
"""
df.to_sql(table_name, con=engine, if_exists=if_exists, index=False, method='multi', chunksize=1000)
Line-by-line:
create_engine: configures DB connection; f-string constructs the DSN (note secure handling of credentials in real systems).to_sqlwithmethod='multi'instructs pandas to insert multiple rows per statement;chunksizecontrols batch size.
execute_values:
import psycopg2
from psycopg2.extras import execute_values
def load_using_execute_values(conn, df: pd.DataFrame, table_name: str):
tuples = [tuple(x) for x in df.to_numpy()]
cols = ','.join(df.columns)
placeholders = ','.join(['%s'] * len(df.columns))
query = f"INSERT INTO {table_name} ({cols}) VALUES %s ON CONFLICT (id) DO UPDATE SET ...;"
with conn.cursor() as cur:
execute_values(cur, query, tuples, page_size=1000)
conn.commit()
Notes:
- Use
ON CONFLICT(Postgres) for idempotent upserts. - Avoid raw string formatting for queries with user input to prevent SQL injection — use parameters or ORM.
Putting It Together: Full ETL Flow
High-level flow:
- Use
fetch_paginatedto stream items in batches (e.g., collect 1000 items). - For each batch, call
transform_records. - Load transformed DataFrame to DB using batch insert/upsert.
from itertools import islice
from tqdm import tqdm # progress bar optional
def batched_iterator(iterator, batch_size):
"""Yield lists of items up to batch_size from an iterator."""
it = iter(iterator)
while True:
batch = list(islice(it, batch_size))
if not batch:
break
yield batch
def run_etl(api_url, db_engine, table, batch_size=1000):
items = fetch_paginated(api_url)
for batch in batched_iterator(items, batch_size):
df = transform_records(batch)
if not df.empty:
load_to_postgres(df, db_engine, table)
This architecture keeps memory bounded and enables checkpointing and retries per batch.
Advanced Python Optimizations
- Caching with functools.lru_cache: If your pipeline enriches records with repeated lookups (e.g., fetching metadata for an ID),
functools.lru_cachecan avoid redundant network calls:
from functools import lru_cache
@lru_cache(maxsize=1024)
def fetch_metadata(id):
# expensive network call
...
Remember lru_cache works with hashable arguments and lives in memory — monitor cache size.
- Partial functions with functools.partial: Useful to create specialized functions with preset arguments when wiring pipeline components:
from functools import partial
load_small_batch = partial(load_to_postgres, chunksize=500, if_exists='append')
- Use generators and streaming transforms to process data without loading whole datasets into memory.
- Use multiprocessing for CPU-bound transforms, or concurrent.futures for I/O-bound enrichments — be mindful of DB locks and API rate limits.
Best Practices
- Log at key points: batch start/end, records processed, errors.
- Make operations idempotent using upserts or watermarking (track last processed timestamp).
- Use configuration for credentials and environment variables (avoid hardcoding).
- Validate schemas early (e.g., using pandera or simple asserts).
- Use retry libraries (tenacity) for robust retry logic with jitter/backoff.
- Monitor performance: measure throughput (records/sec), DB latency, and API response times.
Implementing Pagination in Python Web Applications: Best Practices (Contextual)
When your pipeline consumes or exposes paginated endpoints (e.g., REST APIs for dashboards), follow these practices:
- Prefer cursor-based pagination over offset-based for stable and performant paging on large datasets.
- Return a
nextlink or token instead of page numbers; it's easier for clients to follow. - Support
limitandfieldsparameters so clients request only what they need. - Document rate limits and provide clear HTTP status codes (429).
- For web apps, combine server-side pagination with client-side lazy loading or infinite scroll depending on UX; remember to support accessible alternatives.
Common Pitfalls and How to Avoid Them
- Schema drift: Sources change column names/types. Use schema checks and fail fast, or build flexible parsers that map new fields.
- Duplicate records: Use primary keys and idempotent upserts with
ON CONFLICT. - Memory exhaustion: Process in batches or streaming; prefer generators.
- Hidden data loss: Dropping rows with
dropnacan remove important records — log counts and sample dropped rows for inspection. - Hard-to-debug failures: Add rich logging and sample payloads for failed batches.
- Blocking operations: Avoid long synchronous calls in loops; use async requests or thread pools for parallel I/O.
Advanced Tips and Tools
- Orchestration: Use Airflow, Prefect, or Dagster for scheduling, retries, and lineage.
- Data validation: Use pandera, pydantic, or Great Expectations to validate intermediate DataFrames.
- Caching: Use
functools.lru_cachefor CPU or network-bound repeated calls. For distributed caching use Redis. - Use
psycopg2.extras.execute_valuesor COPY FROM for highest throughput into Postgres. - For large scale, store transformed data as Parquet on S3 and use SQL engines (Snowflake, Redshift, BigQuery) for analytics.
Example: Using functools for caching & partials (detailed)
from functools import lru_cache, partial
import requests
@lru_cache(maxsize=512)
def get_country_name(country_code: str) -> str:
resp = requests.get(f"https://restcountries.com/v3.1/alpha/{country_code}")
resp.raise_for_status()
return resp.json()[0]['name']['common']
def enrich_df_with_country(df):
df['country_name'] = df['country_code'].map(get_country_name)
return df
Example partial to preconfigure loader
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg2://user:pass@host/db")
configured_loader = partial(load_to_postgres, engine=engine, table_name="events")
Notes:
get_country_nameuseslru_cacheto avoid repeated network calls for the same country code.partialcreates a preconfigured loader to simplify orchestration code.
Mastering Python's f-strings: Tips and Advanced Usage for Dynamic String Formatting
f-strings are concise and powerful:
- Basic: f"{value}"
- Format specifiers: f"{value:.2f}" for floats, f"{date:%Y-%m-%d}" for dates
- Expressions allowed: f"{x + y}"
- Use for SQL fragments carefully — never inject user input directly into SQL with f-strings.
from datetime import datetime
now = datetime.utcnow()
message = f"ETL run started at {now:%Y-%m-%d %H:%M:%S} UTC"
Conclusion
Building a reliable ETL pipeline in Python is largely about composing small, well-tested components: robust extraction (including pagination handling), clear and efficient transformations, and safe, batched loading. Use Python features like f-strings for readable formatting, functools for caching and partial functions to simplify wiring, and follow best practices for batching, retries, and idempotency.
Call to action: Try the code snippets above with a small paginated API or mock service. Experiment with batching sizes and observe performance. If you need, adapt the loaders to your target DB or data lake.
Further Reading and References
- requests documentation: https://docs.python-requests.org/
- pandas documentation: https://pandas.pydata.org/docs/
- SQLAlchemy documentation: https://docs.sqlalchemy.org/
- psycopg2 documentation: https://www.psycopg.org/docs/
- functools docs: https://docs.python.org/3/library/functools.html
- Python f-strings: https://docs.python.org/3/reference/lexical_analysis.html#f-strings
- Airflow, Prefect, Dagster for orchestration
- A Dockerized example project
- An Airflow DAG implementing this ETL
- A version that writes to S3/Parquet instead of PostgreSQL
Was this article helpful?
Your feedback helps us improve our content. Thank you!