Skip to content

Commit d945ac9

Browse files
committed
Tighten pair parsing and improve init error handling
- Fail closed in _parse_pair for unrecognised pair shapes instead of guessing a 3+N split - Wrap _MarketStreamState init parsing in try/except to surface malformed snapshots as MarketInitialisationException - Raise MarketInitialisationException if stream closes before the initial snapshot is received
1 parent daa6c42 commit d945ac9

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

luno_python/stream_client.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"""
77

88
import asyncio
9-
from decimal import Decimal
9+
from decimal import Decimal, InvalidOperation
1010
import json
1111
from typing import Callable, Dict, List, Optional
1212
import websockets
@@ -34,11 +34,13 @@ def _parse_pair(pair: str) -> 'Pair':
3434
pair = pair.upper()
3535
if not pair.isalpha() or len(pair) < 6:
3636
raise ValueError(f"Invalid pair: '{pair}'.")
37-
if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) >= 3:
37+
if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) in (3, 4):
3838
return Pair(pair[:4], pair[4:])
39-
if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) >= 3:
39+
if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) in (3, 4):
4040
return Pair(pair[:-4], pair[-4:])
41-
return Pair(pair[:3], pair[3:])
41+
if len(pair) == 6:
42+
return Pair(pair[:3], pair[3:])
43+
raise ValueError(f"Invalid pair: '{pair}'.")
4244

4345

4446
def _flatten_orders(orders, reverse):
@@ -67,13 +69,18 @@ def conv_message(msg):
6769
Decimal(msg['volume']),
6870
)
6971

70-
bids = [conv_message(m) for m in first['bids']]
71-
asks = [conv_message(m) for m in first['asks']]
72-
self._bids = {b.order_id: b for b in bids}
73-
self._asks = {a.order_id: a for a in asks}
74-
self._sequence = first['sequence']
72+
try:
73+
bids = [conv_message(m) for m in first['bids']]
74+
asks = [conv_message(m) for m in first['asks']]
75+
self._bids = {b.order_id: b for b in bids}
76+
self._asks = {a.order_id: a for a in asks}
77+
self._sequence = first['sequence']
78+
self._status = first['status']
79+
except (KeyError, TypeError, InvalidOperation) as exc:
80+
raise MarketInitialisationException(
81+
"Unable to initialise market state from the initial snapshot"
82+
) from exc
7583
self._trades = []
76-
self._status = first['status']
7784

7885
def get_asks(self):
7986
return _flatten_orders(self._asks, False)
@@ -176,6 +183,11 @@ async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
176183

177184
update_f(pair, state.get_snapshot(), body)
178185

186+
if is_first:
187+
raise MarketInitialisationException(
188+
"Stream closed before the initial snapshot was received"
189+
)
190+
179191

180192
async def _write_keep_alive(ws):
181193
while True:

0 commit comments

Comments
 (0)