Initial commit from monorepo
This commit is contained in:
70
billing_app/auth.py
Normal file
70
billing_app/auth.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import logging
|
||||
from typing import Iterable, Optional
|
||||
|
||||
import jwt
|
||||
from django.conf import settings
|
||||
from jwt import InvalidTokenError, PyJWKClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LogtoTokenValidator:
|
||||
"""Validate JWTs issued by Logto using the published JWKS."""
|
||||
|
||||
def __init__(self, jwks_url: str, issuer: str):
|
||||
self._issuer = issuer
|
||||
self._jwks_client = PyJWKClient(jwks_url)
|
||||
|
||||
def decode(
|
||||
self,
|
||||
token: str,
|
||||
audience: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Decode and verify a JWT, enforcing issuer and optional audience."""
|
||||
try:
|
||||
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
|
||||
header_alg = jwt.get_unverified_header(token).get("alg")
|
||||
|
||||
return jwt.decode(
|
||||
token,
|
||||
signing_key.key,
|
||||
algorithms=[header_alg] if header_alg else None,
|
||||
issuer=self._issuer,
|
||||
audience=audience,
|
||||
options={"verify_aud": audience is not None},
|
||||
)
|
||||
except InvalidTokenError as exc:
|
||||
logger.warning("Failed to validate Logto token: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def get_bearer_token(request) -> str:
|
||||
"""Extract Bearer token from Authorization header."""
|
||||
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
|
||||
if not auth_header.startswith("Bearer "):
|
||||
raise InvalidTokenError("Missing Bearer token")
|
||||
|
||||
token = auth_header.split(" ", 1)[1]
|
||||
if not token or token == "undefined":
|
||||
raise InvalidTokenError("Empty Bearer token")
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def scopes_from_payload(payload: dict) -> list[str]:
|
||||
"""Split scope string (if present) into a list."""
|
||||
scope_value = payload.get("scope")
|
||||
if not scope_value:
|
||||
return []
|
||||
if isinstance(scope_value, str):
|
||||
return scope_value.split()
|
||||
if isinstance(scope_value, Iterable):
|
||||
return list(scope_value)
|
||||
return []
|
||||
|
||||
|
||||
validator = LogtoTokenValidator(
|
||||
getattr(settings, "LOGTO_JWKS_URL", "https://auth.optovia.ru/oidc/jwks"),
|
||||
getattr(settings, "LOGTO_ISSUER", "https://auth.optovia.ru/oidc"),
|
||||
)
|
||||
Reference in New Issue
Block a user