-
Notifications
You must be signed in to change notification settings - Fork 41
Merge Streaming Client changes from Adam Hicks #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c69c8f7
1481a99
346d5f4
b2b051d
fccc48c
27efc6b
f30b9bd
af6d000
c2393a5
0b10568
dd6ef14
2173352
3d9d82b
546298b
8be5204
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| import asyncio | ||
|
|
||
| from luno_python.stream_client import stream_market | ||
| from luno_python.api_types import format_state | ||
|
|
||
| def handle_update(pair, state, update): | ||
| print(format_state(pair, state)) | ||
| if update is not None: | ||
| print(update) | ||
|
|
||
| async def main(): | ||
| await stream_market( | ||
| pair="XBTZAR", | ||
| api_key_id="", # API Key goes here | ||
| api_key_secret="", # and API Secret goes here | ||
| update_callback=handle_update, | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| from collections import namedtuple | ||
| from decimal import Decimal | ||
| from typing import List | ||
|
|
||
| DEC_0 = Decimal("0") | ||
|
|
||
| Order = namedtuple("Order", "order_id price volume") | ||
|
|
||
| MarketState = namedtuple("MarketState", "sequence asks bids status") | ||
|
|
||
| Pair = namedtuple("Pair", "base counter") | ||
|
|
||
| def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]): | ||
| if not bids or not asks: | ||
| return "Empty Orderbook" | ||
|
|
||
| bid_sum = sum((o.price * o.volume for o in bids), DEC_0) | ||
| ask_sum = sum((o.volume for o in asks), DEC_0) | ||
|
|
||
| mid = (asks[0].price + bids[0].price) / 2 | ||
|
|
||
| return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}" | ||
|
|
||
| def format_state(pair: Pair, state: MarketState): | ||
| orderbook = format_orderbook(pair, state.asks, state.bids) | ||
| return f"[{state.sequence}] {orderbook}" |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,203 @@ | ||||||
| """ | ||||||
| The stream client can be used to receive live updates to the orderbook. | ||||||
| It also maintains a representation of the Luno orderbook correctly updated for each event. | ||||||
|
|
||||||
| For example usage see examples/stream.py | ||||||
| """ | ||||||
|
|
||||||
| import asyncio | ||||||
| from decimal import Decimal | ||||||
| import json | ||||||
| from typing import Callable, Dict, List | ||||||
| import websockets | ||||||
|
|
||||||
| from .api_types import DEC_0, Order, MarketState, Pair | ||||||
|
|
||||||
| DEFAULT_URL = "wss://ws.luno.com" | ||||||
|
|
||||||
| StateUpdate = Callable[[Pair, MarketState, dict], None] | ||||||
|
|
||||||
| class OutOfOrderMessageException(Exception): | ||||||
| pass | ||||||
|
|
||||||
| class MarketInitialisationException(Exception): | ||||||
| pass | ||||||
|
|
||||||
|
|
||||||
| def _flatten_orders(orders, reverse): | ||||||
| return sorted(orders.values(), key=lambda o: o.price, reverse=reverse) | ||||||
|
|
||||||
|
|
||||||
| def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal): | ||||||
| order = orders.pop(order_id, None) | ||||||
| if order is None: | ||||||
| return | ||||||
|
|
||||||
| new_order = order._replace(volume=order.volume - volume) | ||||||
| if new_order.volume > DEC_0: | ||||||
| orders[order_id] = new_order | ||||||
|
|
||||||
|
|
||||||
| class _MarketStreamState: | ||||||
| def __init__(self, first: dict): | ||||||
| if first is None: | ||||||
| raise MarketInitialisationException("Unable to initialise market state from an empty message") | ||||||
|
|
||||||
| def conv_message(msg): | ||||||
| return Order( | ||||||
| msg['id'], | ||||||
| Decimal(msg['price']), | ||||||
| Decimal(msg['volume']), | ||||||
| ) | ||||||
|
|
||||||
| bids = [conv_message(m) for m in first['bids']] | ||||||
| asks = [conv_message(m) for m in first['asks']] | ||||||
| self._bids = {b.order_id: b for b in bids} | ||||||
| self._asks = {a.order_id: a for a in asks} | ||||||
| self._sequence = first['sequence'] | ||||||
| self._trades = [] | ||||||
| self._status = first['status'] | ||||||
|
|
||||||
| def get_asks(self): | ||||||
| return _flatten_orders(self._asks, False) | ||||||
|
|
||||||
| def get_bids(self): | ||||||
| return _flatten_orders(self._bids, True) | ||||||
|
|
||||||
| def get_status(self): | ||||||
| return self._status | ||||||
|
|
||||||
| def get_sequence(self): | ||||||
| return self._sequence | ||||||
|
|
||||||
| def get_snapshot(self): | ||||||
| return MarketState( | ||||||
| sequence=self.get_sequence(), | ||||||
| asks=self.get_asks(), | ||||||
| bids=self.get_bids(), | ||||||
| status=self.get_status(), | ||||||
| ) | ||||||
|
|
||||||
| def process_update(self, update: dict): | ||||||
| if update is None: | ||||||
| return | ||||||
|
|
||||||
| seq = update['sequence'] | ||||||
| if int(seq) != int(self._sequence)+1: | ||||||
| raise OutOfOrderMessageException() | ||||||
|
|
||||||
| trades = update.get('trade_updates') | ||||||
| if trades: | ||||||
| self._process_trades(trades) | ||||||
|
|
||||||
| create = update.get('create_update') | ||||||
| if create: | ||||||
| self._process_create(create) | ||||||
|
|
||||||
| delete_upd = update.get('delete_update') | ||||||
| if delete_upd: | ||||||
| self._process_delete(delete_upd) | ||||||
|
|
||||||
| status_upd = update.get('status_update') | ||||||
| if status_upd: | ||||||
| self._process_status(status_upd) | ||||||
|
|
||||||
| self._sequence = seq | ||||||
|
|
||||||
| def _process_trades(self, trade_updates: List[dict]): | ||||||
| for t in trade_updates: | ||||||
| maker_id = t['maker_order_id'] | ||||||
| volume = Decimal(t['base']) | ||||||
|
|
||||||
| _decrement_trade(self._asks, maker_id, volume) | ||||||
| _decrement_trade(self._bids, maker_id, volume) | ||||||
|
|
||||||
| def _process_create(self, create_update: dict): | ||||||
| o = Order( | ||||||
| create_update['order_id'], | ||||||
| Decimal(create_update['price']), | ||||||
| Decimal(create_update['volume']), | ||||||
| ) | ||||||
| if create_update['type'] == "ASK": | ||||||
| self._asks[o.order_id] = o | ||||||
| elif create_update['type'] == "BID": | ||||||
| self._bids[o.order_id] = o | ||||||
|
|
||||||
| def _process_delete(self, delete_update: dict): | ||||||
| order_id = delete_update['order_id'] | ||||||
| self._asks.pop(order_id, None) | ||||||
| self._bids.pop(order_id, None) | ||||||
|
|
||||||
| def _process_status(self, status_update: dict): | ||||||
| self._status = status_update['status'] | ||||||
|
|
||||||
|
|
||||||
| async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate): | ||||||
| state = None | ||||||
| is_first = True | ||||||
|
|
||||||
| async for message in ws: | ||||||
| try: | ||||||
| body = json.loads(message) | ||||||
| except ValueError as e: | ||||||
| raise ValueError(f"Invalid JSON received: {message}") from e | ||||||
|
|
||||||
| if body == "": # Empty update, used as keepalive | ||||||
| body = None | ||||||
|
|
||||||
| if is_first: | ||||||
| is_first = False | ||||||
| state = _MarketStreamState(body) | ||||||
| update_f(pair, state.get_snapshot(), None) | ||||||
| continue | ||||||
|
Comment on lines
+145
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore keepalive frames until initial snapshot is received. If the first frame is a keepalive ( Proposed guard if body == "": # Empty update, used as keepalive
body = None
+
+ if body is None and is_first:
+ continue
if is_first:
is_first = False
state = _MarketStreamState(body)🤖 Prompt for AI Agents |
||||||
|
|
||||||
| #could raise OutOfOrderMessageException | ||||||
| state.process_update(body) | ||||||
|
|
||||||
| update_f(pair, state.get_snapshot(), body) | ||||||
|
|
||||||
|
|
||||||
| async def _write_keep_alive(ws): | ||||||
| while True: | ||||||
| await ws.send('""') | ||||||
| await asyncio.sleep(60) | ||||||
|
|
||||||
|
|
||||||
| async def stream_market( | ||||||
| pair: str, | ||||||
| api_key_id: str, | ||||||
| api_key_secret: str, | ||||||
| update_callback: StateUpdate, | ||||||
| base_url: str = DEFAULT_URL, | ||||||
| ): | ||||||
| """Opens a stream to /api/1/stream/... | ||||||
|
|
||||||
| Stream orderbook information and maintain an orderbook state. | ||||||
|
|
||||||
| :param pair: str Amount to buy or sell in the pair base currency. | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the The current text describes an amount, not a market pair, which is misleading for users of this API. Docstring fix- :param pair: str Amount to buy or sell in the pair base currency.
+ :param pair: str Currency pair code (for example, "XBTZAR").📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||
| :param api_key_id: str | ||||||
| :param api_key_secret: str | ||||||
| :param update_callback: an StateUpdate function that will be called with new updates. | ||||||
| """ | ||||||
| if len(pair) != 6: | ||||||
| raise ValueError(f"pair must be length 6, got '{pair}'") | ||||||
|
|
||||||
| p = Pair(pair[:3].upper(), pair[3:].upper()) | ||||||
| url = '/'.join([base_url, 'api/1/stream', p.base + p.counter]) | ||||||
|
|
||||||
| async with websockets.connect( | ||||||
| url, | ||||||
| origin='http://localhost/', | ||||||
| ping_interval=None, | ||||||
| max_size=2**21, | ||||||
| ) as websocket: | ||||||
|
|
||||||
| auth = json.dumps({ | ||||||
| 'api_key_id': api_key_id, | ||||||
| 'api_key_secret': api_key_secret, | ||||||
| }) | ||||||
| await websocket.send(auth) | ||||||
|
|
||||||
| async with asyncio.TaskGroup() as tg: | ||||||
| tg.create_task(_read_from_websocket(websocket, p, update_callback)) | ||||||
| tg.create_task(_write_keep_alive(websocket)) | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the stream_client.py file structure wc -l luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 92
🏁 Script executed:
Repository: luno/luno-python
Length of output: 764
🏁 Script executed:
Repository: luno/luno-python
Length of output: 561
🏁 Script executed:
Repository: luno/luno-python
Length of output: 659
🏁 Script executed:
Repository: luno/luno-python
Length of output: 42
🏁 Script executed:
Repository: luno/luno-python
Length of output: 260
🏁 Script executed:
Repository: luno/luno-python
Length of output: 42
🏁 Script executed:
Repository: luno/luno-python
Length of output: 857
🏁 Script executed:
# Check what imports are at the top of the file head -20 luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 580
Correct
StateUpdatetyping to acceptNoneas the third parameter.The callback's third argument is typed as
dict, butNoneis explicitly passed at line 151 and can beNoneat line 157 when body is empty. Change the type annotation toOptional[dict]:Required change
🤖 Prompt for AI Agents