Skip to content

Events and Messaging

Vectis has two event systems: the in-process EventBus for synchronous module-to-module communication, and Redpanda (Kafka-compatible) for asynchronous inter-service messaging.

In-Process EventBus

Handlers run inline within the current request. Use for audit logging, cache invalidation, or denormalization that must complete before the response.

from vectis.core.events import Event, event_bus

async def on_order_created(event: Event):
    order_id = event.data["order_id"]
    # write audit log, update cache, etc.

event_bus.subscribe("order.created", on_order_created, priority=50)

Handlers are sorted by priority (lower = earlier, default 50). Emit events from services:

await event_bus.emit(Event(
    type="product.created",
    data={"product_id": product.id, "name": product.name},
    source="product_service",
    user_id=current_user.id,
))

A handler can raise EventCancelled to stop later handlers:

from vectis.core.exceptions import EventCancelled

async def validate_order(event: Event):
    if event.data.get("total", 0) > 100_000:
        raise EventCancelled("Order exceeds maximum amount")

event_bus.subscribe("order.pre_create", validate_order, priority=10)

Warning

Keep EventBus handlers under 100ms. Anything longer belongs in a Temporal workflow or Redpanda consumer.

Redpanda (Async Messaging)

For inter-service events — ERP sync, analytics, warehouse integration.

Topics

Topic Events
vectis.orders order.created, order.status_changed, order.cancelled
vectis.inventory inventory.adjusted, inventory.reserved
vectis.accounts account.created, account.approved

Publishing

from vectis.core.events import publish_event

await publish_event(
    topic="vectis.orders",
    event_type="order.created",
    payload={"order_id": order.id, "grand_total": str(order.grand_total)},
    key=str(order.id),
)

The producer is lazy-initialized. Call shutdown_producer() on app shutdown.

Consuming

Register handlers with @on_event and run as a separate process:

from vectis.core.events.consumer import on_event

@on_event("order.created")
async def handle_order_created(event_type: str, payload: dict) -> None:
    # sync to ERP, trigger analytics, etc.
    ...

@on_event("inventory.adjusted")
async def handle_inventory_adjusted(event_type: str, payload: dict) -> None:
    # alert if stock below threshold
    ...
python -m vectis.core.events.consumer

Note

If aiokafka is not installed, the producer silently disables itself. The app runs fine in development without Redpanda.

Choosing Between EventBus and Redpanda

Criteria EventBus Redpanda
Latency < 100ms, inline Async, seconds+
Scope Same process Cross-service
Delivery At-most-once At-least-once
Use case Audit, cache ERP sync, analytics