|
6 | 6 | """ |
7 | 7 |
|
8 | 8 | import asyncio |
9 | | -from decimal import Decimal |
| 9 | +from decimal import Decimal, InvalidOperation |
10 | 10 | import json |
11 | 11 | from typing import Callable, Dict, List, Optional |
12 | 12 | import websockets |
@@ -34,11 +34,13 @@ def _parse_pair(pair: str) -> 'Pair': |
34 | 34 | pair = pair.upper() |
35 | 35 | if not pair.isalpha() or len(pair) < 6: |
36 | 36 | raise ValueError(f"Invalid pair: '{pair}'.") |
37 | | - if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) >= 3: |
| 37 | + if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) in (3, 4): |
38 | 38 | return Pair(pair[:4], pair[4:]) |
39 | | - if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) >= 3: |
| 39 | + if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) in (3, 4): |
40 | 40 | return Pair(pair[:-4], pair[-4:]) |
41 | | - return Pair(pair[:3], pair[3:]) |
| 41 | + if len(pair) == 6: |
| 42 | + return Pair(pair[:3], pair[3:]) |
| 43 | + raise ValueError(f"Invalid pair: '{pair}'.") |
42 | 44 |
|
43 | 45 |
|
44 | 46 | def _flatten_orders(orders, reverse): |
@@ -67,13 +69,18 @@ def conv_message(msg): |
67 | 69 | Decimal(msg['volume']), |
68 | 70 | ) |
69 | 71 |
|
70 | | - bids = [conv_message(m) for m in first['bids']] |
71 | | - asks = [conv_message(m) for m in first['asks']] |
72 | | - self._bids = {b.order_id: b for b in bids} |
73 | | - self._asks = {a.order_id: a for a in asks} |
74 | | - self._sequence = first['sequence'] |
| 72 | + try: |
| 73 | + bids = [conv_message(m) for m in first['bids']] |
| 74 | + asks = [conv_message(m) for m in first['asks']] |
| 75 | + self._bids = {b.order_id: b for b in bids} |
| 76 | + self._asks = {a.order_id: a for a in asks} |
| 77 | + self._sequence = first['sequence'] |
| 78 | + self._status = first['status'] |
| 79 | + except (KeyError, TypeError, InvalidOperation) as exc: |
| 80 | + raise MarketInitialisationException( |
| 81 | + "Unable to initialise market state from the initial snapshot" |
| 82 | + ) from exc |
75 | 83 | self._trades = [] |
76 | | - self._status = first['status'] |
77 | 84 |
|
78 | 85 | def get_asks(self): |
79 | 86 | return _flatten_orders(self._asks, False) |
@@ -176,6 +183,11 @@ async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate): |
176 | 183 |
|
177 | 184 | update_f(pair, state.get_snapshot(), body) |
178 | 185 |
|
| 186 | + if is_first: |
| 187 | + raise MarketInitialisationException( |
| 188 | + "Stream closed before the initial snapshot was received" |
| 189 | + ) |
| 190 | + |
179 | 191 |
|
180 | 192 | async def _write_keep_alive(ws): |
181 | 193 | while True: |
|
0 commit comments