Source code for corx.query

import abc
import dataclasses
import datetime
import typing
import uuid

__all__ = [
    'Query',
    'QueryHandler',
    'QueryExecutor',
]

from corx.base import UseCases, HasUseCase, Singleton, Executor
from corx.dispatcher import Dispatchable, Dispatcher

_T = typing.TypeVar('_T')
_C = typing.TypeVar('_C')


[docs] @dataclasses.dataclass class Query(Dispatchable, abc.ABC): created_at: float = dataclasses.field(init=False, default_factory=datetime.datetime.now().timestamp) uuid: str = dataclasses.field(init=False, default_factory=lambda: str(uuid.uuid4()))
[docs] @staticmethod def use_case() -> UseCases: return UseCases.Query
[docs] class QueryHandler(typing.Generic[_T], HasUseCase, metaclass=Singleton): def __init__(self): Dispatcher().register(self.handles(), type(self))
[docs] @staticmethod def use_case() -> UseCases: return UseCases.Query
[docs] @staticmethod @abc.abstractmethod def handles() -> typing.Type[_T]: raise NotImplementedError
[docs] @abc.abstractmethod def handle(self, query: _T) -> typing.Union[typing.Any, typing.Awaitable[typing.Any]]: raise NotImplementedError
[docs] class QueryExecutor(Executor):
[docs] @staticmethod def use_case() -> UseCases: return UseCases.Query
_registry: typing.Dict[typing.Type[Query], typing.Type[QueryHandler]] = dict()
[docs] def register(self, dispatchable: typing.Type[Query], executable: typing.Type[QueryHandler]): if dispatchable in self._registry: raise Exception(f'{dispatchable} is already registered with {self._registry[dispatchable]}.') self._registry[dispatchable] = executable
[docs] def execute(self, dispatchable: Query): query_class = type(dispatchable) handler_class = self._registry.get(query_class) if handler_class is None: raise Exception(f'No handler for {query_class.__name__}') handler = handler_class() process = handler.handle(dispatchable) if isinstance(process, typing.Coroutine): self._loop.push(process)