Skip to content

Events and Messaging

Vectis has two event systems: the in-process EventBus for synchronous module-to-module communication, and Redpanda (Kafka-compatible) for asynchronous inter-service messaging.

In-Process EventBus

Handlers run inline within the current request. Use for audit logging, cache invalidation, or denormalization that must complete before the response.

from vectis.core.events import Event, event_bus

async def on_order_created(event: Event):
    order_id = event.data["order_id"]
    # write audit log, update cache, etc.

event_bus.subscribe("order.created", on_order_created, priority=50)

Handlers are sorted by priority (lower = earlier, default 50). Emit events from services:

await event_bus.emit(Event(
    type="product.created",
    data={"product_id": product.id, "name": product.name},
    source="product_service",
    user_id=current_user.id,
))

A handler can raise EventCancelled to stop later handlers:

from vectis.core.exceptions import EventCancelled

async def validate_order(event: Event):
    if event.data.get("total", 0) > 100_000:
        raise EventCancelled("Order exceeds maximum amount")

event_bus.subscribe("order.pre_create", validate_order, priority=10)

Warning

Keep EventBus handlers under 100ms. Anything longer belongs in a Temporal workflow or Redpanda consumer.

Redpanda (Async Messaging)

For inter-service events — ERP sync, analytics, warehouse integration.

Topic Naming + Schema Registry (Decided #153)

Topics follow vectis.<domain>.<event>.v<N>. Every topic has a matching JSON Schema at backend/vectis/events/schemas/<domain>/<event>_v<N>.json. The schema registry (backend/vectis/events/registry.py) loads schemas at boot and validates every emitted payload:

  • Development & test: strict — invalid payloads raise.
  • Production: warn-and-publish (logs structured warning) so a bad payload never blocks a transaction; you fix the producer and ship.

The full topic list, schema paths, and consumer ownership lives in vectis/docs/REDPANDA_TOPICS.md.

Topics

Registered schemas live under backend/vectis/events/schemas/. Current schema set:

Domain Schema files Purpose
envelope_v1.json (root) Common envelope wrapper used by every event
cart submitted_for_approval_v1.json, approved_v1.json, rejected_v1.json B2B cart approval phase transitions
payment approval_submitted_v1.json, approved_v1.json, rejected_v1.json, cascade_exhausted_v1.json, tender_settled_v1.json, tender_bounced_v1.json Payment-phase approval lifecycle + tender settlement
inventory stock_changed_v1.json, restocked_v1.json Stock delta + restock notifications
catalog product_updated_v1.json, category_updated_v1.json Catalog mutations for downstream cache busting
pricing override_invalidated_v1.json Automated override invalidation (Decided #168)
list created_v1.json, shared_v1.json, sent_v1.json, transferred_v1.json, reassigned_v1.json, archived_v1.json, conversion_progress_v1.json Quick-Order list lifecycle
affiliate affiliate_status_changed_v1.json, attribution_recorded_v1.json, commission_accrued_v1.json, commission_clawback_v1.json, payout_initiated_v1.json, payout_paid_v1.json, payout_failed_v1.json, referral_code_created_v1.json Affiliate program events
customer_referral reward_issued_v1.json Referral reward issuance
platform approval_resolution_failed_v1.json, cascade_curation_invalid_v1.json, tender_idempotency_collision_v1.json Cross-domain workflow fault payloads
workflow fault_v1.json Workflow fault envelope (Decided #200) emitted by Temporal workflows that trap unrecoverable exceptions

Publishing

from vectis.core.events import publish_event

await publish_event(
    topic="vectis.orders",
    event_type="order.created",
    payload={"order_id": order.id, "grand_total": str(order.grand_total)},
    key=str(order.id),
)

The producer is lazy-initialized. Call shutdown_producer() on app shutdown.

Consuming

Register handlers with @on_event and run as a separate process:

from vectis.core.events.consumer import on_event

@on_event("order.created")
async def handle_order_created(event_type: str, payload: dict) -> None:
    # sync to ERP, trigger analytics, etc.
    ...

@on_event("inventory.adjusted")
async def handle_inventory_adjusted(event_type: str, payload: dict) -> None:
    # alert if stock below threshold
    ...
python -m vectis.core.events.consumer

Note

If aiokafka is not installed, the producer silently disables itself. The app runs fine in development without Redpanda.

Choosing Between EventBus and Redpanda

Criteria EventBus Redpanda
Latency < 100ms, inline Async, seconds+
Scope Same process Cross-service
Delivery At-most-once At-least-once
Use case Audit, cache ERP sync, analytics

Fault Topic (Decided #200)

vectis.workflow.fault.v1 is the dedicated channel for workflow-level faults — anything a Temporal workflow trapped that the operator should triage. The schema requires:

{
  "workflow_id": "string",
  "workflow_type": "string",
  "run_id": "string",
  "occurred_at": "ISO-8601",
  "fault_code": "string (kebab-case)",
  "severity": "warn | error | fatal",
  "context": { "...": "domain payload" },
  "remediation_hint": "string (optional)"
}

Faults are persisted to workflow_faults and surface in the admin Workflow Faults inbox (/workflow-faults). Each row links to the originating workflow run for re-execution. Use this channel for B2B approval timeouts, recurate cascade validation failures, refund execution errors, and recurring-order pauses — anything an operator needs to manually unblock.

Stale-Approval Pricing-Drift Gate (Decided C8)

When a cart re-enters the workflow after lying idle through a price change, the order-placement gate writes a CartRejectionEvent row tagged pricing_drift (against the cart's approved grand total stored on Cart.cart_approved_grand_total) and emits the corresponding vectis.cart.rejected.v1 event. The buyer is shown the new price and asked to confirm; on confirmation the cart re-enters the cart-approval phase.