
Implementing the Observer Pattern in Python: Enhancing Code Flexibility and Responsiveness
Learn how to implement the classic Observer pattern in Python to decouple components, build reactive systems, and improve code flexibility. This guide covers practical, thread-safe and async-ready implementations, integrates context-aware techniques using Python's contextvars, and relates the pattern to microservices and interactive data visualization workflows.
Introduction
Have you ever wanted to decouple parts of your application so changes in one component automatically notify others — without hard wiring them together? That's exactly the problem the Observer pattern solves. In Python, implementing observers lets you build responsive, maintainable systems: GUI widgets that update when data changes, logging subsystems that react to state transitions, or microservices that publish domain events.
In this post you'll learn:
- What the Observer pattern is and when to use it,
- Practical, production-ready Python implementations (synchronous, thread-safe, and async),
- How to use contextvars for thread-safe context management with observers,
- How this pattern relates to building scalable microservices and event-driven architectures,
- A demo showing observers updating a Matplotlib/Seaborn visualization.
Prerequisites
This guide assumes intermediate Python knowledge:
- Classes, functions, decorators
- The
weakref,threading, andasynciomodules - Basic familiarity with Matplotlib/Seaborn (for the visualization demo)
- Awareness of microservice concepts (REST, pub/sub)
contextvars; Python 3.8+ is recommended).
Core Concepts
- Subject (Observable): The object that maintains a list of observers and sends notifications when its state changes.
- Observer: A callable or object that receives updates from the subject.
- Decoupling: Observers don't need to know the internal mechanics of the subject — only what information they receive.
- Event-driven: Systems using observers often become event-driven, enabling reactive/asynchronous interactions.
- Improve modularity and testability.
- Make code responsive and easier to extend (add observers without changing subjects).
- Replace fragile if/else or polling-based checks.
- Overuse can lead to hard-to-follow control flow. Use for clear producer → many consumers scenarios.
Basic Observer Implementation (Synchronous)
We'll start with a minimal, easy-to-understand implementation.
# observer_basic.py
import weakref
from typing import Callable, Any
class Subject:
def __init__(self):
# Use WeakSet so observers don't prevent GC of bound methods/objects
self._observers = weakref.WeakSet()
def register(self, observer: Callable[[Any], None]) -> None:
"""Register an observer callable (function or bound method)."""
self._observers.add(observer)
def unregister(self, observer: Callable[[Any], None]) -> None:
"""Unregister an observer; no-op if not present."""
self._observers.discard(observer)
def notify(self, data: Any) -> None:
"""Notify all observers with the data payload."""
for obs in list(self._observers):
obs(data)
Usage example
if __name__ == "__main__":
class Logger:
def __call__(self, data):
print(f"Logger received: {data}")
def print_observer(data):
print("Print observer:", data)
s = Subject()
s.register(print_observer)
s.register(Logger())
s.notify({"event": "update", "value": 42})
Line-by-line explanation:
weakref.WeakSet()stores observers without creating strong references. If an observer object is deleted elsewhere, it won't leak memory.register/unregistermethods let clients add or remove observers.notifyiterates over observers and calls them with adatapayload.- Example usage demonstrates a function observer and a callable object observer.
- Input: arbitrary
datapassed to observers (could be primitive, dict, custom object). - Output: observers receive
data. If an observer raises an exception, thenotifyloop will abort unless handled (we'll handle that below). - Edge case: If observers mutate the set while iterating, we convert to
list(self._observers)to avoid runtime errors.
Thread-Safe Observer with Error Handling
In multi-threaded programs, observers and the subject might be accessed concurrently. Use a lock to protect the observer collection, and handle exceptions per-observer so one failing observer won't break others.
# observer_threadsafe.py
import weakref
import threading
from typing import Callable, Any
class ThreadSafeSubject:
def __init__(self):
self._observers = weakref.WeakSet()
self._lock = threading.RLock()
def register(self, observer: Callable[[Any], None]) -> None:
with self._lock:
self._observers.add(observer)
def unregister(self, observer: Callable[[Any], None]) -> None:
with self._lock:
self._observers.discard(observer)
def notify(self, data: Any) -> None:
# Snapshot observers under lock
with self._lock:
observers = list(self._observers)
for obs in observers:
try:
obs(data)
except Exception as exc:
# Handle per-observer errors (log, alert, or swallow)
print(f"Observer {obs} raised {exc!r}; continuing")
Usage: same as before, but safe across threads
Key points:
threading.RLock()ensures reentrant safety.- We copy observers while holding the lock to avoid long holds during notifications.
- Per-observer try/except isolates failures.
Using contextvars for Thread-Safe Context Management
If observers need access to request-scoped or task-scoped context (e.g., user id, correlation id), don't rely on global variables. Use contextvars to propagate context in synchronous and asynchronous workflows.
Example: a subject notifies observers, and observers fetch a trace id from a contextvar.
# observer_contextvars.py
import contextvars
import weakref
Define a context variable for trace_id
trace_id_var = contextvars.ContextVar("trace_id", default=None)
class ContextAwareSubject:
def __init__(self):
self._observers = weakref.WeakSet()
def register(self, observer):
self._observers.add(observer)
def notify(self, data):
# Observers can read trace_id_var.get()
for obs in list(self._observers):
obs(data)
Observer that accesses context
def audit_observer(data):
trace = trace_id_var.get()
print(f"[trace={trace}] audit: {data}")
Usage
if __name__ == "__main__":
s = ContextAwareSubject()
s.register(audit_observer)
# Set context var locally
token = trace_id_var.set("req-123")
s.notify({"action": "create"})
trace_id_var.reset(token)
Explanation:
contextvars.ContextVarstores context data local to the current context (thread or async task).- Observers call
trace_id_var.get()to obtain the current trace id. This works across threads and asyncio tasks when used correctly. - Use
set()/reset()to manage context scope.
- In microservices or web applications, a request correlation id can be attached to events without passing it explicitly in every call.
- It replaces thread-local storage with a more robust mechanism that works with
asyncio.
Async Observer (asyncio-aware)
Modern Python services often use asyncio. Let's build an async-friendly observer pattern where observers can be sync or async callables.
# observer_async.py
import asyncio
import weakref
from typing import Callable, Any, Awaitable
def _is_awaitable(obj):
return asyncio.iscoroutine(obj) or isinstance(obj, Awaitable)
class AsyncSubject:
def __init__(self):
self._observers = weakref.WeakSet()
def register(self, observer: Callable[[Any], Any]) -> None:
self._observers.add(observer)
def unregister(self, observer: Callable[[Any], Any]) -> None:
self._observers.discard(observer)
async def notify(self, data: Any) -> None:
# Gather tasks for async observers, call sync observers directly
tasks = []
for obs in list(self._observers):
try:
res = obs(data)
if asyncio.iscoroutine(res):
tasks.append(res)
except Exception as exc:
print(f"Sync observer error: {exc!r}")
if tasks:
# Await all async observers concurrently and handle exceptions
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
for t in done:
if t.exception():
print(f"Async observer raised: {t.exception()!r}")
Usage
async def main():
subj = AsyncSubject()
async def async_obs(data):
await asyncio.sleep(0.1)
print("async observer", data)
def sync_obs(data):
print("sync observer", data)
subj.register(async_obs)
subj.register(sync_obs)
await subj.notify({"msg": "hello"})
if __name__ == "__main__":
asyncio.run(main())
Explanation:
notifydetects if an observer returned a coroutine and awaits it concurrently.- We handle both sync and async observers.
- Use
asyncio.waitto schedule concurrent execution; gather could also be used butwaitdemonstrates handling of partial failures.
- If an observer intentionally blocks (sync), it blocks the caller — prefer async observers in async contexts.
- Cancelled tasks and exceptions should be logged and handled.
Real-world Example: Observers Updating a Plot (Matplotlib + Seaborn)
Imagine a data source (subject) that streams measurements. Observers visualize the data. We'll create a simple example that collects data and, on each update, saves a plot using Matplotlib/Seaborn.
Note: For interactive updating you may use plt.ion() and GUI event loop integration. Here we'll generate static images on each notification for clarity.
# observer_plot.py
import weakref
import random
import time
from typing import Any, List
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")
class DataSubject:
def __init__(self):
self._observers = weakref.WeakSet()
self._data: List[float] = []
def register(self, obs):
self._observers.add(obs)
def add_sample(self, value: float):
self._data.append(value)
for obs in list(self._observers):
obs(self._data) # pass entire series
def save_line_plot(data, filename="plot.png"):
plt.figure(figsize=(6, 4))
sns.lineplot(x=list(range(len(data))), y=data, marker="o")
plt.title("Live Data")
plt.xlabel("sample")
plt.ylabel("value")
plt.tight_layout()
plt.savefig(filename)
plt.close()
print(f"Saved plot to {filename}")
if __name__ == "__main__":
subject = DataSubject()
subject.register(lambda data: save_line_plot(data, filename=f"plot_{len(data)}.png"))
for _ in range(5):
subject.add_sample(random.random())
time.sleep(0.2)
Explanation:
- Each time
add_sampleis called, observers get the full data series. - The observer
save_line_plotgenerates and saves an image using Matplotlib + Seaborn. - Inputs: list of numerical samples. Outputs: saved images.
- Edge cases: very large data lists will increase memory and plotting time — consider streaming windows.
Integrating Observer Pattern into Microservices and Scalable Architectures
The Observer pattern maps well to event-driven microservices:
- A service acts as a publisher (subject) of domain events.
- Consumers (observers) are independent microservices subscribing to events via message brokers (RabbitMQ, Kafka, Redis Pub/Sub).
- FastAPI or Flask for HTTP endpoints; prefer FastAPI for async-first microservices.
- Use frameworks/libraries to dispatch events to message brokers, or use serverless/event bus options.
- For in-process observer-style behavior within a service, use the patterns shown above.
- For cross-process or cross-machine observers, migrate to a message broker (pub/sub). Observers in different processes won't share memory — that's expected.
- Observers as microservices should be idempotent, resilient, and observable.
- Use correlation ids (contextvars locally, propagate via message headers across services).
- For large scale, employ backpressure and partitioning (Kafka topics, consumer groups).
- OrderService (subject) publishes "order.created" events to Kafka.
- BillingService, InventoryService (observers) consume events asynchronously and act.
- Observability components attach trace IDs to events — use context variables locally to add trace id to outgoing messages.
Best Practices
- Use weak references for in-process observers to avoid memory leaks.
- Protect observer lists with locks in threaded contexts.
- Use
contextvarsfor request/task-scoped context (e.g., trace ids). - Prefer async observers in async apps; avoid mixing blocking sync observers unless offloaded to thread executors.
- Keep observer execution fast; offload heavy work (I/O, long CPU tasks) to background workers or queues.
- Handle exceptions per-observer to avoid cascading failures.
- For distributed systems, prefer robust messaging (Kafka, RabbitMQ) over in-process observer lists.
Common Pitfalls
- Memory leaks: holding strong references to listeners prevents garbage collection.
- Unhandled exceptions in observers can break the notification loop.
- Blocking operations in observers delay or block the subject and other observers.
- Misuse of globals instead of
contextvarsin async environments leads to incorrect context propagation.
Advanced Tips
- Compose Observers: Allow observers to register filters or predicates so they only receive relevant events.
- Use typed events (with pydantic models) to add structure and validation to notifications.
- Integrate with reactive libraries (RxPY) if you need advanced event composition, backpressure, and operators.
- Use instrumentation (logging/metrics) within the subject to track observer counts, latencies, and failures.
def filter_event(predicate):
def decorator(fn):
def wrapper(data):
if predicate(data):
return fn(data)
return wrapper
return decorator
Usage
@filter_event(lambda d: d.get("priority") == "high")
def high_priority_handler(data):
print("Handle high priority:", data)
Performance Considerations
- Copying observer lists is O(n) per notification — acceptable for small sets but expensive at scale. Consider more efficient data structures or partitioning.
- If notifications are frequent and observers heavy, use batching or throttling.
- In async environments, prefer concurrency (await multiple coroutines) to reduce overall latency.
Conclusion
The Observer pattern is a simple yet powerful tool in your design toolbox. Implemented carefully in Python, it enables decoupled, responsive systems — from desktop applications to microservices and real-time visualizations. Use weakref to avoid leaks, threading locks for concurrency, and contextvars for robust context management across threads and async tasks. When scaling beyond a single process, combine the in-process observer pattern with proven messaging systems and service frameworks like FastAPI and Kafka.
Try it yourself: implement the observer examples above, swap in your own data sources, and connect observers that log, visualize (Matplotlib/Seaborn), or publish to message brokers.
Further Reading & References
- Gang of Four — Design Patterns: Observer
- Python contextvars: https://docs.python.org/3/library/contextvars.html
- asyncio official docs: https://docs.python.org/3/library/asyncio.html
- Matplotlib docs: https://matplotlib.org/stable/contents.html
- Seaborn docs: https://seaborn.pydata.org/
- FastAPI: https://fastapi.tiangolo.com/
- Kafka: https://kafka.apache.org/
- RxPY (ReactiveX for Python): https://rxpy.readthedocs.io/
- Extending the async observer to publish events to a Kafka topic,
- Using contextvars to propagate trace IDs across service boundaries,
- Or building a live dashboard that updates via WebSocket when subjects notify a plotting observer.
Was this article helpful?
Your feedback helps us improve our content. Thank you!