
Implementing Zero-Dependency Python Microservices: A Step-by-Step Guide
Build fast, maintainable microservices using only Python's standard library. This guide walks you through core concepts, concurrency options, practical zero-dependency examples, and scaling strategies — with clear code, line-by-line explanations, and best practices for production-readiness. Try the examples, adapt them, and learn when to bring in external tools like Dask or multiprocessing for heavier workloads.
Introduction
Microservices are small, focused services that communicate over the network. Many tutorials recommend frameworks (Flask, FastAPI) and external servers (gunicorn), but what if you want a zero-dependency microservice — a service written using only the Python standard library?
Why would you do that?
- Minimal footprint (no pip installs, useful in constrained environments).
- Full control over behavior and dependencies.
- Great for learning core networking and concurrency concepts.
- The basics and prerequisites for zero-dependency microservices.
- Multiple implementation patterns using only Python stdlib.
- How to handle concurrency (threading, multiprocessing, asyncio).
- Practical, production-minded tips: logging, error handling, timeouts.
- When to reach for tools like Dask (large datasets), rate-limiting strategies for web scraping, and CPU-bound multiprocessing.
Prerequisites
Make sure you have:
- Python 3.8+ (examples use features available in 3.8+; where applicable, I’ll note compatibility).
- Basic familiarity with HTTP, JSON, and Python concurrency (threads/processes/asyncio).
- A terminal and ability to run Python scripts locally.
- HTTP server: https://docs.python.org/3/library/http.server.html
- WSGI: https://docs.python.org/3/library/wsgiref.html
- Concurrency: https://docs.python.org/3/library/concurrency.html
- multiprocessing: https://docs.python.org/3/library/multiprocessing.html
- asyncio: https://docs.python.org/3/library/asyncio.html
Core Concepts
Before jumping into code, let's break the topic down.
Key concepts:
- Zero-dependency: Only Python standard library modules (http.server, socketserver, wsgiref, json, threading, concurrent.futures, multiprocessing, asyncio, etc.).
- Routing: Map HTTP paths and methods to handlers.
- Concurrency: Serve multiple requests concurrently using threads, processes, or async.
- Serialization: JSON input/output via built-in json module.
- Robustness: Error handling, timeouts, logging, graceful shutdowns.
- Standard library servers aren't optimized for high-load production. Expect to implement additional logic for timeouts, retries, and monitoring.
- Portability considerations (e.g., SO_REUSEPORT may not be available on all OSes).
- For very large datasets or complex scheduling, external libraries like Dask may be more appropriate.
- Use Dask for distributed processing and handling very large datasets efficiently ("Practical Strategies for Handling Large Datasets in Python Using Dask").
- Use specialized libraries when complex routing, validation, or async frameworks would save time.
- Use external process managers (systemd, Docker orchestration) for production deployments.
Implementation Patterns (Overview)
We'll cover three patterns:
- Simple WSGI microservice (zero-dep, easy to test).
- Threaded HTTP server with routing and JSON handling.
- Scalable zero-dep pattern: combine a master socket with multiple worker processes using multiprocessing / concurrent.futures (and fallback approaches).
- Offload CPU-bound tasks using Python's multiprocessing (see "A Developer's Guide to Using Python's multiprocessing for CPU-bound Tasks").
- Implement a simple rate-limiting/backoff strategy (useful for microservices involved in web scraping — see "Optimizing Web Scraping with Python: Techniques to Bypass Rate Limiting").
Example 1 — Minimal WSGI Microservice (Zero-dep)
This is a small WSGI app using only wsgiref; good for understanding request/response flow.
# simple_wsgi.py
from wsgiref.simple_server import make_server
import json
from urllib.parse import parse_qs
def application(environ, start_response):
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
# Basic routing
if path == '/':
status = '200 OK'
body = {'message': 'Hello from zero-dep WSGI microservice'}
elif path == '/echo' and method == 'GET':
qs = parse_qs(environ.get('QUERY_STRING', ''))
body = {'echo': qs}
status = '200 OK'
else:
status = '404 Not Found'
body = {'error': 'Not found'}
payload = json.dumps(body).encode('utf-8')
headers = [('Content-Type', 'application/json'), ('Content-Length', str(len(payload)))]
start_response(status, headers)
return [payload]
if __name__ == '__main__':
with make_server('', 8000, application) as httpd:
print("Serving on port 8000...")
httpd.serve_forever()
Line-by-line explanation:
- import modules: use wsgiref, json, urllib.parse.
- application(environ, start_response): WSGI callable.
environcontains request info. - Get path and method. Basic routing with if/elif.
- Build JSON body and status. Encode payload and set headers.
- start_response(status, headers) sends HTTP status and headers to server.
- Return an iterable of bytes (here a single bytes item).
- make_server creates a simple WSGI server on port 8000 and runs forever.
- GET / -> JSON greeting.
- GET /echo?x=1 -> JSON echo of query string.
- wsgiref is single-threaded. It's good for development and demonstration, not heavy production.
- WSGI illustrates HTTP handling at a lower-level than frameworks and is supported in stdlib.
Example 2 — Threaded HTTP Server with JSON Routing
A practical microservice that handles JSON POSTs and uses threads for concurrency.
# threaded_server.py
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
import json
import traceback
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True # ensure threads exit on server shutdown
class SimpleHandler(BaseHTTPRequestHandler):
def _set_json(self, status=200):
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.end_headers()
def do_GET(self):
if self.path == '/health':
self._set_json(200)
self.wfile.write(json.dumps({'status': 'ok'}).encode())
else:
self._set_json(404)
self.wfile.write(json.dumps({'error': 'not found'}).encode())
def do_POST(self):
if self.path != '/process':
self._set_json(404)
self.wfile.write(json.dumps({'error': 'not found'}).encode())
return
content_length = int(self.headers.get('Content-Length', 0))
try:
raw = self.rfile.read(content_length)
payload = json.loads(raw) if raw else {}
except Exception:
self._set_json(400)
self.wfile.write(json.dumps({'error': 'invalid json'}).encode())
return
# Simple processing: reverse a string field
text = payload.get('text', '')
if not isinstance(text, str):
self._set_json(422)
self.wfile.write(json.dumps({'error': 'text must be a string'}).encode())
return
result = {'original': text, 'reversed': text[::-1]}
self._set_json(200)
self.wfile.write(json.dumps(result).encode())
def log_message(self, format, args):
# Override to reduce noisy stdout; integrate with logging in real apps
pass
if __name__ == '__main__':
server = ThreadingHTTPServer(('0.0.0.0', 8080), SimpleHandler)
print("Threaded server listening on 8080...")
try:
server.serve_forever()
except KeyboardInterrupt:
print("Shutting down...")
server.shutdown()
Explanation:
- ThreadingMixIn + HTTPServer gives a threaded server: each request handled in a new thread.
- _set_json helper sets headers to JSON.
- do_GET: simple health check.
- do_POST: accepts JSON body, reads Content-Length, parses JSON, does validation, returns processed response.
- log_message overridden to silence default logs (you can integrate Python's logging module instead).
- daemon_threads True ensures worker threads won't prevent process exit.
- Validates Content-Length; if missing, handles gracefully by defaulting to 0.
- Catches JSON parsing errors and returns 400.
- Returns 422 for semantic validation errors.
- Thread-per-request is simple but can be heavy under many concurrent connections (due to memory and context-switch overhead). For heavy workloads, consider process pools or asyncio.
Example 3 — Offloading CPU-bound Work with multiprocessing
Use concurrent.futures.ProcessPoolExecutor to offload expensive CPU tasks to worker processes, keeping the HTTP server responsive.
# server_with_cpu_pool.py
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from concurrent.futures import ProcessPoolExecutor
import json
import math
executor = ProcessPoolExecutor(max_workers=4)
def expensive_fib(n):
# CPU-bound naive fibonacci (for demo only)
if n < 2:
return n
return expensive_fib(n-1) + expensive_fib(n-2)
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
class Handler(BaseHTTPRequestHandler):
def _send_json(self, obj, status=200):
data = json.dumps(obj).encode()
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
def do_GET(self):
if self.path.startswith('/fib/'):
try:
n = int(self.path.split('/')[-1])
if n < 0 or n > 35: # simple guard
raise ValueError
except Exception:
self._send_json({'error': 'n must be an integer between 0 and 35'}, 400)
return
# Submit to process pool and wait for result
future = executor.submit(expensive_fib, n)
result = future.result() # blocking in this thread; thread-per-request keeps main loop responsive
self._send_json({'n': n, 'fib': result})
else:
self._send_json({'error': 'not found'}, 404)
if __name__ == '__main__':
server = ThreadingHTTPServer(('0.0.0.0', 8090), Handler)
print("Server on 8090 for CPU tasks")
try:
server.serve_forever()
except KeyboardInterrupt:
executor.shutdown(wait=False)
server.shutdown()
Explanation:
- ProcessPoolExecutor forks worker processes to compute CPU-bound tasks.
- HTTP server remains responsive because the expensive computations run in separate processes.
- NOTE: naive recursive fib is intentionally expensive to demonstrate offloading; use better algorithms in production.
- The example restricts n <= 35 to avoid long computations. Adjust max_workers based on CPU count: multiprocessing.cpu_count().
- See official docs for concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html
Scaling Strategies (Zero-dep)
If you need more concurrency than threads or want process-based scaling without external tools:
Option A: multiple processes with SO_REUSEPORT
- On Linux (kernel 3.9+), set socket option SO_REUSEPORT to allow multiple processes to bind the same address:port, letting kernel distribute incoming connections.
- This is more advanced and platform-dependent.
- Bind the listening socket in a parent process and pass the socket fd to child processes (os.dup) before exec or via multiprocessing. Workers accept connections on the same socket.
- This requires careful coding but allows zero-dependency horizontal scaling.
- Launch N instances of your script on different ports and use an external reverse proxy (even something simple in Python) to load-balance. This is still zero-dep if done in Python but adds complexity.
Rate Limiting and Web Scraping Considerations
If your microservice performs web scraping, you must handle rate-limiting and politeness. Here are zero-dep techniques:
- Respect robots.txt (you can fetch and parse it manually or with urllib; parsing logic can be implemented with simple string checks).
- Implement exponential backoff with jitter:
- Use per-host rate-limiting using in-memory token-bucket:
Example token-bucket sketch (zero-dep):
import time
import threading
from collections import defaultdict
class RateLimiter:
def __init__(self, interval=1.0):
self.interval = interval
self.lock = threading.Lock()
self.last = defaultdict(lambda: 0.0)
def wait(self, host):
with self.lock:
elapsed = time.time() - self.last[host]
if elapsed < self.interval:
to_sleep = self.interval - elapsed
time.sleep(to_sleep)
self.last[host] = time.time()
This simple class ensures at most one request per interval seconds to a host.
Practical note:
- When scraping at scale, consider specialized strategies and tools. If you need distributed scraping and dataset processing, consider integrating Dask later ("Practical Strategies for Handling Large Datasets in Python Using Dask").
Best Practices
- Use structured logging (Python logging module) rather than print statements.
- Validate and sanitize inputs; return clear HTTP error codes (400, 422, 500).
- Add timeouts to network operations (socket timeouts, urllib.request with timeout).
- Graceful shutdown: handle signals (SIGINT, SIGTERM) and close resources.
- Limit request body sizes and safely parse JSON to avoid OOM attacks.
- Monitor and instrument: expose /metrics or /health endpoints.
- Avoid executing user-provided code.
- Use TLS in production (standard library ssl module can wrap sockets).
- Sanitize headers, avoid header injection when reflecting inputs.
Common Pitfalls
- Using blocking disk or CPU operations in the main thread — offload to worker threads/processes.
- Relying on wsgiref/simple_server for heavy production loads.
- Not setting Content-Length headers or not handling chunked transfer appropriately.
- Ignoring platform differences (e.g., SO_REUSEPORT availability).
- Not handling JSON decoding errors or malformed requests.
Advanced Tips
- Use asyncio (standard lib) for many concurrent I/O-bound tasks if you can write async code. Example: asyncio.start_server for custom protocol servers.
- Combine asyncio for I/O bound parts and ProcessPoolExecutor (loop.run_in_executor) for CPU-bound parts.
- If dataset sizes outgrow memory, consider streaming responses (yielding chunks) instead of building giant JSON strings in memory.
- If you begin processing very large datasets or need distributed computing, bring in Dask. It plays nicely with Python and can orchestrate parallel workloads that are hard to implement manually.
- Complex routing, validation, authentication, or asynchronous HTTP stacks often justify bringing in frameworks (Flask, FastAPI) and servers (gunicorn, uvicorn).
Example: Graceful Shutdown and Signal Handling
# graceful.py (excerpt)
import signal
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
server = HTTPServer(('0.0.0.0', 9000), BaseHTTPRequestHandler)
def shutdown(sig, frame):
print("Signal received, shutting down...")
server.shutdown()
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
server.serve_forever()
Key idea: trap signals and call server.shutdown() to stop serving cleanly.
Conclusion
Building zero-dependency Python microservices is a fantastic way to learn underlying web primitives and write deployable services in environments where third-party packages are undesirable. Use:
- wsgiref for simple WSGI apps,
- http.server + ThreadingMixIn for thread-per-request designs,
- concurrent.futures / multiprocessing for CPU-intensive tasks,
- asyncio when scaling many I/O-bound connections.
- For large datasets and distributed workflows, use tools like Dask.
- For web scraping tasks, implement respectful rate-limiting and backoff.
- For CPU-bound tasks, process pools or multiprocessing are essential.
- Run the code locally, exercise endpoints with curl or HTTPie.
- Modify the Threaded server to add authentication or logging.
- Replace the naive fib with a memoized or iterative version and observe performance changes.
Further Reading and References
- Python http.server docs: https://docs.python.org/3/library/http.server.html
- WSGI (wsgiref) docs: https://docs.python.org/3/library/wsgiref.html
- concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html
- multiprocessing: https://docs.python.org/3/library/multiprocessing.html
- asyncio: https://docs.python.org/3/library/asyncio.html
- Dask (official): https://docs.dask.org — for handling large datasets at scale
- Rate limiting and scraping best practices: read site terms and robots.txt for each domain
Was this article helpful?
Your feedback helps us improve our content. Thank you!