Design Patterns in Python: Applying Singleton, Factory, and Observer Patterns in Real Projects

Design Patterns in Python: Applying Singleton, Factory, and Observer Patterns in Real Projects

November 10, 202512 min read33 viewsDesign Patterns in Python: Applying Singleton, Factory, and Observer Patterns in Real Projects

Learn how to apply **Singleton**, **Factory**, and **Observer** design patterns in Python to build robust, maintainable systems. This post walks intermediate developers through concepts, thread-safety, real-world examples (database connections, plugin factories, event systems), and integration tips with Celery, multiprocessing, and data validation tools like pydantic and marshmallow.

Introduction

Design patterns are proven, reusable solutions to common software design problems. In Python projects—from web services to automation frameworks—Singleton, Factory, and Observer patterns frequently appear. Why learn them? Because patterns help you communicate intent, reduce bugs, and structure code for evolution.

Imagine you need:

  • A single database connection object shared across your app.
  • A pluggable system where new components are easily added.
  • An efficient event/broadcast system for state changes.
These are classic use-cases for the Singleton, Factory, and Observer patterns. In this post you'll get conceptual explanations, step-by-step code examples, explanations line-by-line, edge cases, best practices, and pointers to related topics like building a Celery + Redis task automation framework, using multiprocessing for CPU-bound tasks, and validating inputs with pydantic or marshmallow.

Prerequisites

You should be comfortable with:

  • Python 3.x basics (classes, modules, multiprocessing basics).
  • Basic concurrency concepts (threads vs processes).
  • Familiarity with dependency injection isn't required but helpful.
  • Optional familiarity with Celery, Redis, pydantic, marshmallow.
Tools/libraries shown:
  • Python standard library (threading, weakref, abc).
  • pydantic and marshmallow (for validation examples).
  • Celery + Redis only mentioned for integration guidance (no full Celery setup required here).

Core Concepts — Quick Overview

  • Singleton: Ensures a class has only one instance and provides a global access point. Useful for shared resources (configuration, connection pools). Caveat: singletons are per-process in Python; multiprocessing spawns separate interpreters.
  • Factory: Encapsulates object creation, allowing clients to request objects by type or configuration without specifying exact classes. Useful for plugin systems, task factories, or selecting strategies at runtime.
  • Observer: Allows objects (observers) to subscribe to state changes in another object (subject). Useful for event systems, decoupling components, or broadcasting updates.

Design Considerations & Challenges

  • Thread-safety: When building Singletons or event systems used in threads, guard against race conditions.
  • Processes vs Threads: Python's multiprocessing creates separate processes; a Singleton in one process isn't the same in another. Consider using inter-process resources (Redis, message brokers) when sharing is required.
  • Memory management: Observers should not prevent garbage collection—use weak references or explicit unsubscribe.
  • Validation: When factories accept external input, validate construction parameters (pydantic or marshmallow).
  • Performance: Event dispatching and validation can become bottlenecks; consider async or multiprocessing for CPU-bound handlers.

Step-by-Step Examples

We'll implement the patterns in practical contexts:

  1. Thread-safe Singleton for a DB connection manager.
  2. Factory for creating worker/task objects with pydantic validation.
  3. Observer pattern for an event bus, using weakrefs to avoid leaks.
All examples assume Python 3.8+.

Example 1 — Thread-safe Singleton (Database/Config Manager)

Use case: A shared configuration manager or simple connection pool. We'll implement a metaclass-based Singleton with thread safety.

# db_singleton.py
import threading
from typing import Optional

class SingletonMeta(type): """ Thread-safe Singleton metaclass. Any class using this metaclass will have only one instance per process. """ _instances = {} _lock = threading.Lock()

def __call__(cls, args, kwargs): # Double-checked locking if cls not in cls._instances: with cls._lock: if cls not in cls._instances: instance = super().__call__(args, kwargs) cls._instances[cls] = instance return cls._instances[cls]

class ConfigManager(metaclass=SingletonMeta): def __init__(self, config_source: Optional[str] = None): # A simple initialization that might be expensive self.config_source = config_source or "default.yaml" self._config = None

def load(self): # Imagine expensive I/O here if self._config is None: # For demo, just set a dict. Real code would read a file or DB self._config = {"db_host": "localhost", "db_port": 5432} return self._config

