Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions splitio/push/splitsse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
class SplitSSEClient(object):
"""Split streaming endpoint SSE client."""

KEEPALIVE_TIMEOUT = 70

class _Status(Enum):
IDLE = 0
CONNECTING = 1
Expand Down Expand Up @@ -105,13 +107,13 @@ def start(self, token):
def connect(url):
"""Connect to sse in a blocking manner."""
try:
self._client.start(url)
self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT)
finally:
self._sse_connection_closed.set()
self._status = SplitSSEClient._Status.IDLE

url = self._build_url(token)
task = threading.Thread(target=connect, args=(url,), name='SSeConnection')
task = threading.Thread(target=connect, name='SSEConnection', args=(url,))
task.setDaemon(True)
task.start()
event_group.wait()
Expand Down
29 changes: 16 additions & 13 deletions splitio/push/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(self, callback):
:param callback: function to call when an event is received
:type callback: callable
"""
self._connection = None
self._conn = None
self._event_callback = callback
self._shutdown_requested = False

Expand All @@ -102,12 +102,11 @@ def _read_events(self):
:rtype: bool
"""
try:
response = self._connection.getresponse()
response = self._conn.getresponse()
event_builder = EventBuilder()
while True:
line = _http_response_readline(response)
if line is None or len(line) <= 0: # connection ended
_LOGGER.info("sse connection has ended.")
break
elif line.startswith(b':'): # comment. Skip
_LOGGER.debug("skipping sse comment")
Expand All @@ -120,15 +119,15 @@ def _read_events(self):
else:
event_builder.process_line(line)
except Exception: #pylint:disable=broad-except
_LOGGER.info('sse connection ended.')
_LOGGER.debug('sse connection ended.')
_LOGGER.debug('stack trace: ', exc_info=True)
finally:
self._connection.close()
self._connection = None # clear so it can be started again
self._conn.close()
self._conn = None # clear so it can be started again

return self._shutdown_requested

def start(self, url, extra_headers=None): #pylint:disable=dangerous-default-value
def start(self, url, extra_headers=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): #pylint:disable=protected-access
"""
Connect and start listening for events.

Expand All @@ -138,25 +137,29 @@ def start(self, url, extra_headers=None): #pylint:disable=dangerous-default-val
:param extra_headers: additional headers
:type extra_headers: dict[str, str]

:param timeout: connection & read timeout
:type timeout: float

:returns: True if the connection was ended by us. False if it was closed by the serve.
:rtype: bool
"""
if self._connection is not None:
if self._conn is not None:
raise RuntimeError('Client already started.')

self._shutdown_requested = False
url = urlparse(url)
headers = self._DEFAULT_HEADERS.copy()
headers.update(extra_headers if extra_headers is not None else {})
self._connection = HTTPSConnection(url.hostname, url.port) if url.scheme == 'https' \
else HTTPConnection(url.hostname, port=url.port)
self._conn = (HTTPSConnection(url.hostname, url.port, timeout=timeout)
if url.scheme == 'https'
else HTTPConnection(url.hostname, port=url.port, timeout=timeout))

self._connection.request('GET', '%s?%s' % (url.path, url.query), headers=headers)
self._conn.request('GET', '%s?%s' % (url.path, url.query), headers=headers)
return self._read_events()

def shutdown(self):
"""Shutdown the current connection."""
if self._connection is None:
if self._conn is None:
_LOGGER.warn("no sse connection has been started on this SSEClient instance. Ignoring")
return

Expand All @@ -165,4 +168,4 @@ def shutdown(self):
return

self._shutdown_requested = True
self._connection.sock.shutdown(socket.SHUT_RDWR)
self._conn.sock.shutdown(socket.SHUT_RDWR)
6 changes: 3 additions & 3 deletions tests/push/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def runner():
SSEEvent('4', 'message', None, 'ghi')
]

assert client._connection is None
assert client._conn is None
server.publish(server.GRACEFUL_REQUEST_END)
server.stop()

Expand Down Expand Up @@ -87,7 +87,7 @@ def runner():
SSEEvent('4', 'message', None, 'ghi')
]

assert client._connection is None
assert client._conn is None

def test_sse_server_disconnects_abruptly(self):
"""Test correct initialization. Server ends connection."""
Expand Down Expand Up @@ -125,4 +125,4 @@ def runner():
SSEEvent('4', 'message', None, 'ghi')
]

assert client._connection is None
assert client._conn is None