MesoLive Event Hub
Use MesoLiveEventHubClient to receive real-time server→client callbacks (signals, order/position updates, job completion)
and to replay historical events after restarts.
- Event Hub SignalR reference
- Python client:
mesolive_sdk.MesoLiveEventHubClient
Roles
The Event Hub provides:
- Client to server APIs: subscribe/unsubscribe + history (
GetEventsSince) - Server to client callbacks:
OnEntrySignal,OnOrderUpdate, ...
Signals and events used by automation clients include:
- Entry/Exit/Adjustment signals (e.g.
OnEntrySignal) - Error events (
OnError) - order/position/leg updates for reconciliation (
OnOrderUpdate,OnPositionUpdate, ...)
Handler registration and lifecycle
Use the runnable example to dump incoming server events to stdout:
python -m mesolive_sdk_examples.events_example --help
python -m mesolive_sdk_examples.events_example dump --seconds 30
python -m mesolive_sdk_examples.events_example dump --strategy-id 123 --seconds 30
Callbacks on connect
When your connection opens, the server may proactively send “current active state” to the caller (for example active entry/exit/adjustment signals and active errors). You should assume:
- callbacks can arrive before you call
SubscribeToStrategies - callbacks may include items you already processed via history/replay
Design your consumer as idempotent (treat events as upserts; dedupe by EventId if needed).
Handler execution model
Handlers can be sync functions or async def. The SDK:
- calls handlers sequentially per event callback
- logs and suppresses handler exceptions (it won’t crash the connection)
If your handler does slow I/O, consider enqueuing work to your own task/queue.
Subscriptions
Most events are delivered by strategy group membership.
SubscribeToStrategies
# Subscribe to all strategies (recommended for automation clients)
python -m mesolive_sdk_examples.events_example dump --seconds 30
# Subscribe to specific strategies
python -m mesolive_sdk_examples.events_example dump --strategy-id 1 --strategy-id 2 --seconds 30
Semantics:
Strategies=None: subscribe to all strategies (recommended for automation clients)Strategies=[]: no-opStrategies=[1,2,3]: subscribe to those strategies you have access to; results includeSuccessfulStrategiesandFailedStrategies
UnsubscribeFromStrategies
Semantics:
Strategies=None: unsubscribe from the “all strategies” group for this connectionStrategies=[]: no-opStrategies=[...]: unsubscribe from the per-strategy groups listed
Callback catalog
The SDK exposes registration helpers for server to client callbacks in these categories:
- Signals:
on_entry_signal,on_exit_signal,on_adjustment_signal - Execution updates:
on_order_update,on_position_update,on_leg_update - Ops and diagnostics:
on_agent_status_change,on_error - Preparation jobs:
on_position_entry_preparation_completed/failed,on_position_exit_preparation_completed/failed,on_position_adjustment_preparation_completed/failed - Vars automation:
on_auto_update_vars_adjustment_started/completed/failed
Every event implements common fields:
EventId: stable unique id (dedupe key)EventSeqId: monotonically increasing per-user sequence id (ordering and replay cursor)EventTime: server UTC timestamp
Event history and replay
History is accessed via:
get_latest_event_seq_id()→ latest cursorget_events_since(SinceEventSeqId=..., Limit=..., ContinuationToken=...)→ paged results
Basic replay loop
import asyncio
import os
from mesolive_sdk import MesoLiveEventHubClient
from mesolive_sdk import models
def envelope_seq_id(env: models.MesoLiveEventEnvelope) -> int:
for payload in (
env.EntrySignal,
env.ExitSignal,
env.AdjustmentSignal,
env.OrderUpdate,
env.PositionUpdate,
env.LegUpdate,
env.AgentStatusChange,
env.Error,
env.PositionEntryPreparationCompleted,
env.PositionEntryPreparationFailed,
env.PositionExitPreparationCompleted,
env.PositionExitPreparationFailed,
env.PositionAdjustmentPreparationCompleted,
env.PositionAdjustmentPreparationFailed,
env.AutoUpdateVarsAdjustmentStarted,
env.AutoUpdateVarsAdjustmentCompleted,
env.AutoUpdateVarsAdjustmentFailed,
):
if payload is not None:
return int(payload.EventSeqId)
raise ValueError(f"Envelope has no payload (Kind={env.Kind})")
async def replay(evt: MesoLiveEventHubClient, *, cursor: int) -> int:
continuation = None
while True:
page = await evt.get_events_since(
models.GetEventsSinceArgs(
SinceEventSeqId=cursor,
Limit=1000,
ContinuationToken=continuation,
)
)
for envelope in page.Events:
# Use envelope.Kind to decide which payload is populated.
# Example: envelope.OrderUpdate, envelope.EntrySignal, ...
cursor = max(cursor, envelope_seq_id(envelope))
continuation = page.ContinuationToken
if not continuation:
return cursor
async def main() -> None:
cursor = 0 # load from your durable store
evt = MesoLiveEventHubClient(
mesolive_instance=os.environ["MESOLIVE_INSTANCE"],
mesolive_api_key=os.environ["MESOLIVE_API_KEY"],
)
async with evt:
cursor = await replay(evt, cursor=cursor)
await evt.subscribe_to_strategies(models.SubscribeToStrategiesArgs(Strategies=None))
await asyncio.sleep(3600)
asyncio.run(main())
Implementation notes:
- The server caps
Limit(max 1000). ContinuationTokenis opaque; keep it exactly as returned.- Use
EventSeqIdas your primary cursor and considerEventIdfor dedupe.
If you don’t want to depend on history envelopes, you can instead store state derived from callbacks and treat replay as “best effort catch-up”. For resilient automation, prefer a durable cursor and explicit replay.
Recommended reliability pattern
On startup:
- Connect to Event Hub and register handlers.
- Replay from your stored
EventSeqIdviaGetEventsSinceuntil caught up. - Subscribe to strategies (often
Strategies=None). - Persist your cursor as you process callbacks.
See Workflows: Reliability for a full pattern, including idempotency recovery for Control Hub operations.
References
- SignalR parameters: SignalR Reference