corx package

Submodules

corx.aggregate module

class corx.aggregate.Aggregate[source]

Bases: object

class corx.aggregate.HasAggregate[source]

Bases: object

abstract static aggregate() Type[Aggregate][source]

corx.base module

class corx.base.AsyncLoop(*args, **kwargs)[source]

Bases: object

is_processing() bool[source]
process() None[source]
propagate_exceptions(status: bool)[source]
push(process: Coroutine) None[source]
class corx.base.Executor(*args, **kwargs)[source]

Bases: HasUseCase

activate(executable)[source]
deactivate(executable)[source]
abstract execute(dispatchable)[source]
abstract register(dispatchable, executable)[source]
class corx.base.HasUseCase[source]

Bases: object

abstract static use_case() UseCases[source]
class corx.base.Singleton[source]

Bases: type

class corx.base.UseCases(value)[source]

Bases: Enum

An enumeration.

Command = 1
Event = 3
Query = 2

corx.bootstrap module

corx.bootstrap.bootstrap()[source]

corx.command module

class corx.command.Command[source]

Bases: Dispatchable, ABC

created_at: float
static use_case() UseCases[source]
uuid: str
class corx.command.CommandExecutor(*args, **kwargs)[source]

Bases: Executor

execute(dispatchable: Command)[source]
register(dispatchable: CommandType, executable: CommandHandlerType)[source]
static use_case() UseCases[source]
class corx.command.CommandHandler(*args, **kwargs)[source]

Bases: HasUseCase

abstract handle(command: Command)[source]
static use_case() UseCases[source]

corx.dispatcher module

class corx.dispatcher.Dispatchable[source]

Bases: HasUseCase, ABC

classmethod dispatch(*args, **kwargs)[source]
class corx.dispatcher.Dispatcher(*args, **kwargs)[source]

Bases: object

activate(executable: Type[HasUseCase]) None[source]
deactivate(executable: Type[HasUseCase]) None[source]
dispatch(*dispatchables: _T) None[source]
propagate_exceptions(status: bool)[source]
register(dispatchable: Type[Dispatchable], executable: Type[_C]) None[source]
register_executors(*executors: Executor)[source]

corx.event module

class corx.event.AnyEvent[source]

Bases: Event, ABC

timestamp: float
uuid: str
version: int
class corx.event.Event[source]

Bases: Dispatchable, ABC

abstract apply(aggregate: Aggregate)[source]
timestamp: float
static use_case() UseCases[source]
uuid: str
version: int
class corx.event.EventExecutor(*args, **kwargs)[source]

Bases: Executor

execute(dispatchable: Event)[source]
register(dispatchable: EventType, executable: EventListenerType)[source]
static use_case() UseCases[source]
class corx.event.EventListener[source]

Bases: HasUseCase

abstract async react(event: Event)[source]
static use_case() UseCases[source]
class corx.event.ProcessManager[source]

Bases: EventListener, ABC

class corx.event.RuntimeEventStore(*args, **kwargs)[source]

Bases: EventListener

clear()[source]
get() List[Event][source]
get_latest_by_type(event_type: EventType) Event[source]
async react(event: Event)[source]
seed_events(*events: Event)[source]

corx.query module

class corx.query.Query[source]

Bases: Dispatchable, ABC

created_at: float
static use_case() UseCases[source]
uuid: str
class corx.query.QueryExecutor(*args, **kwargs)[source]

Bases: Executor

execute(dispatchable: Query)[source]
register(dispatchable: Type[Query], executable: Type[QueryHandler])[source]
static use_case() UseCases[source]
class corx.query.QueryHandler(*args, **kwargs)[source]

Bases: Generic[_T], HasUseCase

abstract handle(query: _T) Any | Awaitable[Any][source]
abstract static handles() Type[_T][source]
static use_case() UseCases[source]

corx.test module

class corx.test.UnitTestCase(methodName='runTest')[source]

Bases: TestCase, ABC

dispatcher: Dispatcher
given(*events: Event)[source]
classmethod setUpClass()[source]

Hook method for setting up class fixture before running tests in the class.

tearDown()[source]

Hook method for deconstructing the test fixture after testing it.

then(*expected_events: EventType | Event, strict: bool = False)[source]
when(*commands: Command)[source]

Module contents