""" GraphQL middleware for JWT authentication. Each class is bound to a specific GraphQL endpoint (public/user/team/m2m). """ import logging from django.conf import settings from graphql import GraphQLError from jwt import InvalidTokenError from .auth import get_bearer_token, scopes_from_payload, validator logger = logging.getLogger(__name__) def _is_introspection(info) -> bool: """Возвращает True для любых introspection резолвов.""" field = getattr(info, "field_name", "") parent = getattr(getattr(info, "parent_type", None), "name", "") return field.startswith("__") or parent.startswith("__") class PublicNoAuthMiddleware: """Public endpoint - no authentication required.""" def resolve(self, next, root, info, **kwargs): return next(root, info, **kwargs) class UserJWTMiddleware: """User endpoint - requires ID token.""" def resolve(self, next, root, info, **kwargs): request = info.context if _is_introspection(info): return next(root, info, **kwargs) # Only process auth once (check if already processed) if not hasattr(request, 'user_id'): try: token = get_bearer_token(request) payload = validator.decode(token) request.user_id = payload.get('sub') logger.info(f"[UserJWTMiddleware] user_id set to: {request.user_id}") except InvalidTokenError as exc: logger.warning(f"[UserJWTMiddleware] Token error: {exc}") raise GraphQLError("Unauthorized") from exc return next(root, info, **kwargs) class TeamJWTMiddleware: """Team endpoint - requires Access token for teams audience.""" def resolve(self, next, root, info, **kwargs): request = info.context if _is_introspection(info): return next(root, info, **kwargs) try: token = get_bearer_token(request) payload = validator.decode( token, audience=getattr(settings, 'LOGTO_TEAMS_AUDIENCE', None), ) request.user_id = payload.get('sub') request.team_uuid = payload.get('team_uuid') request.scopes = scopes_from_payload(payload) if not request.team_uuid or 'teams:member' not in request.scopes: raise GraphQLError("Unauthorized") except InvalidTokenError as exc: raise GraphQLError("Unauthorized") from exc return next(root, info, **kwargs) class M2MNoAuthMiddleware: """M2M endpoint - internal services only, no auth for now.""" def resolve(self, next, root, info, **kwargs): return next(root, info, **kwargs)