Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ from luno_python.client import Client
c = Client(api_key_id='key_id', api_key_secret='key_secret')
try:
res = c.get_ticker(pair='XBTZAR')
print res
print(res)
except Exception as e:
print e
print(e)
```

### License
Expand Down
21 changes: 21 additions & 0 deletions examples/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio

from luno_python.stream_client import stream_market
from luno_python.api_types import format_state

def handle_update(pair, state, update):
print(format_state(pair, state))
if update is not None:
print(update)

async def main():
await stream_market(
pair="XBTZAR",
api_key_id="", # API Key goes here
api_key_secret="", # and API Secret goes here
update_callback=handle_update,
)


if __name__ == "__main__":
asyncio.run(main())
26 changes: 26 additions & 0 deletions luno_python/api_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from collections import namedtuple
from decimal import Decimal
from typing import List

DEC_0 = Decimal("0")

Order = namedtuple("Order", "order_id price volume")

MarketState = namedtuple("MarketState", "sequence asks bids status")

Pair = namedtuple("Pair", "base counter")

def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]):
if not bids or not asks:
return "Empty Orderbook"

bid_sum = sum((o.price * o.volume for o in bids), DEC_0)
ask_sum = sum((o.volume for o in asks), DEC_0)

mid = (asks[0].price + bids[0].price) / 2

return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}"

def format_state(pair: Pair, state: MarketState):
orderbook = format_orderbook(pair, state.asks, state.bids)
return f"[{state.sequence}] {orderbook}"
4 changes: 2 additions & 2 deletions luno_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class Client(BaseClient):
c = Client(api_key_id='key_id', api_key_secret='key_secret')
try:
res = c.get_ticker(pair='XBTZAR')
print res
print(res)
except Exception as e:
print e
print(e)
"""

def cancel_withdrawal(self, id):
Expand Down
203 changes: 203 additions & 0 deletions luno_python/stream_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""
The stream client can be used to receive live updates to the orderbook.
It also maintains a representation of the Luno orderbook correctly updated for each event.

For example usage see examples/stream.py
"""

import asyncio
from decimal import Decimal
import json
from typing import Callable, Dict, List
import websockets

from .api_types import DEC_0, Order, MarketState, Pair

DEFAULT_URL = "wss://ws.luno.com"

StateUpdate = Callable[[Pair, MarketState, dict], None]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the stream_client.py file structure
wc -l luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 92


🏁 Script executed:

# Read the area around line 18 to see the StateUpdate definition
sed -n '1,30p' luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 764


🏁 Script executed:

# Check lines 151 and 157 as mentioned
sed -n '145,165p' luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 561


🏁 Script executed:

# Search for where StateUpdate callbacks are invoked with None
rg -n "StateUpdate|state_update" luno_python/stream_client.py -A 3

Repository: luno/luno-python

Length of output: 659


🏁 Script executed:

# Search for callback invocations with None as third argument
rg -n "callback\(" luno_python/stream_client.py -B 2 -A 2

Repository: luno/luno-python

Length of output: 42


🏁 Script executed:

# Search for all update_f invocations to see all the places where callbacks are called
rg -n "update_f\(" luno_python/stream_client.py -B 1 -A 1

Repository: luno/luno-python

Length of output: 260


🏁 Script executed:

# Also check for update_callback invocations
rg -n "update_callback\(" luno_python/stream_client.py -B 1 -A 1

Repository: luno/luno-python

Length of output: 42


🏁 Script executed:

# Let me see the complete context around the callback invocations
sed -n '135,165p' luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 857


🏁 Script executed:

# Check what imports are at the top of the file
head -20 luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 580


Correct StateUpdate typing to accept None as the third parameter.

The callback's third argument is typed as dict, but None is explicitly passed at line 151 and can be None at line 157 when body is empty. Change the type annotation to Optional[dict]:

Required change
-from typing import Callable, Dict, List
+from typing import Callable, Dict, List, Optional
 
 ...
-StateUpdate = Callable[[Pair, MarketState, dict], None]
+StateUpdate = Callable[[Pair, MarketState, Optional[dict]], None]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` at line 18, Update the StateUpdate type alias
to accept None for the third parameter by changing its annotation from dict to
Optional[dict] and ensure Optional is imported from typing; i.e., modify
StateUpdate = Callable[[Pair, MarketState, dict], None] to StateUpdate =
Callable[[Pair, MarketState, Optional[dict]], None] and add the necessary typing
import so callbacks that receive None (where None is passed) type-check
correctly.


class OutOfOrderMessageException(Exception):
pass

class MarketInitialisationException(Exception):
pass


def _flatten_orders(orders, reverse):
return sorted(orders.values(), key=lambda o: o.price, reverse=reverse)


def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal):
order = orders.pop(order_id, None)
if order is None:
return

new_order = order._replace(volume=order.volume - volume)
if new_order.volume > DEC_0:
orders[order_id] = new_order


class _MarketStreamState:
def __init__(self, first: dict):
if first is None:
raise MarketInitialisationException("Unable to initialise market state from an empty message")

def conv_message(msg):
return Order(
msg['id'],
Decimal(msg['price']),
Decimal(msg['volume']),
)