Line-by-line explanation:

  • import threading, typing: imports for concurrency and type hints.
  • SingletonMeta: a metaclass that implements a thread-safe Singleton using a class-level lock. _instances stores per-class singletons.
  • __call__: when the class is instantiated, check _instances. If not present, acquire _lock, check again (double-checked locking) and create instance safely.
  • ConfigManager uses the metaclass. load() lazily initializes configuration.
Edge cases and notes:
  • Singleton is per-process. If you run multiple workers via multiprocessing or Celery workers, each process will have its own instance.
  • Avoid storing huge mutable state in Singletons unless carefully managed.
  • Use dependency injection or factory wrappers in tests to provide mocked instances.

Example 1b — Process-aware Singleton Alternative

If you need a single shared resource across processes (e.g., config or cache), prefer a networked store (Redis) or an IPC mechanism. Example: save configuration in Redis — fits well with a Celery + Redis automation architecture.

Mention: For building a Task Automation Framework in Python Using Celery and Redis: A Step-by-Step Guide, using Redis for shared state and Celery for distributed workers is the recommended approach instead of trying to make a true cross-process Singleton.

Example 2 — Factory Pattern (Plugin/Task Factory with Validation)

Use case: Create different task handler objects based on configuration or user input. We'll add pydantic validation to ensure construction parameters are valid.

First, define schemas with pydantic.

# tasks.py
from abc import ABC, abstractmethod
from pydantic import BaseModel, Field, ValidationError
from typing import Dict, Type

class TaskConfig(BaseModel): type: str = Field(..., description="Task type identifier") payload: Dict = Field(default_factory=dict)

class Task(ABC): def __init__(self, config: TaskConfig): self.config = config

@abstractmethod def run(self): pass

class EmailTask(Task): def run(self): # Real code would integrate with an email service recipient = self.config.payload.get("recipient") return f"Sending email to {recipient}"

class ImageProcessingTask(Task): def run(self): image_id = self.config.payload.get("image_id") return f"Processing image {image_id}"

Now the factory:

# factory.py
from typing import Dict, Type
from tasks import TaskConfig, Task, EmailTask, ImageProcessingTask

class TaskFactory: _registry: Dict[str, Type[Task]] = {}

@classmethod def register(cls, name: str, task_cls: Type[Task]): cls._registry[name] = task_cls

@classmethod def create(cls, data: dict) -> Task: """ Validate input with pydantic, then instantiate the appropriate Task. Raises ValueError on unknown type or ValidationError on schema issues. """ config = TaskConfig(data) task_type = config.type task_cls = cls._registry.get(task_type) if not task_cls: raise ValueError(f"Unknown task type: {task_type}") return task_cls(config)

Register built-in tasks

TaskFactory.register("email", EmailTask) TaskFactory.register("image", ImageProcessingTask)

Line-by-line explanation:

  • TaskConfig: pydantic model validating presence of type and optional payload. This prevents invalid data reaching factory logic.
  • Task: abstract base class for tasks.
  • Specific tasks: EmailTask and ImageProcessingTask implement run().
  • TaskFactory: registry-based factory that maps string keys to classes.
  • create: validates input using pydantic, then looks up the class and instantiates it.
Example usage:

# example_usage.py
from factory import TaskFactory
from pydantic import ValidationError

payload = {"type": "email", "payload": {"recipient": "alice@example.com"}}

try: task = TaskFactory.create(payload) result = task.run() print(result) # Sending email to alice@example.com except ValidationError as ve: print("Invalid task data:", ve) except ValueError as ve: print("Factory error:", ve)

Edge cases and notes:

  • Using pydantic helps with robust input validation. Alternatively, marshmallow can be used if you prefer serialization/deserialization patterns.
  • Factories are excellent for plugin architectures — plugins can register themselves on import.
  • For dynamic plugin discovery, consider using entry points (setuptools) or a plugin registry.

Example 3 — Observer Pattern (Event Bus)

Use case: An application needs decoupled components to react to events (e.g., config changes, task completion). We'll implement an event bus where observers subscribe to event types. Use weak references to avoid preventing garbage collection of observers.

# event_bus.py
import weakref
from collections import defaultdict
from typing import Callable, Any, Dict, Set

