Files
orders/orders_app/auth.py
2026-01-07 09:16:11 +07:00

67 lines
2.1 KiB
Python

import logging
from typing import Iterable, Optional
import jwt
from django.conf import settings
from jwt import InvalidTokenError, PyJWKClient
logger = logging.getLogger(__name__)
class LogtoTokenValidator:
"""Validate JWTs issued by Logto using the published JWKS."""
def __init__(self, jwks_url: str, issuer: str):
self._issuer = issuer
self._jwks_client = PyJWKClient(jwks_url)
def decode(self, token: str, audience: Optional[str] = None) -> dict:
"""Decode and verify a JWT, enforcing issuer and optional audience."""
try:
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
header_alg = jwt.get_unverified_header(token).get("alg")
return jwt.decode(
token,
signing_key.key,
algorithms=[header_alg] if header_alg else None,
issuer=self._issuer,
audience=audience,
options={"verify_aud": audience is not None},
)
except InvalidTokenError as exc:
logger.warning("Failed to validate Logto token: %s", exc)
raise
def get_bearer_token(request) -> str:
"""Extract Bearer token from Authorization header."""
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
if not auth_header.startswith("Bearer "):
raise InvalidTokenError("Missing Bearer token")
token = auth_header.split(" ", 1)[1]
if not token or token == "undefined":
raise InvalidTokenError("Empty Bearer token")
return token
def scopes_from_payload(payload: dict) -> list[str]:
"""Split scope string (if present) into a list."""
scope_value = payload.get("scope")
if not scope_value:
return []
if isinstance(scope_value, str):
return scope_value.split()
if isinstance(scope_value, Iterable):
return list(scope_value)
return []
validator = LogtoTokenValidator(
getattr(settings, "LOGTO_JWKS_URL", "https://auth.optovia.ru/oidc/jwks"),
getattr(settings, "LOGTO_ISSUER", "https://auth.optovia.ru/oidc"),
)