import logging from typing import Iterable, Optional import jwt from django.conf import settings from jwt import InvalidTokenError, PyJWKClient logger = logging.getLogger(__name__) class LogtoTokenValidator: """Validate JWTs issued by Logto using the published JWKS.""" def __init__(self, jwks_url: str, issuer: str): self._issuer = issuer self._jwks_client = PyJWKClient(jwks_url) def decode( self, token: str, audience: Optional[str] = None, ) -> dict: """Decode and verify a JWT, enforcing issuer and optional audience.""" try: signing_key = self._jwks_client.get_signing_key_from_jwt(token) header_alg = jwt.get_unverified_header(token).get("alg") return jwt.decode( token, signing_key.key, algorithms=[header_alg] if header_alg else None, issuer=self._issuer, audience=audience, options={"verify_aud": audience is not None}, ) except InvalidTokenError as exc: logger.warning("Failed to validate Logto token: %s", exc) raise def get_bearer_token(request) -> str: """Extract Bearer token from Authorization header.""" auth_header = request.META.get("HTTP_AUTHORIZATION", "") if not auth_header.startswith("Bearer "): raise InvalidTokenError("Missing Bearer token") token = auth_header.split(" ", 1)[1] if not token or token == "undefined": raise InvalidTokenError("Empty Bearer token") return token def scopes_from_payload(payload: dict) -> list[str]: """Split scope string (if present) into a list.""" scope_value = payload.get("scope") if not scope_value: return [] if isinstance(scope_value, str): return scope_value.split() if isinstance(scope_value, Iterable): return list(scope_value) return [] validator = LogtoTokenValidator( getattr(settings, "LOGTO_JWKS_URL", "https://auth.optovia.ru/oidc/jwks"), getattr(settings, "LOGTO_ISSUER", "https://auth.optovia.ru/oidc"), )