class EventBus: def __init__(self): # mapping: event_name -> set of weakrefs to callbacks self._subscribers: Dict[str, Set[weakref.WeakMethod]] = defaultdict(set)

def subscribe(self, event_name: str, callback: Callable): """ Subscribe a bound method as callback. Use weak references so subscribers can be garbage collected. """ if hasattr(callback, "__self__") and hasattr(callback, "__func__"): # bound method self._subscribers[event_name].add(weakref.WeakMethod(callback)) else: # function; wrap in a simple weakref-like container # functions do not have WeakMethod, but are globals and won't be GC'ed in typical modules raise TypeError("Only bound methods are supported for safe weak refs.")

def unsubscribe(self, event_name: str, callback: Callable): to_remove = None for ref in list(self._subscribers.get(event_name, [])): cb = ref() if cb is callback: to_remove = ref break if to_remove: self._subscribers[event_name].remove(to_remove)

def publish(self, event_name: str, data: Any = None): dead = [] for ref in list(self._subscribers.get(event_name, [])): callback = ref() if callback is None: dead.append(ref) continue try: callback(data) except Exception as e: # Basic error handling: log or handle subscriber exceptions print(f"Error in subscriber for {event_name}: {e}") # Cleanup dead refs for ref in dead: self._subscribers[event_name].remove(ref)

Example subscriber:

# listeners.py
class Logger:
    def __init__(self, name):
        self.name = name

def on_task_complete(self, data): print(f"[{self.name}] Task completed: {data}")

usage

bus = EventBus() logger = Logger("Main") bus.subscribe("task_complete", logger.on_task_complete) bus.publish("task_complete", {"task_id": 123})

Output: [Main] Task completed: {'task_id': 123}

Line-by-line explanation:

  • EventBus uses defaultdict(set) to hold event subscriber sets.
  • subscribe registers bound methods using weakref.WeakMethod; ensures subscriber objects can be GC'd.
  • publish iterates subscribers, calls callbacks with data, collects dead references to remove.
  • Exceptions in subscribers are caught and logged; you might want to use structured logging in production.
Edge cases and notes:
  • Functions (unbound) are not weak-referenceable via WeakMethod. You can store them strongly or use wrappers. Be careful: strong references to large closures can prevent GC.
  • Consider dispatching events asynchronously if handlers can be slow: use threads, a worker pool, or integrate with Celery for distributed handling.

Integrating with Celery, multiprocessing, and Validation Pipelines

Patterns often integrate with other systems.

  • Celery + Redis (Task Automation Framework): Use Factory to register Celery tasks dynamically or to create task wrappers. Observers are useful for signaling task start/finish events to a local monitor; for distributed events, publish to Redis channels or use Celery signals. Singleton metaclasses are fine for per-worker singletons; avoid assuming a single global instance across multiple Celery worker processes.
  • multiprocessing (CPU-bound optimization): For CPU-bound handlers in your Observer, avoid blocking the main loop—use multiprocessing.Pool or concurrent.futures.ProcessPoolExecutor. Note: Singletons don't cross process boundaries; use process-safe resources like Manager objects or external services.
  • Validation Pipelines (pydantic and marshmallow): Combine Factory and validation: validate inputs with pydantic models in factories before object creation. Use marshmallow when you need explicit serialization to/from formats (JSON) or schema-driven (de)serialization pipelines.
Example: Using a multiprocess executor to handle heavy observer work:
# observer_with_multiprocessing.py
from concurrent.futures import ProcessPoolExecutor

def heavy_handler(data): # CPU-intensive work here return f"Processed {data['task_id']}"

executor = ProcessPoolExecutor(max_workers=4)

When publishing an event:

future = executor.submit(heavy_handler, {"task_id": 1}) result = future.result() # wait or store future for later

Notes:

  • Use executor.submit() for non-blocking dispatch.
  • When using ProcessPoolExecutor, ensure top-level code is guarded with if __name__ == "__main__": on Windows.

Best Practices

  • Prefer composition over inheritance when it improves clarity.
  • Keep Singletons simple — they’re about global access, not global mutable state.
  • Validate inputs at the boundary — factories are a great place to enforce schemas (pydantic/marshmallow).
  • Use weak references or explicit unsubscribe to prevent memory leaks in Observer implementations.
  • When scaling across processes, use external systems (Redis, message brokers like RabbitMQ/Celery) instead of trying to share in-memory singletons.
  • Document your plugin registration process clearly (e.g., module import order, entry points).