bids = [conv_message(m) for m in first['bids']]
asks = [conv_message(m) for m in first['asks']]
self._bids = {b.order_id: b for b in bids}
self._asks = {a.order_id: a for a in asks}
self._sequence = first['sequence']
self._trades = []
self._status = first['status']

def get_asks(self):
return _flatten_orders(self._asks, False)

def get_bids(self):
return _flatten_orders(self._bids, True)

def get_status(self):
return self._status

def get_sequence(self):
return self._sequence

def get_snapshot(self):
return MarketState(
sequence=self.get_sequence(),
asks=self.get_asks(),
bids=self.get_bids(),
status=self.get_status(),
)

def process_update(self, update: dict):
if update is None:
return

seq = update['sequence']
if int(seq) != int(self._sequence)+1:
raise OutOfOrderMessageException()

trades = update.get('trade_updates')
if trades:
self._process_trades(trades)

create = update.get('create_update')
if create:
self._process_create(create)

delete_upd = update.get('delete_update')
if delete_upd:
self._process_delete(delete_upd)

status_upd = update.get('status_update')
if status_upd:
self._process_status(status_upd)

self._sequence = seq

def _process_trades(self, trade_updates: List[dict]):
for t in trade_updates:
maker_id = t['maker_order_id']
volume = Decimal(t['base'])

_decrement_trade(self._asks, maker_id, volume)
_decrement_trade(self._bids, maker_id, volume)

def _process_create(self, create_update: dict):
o = Order(
create_update['order_id'],
Decimal(create_update['price']),
Decimal(create_update['volume']),
)
if create_update['type'] == "ASK":
self._asks[o.order_id] = o
elif create_update['type'] == "BID":
self._bids[o.order_id] = o

def _process_delete(self, delete_update: dict):
order_id = delete_update['order_id']
self._asks.pop(order_id, None)
self._bids.pop(order_id, None)

def _process_status(self, status_update: dict):
self._status = status_update['status']


async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
state = None
is_first = True

async for message in ws:
try:
body = json.loads(message)
except ValueError as e:
raise ValueError(f"Invalid JSON received: {message}") from e

if body == "": # Empty update, used as keepalive
body = None

if is_first:
is_first = False
state = _MarketStreamState(body)
update_f(pair, state.get_snapshot(), None)
continue
Comment on lines +145 to +152
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ignore keepalive frames until initial snapshot is received.

If the first frame is a keepalive (""), body becomes None and initialisation fails immediately. Skipping keepalives before initial state avoids false start-up failures.

Proposed guard
         if body == "": # Empty update, used as keepalive
             body = None
+
+        if body is None and is_first:
+            continue
 
         if is_first:
             is_first = False
             state = _MarketStreamState(body)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 145 - 152, The first-frame
keepalive is being treated as an empty body and causing initialization to fail;
change the handling so that if body == "" and is_first is True you skip the
frame (continue) instead of setting body = None and initializing. Concretely, in
the loop containing body, is_first, _MarketStreamState and update_f, move or add
a guard: if body == "" and is_first: continue; then keep the existing behavior
for non-first keepalives (body = None) and the existing is_first initialization
path that constructs _MarketStreamState and calls update_f(pair,
state.get_snapshot(), None).


#could raise OutOfOrderMessageException
state.process_update(body)

update_f(pair, state.get_snapshot(), body)


async def _write_keep_alive(ws):
while True:
await ws.send('""')
await asyncio.sleep(60)


async def stream_market(
pair: str,
api_key_id: str,
api_key_secret: str,
update_callback: StateUpdate,
base_url: str = DEFAULT_URL,
):
"""Opens a stream to /api/1/stream/...

Stream orderbook information and maintain an orderbook state.

:param pair: str Amount to buy or sell in the pair base currency.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Correct the pair docstring description.

The current text describes an amount, not a market pair, which is misleading for users of this API.

Docstring fix
-        :param pair: str Amount to buy or sell in the pair base currency.
+        :param pair: str Currency pair code (for example, "XBTZAR").
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
:param pair: str Amount to buy or sell in the pair base currency.
:param pair: str Currency pair code (for example, "XBTZAR").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` at line 177, The docstring for the parameter
named "pair" currently describes an amount instead of the market pair; update
the docstring in luno_python/stream_client.py for the function/method that
accepts the parameter "pair" so it correctly describes it as the market trading
pair (e.g., "market pair in the format 'BASE/QUOTE' or 'XBTZAR'") and remove the
incorrect "Amount to buy or sell..." wording; locate the docstring containing
":param pair:" and replace the description with a concise, accurate explanation
referencing the expected format and purpose of the pair parameter.

:param api_key_id: str
:param api_key_secret: str
:param update_callback: an StateUpdate function that will be called with new updates.
"""
if len(pair) != 6:
raise ValueError(f"pair must be length 6, got '{pair}'")

p = Pair(pair[:3].upper(), pair[3:].upper())
url = '/'.join([base_url, 'api/1/stream', p.base + p.counter])

async with websockets.connect(
url,
origin='http://localhost/',
ping_interval=None,
max_size=2**21,
) as websocket:

auth = json.dumps({
'api_key_id': api_key_id,
'api_key_secret': api_key_secret,
})
await websocket.send(auth)

async with asyncio.TaskGroup() as tg:
tg.create_task(_read_from_websocket(websocket, p, update_callback))
tg.create_task(_write_keep_alive(websocket))