
Implementing the Observer Pattern in Python: Event-Driven Programming Techniques
Learn how to implement the **Observer Pattern** in Python to build clean, event-driven systems. This comprehensive guide walks you from foundational concepts to practical, production-ready examples (including thread-safe and asyncio variants), plus best practices for memory optimization, automation scripts, and ETL pipeline use cases.
Introduction
Have you ever wanted parts of your program to react automatically when something changes — without tightly coupling components? That's the core idea behind the Observer Pattern (also called publish/subscribe or pub/sub). It's an essential tool for event-driven programming, useful in GUI frameworks, automation scripts, and data pipelines.
In this post you'll learn:
- What the Observer Pattern is and when to use it
- Several Python implementations (simple, memory-friendly, thread-safe, and asyncio-based)
- Real-world examples: automating daily tasks and building ETL pipelines
- Performance and memory optimizations, plus common pitfalls and advanced tips
Why use the Observer Pattern?
Think of a newspaper subscription: the publisher releases news, and many subscribers receive updates independently. Observers subscribe to a subject and get notified when an event occurs. Benefits include:
- Loose coupling: publishers don't need to know subscriber internals.
- Flexibility: add/remove observers at runtime.
- Separation of concerns: logic for events and responses are kept separate.
- GUI event handling
- Task automation (e.g., notify routines when a file changes)
- ETL pipelines (stages emit events when data is ready)
- Logging, metrics, and monitoring hooks
Core Concepts
Key terms:
- Subject (a.k.a. Publisher): Maintains a list of observers and notifies them.
- Observer (a.k.a. Subscriber): Implements a callback to respond to notifications.
- Event: Payload or reason for notification. Can be simple signals or rich objects.
- How are observers identified and stored? (strong references vs weak references)
- Should notifications be synchronous or asynchronous?
- How to handle exceptions raised by observers?
- Thread-safety: will notifications happen across threads?
Simple synchronous implementation
Let's start with the simplest, most explicit implementation.
# simple_observer.py
from typing import Callable, Any, List
class Subject:
def __init__(self) -> None:
self._observers: List[Callable[[Any], None]] = []
def subscribe(self, observer: Callable[[Any], None]) -> None:
"""Register an observer (a callable that accepts one argument)."""
if observer not in self._observers:
self._observers.append(observer)
def unsubscribe(self, observer: Callable[[Any], None]) -> None:
"""Unregister an observer."""
try:
self._observers.remove(observer)
except ValueError:
pass # Ignore if observer wasn't registered
def notify(self, event: Any) -> None:
"""Notify all observers of the event (synchronous)."""
for observer in list(self._observers):
observer(event)
Line-by-line explanation:
from typing import ...: Type hints help readability and tooling.class Subject: This is the publisher.self._observers: A list storing observer callables.subscribe: Adds a callable if it's not already present.unsubscribe: Removes a callable; uses try/except to ignore missing observers.notify: Iterates over a shallow copy (list(self._observers)) to avoid issues if observers modify the list during notification.
def logger(event):
print(f"[LOGGER] Received: {event}")
def audit(event):
print(f"[AUDIT] Event: {event}")
s = Subject()
s.subscribe(logger)
s.subscribe(audit)
s.notify({"type": "file_saved", "path": "/tmp/data.csv"})
Output:
[LOGGER] Received: {'type': 'file_saved', 'path': '/tmp/data.csv'}
[AUDIT] Event: {'type': 'file_saved', 'path': '/tmp/data.csv'}
Edge cases:
- If an observer raises an exception, notification stops by default. We'll handle this below.
Handling observer exceptions and isolation
You usually don't want one failing observer to prevent others from receiving events. Wrap calls in try/except and optionally collect failures.
def notify_safe(self, event: Any) -> None:
errors = []
for observer in list(self._observers):
try:
observer(event)
except Exception as exc:
errors.append((observer, exc))
if errors:
# Handle errors: log or raise a combined exception
print(f"Warning: {len(errors)} observers failed.")
Explanation:
- We collect exceptions to avoid losing failures and to decide how to handle them centrally.
Avoiding memory leaks: weak references
If observers are bound methods of objects, storing strong references in the subject prevents those objects from being garbage-collected. Use weakref to avoid that.
# weak_observer.py
import weakref
from typing import Callable, Any
class WeakMethod:
"""A minimal weak reference wrapper for bound methods."""
def __init__(self, method: Callable):
try:
# method.__func__ is the function, __self__ is the instance
self._func = method.__func__
self._obj_ref = weakref.ref(method.__self__)
except AttributeError:
# Not a bound method: store strong reference to function
self._func = method
self._obj_ref = None
def __call__(self):
if self._obj_ref is None:
return self._func
obj = self._obj_ref()
if obj is None:
return None
return self._func.__get__(obj, obj.__class__)
class Subject:
def __init__(self) -> None:
self._observers = []
def subscribe(self, callback: Callable) -> None:
self._observers.append(WeakMethod(callback))
def unsubscribe(self, callback: Callable) -> None:
self._observers = [w for w in self._observers if w() != callback]
def notify(self, event: Any) -> None:
alive = []
for weak_cb in self._observers:
cb = weak_cb()
if cb is None:
# target collected, skip
continue
try:
cb(event)
alive.append(weak_cb)
except Exception:
alive.append(weak_cb)
self._observers = alive
Explanation:
WeakMethod: wraps bound methods with weak references to the instance; supports plain functions too.subscribe: stores weak wrappers.notify: skips callbacks whose target objects have been garbage-collected.
weakref module docs for patterns like weakref.WeakMethod in Python 3.4+ (https://docs.python.org/3/library/weakref.html).
Thread-safe observer pattern
If notifications and subscriptions can happen from multiple threads, use a lock.
# thread_safe_observer.py
import threading
from typing import Callable, Any, List
class ThreadSafeSubject:
def __init__(self):
self._observers: List[Callable[[Any], None]] = []
self._lock = threading.RLock()
def subscribe(self, observer: Callable[[Any], None]) -> None:
with self._lock:
if observer not in self._observers:
self._observers.append(observer)
def unsubscribe(self, observer: Callable[[Any], None]) -> None:
with self._lock:
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self, event: Any) -> None:
with self._lock:
observers_snapshot = list(self._observers)
for observer in observers_snapshot:
try:
observer(event)
except Exception as exc:
# handle per-app policy (logging, continue, re-raise, etc.)
print(f"Observer error: {exc}")
Explanation:
- Use
RLockto allow reentry when callbacks might resubscribe. - Take snapshot under lock, then call observers outside the lock to avoid deadlocks and allow long-running observers without blocking subscriptions.
Asynchronous (asyncio) observer pattern
For event-driven async apps, use async def callbacks and await them.
# async_observer.py
import asyncio
from typing import Callable, Any, Awaitable, List
class AsyncSubject:
def __init__(self) -> None:
self._observers: List[Callable[[Any], Awaitable[None]]] = []
def subscribe(self, observer: Callable[[Any], Awaitable[None]]) -> None:
if observer not in self._observers:
self._observers.append(observer)
def unsubscribe(self, observer: Callable[[Any], Awaitable[None]]) -> None:
try:
self._observers.remove(observer)
except ValueError:
pass
async def notify(self, event: Any) -> None:
await asyncio.gather([obs(event) for obs in list(self._observers)], return_exceptions=True)
Explanation:
notifyusesasyncio.gatherto run observers concurrently.return_exceptions=Trueensures one failing observer doesn't cancel the others; you can then inspect exceptions if needed.
async def subscriber(name, event):
await asyncio.sleep(0.1)
print(f"{name} got {event}")
async def main():
s = AsyncSubject()
s.subscribe(lambda e: subscriber("A", e))
s.subscribe(lambda e: subscriber("B", e))
await s.notify({"job": "ETL", "status": "complete"})
asyncio.run(main())
Real-world example 1: Automating daily tasks
Suppose you have a daily automation script that performs backups, notifications, and cleanup. Using Observer Pattern, the scheduler triggers a "daily_event" and each task subscribes independently.
# daily_tasks.py (simplified)
from datetime import datetime
import time
class DailyScheduler:
def __init__(self):
self._subject = Subject()
def subscribe_task(self, task):
self._subject.subscribe(task)
def run_daily(self):
# In production, you'd use cron or system scheduler. Here, a simple loop:
while True:
now = datetime.now()
event = {"time": now.isoformat(), "type": "daily_run"}
self._subject.notify(event)
time.sleep(24 3600) # sleep 1 day (for demo, you'd shorten this)
Tasks can be small functions that perform backup, send summary email, or archive logs. This pattern makes it easy to add or remove automation tasks without changing the scheduler core.
Practical tip: For real automation scripts, combine with utilities from the standard library (e.g., argparse for CLI, logging for logs) and consider using a lightweight job scheduler library (APScheduler).
Real-world example 2: Building an ETL pipeline
Imagine an ETL pipeline where data ingestion emits events when a batch is ready. Different observers can take a batch to clean, transform, or push to storage. Observers enable parallelism and modularity.
Example (synchronous view):
# etl_pipeline.py
class ETLIngest:
def __init__(self):
self.events = Subject()
def ingest_data(self, batch):
# parse/validate...
self.events.notify({"batch": batch, "stage": "ingestion_complete"})
class Cleaner:
def __call__(self, event):
batch = event["batch"]
# perform cleaning
print("Cleaned", len(batch))
Use-case integration:
- In larger ETL systems, you might use message queues (Kafka/RabbitMQ) instead of in-process observers for scalability.
- Observers here could push results to the next stage, or notify a monitoring system.
Performance considerations and memory optimization
Key performance points:
- Notification cost is O(n) in number of observers. Avoid having very large numbers of observers if synchronous notification is used.
- Use batching if many events fire quickly: buffer events and emit summaries.
- For long-running apps, prefer weak references to avoid memory leaks (see weakref example).
- Avoid storing large objects inside the observer list; store lightweight references or IDs and resolve lazily.
- When using async, consider concurrency limits — uncontrolled parallel observers may exhaust resources; use semaphores or a bounded task pool.
- Use weakrefs for bound methods and large object references.
- Use generators and streaming (yield) when processing large data sets in observers.
- Profile memory with tracemalloc and tools like memory_profiler to find hotspots.
- Avoid cyclic references or break cycles (use weakrefs or explicit cleanup).
- weakref module: https://docs.python.org/3/library/weakref.html
- asyncio: https://docs.python.org/3/library/asyncio.html
- threading: https://docs.python.org/3/library/threading.html
Best practices
- Define a clear event schema (use dataclasses or typed dicts): makes observer expectations explicit.
- Document lifecycle: who subscribes/unsubscribes and when.
- Use weak references for bound methods to avoid leaks.
- Isolate observer errors so one failure doesn't break the system.
- Limit concurrency for resource-intensive observers.
- Log meaningful information including which observers failed and stack traces.
from dataclasses import dataclass
@dataclass
class Event:
type: str
payload: dict
timestamp: float
Common pitfalls
- Forgetting to unsubscribe observers belonging to short-lived objects -> memory leaks.
- Observers modifying the observer list during iteration -> potential runtime errors (use snapshot).
- Blocking calls in synchronous observers causing slow notifications — prefer async or move work to worker threads/queues.
- Not handling exceptions in observers -> silent application crashes or incomplete notifications.
- Using global subjects without clear ownership -> maintainability issues.
Advanced tips
- Priorities: support observer priorities to control notification order.
- Filtering: allow observers to register with filters (only receive events matching criteria).
- Rate-limiting: throttle notifications to observers that are slow or rate-sensitive.
- Distributed pub/sub: for cross-process systems, use message brokers (Redis, Kafka) instead of in-process Observers.
- Testing: write unit tests that assert that observers are called with expected events. Use mocks for observers.
class FilteredSubject(Subject):
def subscribe(self, observer, filter_fn=None, priority=0):
self._observers.append((priority, filter_fn, observer))
self._observers.sort(key=lambda x: x[0], reverse=True)
def notify(self, event):
for _, filter_fn, observer in list(self._observers):
if filter_fn and not filter_fn(event):
continue
observer(event)
Putting it all together — checklist before deploying
- [ ] Define event shapes and document them
- [ ] Choose synchronous vs asynchronous notifications
- [ ] Ensure thread-safety if needed
- [ ] Use weakrefs when observers are instance methods
- [ ] Implement error handling and logging for observer failures
- [ ] Profile for performance and memory usage
- [ ] Add unit tests for observer behavior
Conclusion
The Observer Pattern is a small but powerful tool in your Python toolbox. It encourages modular design, simplifies automation scripts (e.g., for daily tasks), and helps structure ETL stages and monitoring hooks. When implementing observers, be mindful of memory (use weakrefs), concurrency, and error handling. For complex or distributed scenarios, consider message brokers or full-featured event systems.
Try it now: pick a small automation task or a step in your ETL pipeline and refactor it to use an observer-based approach. Experiment with the async variant if you need concurrency.
Further reading and resources
- Official Python docs: weakref, asyncio, threading (see https://docs.python.org/3/)
- Patterns and idioms: "Design Patterns" (Gamma et al.)
- Tools and libraries: APScheduler (for scheduling), Redis/Kafka (for distributed pub/sub), RxPy (reactive programming)
Was this article helpful?
Your feedback helps us improve our content. Thank you!