
Implementing the Observer Pattern in Python: Practical Use Cases, Dataclasses, Flask WebSockets & Dask Integrations
Learn how to implement the **Observer pattern** in Python with clean, production-ready examples. This post walks through core concepts, thread-safe and dataclass-based implementations, a real-time chat example using Flask and WebSockets, and how to hook observers into Dask-powered pipelines for monitoring and progress updates.
Introduction
Have you ever needed one object to notify many other objects when something changes — for example, a data producer broadcasting updates to dashboards, logs, or WebSocket clients? That's the job of the Observer pattern: a classic behavioral design pattern that decouples the producer (Subject) from consumers (Observers).
In this article you'll learn:
- What the Observer pattern is and when to use it.
- How to implement it idiomatically in Python.
- How to use
dataclasses
for clean data handling. - A production-minded, thread-safe implementation.
- A practical real-time chat example with Flask and WebSockets.
- How to integrate Observers with Dask to monitor and optimize pipelines.
Prerequisites
- Python 3.7+ (for
dataclasses
). If using 3.10+ you'll get more typing niceties. - Basic knowledge of classes, functions, and concurrency primitives (
threading
,asyncio
). - For the WebSockets example: Flask and a WebSocket library such as Flask-SocketIO.
- For the Dask example: familiarity with Dask and its task scheduling model.
Core Concepts: What is the Observer Pattern?
- Subject: Maintains a list of observers and notifies them of state changes.
- Observer: Defines an interface for objects that should be notified.
- Decoupling: Observers can be added/removed at runtime; Subjects don't need to know details.
Minimal Implementation — Learning the Basics
Let's start with a simple, synchronous implementation to see the skeleton.
# simple_observer.py
class Subject:
def __init__(self):
self._observers = []
def register(self, observer):
self._observers.append(observer)
def unregister(self, observer):
self._observers.remove(observer)
def notify(self, message):
for obs in list(self._observers):
obs.update(message)
class Observer:
def update(self, message):
raise NotImplementedError("Observer must implement update()")
Usage
class PrintObserver(Observer):
def update(self, message):
print("Received:", message)
s = Subject()
p = PrintObserver()
s.register(p)
s.notify("Hello observers")
Line-by-line explanation:
Subject
keeps a list of observers.register
/unregister
manage subscriptions.notify
iterates over observers and calls theirupdate
method.Observer
is a base class declaringupdate
.PrintObserver
implementsupdate
to print messages.
- If an observer's update raises an exception,
notify
stops — we'll address that in the robust implementation.
Using dataclasses for Clean and Efficient Data Management
When your notifications carry structured data, dataclasses
simplify creation and maintenance of message payloads.
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Notification:
event: str
payload: dict
timestamp: datetime = datetime.utcnow()
Explanation:
@dataclass
creates init, repr, and comparison methods automatically.- Use
Notification
as the message object passed to observers for consistency and easier debugging.
field(default_factory=...)
for defaults that are mutable.
A Robust, Thread-Safe Implementation
Production systems often have concurrency (background workers, WebSocket threads). Here’s a thread-safe implementation using threading.Lock
, weakref.WeakSet
to avoid memory leaks, and safe notification (catching observer exceptions).
# robust_observer.py
import threading
import weakref
from dataclasses import dataclass
from datetime import datetime
from typing import Protocol
@dataclass
class Notification:
event: str
payload: dict
timestamp: datetime = datetime.utcnow()
class ObserverProtocol(Protocol):
def update(self, notification: Notification) -> None:
...
class Subject:
def __init__(self):
self._observers = weakref.WeakSet()
self._lock = threading.RLock()
def register(self, observer: ObserverProtocol):
with self._lock:
self._observers.add(observer)
def unregister(self, observer: ObserverProtocol):
with self._lock:
self._observers.discard(observer)
def notify(self, notification: Notification):
# Snapshot to avoid modification during iteration
with self._lock:
observers = list(self._observers)
for obs in observers:
try:
obs.update(notification)
except Exception as ex:
# Log and continue notifying other observers
print(f"Observer {obs} raised: {ex}")
Line-by-line highlights:
WeakSet
means if an observer is garbage-collected elsewhere, it won't hang in the Subject.RLock
ensures register/unregister/notify coordination across threads.notify
takes a snapshot of observers to avoid issues if the set changes during iteration.- Exceptions from any observer are caught, logged (print here; replace with logging), and notification continues.
- Lock scope is minimal; acquiring it only to snapshot observers reduces contention.
- If observer
update
methods are slow, consider dispatching notifications asynchronously (threads or an event loop).
- Observers that block can slow
notify
. Use asynchronous dispatch if needed.
Example: Weather Station (Practical, synchronous use case)
# weather_station.py
class ConsoleObserver:
def update(self, notification):
print(f"[{notification.timestamp.isoformat()}] {notification.event}: {notification.payload}")
class FileObserver:
def __init__(self, path):
self.path = path
def update(self, notification):
with open(self.path, "a") as f:
f.write(f"{notification.timestamp.isoformat()} {notification.event} {notification.payload}\n")
subject = Subject()
subject.register(ConsoleObserver())
subject.register(FileObserver("weather.log"))
subject.notify(Notification(event="temperature", payload={"celsius": 23.5}))
Explanation:
- Two observers: one prints, another appends to a file.
- Input: Notification object.
- Output: Console print and appended log line.
- Edge cases: FileObserver will raise if file permissions denied — handle exceptions inside update in real apps.
Advanced: Real-Time Chat Application with Flask and WebSockets
The Observer pattern maps naturally to WebSocket clients subscribing to messages. Below is a simplified pattern using Flask-SocketIO (which uses eventlet/gevent by default or a threading mode). We'll show the concept; integrate into your app with proper setup and production server.
Install: pip install Flask Flask-SocketIO
Server skeleton:
# chat_server.py
from flask import Flask, render_template
from flask_socketio import SocketIO, emit, join_room, leave_room
from dataclasses import dataclass
from datetime import datetime
app = Flask(__name__)
socketio = SocketIO(app)
@dataclass
class ChatNotification:
room: str
user: str
message: str
timestamp: datetime = datetime.utcnow()
Subject: ChatRoomBroker
class ChatRoomBroker:
def __init__(self):
self.rooms = {} # room -> set of session ids
def register(self, sid, room):
self.rooms.setdefault(room, set()).add(sid)
def unregister(self, sid, room):
if room in self.rooms:
self.rooms[room].discard(sid)
def broadcast(self, notification: ChatNotification):
# Using SocketIO emit to send to room
socketio.emit("message", notification.__dict__, room=notification.room)
broker = ChatRoomBroker()
@socketio.on("join")
def on_join(data):
room = data["room"]
sid = request.sid
join_room(room)
broker.register(sid, room)
@socketio.on("leave")
def on_leave(data):
room = data["room"]
sid = request.sid
leave_room(room)
broker.unregister(sid, room)
@socketio.on("send")
def handle_send(data):
notification = ChatNotification(room=data["room"], user=data["user"], message=data["message"])
broker.broadcast(notification)
Explanation:
ChatRoomBroker
behaves like a Subject, managing subscriptions (rooms) and broadcasting to all observers (clients in that room).- Flask-SocketIO manages low-level socket client connections and routing; broker simply instructs SocketIO to emit to a room.
- Clients are web browsers connecting with Socket.IO client. Each client is an observer; the server needn't track per-client callbacks — SocketIO abstracts that.
- Authentication and room authorization are essential in production; validate
user
androom
on join. - Use Redis or another message queue for scaling across multiple processes/servers (SocketIO supports message queues).
Call to action: Try building a small chat room using this broker pattern locally and send messages from multiple browser tabs.
Observers for Monitoring Dask Pipelines
When optimizing data processing pipelines (e.g., with Dask), it's helpful to get real-time updates about task progress, errors, and resource usage. You can implement observers that subscribe to pipeline events.
Example: a simple progress reporter that observes Dask futures.
# dask_observer.py
from dask.distributed import Client, as_completed
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TaskStatus:
key: str
status: str
result: object = None
timestamp: datetime = datetime.utcnow()
class DaskObserver:
def update(self, task_status: TaskStatus):
print(f"{task_status.timestamp} - {task_status.key}: {task_status.status}")
client = Client() # connects to a Dask scheduler
futures = [client.submit(lambda x: x2, i) for i in range(10)]
observer = DaskObserver()
for future in as_completed(futures):
try:
res = future.result()
observer.update(TaskStatus(key=str(future.key), status="finished", result=res))
except Exception as ex:
observer.update(TaskStatus(key=str(future.key), status="error", result=str(ex)))
Explanation:
DaskObserver
receivesTaskStatus
updates as tasks complete.as_completed
yields futures as they finish; we push updates to the observer.- Integrating observers helps build dashboards, logs, or triggers that adapt pipeline behavior (e.g., retry logic on failure).
- Use observers to collect metrics for bottleneck analysis.
- Send metrics to monitoring systems or dashboards to determine where to apply parallelism or memory tuning.
- For large pipelines, avoid printing every event; aggregate or sample updates to avoid I/O bottlenecks.
Best Practices
- Use dataclasses for notification payloads to keep data structured and readable.
- Prefer weak references* (WeakSet) to avoid memory leaks when observers are ephemeral.
- Make notification non-blocking: either have observers perform fast work or dispatch notifications asynchronously (threads, event loop).
- Handle observer exceptions robustly so one bad observer doesn't break the whole broadcast.
- For distributed or multi-process systems, use message brokers (Redis, RabbitMQ) or frameworks (Flask-SocketIO, Dask) to route notifications across processes.
Common Pitfalls
- Forgetting to remove observers, causing memory leaks — use weak refs or explicit unregister patterns.
- Blocking in observer update methods — always expect observers might be slow; protect subject performance.
- Race conditions: modifying observer list while notifying — snapshot the list under a lock.
- Tight coupling: if your observers need internals of the subject, you're losing decoupling benefits.
Advanced Tips
- Async observer pattern: Use asyncio to make notifications asynchronous. Observers implement
async def update(...)
andnotify
usesasyncio.create_task
. - Back-pressure: For high-velocity streams (e.g., Dask progress), implement batching or rate limiting in the subject.
- Persistence: Persist events to a durable store if the subject's events must survive restarts.
- Monitoring: Integrate with Prometheus or other monitoring systems by turning observer events into metrics.
import asyncio
class AsyncSubject:
def __init__(self):
self._observers = set()
def register(self, observer):
self._observers.add(observer)
async def notify(self, message):
tasks = [asyncio.create_task(o.update(message)) for o in self._observers]
await asyncio.gather(tasks, return_exceptions=True)
Conclusion
The Observer pattern is a versatile tool for decoupling producers and consumers in Python applications, from simple logging to real-time chat rooms and monitoring Dask pipelines. Combining idiomatic Python practices — dataclasses
for payloads, thread-safety with locks and weakrefs, and asynchronous dispatch — yields robust and maintainable designs.
Try these next:
- Implement an asyncio observer chain and connect it to a WebSocket server.
- Instrument a Dask pipeline with observers that feed a small dashboard.
- Replace print-level logging in examples with Python's
logging
module.
Further Reading and References
- Official Python dataclasses documentation: https://docs.python.org/3/library/dataclasses.html
- Flask-SocketIO docs: https://flask-socketio.readthedocs.io/
- Dask documentation (distributed): https://docs.dask.org/
- "Design Patterns: Elements of Reusable Object-Oriented Software" — the canonical description of Observer (Gang of Four)
Was this article helpful?
Your feedback helps us improve our content. Thank you!