74 lines
2.1 KiB
Python
74 lines
2.1 KiB
Python
"""
|
|
JWT authentication utilities for KYC API.
|
|
"""
|
|
import logging
|
|
from typing import Iterable, Optional
|
|
|
|
import jwt
|
|
from django.conf import settings
|
|
from jwt import InvalidTokenError, PyJWKClient
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LogtoTokenValidator:
|
|
"""Validate JWTs issued by Logto using the published JWKS."""
|
|
|
|
def __init__(self, jwks_url: str, issuer: str):
|
|
self._issuer = issuer
|
|
self._jwks_client = PyJWKClient(jwks_url)
|
|
|
|
def decode(
|
|
self,
|
|
token: str,
|
|
audience: Optional[str] = None,
|
|
) -> dict:
|
|
"""Decode and verify a JWT, enforcing issuer and optional audience."""
|
|
try:
|
|
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
|
|
header_alg = jwt.get_unverified_header(token).get("alg")
|
|
|
|
return jwt.decode(
|
|
token,
|
|
signing_key.key,
|
|
algorithms=[header_alg] if header_alg else None,
|
|
issuer=self._issuer,
|
|
audience=audience,
|
|
options={"verify_aud": audience is not None},
|
|
)
|
|
except InvalidTokenError as exc:
|
|
logger.warning("Failed to validate Logto token: %s", exc)
|
|
raise
|
|
|
|
|
|
def get_bearer_token(request) -> str:
|
|
"""Extract Bearer token from Authorization header."""
|
|
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
raise InvalidTokenError("Missing Bearer token")
|
|
|
|
token = auth_header.split(" ", 1)[1]
|
|
if not token or token == "undefined":
|
|
raise InvalidTokenError("Empty Bearer token")
|
|
|
|
return token
|
|
|
|
|
|
def scopes_from_payload(payload: dict) -> list[str]:
|
|
"""Split scope string (if present) into a list."""
|
|
scope_value = payload.get("scope")
|
|
if not scope_value:
|
|
return []
|
|
if isinstance(scope_value, str):
|
|
return scope_value.split()
|
|
if isinstance(scope_value, Iterable):
|
|
return list(scope_value)
|
|
return []
|
|
|
|
|
|
validator = LogtoTokenValidator(
|
|
getattr(settings, "LOGTO_JWKS_URL", "https://auth.optovia.ru/oidc/jwks"),
|
|
getattr(settings, "LOGTO_ISSUER", "https://auth.optovia.ru/oidc"),
|
|
)
|