""" Декоратор для проверки scopes в JWT токене. Используется для защиты GraphQL резолверов. """ from functools import wraps from graphql import GraphQLError def require_scopes(*scopes: str): """ Декоратор для проверки наличия scopes в JWT токене. Использование: @require_scopes("read:teams") def resolve_team(self, info): ... @require_scopes("read:teams", "write:teams") def resolve_update_team(self, info): ... """ def decorator(func): # Сохраняем scopes в метаданных для возможности сбора всех scopes if not hasattr(func, '_required_scopes'): func._required_scopes = [] func._required_scopes.extend(scopes) @wraps(func) def wrapper(self, info, *args, **kwargs): # Получаем scopes из контекста (должны быть добавлены в middleware) user_scopes = set(getattr(info.context, 'scopes', []) or []) missing = set(scopes) - user_scopes if missing: raise GraphQLError(f"Missing required scopes: {', '.join(missing)}") return func(self, info, *args, **kwargs) # Переносим метаданные на wrapper wrapper._required_scopes = func._required_scopes return wrapper return decorator def collect_scopes_from_schema(schema) -> set: """ Собирает все scopes из схемы для синхронизации с Logto. Использование: from .schema import schema scopes = collect_scopes_from_schema(schema) # {'read:team', 'invite:member', ...} """ scopes = set() # Query resolvers if hasattr(schema, 'query') and schema.query: query_type = schema.query for field_name in dir(query_type): if field_name.startswith('resolve_'): resolver = getattr(query_type, field_name, None) if resolver and hasattr(resolver, '_required_scopes'): scopes.update(resolver._required_scopes) # Mutation resolvers if hasattr(schema, 'mutation') and schema.mutation: mutation_type = schema.mutation for field_name, field in mutation_type._meta.fields.items(): if hasattr(field, 'type') and hasattr(field.type, 'mutate'): mutate = field.type.mutate if hasattr(mutate, '_required_scopes'): scopes.update(mutate._required_scopes) return scopes