
Implementing Observer Pattern in Python: Real-World Applications and Code Examples
Learn how to implement the Observer pattern in Python with clear, practical examples—from a minimal, thread-safe implementation to real-world uses like automation scripts and event buses. This post walks you through code, edge cases, unit tests, and performance tips (including functools memoization) so you can apply the pattern confidently.
Introduction
Have you ever wanted a clean way to notify many parts of your application when something changes — without tightly coupling components? The Observer pattern solves exactly that. It lets an object (the subject) broadcast state changes to interested observers. This decoupling makes code easier to extend, test, and reuse.
In this post you'll get:
- A clear explanation of core concepts and a textual diagram.
- Several practical, working Python implementations (simple to advanced).
- Examples of real-world usage: automation scripts, event buses, and UI-like updates.
- Performance and testing guidance, including using functools for memoization and best practices with unittest and pytest.
Prerequisites
Before implementing Observer, make sure you understand:
- Python 3 class definitions and instance methods.
- The difference between composition and inheritance.
- Basic concurrency (threading.Lock) if your observers run across threads.
- The standard library modules we'll reference:
functools
,weakref
,threading
,collections
, andlogging
.
Core Concepts
Key terms:
- Subject (or Observable): the source of truth. It maintains state and notifies observers on changes.
- Observer: anything that wants to be notified (callbacks, objects with
update()
methods). - Attach / Detach: register/unregister observers.
- Notify: send updates to all registered observers.
Textual sequence diagram:
- Observer registers with Subject.
- Subject's state changes (e.g., new data).
- Subject calls
notify()
. - Each Observer receives the update and reacts.
Minimal, Idiomatic Implementation
Let's implement a compact version first.
# observer_simple.py
from typing import Callable, List, Any
class Subject:
def __init__(self):
self._observers: List[Callable[[Any], None]] = []
def attach(self, observer: Callable[[Any], None]) -> None:
"""Register an observer callable."""
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Callable[[Any], None]) -> None:
"""Unregister an observer callable."""
if observer in self._observers:
self._observers.remove(observer)
def notify(self, data: Any = None) -> None:
"""Notify all observers with provided data."""
for observer in list(self._observers): # copy to allow modification during iteration
observer(data)
Line-by-line explanation:
from typing import ...
: type hints for readability.class Subject
: defines the observable object.self._observers
: list of callables (functions or lambdas) that accept a single argument.attach
: adds an observer only if not already registered.detach
: removes an observer safely.notify
: iterates a copy of the observers list to avoid modification problems while iterating (an important edge case).
def print_observer(data):
print("Received:", data)
s = Subject()
s.attach(print_observer)
s.notify("Hello observers!") # prints: Received: Hello observers!
Edge cases:
- Observers raising exceptions will stop the loop. In production you probably want to handle exceptions per observer (see advanced version).
Robust Implementation: Thread-safety and Fault Isolation
In real systems, notifications may come from different threads, and an observer raising an exception should not prevent others from receiving updates.
# observer_threadsafe.py
import threading
import weakref
import logging
from typing import Callable, Any
logger = logging.getLogger(__name__)
class Subject:
def __init__(self):
# Use WeakSet of bound-method references to avoid memory leaks
self._observers = weakref.WeakSet()
self._lock = threading.RLock()
def attach(self, observer: Callable[[Any], None]) -> None:
with self._lock:
self._observers.add(observer)
def detach(self, observer: Callable[[Any], None]) -> None:
with self._lock:
try:
self._observers.remove(observer)
except KeyError:
logger.debug("Observer not found during detach")
def notify(self, data: Any = None) -> None:
# Snapshot observers under lock, then notify without holding lock
with self._lock:
observers = list(self._observers)
for obs in observers:
try:
obs(data)
except Exception:
logger.exception("Observer raised an exception; continuing")
Explanation:
weakref.WeakSet()
lets observer objects be garbage collected when no strong references remain (prevents memory leaks caused by subjects holding references).threading.RLock()
protects internal state when multiple threads attach/detach/notify.- In
notify
, we take a snapshot under the lock and then call observers outside the lock to reduce lock contention and avoid deadlocks if observers call back into the Subject. - Each observer call is wrapped in try/except to isolate failures; a misbehaving observer will not stop others.
Real-World Example 1: Stock Price Notifier (Event Bus)
Imagine a simple event bus where multiple components (UI, logger, alert system) subscribe to stock price changes.
# stock_notifier.py
from dataclasses import dataclass
from typing import Dict
@dataclass
class StockPrice:
symbol: str
price: float
class StockTicker(Subject):
def __init__(self):
super().__init__()
self._prices: Dict[str, float] = {}
def update_price(self, symbol: str, price: float) -> None:
old = self._prices.get(symbol)
self._prices[symbol] = price
if old != price:
# Notify observers with a StockPrice instance
self.notify(StockPrice(symbol, price))
Observer examples:
def ui_update(stock: StockPrice):
print(f"UI: {stock.symbol} = ${stock.price:.2f}")
def alert_system(stock: StockPrice):
if stock.price < 10:
print(f"ALERT: {stock.symbol} dropped below $10!")
ticker = StockTicker()
ticker.attach(ui_update)
ticker.attach(alert_system)
ticker.update_price("ACME", 12.5)
ticker.update_price("ACME", 9.99) # both observers are invoked
Explanation:
StockTicker
inherits Subject, maintains internalself._prices
.update_price
sets state and notifies observers only on change (efficiency).- Observers receive a structured
StockPrice
object (clarity and typing).
Real-World Example 2: Automating Daily Tasks with Observer Pattern
Suppose you’re building a small automation runner that triggers different tasks daily. You can create a DailyScheduler
(Subject) that notifies tasks (Observers) at certain times.
High-level script structure:
DailyScheduler
emits events at scheduled times.- Task functions attach to the scheduler.
- The main script runs continuously (or can be executed via cron).
# daily_automation.py
import time
import threading
from datetime import datetime, timedelta
class DailyScheduler(Subject):
def __init__(self, check_interval: float = 1.0):
super().__init__()
self.check_interval = check_interval
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
def start(self):
self._thread.start()
def stop(self):
self._stop.set()
self._thread.join()
def _run(self):
last_date = None
while not self._stop.is_set():
now = datetime.now()
if last_date != now.date() and now.hour == 0 and now.minute == 0:
# New day at midnight - notify
self.notify({"event": "daily_start", "time": now})
last_date = now.date()
time.sleep(self.check_interval)
Task examples
def backup_task(event):
print("Running backup at", event["time"])
def daily_report(event):
print("Generating report at", event["time"])
Wiring
scheduler = DailyScheduler(check_interval=10.0)
scheduler.attach(backup_task)
scheduler.attach(daily_report)
scheduler.start()
In a real script, you'd keep the main thread alive or use a proper service manager.
Notes and best practices:
- For production, use robust schedulers (APScheduler) or system cron. This example illustrates how Observer fits into an automation script.
- To build a daily automation script, structure it with modular observers (tasks), make them idempotent, include logging, and test them with unit tests and dry runs.
Performance Optimization: Using functools for Memoization
Sometimes observers perform expensive computations based on the notification payload. You can use functools.lru_cache
to memoize results and avoid repeated work.
Example:
from functools import lru_cache
@lru_cache(maxsize=128)
def heavy_calculation(symbol: str, price: float) -> float:
# Pretend this is CPU-bound / expensive
import time
time.sleep(0.5)
return price * 1.05
def observer_with_cache(stock):
result = heavy_calculation(stock.symbol, stock.price)
print(f"Cached calc for {stock.symbol}: {result}")
Explanation:
@lru_cache
caches results keyed by function arguments. If the same symbol/price appears repeatedly, subsequent calls are fast.- Be mindful: unbounded or large caches can consume memory. Choose an appropriate
maxsize
. lru_cache
works best when inputs are hashable and repeated.
Unit Testing Observers: Best Practices with unittest and pytest
Testing Observers should assert:
- Registration and deregistration behavior.
- That notifications reach all observers.
- That exceptions in one observer don't suppress others.
- Thread-safety where applicable.
# test_observer_unittest.py
import unittest
from observer_threadsafe import Subject
class TestObserver(unittest.TestCase):
def test_attach_notify(self):
subject = Subject()
called = []
def obs(data):
called.append(data)
subject.attach(obs)
subject.notify(42)
self.assertEqual(called, [42])
def test_exception_in_observer(self):
subject = Subject()
called = []
def bad(data):
raise ValueError("oops")
def good(data):
called.append(data)
subject.attach(bad)
subject.attach(good)
subject.notify("hi")
self.assertEqual(called, ["hi"])
if __name__ == '__main__':
unittest.main()
pytest equivalent:
# test_observer_pytest.py
import pytest
from observer_threadsafe import Subject
def test_attach_notify():
subject = Subject()
called = []
def obs(data):
called.append(data)
subject.attach(obs)
subject.notify(100)
assert called == [100]
def test_exception_in_observer(caplog):
subject = Subject()
def bad(data):
raise RuntimeError("bad")
def good(data):
return "ok"
subject.attach(bad)
subject.attach(good)
subject.notify("payload")
# Ensure observers continued; no exception propagated.
# Depending on logger usage, you can assert log messages count:
assert any("Observer raised an exception" in rec.message for rec in caplog.records)
Best testing practices:
- Use fixtures to create Subjects and ephemeral observers.
- Use
caplog
(pytest) or logging capture to assert failures were logged. - Mock external dependencies (I/O, network) to keep tests deterministic.
- Test concurrency with caution: deterministic timing is hard — prefer to encapsulate concurrency logic and test deterministically if possible.
Common Pitfalls and How to Avoid Them
- Observers causing memory leaks: use
weakref
for bound methods or ensure detach on shutdown. - Observer exceptions stopping notifications: catch exceptions per observer.
- Race conditions in multi-threaded code: use locks and design to minimize lock contention.
- Ordering dependencies: if notifications require ordering, document and implement deterministic registration if necessary (but this couples observers).
- Late binding closure problem: when attaching lambdas in loops, bind variables correctly (use default args).
callbacks = []
for i in range(3):
callbacks.append(lambda: print(i)) # all will print 2
Fix:
for i in range(3):
callbacks.append(lambda i=i: print(i))
Advanced Tips
- Use Abstract Base Classes (ABC) to formalize observer interfaces:
class Observer(abc.ABC): def update(self, data): ...
- Combine with
asyncio
for async observers:
await
inside notify; consider asyncio.gather
.
- Use priority queues if some observers need to run before others.
- For complex event systems, consider libraries (pydispatcher, RxPY) that provide richer semantics.
- Document event payload schemas; using dataclasses or TypedDicts helps consumers know what to expect.
Conclusion
The Observer pattern is a powerful tool to decouple producers from consumers, making code modular and extensible. In Python, a few thoughtful choices — using weakrefs to avoid leaks, locks for thread-safety, and memoization via functools for heavy observers — make implementations robust and performant.
Try it yourself:
- Refactor a small automation script into an event-driven structure using Subject/Observer.
- Add tests using unittest or pytest to verify behavior and edge cases.
- Experiment with functools.lru_cache to speed up expensive observer computations.
- Implement an async observer bus with asyncio.
- Plug in file system events (watchdog) and subscribe tasks to file changes.
- Build a small dashboard that listens to a StockTicker and renders updates.
Further Reading and References
- Observer pattern (Gang of Four): classic design pattern description.
- Python docs:
- Testing:
- Related tutorials:
Happy coding! If you want, paste a small snippet of your current project and I’ll show how to refactor it using Observer and provide unit tests.
Was this article helpful?
Your feedback helps us improve our content. Thank you!