Source code for corx.command
import abc
import dataclasses
import datetime
import typing
import uuid
from corx.base import UseCases, HasUseCase, Singleton, Executor
from corx.dispatcher import Dispatchable, Dispatcher
__all__ = [
'Command',
'CommandType',
'CommandHandler',
'CommandExecutor'
]
[docs]
@dataclasses.dataclass
class Command(Dispatchable, abc.ABC):
created_at: float = dataclasses.field(init=False, default_factory=datetime.datetime.now().timestamp)
uuid: str = dataclasses.field(init=False, default_factory=lambda: str(uuid.uuid4()))
[docs]
@staticmethod
def use_case() -> UseCases:
return UseCases.Command
CommandType = typing.TypeVar('CommandType', bound=typing.Type[Command])
[docs]
class CommandHandler(HasUseCase, metaclass=Singleton):
[docs]
@staticmethod
def use_case() -> UseCases:
return UseCases.Command
[docs]
@abc.abstractmethod
def handle(self, command: Command):
raise NotImplementedError
CommandHandlerType = typing.TypeVar('CommandHandlerType', bound=typing.Type[CommandHandler])
def handles(command: CommandType):
def wrap(cls):
Dispatcher().register(command, cls)
return cls
return wrap
[docs]
class CommandExecutor(Executor):
[docs]
@staticmethod
def use_case() -> UseCases:
return UseCases.Command
_registry: typing.Dict[CommandType, CommandHandlerType] = dict()
[docs]
def register(self, dispatchable: CommandType, executable: CommandHandlerType):
if dispatchable in self._registry:
raise Exception(f'{dispatchable} is already registered with {self._registry[dispatchable]}.')
self._registry[dispatchable] = executable
[docs]
def execute(self, dispatchable: Command):
command_class: CommandType = type(dispatchable)
handler_class = self._registry.get(command_class)
if handler_class is None:
raise Exception(f'No handler for {command_class.__name__}')
handler = handler_class()
process = handler.handle(dispatchable)
if isinstance(process, typing.Coroutine):
self._loop.push(process)