75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
"""
|
||
Декоратор для проверки scopes в JWT токене.
|
||
Используется для защиты GraphQL резолверов.
|
||
"""
|
||
from functools import wraps
|
||
from graphql import GraphQLError
|
||
|
||
|
||
def require_scopes(*scopes: str):
|
||
"""
|
||
Декоратор для проверки наличия scopes в JWT токене.
|
||
|
||
Использование:
|
||
@require_scopes("read:teams")
|
||
def resolve_team(self, info):
|
||
...
|
||
|
||
@require_scopes("read:teams", "write:teams")
|
||
def resolve_update_team(self, info):
|
||
...
|
||
"""
|
||
def decorator(func):
|
||
# Сохраняем scopes в метаданных для возможности сбора всех scopes
|
||
if not hasattr(func, '_required_scopes'):
|
||
func._required_scopes = []
|
||
func._required_scopes.extend(scopes)
|
||
|
||
@wraps(func)
|
||
def wrapper(self, info, *args, **kwargs):
|
||
# Получаем scopes из контекста (должны быть добавлены в middleware)
|
||
user_scopes = set(getattr(info.context, 'scopes', []) or [])
|
||
|
||
missing = set(scopes) - user_scopes
|
||
if missing:
|
||
raise GraphQLError(f"Missing required scopes: {', '.join(missing)}")
|
||
|
||
return func(self, info, *args, **kwargs)
|
||
|
||
# Переносим метаданные на wrapper
|
||
wrapper._required_scopes = func._required_scopes
|
||
return wrapper
|
||
return decorator
|
||
|
||
|
||
def collect_scopes_from_schema(schema) -> set:
|
||
"""
|
||
Собирает все scopes из схемы для синхронизации с Logto.
|
||
|
||
Использование:
|
||
from .schema import schema
|
||
scopes = collect_scopes_from_schema(schema)
|
||
# {'read:team', 'invite:member', ...}
|
||
"""
|
||
scopes = set()
|
||
|
||
# Query resolvers
|
||
if hasattr(schema, 'query') and schema.query:
|
||
query_type = schema.query
|
||
for field_name in dir(query_type):
|
||
if field_name.startswith('resolve_'):
|
||
resolver = getattr(query_type, field_name, None)
|
||
if resolver and hasattr(resolver, '_required_scopes'):
|
||
scopes.update(resolver._required_scopes)
|
||
|
||
# Mutation resolvers
|
||
if hasattr(schema, 'mutation') and schema.mutation:
|
||
mutation_type = schema.mutation
|
||
for field_name, field in mutation_type._meta.fields.items():
|
||
if hasattr(field, 'type') and hasattr(field.type, 'mutate'):
|
||
mutate = field.type.mutate
|
||
if hasattr(mutate, '_required_scopes'):
|
||
scopes.update(mutate._required_scopes)
|
||
|
||
return scopes
|