Implementing the Observer Pattern in Python: Event-Driven Programming Techniques

Implementing the Observer Pattern in Python: Event-Driven Programming Techniques

November 18, 202511 min read17 viewsImplementing the Observer Pattern in Python: Event-Driven Programming Techniques

Learn how to implement the **Observer Pattern** in Python to build clean, event-driven systems. This comprehensive guide walks you from foundational concepts to practical, production-ready examples (including thread-safe and asyncio variants), plus best practices for memory optimization, automation scripts, and ETL pipeline use cases.

Introduction

Have you ever wanted parts of your program to react automatically when something changes — without tightly coupling components? That's the core idea behind the Observer Pattern (also called publish/subscribe or pub/sub). It's an essential tool for event-driven programming, useful in GUI frameworks, automation scripts, and data pipelines.

In this post you'll learn:

  • What the Observer Pattern is and when to use it
  • Several Python implementations (simple, memory-friendly, thread-safe, and asyncio-based)
  • Real-world examples: automating daily tasks and building ETL pipelines
  • Performance and memory optimizations, plus common pitfalls and advanced tips
Prerequisites: Familiarity with Python 3.x, basic OOP, and asynchronous programming (optional for advanced sections).

Why use the Observer Pattern?

Think of a newspaper subscription: the publisher releases news, and many subscribers receive updates independently. Observers subscribe to a subject and get notified when an event occurs. Benefits include:

  • Loose coupling: publishers don't need to know subscriber internals.
  • Flexibility: add/remove observers at runtime.
  • Separation of concerns: logic for events and responses are kept separate.
Use cases:
  • GUI event handling
  • Task automation (e.g., notify routines when a file changes)
  • ETL pipelines (stages emit events when data is ready)
  • Logging, metrics, and monitoring hooks

Core Concepts

Key terms:

  • Subject (a.k.a. Publisher): Maintains a list of observers and notifies them.
  • Observer (a.k.a. Subscriber): Implements a callback to respond to notifications.
  • Event: Payload or reason for notification. Can be simple signals or rich objects.
Design considerations:
  • How are observers identified and stored? (strong references vs weak references)
  • Should notifications be synchronous or asynchronous?
  • How to handle exceptions raised by observers?
  • Thread-safety: will notifications happen across threads?

Simple synchronous implementation

Let's start with the simplest, most explicit implementation.

# simple_observer.py
from typing import Callable, Any, List

class Subject: def __init__(self) -> None: self._observers: List[Callable[[Any], None]] = []

def subscribe(self, observer: Callable[[Any], None]) -> None: """Register an observer (a callable that accepts one argument).""" if observer not in self._observers: self._observers.append(observer)

def unsubscribe(self, observer: Callable[[Any], None]) -> None: """Unregister an observer.""" try: self._observers.remove(observer) except ValueError: pass # Ignore if observer wasn't registered

def notify(self, event: Any) -> None: """Notify all observers of the event (synchronous).""" for observer in list(self._observers): observer(event)

Line-by-line explanation:

  • from typing import ...: Type hints help readability and tooling.
  • class Subject: This is the publisher.
  • self._observers: A list storing observer callables.
  • subscribe: Adds a callable if it's not already present.
  • unsubscribe: Removes a callable; uses try/except to ignore missing observers.
  • notify: Iterates over a shallow copy (list(self._observers)) to avoid issues if observers modify the list during notification.
Example usage:

def logger(event):
    print(f"[LOGGER] Received: {event}")

def audit(event): print(f"[AUDIT] Event: {event}")

s = Subject() s.subscribe(logger) s.subscribe(audit)

s.notify({"type": "file_saved", "path": "/tmp/data.csv"})

Output:

[LOGGER] Received: {'type': 'file_saved', 'path': '/tmp/data.csv'}

[AUDIT] Event: {'type': 'file_saved', 'path': '/tmp/data.csv'}

Edge cases:

  • If an observer raises an exception, notification stops by default. We'll handle this below.

