Events and Messaging¶
Vectis has two event systems: the in-process EventBus for synchronous module-to-module communication, and Redpanda (Kafka-compatible) for asynchronous inter-service messaging.
In-Process EventBus¶
Handlers run inline within the current request. Use for audit logging, cache invalidation, or denormalization that must complete before the response.
from vectis.core.events import Event, event_bus
async def on_order_created(event: Event):
order_id = event.data["order_id"]
# write audit log, update cache, etc.
event_bus.subscribe("order.created", on_order_created, priority=50)
Handlers are sorted by priority (lower = earlier, default 50). Emit events from services:
await event_bus.emit(Event(
type="product.created",
data={"product_id": product.id, "name": product.name},
source="product_service",
user_id=current_user.id,
))
A handler can raise EventCancelled to stop later handlers:
from vectis.core.exceptions import EventCancelled
async def validate_order(event: Event):
if event.data.get("total", 0) > 100_000:
raise EventCancelled("Order exceeds maximum amount")
event_bus.subscribe("order.pre_create", validate_order, priority=10)
Warning
Keep EventBus handlers under 100ms. Anything longer belongs in a Temporal workflow or Redpanda consumer.
Redpanda (Async Messaging)¶
For inter-service events — ERP sync, analytics, warehouse integration.
Topic Naming + Schema Registry (Decided #153)¶
Topics follow vectis.<domain>.<event>.v<N>. Every topic has a matching JSON Schema at backend/vectis/events/schemas/<domain>/<event>_v<N>.json. The schema registry (backend/vectis/events/registry.py) loads schemas at boot and validates every emitted payload:
- Development & test: strict — invalid payloads raise.
- Production: warn-and-publish (logs structured warning) so a bad payload never blocks a transaction; you fix the producer and ship.
The full topic list, schema paths, and consumer ownership lives in vectis/docs/REDPANDA_TOPICS.md.
Topics¶
Registered schemas live under backend/vectis/events/schemas/. Current schema set:
| Domain | Schema files | Purpose |
|---|---|---|
envelope_v1.json | (root) | Common envelope wrapper used by every event |
| cart | submitted_for_approval_v1.json, approved_v1.json, rejected_v1.json | B2B cart approval phase transitions |
| payment | approval_submitted_v1.json, approved_v1.json, rejected_v1.json, cascade_exhausted_v1.json, tender_settled_v1.json, tender_bounced_v1.json | Payment-phase approval lifecycle + tender settlement |
| inventory | stock_changed_v1.json, restocked_v1.json | Stock delta + restock notifications |
| catalog | product_updated_v1.json, category_updated_v1.json | Catalog mutations for downstream cache busting |
| pricing | override_invalidated_v1.json | Automated override invalidation (Decided #168) |
| list | created_v1.json, shared_v1.json, sent_v1.json, transferred_v1.json, reassigned_v1.json, archived_v1.json, conversion_progress_v1.json | Quick-Order list lifecycle |
| affiliate | affiliate_status_changed_v1.json, attribution_recorded_v1.json, commission_accrued_v1.json, commission_clawback_v1.json, payout_initiated_v1.json, payout_paid_v1.json, payout_failed_v1.json, referral_code_created_v1.json | Affiliate program events |
| customer_referral | reward_issued_v1.json | Referral reward issuance |
| platform | approval_resolution_failed_v1.json, cascade_curation_invalid_v1.json, tender_idempotency_collision_v1.json | Cross-domain workflow fault payloads |
| workflow | fault_v1.json | Workflow fault envelope (Decided #200) emitted by Temporal workflows that trap unrecoverable exceptions |
Publishing¶
from vectis.core.events import publish_event
await publish_event(
topic="vectis.orders",
event_type="order.created",
payload={"order_id": order.id, "grand_total": str(order.grand_total)},
key=str(order.id),
)
The producer is lazy-initialized. Call shutdown_producer() on app shutdown.
Consuming¶
Register handlers with @on_event and run as a separate process:
from vectis.core.events.consumer import on_event
@on_event("order.created")
async def handle_order_created(event_type: str, payload: dict) -> None:
# sync to ERP, trigger analytics, etc.
...
@on_event("inventory.adjusted")
async def handle_inventory_adjusted(event_type: str, payload: dict) -> None:
# alert if stock below threshold
...
Note
If aiokafka is not installed, the producer silently disables itself. The app runs fine in development without Redpanda.
Choosing Between EventBus and Redpanda¶
| Criteria | EventBus | Redpanda |
|---|---|---|
| Latency | < 100ms, inline | Async, seconds+ |
| Scope | Same process | Cross-service |
| Delivery | At-most-once | At-least-once |
| Use case | Audit, cache | ERP sync, analytics |
Fault Topic (Decided #200)¶
vectis.workflow.fault.v1 is the dedicated channel for workflow-level faults — anything a Temporal workflow trapped that the operator should triage. The schema requires:
{
"workflow_id": "string",
"workflow_type": "string",
"run_id": "string",
"occurred_at": "ISO-8601",
"fault_code": "string (kebab-case)",
"severity": "warn | error | fatal",
"context": { "...": "domain payload" },
"remediation_hint": "string (optional)"
}
Faults are persisted to workflow_faults and surface in the admin Workflow Faults inbox (/workflow-faults). Each row links to the originating workflow run for re-execution. Use this channel for B2B approval timeouts, recurate cascade validation failures, refund execution errors, and recurring-order pauses — anything an operator needs to manually unblock.
Stale-Approval Pricing-Drift Gate (Decided C8)¶
When a cart re-enters the workflow after lying idle through a price change, the order-placement gate writes a CartRejectionEvent row tagged pricing_drift (against the cart's approved grand total stored on Cart.cart_approved_grand_total) and emits the corresponding vectis.cart.rejected.v1 event. The buyer is shown the new price and asked to confirm; on confirmation the cart re-enters the cart-approval phase.