Files
orders/orders_app/permissions.py
2026-01-07 09:16:11 +07:00

71 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Декоратор для проверки scopes в JWT токене.
Используется для защиты GraphQL резолверов.
"""
from functools import wraps
from graphql import GraphQLError
def require_scopes(*scopes: str):
"""
Декоратор для проверки наличия scopes в JWT токене.
Использование:
@require_scopes("read:orders")
def resolve_getTeamOrders(self, info):
...
"""
def decorator(func):
# Сохраняем scopes в метаданных для возможности сбора всех scopes
if not hasattr(func, '_required_scopes'):
func._required_scopes = []
func._required_scopes.extend(scopes)
@wraps(func)
def wrapper(self, info, *args, **kwargs):
# Получаем scopes из контекста (должны быть добавлены в middleware)
user_scopes = set(getattr(info.context, 'scopes', []) or [])
missing = set(scopes) - user_scopes
if missing:
raise GraphQLError(f"Missing required scopes: {', '.join(missing)}")
return func(self, info, *args, **kwargs)
# Переносим метаданные на wrapper
wrapper._required_scopes = func._required_scopes
return wrapper
return decorator
def collect_scopes_from_schema(schema) -> set:
"""
Собирает все scopes из схемы для синхронизации с Logto.
Использование:
from .schema import schema
scopes = collect_scopes_from_schema(schema)
# {'read:orders'}
"""
scopes = set()
# Query resolvers
if hasattr(schema, 'query') and schema.query:
query_type = schema.query
for field_name in dir(query_type):
if field_name.startswith('resolve_'):
resolver = getattr(query_type, field_name, None)
if resolver and hasattr(resolver, '_required_scopes'):
scopes.update(resolver._required_scopes)
# Mutation resolvers
if hasattr(schema, 'mutation') and schema.mutation:
mutation_type = schema.mutation
for field_name, field in mutation_type._meta.fields.items():
if hasattr(field, 'type') and hasattr(field.type, 'mutate'):
mutate = field.type.mutate
if hasattr(mutate, '_required_scopes'):
scopes.update(mutate._required_scopes)
return scopes