Handling observer exceptions and isolation

You usually don't want one failing observer to prevent others from receiving events. Wrap calls in try/except and optionally collect failures.

def notify_safe(self, event: Any) -> None:
    errors = []
    for observer in list(self._observers):
        try:
            observer(event)
        except Exception as exc:
            errors.append((observer, exc))

if errors: # Handle errors: log or raise a combined exception print(f"Warning: {len(errors)} observers failed.")

Explanation:

  • We collect exceptions to avoid losing failures and to decide how to handle them centrally.

Avoiding memory leaks: weak references

If observers are bound methods of objects, storing strong references in the subject prevents those objects from being garbage-collected. Use weakref to avoid that.

# weak_observer.py
import weakref
from typing import Callable, Any

class WeakMethod: """A minimal weak reference wrapper for bound methods.""" def __init__(self, method: Callable): try: # method.__func__ is the function, __self__ is the instance self._func = method.__func__ self._obj_ref = weakref.ref(method.__self__) except AttributeError: # Not a bound method: store strong reference to function self._func = method self._obj_ref = None

def __call__(self): if self._obj_ref is None: return self._func obj = self._obj_ref() if obj is None: return None return self._func.__get__(obj, obj.__class__)

class Subject: def __init__(self) -> None: self._observers = []

def subscribe(self, callback: Callable) -> None: self._observers.append(WeakMethod(callback))

def unsubscribe(self, callback: Callable) -> None: self._observers = [w for w in self._observers if w() != callback]

def notify(self, event: Any) -> None: alive = [] for weak_cb in self._observers: cb = weak_cb() if cb is None: # target collected, skip continue try: cb(event) alive.append(weak_cb) except Exception: alive.append(weak_cb) self._observers = alive

Explanation:

  • WeakMethod: wraps bound methods with weak references to the instance; supports plain functions too.
  • subscribe: stores weak wrappers.
  • notify: skips callbacks whose target objects have been garbage-collected.
