- Published on
Integrating FastAPI with Structlog
- Authors
- Name
- Daniel Herrmann
The built-in Python logging module is doing a great job. If you're wondering yourself if you need an additonal logging library such as structlog or loguru - the answer might very well be "no". Before deciding to go down this route, please familiarize yourself with Python logging and read (yes, read!) the logging module tutorial. It's an excellent read!
A Structlog Primer
According to the logs section of the Twelve-Factor App (which is another highly recommended read), logs provide visibility into the behaviour of the running app. When running in local development mode, they should be easy to read for the developer. When running in production, they should be easy to parse for the logging environment. We for example use a combination of fluentbit, fluentd and OpenSearch as logging pipeline. All of this could be done with the standard logging module though. Compare these log entries:
DEBUG:__main__:User example triggered deletion of hero 18bcb9d1-9ecd-4297-a7cc-591d0ededf7a
INFO:__main__:Hero 18bcb9d1-9ecd-4297-a7cc-591d0ededf7a deleted successfully
2024-05-03T18:58:21.443989Z [debug ] Triggering deletion of hero [app.worker] request_id=97133231229d4af2a0588e468049261c hero_id=18bcb9d1-9ecd-4297-a7cc-591d0ededf7a user_id=example
2024-05-03T18:58:21.509100Z [info ] Hero deleted successfully [app.worker] request_id=97133231229d4af2a0588e468049261c hero_id=18bcb9d1-9ecd-4297-a7cc-591d0ededf7a user_id=example
The second example provides additional context for each log entry, allowing us to identify all log entries related to a particular request and see which objects are attached. This is better, and is perfectly fine for local development. For parsing in production, this is even better, as it allows for direct ingestion without the need for sophisticated and potentially error-prone parsing:
{"request_id": "33fe32d22dd44868afbf36b0ed40a104", "logger": "arq.worker", "level": "info", "message": "33fe32d22dd44868afbf36b0ed40a104:print_hero", "asctime": "21:42:51", "timestamp": "2024-05-03T19:42:51.613223Z"}
{"request_id": "21b69ca806ac4e64a9af9f91d3d35ef3", "logger": "app.worker.hero", "level": "info", "timestamp": "2024-05-03T19:42:51.617299Z", "message": "Hero 18bcb9d1-9ecd-4297-a7cc-591d0ededf7a printed"}
Adding context to log messages can be quite tedious, especially when you need to add the parameters to each logging call. This is where bound loggers come in very handy. The documentation of structlog is very good, therefore please head over to the documentation to understand the concepts. In short, it allows to do this:
import structlog
logger = structlog.get_logger()
log = logger.bind(foo="bar")
Integrating Structlog with FastAPI
With the integration of structlog into FastAPI, the goal is to:
- Be able to switch between human readable output and JSON output
- Attach one request ID to each incoming request and add it to all log messages
- Automatically attach certain request attributes (path, HTTP method, function, ...)
- Make sure the uvicorn logger behaves well with structlog
- Handle exceptions automatically and serialize them properly in the logs
- Optionally, be able to quickly bind SQLAlchemy models to the logger
Log Settings
Assuming we're using pydantic-settings
, first extend your configuration to include the neccessary variables:
# src/core/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
LOG_LEVEL: str = "INFO"
LOG_JSON_FORMAT: bool = False
LOG_NAME: str = "your_app.app_logs"
LOG_ACCESS_NAME: str = "your_app.access_logs"
Configure the Logger
Structlog uses processors to format and/or filter the logs. Some of this code is taken from this excellent Gist on Github, just modified a little and removed their Datadog integration. Also, we're going to handle exceptions in the middleware (more on the resaon for that later), so we're removing that code here as well. The goals are:
- Configure the required preprocessors and processors to make structlog output everything the way we want
- Add a formatter to the Python
logging
module to convert existing logs to the correct format - Prepare uvicorn loggers to be able to enrich them with structured information
# src/core/logger.py
import logging
import re
import sys
from typing import Any
import structlog
from structlog.types import EventDict, Processor
from .config import settings
def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict:
"""
Uvicorn logs the message a second time in the extra `color_message`, but we don't
need it. This processor drops the key from the event dict if it exists.
"""
event_dict.pop("color_message", None)
return event_dict
def setup_logging(json_logs: bool = False, log_level: str = "INFO"):
timestamper = structlog.processors.TimeStamper(fmt="iso")
shared_processors: list[Processor] = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.stdlib.ExtraAdder(),
drop_color_message_key,
timestamper,
structlog.processors.StackInfoRenderer(),
]
if json_logs:
# Format the exception only for JSON logs, as we want to pretty-print them when
# using the ConsoleRenderer
shared_processors.append(structlog.processors.format_exc_info)
structlog.configure(
processors=shared_processors
+ [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
log_renderer: structlog.types.Processor
if json_logs:
log_renderer = structlog.processors.JSONRenderer()
else:
log_renderer = structlog.dev.ConsoleRenderer()
formatter = structlog.stdlib.ProcessorFormatter(
# These run ONLY on `logging` entries that do NOT originate within
# structlog.
foreign_pre_chain=shared_processors,
# These run on ALL entries after the pre_chain is done.
processors=[
# Remove _record & _from_structlog.
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
log_renderer,
],
)
# Reconfigure the root logger to use our structlog formatter, effectively emitting the logs via structlog
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(log_level.upper())
for _log in ["uvicorn", "uvicorn.error"]:
# Make sure the logs are handled by the root logger
logging.getLogger(_log).handlers.clear()
logging.getLogger(_log).propagate = True
# Uvicorn logs are re-emitted with more context. We effectively silence them here
logging.getLogger("uvicorn.access").handlers.clear()
logging.getLogger("uvicorn.access").propagate = False
Important notes:
structlog.processors.format_exc_info
is used to consistently parse exception information and stack traces. For this to work, the exception must be passed asexc_info
attribute with the log.- Native uvicorn logs are suppressed and re-emitted in the middleware to use structlog and enrich it with proper formatting
- The
json_logs
argument is used to determine whether theConsoleRenderer
should be used (for local development) or theJSONRenderer
should be used for production environments
Thin logging wrapper
With native structlog, there are two ways to attach contextual information to structlog:
Bound Logger
See the structlog documentation on bound logger for more details.
import structlog
log1 = structlog.get_logger()
log1.debug("This is a message without context")
log2 = logger.bind(foo="bar")
log2.debug("This is a message with context")
log1 == log2 # Will be false
There are two things I don't like too much about this:
- Binding a value returns a new logger instance which then needs to be used
- The values added to the logger are not context aware, which means binding them means they're bound for all requests
Context Variables
Built on the standard library contextvars
module, this integration allows us to bind values to the logger locally to a certain context. This fits very well into FastAPI, as contextvars will be locally relevant for each request. There is a nice writeup by the FastAPI creator on Github on this topic.
import structlog
structlog.contextvars.clear_contextvars()
log = structlog.get_logger()
log.debug("This is a message without context")
structlog.contextvars.bind_contextvars(a=1, b=2)
log2.debug("This is a message with context")
This looks much better already. At the beginning of the request, we can clear any existing contextvars, assign a unique request ID and other important information and therefore make sure that every log emitted within the request context will contain this information.
Writing this every time still seems a bit verbose though. Also, one common usecase we found very often is to assign SQLAlchemy objects to the request (rather, their ID). Our goal was to simplify this as much as possible:
# instead of doing this every time
log.bind(user_id=user_object.id)
# we'd want to this and have the logger figure it out behind the scenes
log.bind(user_object)
Therefore, we usually implement a thin wrapper, which translates the easy to use bind
and unbind
syntax to the _contextvars
methods and handles SQLAlchemy models for us. Models will be translated to snake case, so adding a model called MyNiceModel
would result in a key my_nice_model
and a value containing the id
of the model. If you use a different field as primary key, you may need to change this part for your case.
import structlog
class FastAPIStructLogger:
def __init__(self, log_name=settings.LOG_NAME):
self.logger = structlog.stdlib.get_logger(log_name)
@staticmethod
def _to_snake_case(name):
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
def bind(self, *args, **new_values: Any):
for arg in args:
if not issubclass(type(arg), Base):
self.logger.error(
"Unsupported argument when trying to log."
f"Unnamed argument must be a subclass of Base. Invalid argument: {type(arg).__name__}"
)
continue
key = self._to_snake_case(type(arg).__name__)
structlog.contextvars.bind_contextvars(**{key: arg.id})
structlog.contextvars.bind_contextvars(**new_values)
@staticmethod
def unbind(*keys: str):
structlog.contextvars.unbind_contextvars(*keys)
def debug(self, event: str | None = None, *args: Any, **kw: Any):
self.logger.debug(event, *args, **kw)
def info(self, event: str | None = None, *args: Any, **kw: Any):
self.logger.info(event, *args, **kw)
def warning(self, event: str | None = None, *args: Any, **kw: Any):
self.logger.warning(event, *args, **kw)
warn = warning
def error(self, event: str | None = None, *args: Any, **kw: Any):
self.logger.error(event, *args, **kw)
def critical(self, event: str | None = None, *args: Any, **kw: Any):
self.logger.critical(event, *args, **kw)
def exception(self, event: str | None = None, *args: Any, **kw: Any):
self.logger.exception(event, *args, **kw)
Logging Middleware
Most of the actual magic is still to come, its mainly implemented in the following middleware. Its job is to automatically adjust the contextvars to include request parameters, implement structured uvicorn logging and handle exceptions gracefully.
First, we need a way to uniquely identify requests. For this we're going to use the asgi-correlation-id
package, which provides a ready to use middleware for ASGI based web servers such as uvicorn. The default behaviour is to watch for the presence of the X-Request-ID
header. If present (e.g. if your frontend generates a unique request id) it will use this value, if not it will auto-generate a random value and add it to the HTTP response. This allows for scenarios where you can display the request or correlation ID to the user in case of an error, and its easy to find related log entries.
The logging middleware itself then simply clears the contextvars on the beginning of each request, adds the detected correlation ID to the bound logger and then adds additional details such as the HTTP path, the HTTP method and so on to the log as well. You can of course customize this to your needs.
We also want to automatically handle exceptions that are raised from within the code. FastAPI does provide build-in exception hooks, which are quite flexible and powerful. These exception hooks are however not used when the exception is raised (a) in a background task or (b) in a middleware. As we would need to re-raise the exceptions in our logging middleware, we cannot rely on the hooks but instead handle exceptions directly in the middleware as well. There is an extremely nice writeup and recommended read about this topic.
import time
from typing import TypedDict
import structlog
from asgi_correlation_id import correlation_id
from starlette.responses import JSONResponse
from starlette.types import ASGIApp, Receive, Scope, Send
from uvicorn.protocols.utils import get_path_with_query_string
from backend.core.config import settings
app_logger = structlog.stdlib.get_logger(settings.LOG_NAME)
access_logger = structlog.stdlib.get_logger(settings.LOG_ACCESS_NAME)
class AccessInfo(TypedDict, total=False):
status_code: int
start_time: float
class StructLogMiddleware:
def __init__(self, app: ASGIApp):
self.app = app
pass
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# If the request is not an HTTP request, we don't need to do anything special
if scope["type"] != "http":
await self.app(scope, receive, send)
return
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(request_id=correlation_id.get())
info = AccessInfo()
# Inner send function
async def inner_send(message):
if message["type"] == "http.response.start":
info["status_code"] = message["status"]
await send(message)
try:
info["start_time"] = time.perf_counter_ns()
await self.app(scope, receive, inner_send)
except Exception as e:
app_logger.exception(
"An unhandled exception was caught by last resort middleware",
exception_class=e.__class__.__name__,
exc_info=e,
stack_info=settings.LOG_INCLUDE_STACK,
)
info["status_code"] = 500
response = JSONResponse(
status_code=500,
content={
"error": "Internal Server Error",
"message": "An unexpected error occurred.",
},
)
await response(scope, receive, send)
finally:
process_time = time.perf_counter_ns() - info["start_time"]
client_host, client_port = scope["client"]
http_method = scope["method"]
http_version = scope["http_version"]
url = get_path_with_query_string(scope)
# Recreate the Uvicorn access log format, but add all parameters as structured information
access_logger.info(
f"""{client_host}:{client_port} - "{http_method} {scope["path"]} HTTP/{http_version}" {info["status_code"]}""",
http={
"url": str(url),
"status_code": info["status_code"],
"method": http_method,
"request_id": correlation_id.get(),
"version": http_version,
},
network={"client": {"ip": client_host, "port": client_port}},
duration=process_time,
)
Note that this implementation uses two different loggers:
- Application logs (name configured using
settings.LOG_NAME
): this is mainly used for your own logs. It will automatically include the additional fields listed above - Uvicorn access logs (name configured using
settings.LOG_ACCESS_NAME
): this contains the uvicorn access logs, re-emitted using structured information
Bring it all together
Finally, the FastAPI app itself needs to be configured accordingly.
from asgi_correlation_id import CorrelationIdMiddleware
from fastapi import FastAPI
from .middleware import (
StructLogMiddleware,
XForwardedMiddleware,
)
# Do your lifespan stuff here such as opening DB connections, ...
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
pass
app = FastAPI(lifespan=lifespan)
application.add_middleware(StructLogMiddleware)
application.add_middleware(CorrelationIdMiddleware)
Keep in mind that the order of middlewares is important. They're applied in an "onion" model, where each added middleware gets wrapped around the application including potentially added middlewars. In the particular example above, the resulting stack would look like this to make sure the request ID is generated first.
CorrelationIdMiddleware PRE-Code
StructLogMiddleware PRE-Code
FastAPI App
StructLogMiddleware POST-Code
CorrelationIdMiddleware POST-Code
Use the new logging solution
With all this work done, we can finally go ahead and use our new logging ability. This is obviously just a stupid example, but it should show the general use.
import uuid
from fastapi import APIRouter, HTTPException
from fastapi_async_sqlalchemy import db
from app import models, schemas
from app.core import FastAPIStructLogger
from app.crud import crud_hero
log = FastAPIStructLogger()
router = APIRouter(prefix="/hero", tags=["heroes"])
# Demonstrate logger usage
@router.get("/{hero_id}/ability", response_model=schemas.HeroSchema)
async def get_hero_ability(hero_id: uuid.UUID):
# Bind requested log ID
log.bind(requested_hero_id=hero_id)
hero = await crud_hero.get(db.session, hero_id)
if not hero:
log.warning("Hero not found")
raise HTTPException(status_code=404, detail="Hero not found")
await db.session.refresh(hero, attribute_names=["ability"])
log.info("Successfully fetched hero ability")
return hero.ability
In local development, the output is like this:
# Successful request
2024-05-04T12:39:38.923009Z [info ] Successfully fetched hero ability [mksp.backend.app] request_id=1620de462c854bbf9164404d05997d70 requested_hero_id=UUID('18bcb9d1-9ecd-4297-a7cc-591d0ededf7a')
2024-05-04T12:39:38.924908Z [info ] 127.0.0.1:54518 - "GET /api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0ededf7a/ability HTTP/1.1" 200 [mksp.backend.access] duration=82099875 http={'url': '/api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0ededf7a/ability', 'status_code': 200, 'method': 'GET', 'request_id': '1620de462c854bbf9164404d05997d70', 'version': '1.1'} network={'client': {'ip': '127.0.0.1', 'port': 54518}} request_id=1620de462c854bbf9164404d05997d70
# Non-existent request
2024-05-04T12:40:11.174939Z [warning ] Hero not found [mksp.backend.app] request_id=1c2ce023eda3490eb92fc4153823e636 requested_hero_id=UUID('e4e242a9-ed29-4adf-b15b-403d25e98e0b')
2024-05-04T12:40:11.177626Z [info ] 127.0.0.1:54529 - "GET /api/v1/hero/e4e242a9-ed29-4adf-b15b-403d25e98e0b/ability HTTP/1.1" 404 [mksp.backend.access] duration=9780292 http={'url': '/api/v1/hero/e4e242a9-ed29-4adf-b15b-403d25e98e0b/ability', 'status_code': 404, 'method': 'GET', 'request_id': '1c2ce023eda3490eb92fc4153823e636', 'version': '1.1'} network={'client': {'ip': '127.0.0.1', 'port': 54529}} request_id=1c2ce023eda3490eb92fc4153823e636
Or, if we enable JSON output by setting the LOG_JSON_FORMAT
environment to true
:
{"event": "Hero not found", "request_id": "6fb1e9a3b27541ffbdd3e95e0376e514", "requested_hero_id": "UUID('18bcb9d1-9ecd-4297-a7cc-591d0edadf7a')", "logger": "mksp.backend.app", "level": "warning", "timestamp": "2024-05-04T12:41:56.312632Z"}
{"http": {"url": "/api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0edadf7a/ability", "status_code": 404, "method": "GET", "request_id": "6fb1e9a3b27541ffbdd3e95e0376e514", "version": "1.1"}, "network": {"client": {"ip": "127.0.0.1", "port": 54761}}, "duration": 105338709, "event": "127.0.0.1:54761 - \"GET /api/v1/hero/18bcb9d1-9ecd-4297-a7cc-591d0edadf7a/ability HTTP/1.1\" 404", "request_id": "6fb1e9a3b27541ffbdd3e95e0376e514", "logger": "mksp.backend.access", "level": "info", "timestamp": "2024-05-04T12:41:56.314768Z"}
This can easily be parsed by whatever log ingestion pipeline, including all contextual information.