64 lines
2.2 KiB
Python
64 lines
2.2 KiB
Python
import json
|
|
import logging
|
|
|
|
from django.conf import settings
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
from jwt import InvalidTokenError
|
|
|
|
from .auth import get_bearer_token, scopes_from_payload, validator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class LogtoJWTMiddleware(MiddlewareMixin):
|
|
"""
|
|
JWT middleware для проверки токенов от Logto
|
|
"""
|
|
|
|
def __init__(self, get_response=None):
|
|
super().__init__(get_response)
|
|
|
|
# Audience validated only for non-introspection API calls
|
|
self.audience = getattr(settings, "LOGTO_ORDERS_AUDIENCE", None)
|
|
|
|
def _is_introspection_query(self, request):
|
|
"""Проверяет, является ли запрос introspection (для GraphQL codegen)"""
|
|
if request.method != 'POST':
|
|
return False
|
|
try:
|
|
body = json.loads(request.body.decode('utf-8'))
|
|
query = body.get('query', '')
|
|
return '__schema' in query or '__type' in query
|
|
except Exception:
|
|
return False
|
|
|
|
def process_request(self, request):
|
|
"""Обрабатывает Team Access Token для Orders API"""
|
|
|
|
# Пропускаем проверку для admin панели и статики
|
|
if request.path.startswith('/admin/') or request.path.startswith('/static/'):
|
|
return None
|
|
|
|
# Пропускаем introspection запросы (для GraphQL codegen)
|
|
if self._is_introspection_query(request):
|
|
return None
|
|
|
|
try:
|
|
token = get_bearer_token(request)
|
|
payload = validator.decode(token, audience=self.audience)
|
|
|
|
# Проверяем scope
|
|
scopes = scopes_from_payload(payload)
|
|
if 'teams:member' not in scopes:
|
|
return None
|
|
|
|
# Извлекаем данные пользователя и команды
|
|
request.user_id = payload.get('sub')
|
|
request.team_uuid = payload.get('team_uuid')
|
|
request.scopes = scopes
|
|
|
|
except InvalidTokenError:
|
|
# Любая ошибка = нет доступа (тихо)
|
|
return None
|
|
|
|
return None
|