Skip to main content

MesoLive Event Hub

Use MesoLiveEventHubClient to receive real-time server→client callbacks (signals, order/position updates, job completion) and to replay historical events after restarts.

Roles

The Event Hub provides:

  1. Client to server APIs: subscribe/unsubscribe + history (GetEventsSince)
  2. Server to client callbacks: OnEntrySignal, OnOrderUpdate, ...

Signals and events used by automation clients include:

  • Entry/Exit/Adjustment signals (e.g. OnEntrySignal)
  • Error events (OnError)
  • order/position/leg updates for reconciliation (OnOrderUpdate, OnPositionUpdate, ...)

Handler registration and lifecycle

Use the runnable example to dump incoming server events to stdout:

python -m mesolive_sdk_examples.events_example --help
python -m mesolive_sdk_examples.events_example dump --seconds 30
python -m mesolive_sdk_examples.events_example dump --strategy-id 123 --seconds 30

Callbacks on connect

When your connection opens, the server may proactively send “current active state” to the caller (for example active entry/exit/adjustment signals and active errors). You should assume:

  • callbacks can arrive before you call SubscribeToStrategies
  • callbacks may include items you already processed via history/replay

Design your consumer as idempotent (treat events as upserts; dedupe by EventId if needed).

Handler execution model

Handlers can be sync functions or async def. The SDK:

  • calls handlers sequentially per event callback
  • logs and suppresses handler exceptions (it won’t crash the connection)

If your handler does slow I/O, consider enqueuing work to your own task/queue.

Subscriptions

Most events are delivered by strategy group membership.

SubscribeToStrategies

# Subscribe to all strategies (recommended for automation clients)
python -m mesolive_sdk_examples.events_example dump --seconds 30

# Subscribe to specific strategies
python -m mesolive_sdk_examples.events_example dump --strategy-id 1 --strategy-id 2 --seconds 30

Semantics:

  • Strategies=None: subscribe to all strategies (recommended for automation clients)
  • Strategies=[]: no-op
  • Strategies=[1,2,3]: subscribe to those strategies you have access to; results include SuccessfulStrategies and FailedStrategies

UnsubscribeFromStrategies

Semantics:

  • Strategies=None: unsubscribe from the “all strategies” group for this connection
  • Strategies=[]: no-op
  • Strategies=[...]: unsubscribe from the per-strategy groups listed

Callback catalog

The SDK exposes registration helpers for server to client callbacks in these categories:

  • Signals: on_entry_signal, on_exit_signal, on_adjustment_signal
  • Execution updates: on_order_update, on_position_update, on_leg_update
  • Ops and diagnostics: on_agent_status_change, on_error
  • Preparation jobs: on_position_entry_preparation_completed/failed, on_position_exit_preparation_completed/failed, on_position_adjustment_preparation_completed/failed
  • Vars automation: on_auto_update_vars_adjustment_started/completed/failed

Every event implements common fields:

  • EventId: stable unique id (dedupe key)
  • EventSeqId: monotonically increasing per-user sequence id (ordering and replay cursor)
  • EventTime: server UTC timestamp

Event history and replay

History is accessed via:

  • get_latest_event_seq_id() → latest cursor
  • get_events_since(SinceEventSeqId=..., Limit=..., ContinuationToken=...) → paged results

Basic replay loop

import asyncio
import os

from mesolive_sdk import MesoLiveEventHubClient
from mesolive_sdk import models


def envelope_seq_id(env: models.MesoLiveEventEnvelope) -> int:
for payload in (
env.EntrySignal,
env.ExitSignal,
env.AdjustmentSignal,
env.OrderUpdate,
env.PositionUpdate,
env.LegUpdate,
env.AgentStatusChange,
env.Error,
env.PositionEntryPreparationCompleted,
env.PositionEntryPreparationFailed,
env.PositionExitPreparationCompleted,
env.PositionExitPreparationFailed,
env.PositionAdjustmentPreparationCompleted,
env.PositionAdjustmentPreparationFailed,
env.AutoUpdateVarsAdjustmentStarted,
env.AutoUpdateVarsAdjustmentCompleted,
env.AutoUpdateVarsAdjustmentFailed,
):
if payload is not None:
return int(payload.EventSeqId)
raise ValueError(f"Envelope has no payload (Kind={env.Kind})")

async def replay(evt: MesoLiveEventHubClient, *, cursor: int) -> int:
continuation = None
while True:
page = await evt.get_events_since(
models.GetEventsSinceArgs(
SinceEventSeqId=cursor,
Limit=1000,
ContinuationToken=continuation,
)
)

for envelope in page.Events:
# Use envelope.Kind to decide which payload is populated.
# Example: envelope.OrderUpdate, envelope.EntrySignal, ...
cursor = max(cursor, envelope_seq_id(envelope))

continuation = page.ContinuationToken
if not continuation:
return cursor


async def main() -> None:
cursor = 0 # load from your durable store

evt = MesoLiveEventHubClient(
mesolive_instance=os.environ["MESOLIVE_INSTANCE"],
mesolive_api_key=os.environ["MESOLIVE_API_KEY"],
)

async with evt:
cursor = await replay(evt, cursor=cursor)
await evt.subscribe_to_strategies(models.SubscribeToStrategiesArgs(Strategies=None))
await asyncio.sleep(3600)


asyncio.run(main())

Implementation notes:

  • The server caps Limit (max 1000).
  • ContinuationToken is opaque; keep it exactly as returned.
  • Use EventSeqId as your primary cursor and consider EventId for dedupe.
note

If you don’t want to depend on history envelopes, you can instead store state derived from callbacks and treat replay as “best effort catch-up”. For resilient automation, prefer a durable cursor and explicit replay.

On startup:

  1. Connect to Event Hub and register handlers.
  2. Replay from your stored EventSeqId via GetEventsSince until caught up.
  3. Subscribe to strategies (often Strategies=None).
  4. Persist your cursor as you process callbacks.

See Workflows: Reliability for a full pattern, including idempotency recovery for Control Hub operations.

References