corx package

Submodules

corx.command module

class corx.command.Command[source]

Bases: Dispatchable, ABC

created_at: float
uuid: str
class corx.command.CommandExecutor[source]

Bases: Executor

execute(dispatchable: Command)[source]
register(dispatchable: CommandType, executable: CommandHandlerType)[source]

corx.dispatcher module

class corx.dispatcher.Dispatchable[source]

Bases: ABC

classmethod dispatch(*args, **kwargs)[source]
class corx.dispatcher.Executor[source]

Bases: object

activate(executable)[source]
deactivate(executable)[source]
abstract execute(dispatchable)[source]
abstract register(dispatchable, executable)[source]
corx.dispatcher.get_dispatcher() __Dispatcher[source]

corx.event module

class corx.event.AnyEvent[source]

Bases: Event, ABC

class corx.event.Event[source]

Bases: Generic[_T], Dispatchable, ABC

abstract apply(aggregate: _T)[source]
timestamp: float
uuid: str
version: int
class corx.event.EventExecutor[source]

Bases: Executor

execute(dispatchable: Event)[source]
register(dispatchable: EventType, executable: EventListenerType)[source]
class corx.event.EventListener[source]

Bases: object

abstract async react(event: Event)[source]
class corx.event.ProcessManager[source]

Bases: EventListener, ABC

class corx.event.RuntimeEventStore[source]

Bases: EventListener

clear()[source]
get() List[Event][source]
get_latest_by_type(event_type: EventType) Event[source]
async react(event: Event)[source]
seed_events(*events: Event)[source]

corx.loop module

corx.loop.get_async_loop() __AsyncLoop[source]

corx.test module

class corx.test.UnitTestCase(methodName='runTest')[source]

Bases: TestCase, ABC

given(*events: Event)[source]
classmethod setUpClass()[source]

Hook method for setting up class fixture before running tests in the class.

tearDown()[source]

Hook method for deconstructing the test fixture after testing it.

then(*expected_events: EventType | Event, strict: bool = False)[source]
when(*commands: Command)[source]

Module contents