Events and Messaging¶
Vectis has two event systems: the in-process EventBus for synchronous module-to-module communication, and Redpanda (Kafka-compatible) for asynchronous inter-service messaging.
In-Process EventBus¶
Handlers run inline within the current request. Use for audit logging, cache invalidation, or denormalization that must complete before the response.
from vectis.core.events import Event, event_bus
async def on_order_created(event: Event):
order_id = event.data["order_id"]
# write audit log, update cache, etc.
event_bus.subscribe("order.created", on_order_created, priority=50)
Handlers are sorted by priority (lower = earlier, default 50). Emit events from services:
await event_bus.emit(Event(
type="product.created",
data={"product_id": product.id, "name": product.name},
source="product_service",
user_id=current_user.id,
))
A handler can raise EventCancelled to stop later handlers:
from vectis.core.exceptions import EventCancelled
async def validate_order(event: Event):
if event.data.get("total", 0) > 100_000:
raise EventCancelled("Order exceeds maximum amount")
event_bus.subscribe("order.pre_create", validate_order, priority=10)
Warning
Keep EventBus handlers under 100ms. Anything longer belongs in a Temporal workflow or Redpanda consumer.
Redpanda (Async Messaging)¶
For inter-service events — ERP sync, analytics, warehouse integration.
Topics¶
| Topic | Events |
|---|---|
vectis.orders | order.created, order.status_changed, order.cancelled |
vectis.inventory | inventory.adjusted, inventory.reserved |
vectis.accounts | account.created, account.approved |
Publishing¶
from vectis.core.events import publish_event
await publish_event(
topic="vectis.orders",
event_type="order.created",
payload={"order_id": order.id, "grand_total": str(order.grand_total)},
key=str(order.id),
)
The producer is lazy-initialized. Call shutdown_producer() on app shutdown.
Consuming¶
Register handlers with @on_event and run as a separate process:
from vectis.core.events.consumer import on_event
@on_event("order.created")
async def handle_order_created(event_type: str, payload: dict) -> None:
# sync to ERP, trigger analytics, etc.
...
@on_event("inventory.adjusted")
async def handle_inventory_adjusted(event_type: str, payload: dict) -> None:
# alert if stock below threshold
...
Note
If aiokafka is not installed, the producer silently disables itself. The app runs fine in development without Redpanda.
Choosing Between EventBus and Redpanda¶
| Criteria | EventBus | Redpanda |
|---|---|---|
| Latency | < 100ms, inline | Async, seconds+ |
| Scope | Same process | Cross-service |
| Delivery | At-most-once | At-least-once |
| Use case | Audit, cache | ERP sync, analytics |