Reference: See official weakref module docs for patterns like weakref.WeakMethod in Python 3.4+ (https://docs.python.org/3/library/weakref.html).

Thread-safe observer pattern

If notifications and subscriptions can happen from multiple threads, use a lock.

# thread_safe_observer.py
import threading
from typing import Callable, Any, List

class ThreadSafeSubject: def __init__(self): self._observers: List[Callable[[Any], None]] = [] self._lock = threading.RLock()

def subscribe(self, observer: Callable[[Any], None]) -> None: with self._lock: if observer not in self._observers: self._observers.append(observer)

def unsubscribe(self, observer: Callable[[Any], None]) -> None: with self._lock: try: self._observers.remove(observer) except ValueError: pass

def notify(self, event: Any) -> None: with self._lock: observers_snapshot = list(self._observers) for observer in observers_snapshot: try: observer(event) except Exception as exc: # handle per-app policy (logging, continue, re-raise, etc.) print(f"Observer error: {exc}")

Explanation:

  • Use RLock to allow reentry when callbacks might resubscribe.
  • Take snapshot under lock, then call observers outside the lock to avoid deadlocks and allow long-running observers without blocking subscriptions.
Performance note: locks add overhead, but they're essential for correctness in multithreaded contexts.

Asynchronous (asyncio) observer pattern

For event-driven async apps, use async def callbacks and await them.

# async_observer.py
import asyncio
from typing import Callable, Any, Awaitable, List

class AsyncSubject: def __init__(self) -> None: self._observers: List[Callable[[Any], Awaitable[None]]] = []

def subscribe(self, observer: Callable[[Any], Awaitable[None]]) -> None: if observer not in self._observers: self._observers.append(observer)

def unsubscribe(self, observer: Callable[[Any], Awaitable[None]]) -> None: try: self._observers.remove(observer) except ValueError: pass

async def notify(self, event: Any) -> None: await asyncio.gather([obs(event) for obs in list(self._observers)], return_exceptions=True)

Explanation:

  • notify uses asyncio.gather to run observers concurrently.
  • return_exceptions=True ensures one failing observer doesn't cancel the others; you can then inspect exceptions if needed.
Example:

async def subscriber(name, event):
    await asyncio.sleep(0.1)
    print(f"{name} got {event}")

async def main(): s = AsyncSubject() s.subscribe(lambda e: subscriber("A", e)) s.subscribe(lambda e: subscriber("B", e)) await s.notify({"job": "ETL", "status": "complete"})

asyncio.run(main())

Real-world example 1: Automating daily tasks

Suppose you have a daily automation script that performs backups, notifications, and cleanup. Using Observer Pattern, the scheduler triggers a "daily_event" and each task subscribes independently.

# daily_tasks.py (simplified)
from datetime import datetime
import time

class DailyScheduler: def __init__(self): self._subject = Subject()

def subscribe_task(self, task): self._subject.subscribe(task)

def run_daily(self): # In production, you'd use cron or system scheduler. Here, a simple loop: while True: now = datetime.now() event = {"time": now.isoformat(), "type": "daily_run"} self._subject.notify(event) time.sleep(24 3600) # sleep 1 day (for demo, you'd shorten this)

Tasks can be small functions that perform backup, send summary email, or archive logs. This pattern makes it easy to add or remove automation tasks without changing the scheduler core.

Practical tip: For real automation scripts, combine with utilities from the standard library (e.g., argparse for CLI, logging for logs) and consider using a lightweight job scheduler library (APScheduler).

Real-world example 2: Building an ETL pipeline

Imagine an ETL pipeline where data ingestion emits events when a batch is ready. Different observers can take a batch to clean, transform, or push to storage. Observers enable parallelism and modularity.

Example (synchronous view):

# etl_pipeline.py
class ETLIngest:
    def __init__(self):
        self.events = Subject()

def ingest_data(self, batch): # parse/validate... self.events.notify({"batch": batch, "stage": "ingestion_complete"})

class Cleaner: def __call__(self, event): batch = event["batch"] # perform cleaning print("Cleaned", len(batch))

Use-case integration:

  • In larger ETL systems, you might use message queues (Kafka/RabbitMQ) instead of in-process observers for scalability.
  • Observers here could push results to the next stage, or notify a monitoring system.
Tip: Combine with "Building a Data Pipeline with Python: Tools and Techniques for ETL Processes" — consider integrating this pattern into frameworks like Airflow or Luigi for scheduling, and use observers to hook custom monitoring or metrics collection.

Performance considerations and memory optimization

Key performance points:

  • Notification cost is O(n) in number of observers. Avoid having very large numbers of observers if synchronous notification is used.
  • Use batching if many events fire quickly: buffer events and emit summaries.
  • For long-running apps, prefer weak references to avoid memory leaks (see weakref example).
  • Avoid storing large objects inside the observer list; store lightweight references or IDs and resolve lazily.
  • When using async, consider concurrency limits — uncontrolled parallel observers may exhaust resources; use semaphores or a bounded task pool.
Memory optimization tips:
  • Use weakrefs for bound methods and large object references.
  • Use generators and streaming (yield) when processing large data sets in observers.
  • Profile memory with tracemalloc and tools like memory_profiler to find hotspots.
  • Avoid cyclic references or break cycles (use weakrefs or explicit cleanup).
Official docs to consult:

Best practices

  • Define a clear event schema (use dataclasses or typed dicts): makes observer expectations explicit.
  • Document lifecycle: who subscribes/unsubscribes and when.
  • Use weak references for bound methods to avoid leaks.
  • Isolate observer errors so one failure doesn't break the system.
  • Limit concurrency for resource-intensive observers.
  • Log meaningful information including which observers failed and stack traces.
Example: Using dataclasses for events
from dataclasses import dataclass

@dataclass class Event: type: str payload: dict timestamp: float

Common pitfalls

  • Forgetting to unsubscribe observers belonging to short-lived objects -> memory leaks.
  • Observers modifying the observer list during iteration -> potential runtime errors (use snapshot).
  • Blocking calls in synchronous observers causing slow notifications — prefer async or move work to worker threads/queues.
  • Not handling exceptions in observers -> silent application crashes or incomplete notifications.
  • Using global subjects without clear ownership -> maintainability issues.

Advanced tips

  • Priorities: support observer priorities to control notification order.
  • Filtering: allow observers to register with filters (only receive events matching criteria).
  • Rate-limiting: throttle notifications to observers that are slow or rate-sensitive.
  • Distributed pub/sub: for cross-process systems, use message brokers (Redis, Kafka) instead of in-process Observers.
  • Testing: write unit tests that assert that observers are called with expected events. Use mocks for observers.
Example: Observer with filter and priority
class FilteredSubject(Subject):
    def subscribe(self, observer, filter_fn=None, priority=0):
        self._observers.append((priority, filter_fn, observer))
        self._observers.sort(key=lambda x: x[0], reverse=True)

def notify(self, event): for _, filter_fn, observer in list(self._observers): if filter_fn and not filter_fn(event): continue observer(event)

Putting it all together — checklist before deploying

  • [ ] Define event shapes and document them
  • [ ] Choose synchronous vs asynchronous notifications
  • [ ] Ensure thread-safety if needed
  • [ ] Use weakrefs when observers are instance methods
  • [ ] Implement error handling and logging for observer failures
  • [ ] Profile for performance and memory usage
  • [ ] Add unit tests for observer behavior

Conclusion

The Observer Pattern is a small but powerful tool in your Python toolbox. It encourages modular design, simplifies automation scripts (e.g., for daily tasks), and helps structure ETL stages and monitoring hooks. When implementing observers, be mindful of memory (use weakrefs), concurrency, and error handling. For complex or distributed scenarios, consider message brokers or full-featured event systems.

Try it now: pick a small automation task or a step in your ETL pipeline and refactor it to use an observer-based approach. Experiment with the async variant if you need concurrency.

Further reading and resources

  • Official Python docs: weakref, asyncio, threading (see https://docs.python.org/3/)
  • Patterns and idioms: "Design Patterns" (Gamma et al.)
  • Tools and libraries: APScheduler (for scheduling), Redis/Kafka (for distributed pub/sub), RxPy (reactive programming)
If you enjoyed this guide, try adapting the observer examples to your own daily automation scripts or ETL components. Share your code or questions — I'd love to see what you build!

Was this article helpful?

Your feedback helps us improve our content. Thank you!

Stay Updated with Python Tips

Get weekly Python tutorials and best practices delivered to your inbox

We respect your privacy. Unsubscribe at any time.

Related Posts

Practical Techniques for Handling CSV Data with Python's Built-in Libraries

Learn practical, production-ready techniques for reading, writing, validating, and processing CSV data using Python's built-in libraries. This post covers core concepts, robust code patterns (including an example of the Strategy Pattern), unit-testing edge cases with pytest, and guidance to scale to large datasets (including a Dask mention). Try the code and level up your CSV-processing skills.

Implementing Pagination in Python Web Applications: Strategies for Efficient Data Handling

Pagination is essential for building responsive, scalable Python web applications that handle large datasets. This post breaks down pagination strategies—offset/limit, keyset (cursor), and hybrid approaches—with practical Flask and FastAPI examples, performance tips, and advanced techniques like background prefetching using Python's threading module. Follow along to implement efficient, production-ready pagination and learn how related topics like automated file batching and Python 3.11 improvements can enhance your systems.

Mastering Python Packages: Best Practices for Structuring, Building, and Distributing Your Code

Dive into the world of Python packaging and learn how to transform your scripts into reusable, distributable libraries that power real-world applications. This comprehensive guide covers everything from project structure and setup files to advanced best practices, complete with practical code examples to get you started. Whether you're an intermediate Python developer looking to share your code or streamline team collaborations, you'll gain the skills to create professional packages that stand the test of time.