Common Pitfalls

  • Assuming a Singleton is global across processes — it's per-process in Python.
  • Not handling subscriber exceptions — one bad handler can disrupt publishing.
  • Forgetting thread-safety — race conditions during singleton initialization or concurrent registry updates.
  • Overusing patterns — don't force a pattern where a simple function would be clearer.

Advanced Tips

  • Combine patterns: use a Factory to instantiate different Observer implementations (in-memory vs distributed). E.g., a factory can create either a local EventBus or a Redis-backed event publisher depending on environment/config.
  • Use asyncio for event dispatching in IO-bound scenarios instead of threads or processes.
  • For plugin systems, combine setuptools entry points and a Factory registry to auto-discover plugins.
  • For observability, integrate pattern components with structured logging and metrics (e.g., counters for published events, timing for handler duration).
Example: Factory selects an event bus implementation based on configuration:
# event_factory.py
class LocalEventBusFactory:
    @staticmethod
    def create():
        return EventBus()  # in-memory

class RedisEventBusFactory: @staticmethod def create(): # returns a hypothetical Redis-backed event bus return RedisEventBus(redis_client)

usage

if config.use_redis: bus = RedisEventBusFactory.create() else: bus = LocalEventBusFactory.create()

Conclusion

Design patterns like Singleton, Factory, and Observer are powerful tools in your Python toolbox. They clarify intent and help structure real-world systems: global-like access to shared resources, configurable object creation, and decoupled event handling.

Key takeaways:

  • Use Singleton carefully — it's per-process and requires thread-safety.
  • Use Factory for validation and decoupling creation logic; leverage pydantic or marshmallow to validate input.
  • Use Observer for decoupled event systems; manage subscriber lifecycle with weakrefs and consider async/worker dispatch for heavy handlers.
  • For cross-process coordination and task automation, integrate message brokers like Redis and frameworks like Celery rather than trying to share in-memory singletons.
Try these examples in your projects:
  • Convert a configuration loader into a Singleton and observe thread safety.
  • Build a TaskFactory that uses pydantic to validate incoming job payloads.
  • Implement an EventBus and add subscribers for logging, metrics, and alerts.
If you enjoyed this tutorial, try extending it:
  • Build a small Celery + Redis automation framework where tasks are created via a Factory and task lifecycle events are published via Observer to a monitoring service.
  • Replace synchronous handlers with a multiprocessing or Celery-based worker to optimize CPU-bound workloads.

Further Reading and References

Bold steps: pick one example above, adapt it to your codebase, and run the tests. Share your experience or questions — I'd love to help you apply these patterns to a concrete project.

Was this article helpful?

Your feedback helps us improve our content. Thank you!

Stay Updated with Python Tips

Get weekly Python tutorials and best practices delivered to your inbox

We respect your privacy. Unsubscribe at any time.

Related Posts

Mastering Python Data Analysis with pandas: A Practical Guide for Intermediate Developers

Dive into practical, production-ready data analysis with pandas. This guide covers core concepts, real-world examples, performance tips, and integrations with Python REST APIs, machine learning, and pytest to help you build reliable, scalable analytics workflows.

Unlocking Python's Pattern Matching: Essential Use Cases and Implementation Strategies for Intermediate Developers

Dive into Python's powerful structural pattern matching feature introduced in version 3.10, and discover how it can simplify your code for tasks like data parsing and conditional logic. This comprehensive guide breaks down key concepts with practical examples, helping intermediate Python learners implement efficient strategies while avoiding common pitfalls. Whether you're building web apps or modular systems, you'll gain insights to elevate your programming skills and boost code readability.

Implementing Event-Driven Architecture in Python: Patterns, Practices, and Best Practices for Scalable Applications

Dive into the world of event-driven architecture (EDA) with Python and discover how to build responsive, scalable applications that react to changes in real-time. This comprehensive guide breaks down key patterns like publish-subscribe, provides hands-on code examples, and integrates best practices for code organization, function manipulation, and data structures to elevate your Python skills. Whether you're handling microservices or real-time data processing, you'll learn to implement EDA effectively, making your code more maintainable and efficient.