From 6ca558f696c4f0a5390d02dd1cb82e62864b2ad0 Mon Sep 17 00:00:00 2001 From: Martin Redolatti Date: Tue, 27 Oct 2020 16:57:21 -0300 Subject: [PATCH] enable keepalive timeouts --- splitio/push/splitsse.py | 6 ++++-- splitio/push/sse.py | 29 ++++++++++++++++------------- tests/push/test_sse.py | 6 +++--- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/splitio/push/splitsse.py b/splitio/push/splitsse.py index 4f8cb3f9..cded5756 100644 --- a/splitio/push/splitsse.py +++ b/splitio/push/splitsse.py @@ -13,6 +13,8 @@ class SplitSSEClient(object): """Split streaming endpoint SSE client.""" + KEEPALIVE_TIMEOUT = 70 + class _Status(Enum): IDLE = 0 CONNECTING = 1 @@ -105,13 +107,13 @@ def start(self, token): def connect(url): """Connect to sse in a blocking manner.""" try: - self._client.start(url) + self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT) finally: self._sse_connection_closed.set() self._status = SplitSSEClient._Status.IDLE url = self._build_url(token) - task = threading.Thread(target=connect, args=(url,), name='SSeConnection') + task = threading.Thread(target=connect, name='SSEConnection', args=(url,)) task.setDaemon(True) task.start() event_group.wait() diff --git a/splitio/push/sse.py b/splitio/push/sse.py index e7044e84..7e051a22 100644 --- a/splitio/push/sse.py +++ b/splitio/push/sse.py @@ -90,7 +90,7 @@ def __init__(self, callback): :param callback: function to call when an event is received :type callback: callable """ - self._connection = None + self._conn = None self._event_callback = callback self._shutdown_requested = False @@ -102,12 +102,11 @@ def _read_events(self): :rtype: bool """ try: - response = self._connection.getresponse() + response = self._conn.getresponse() event_builder = EventBuilder() while True: line = _http_response_readline(response) if line is None or len(line) <= 0: # connection ended - _LOGGER.info("sse connection has ended.") break elif line.startswith(b':'): # comment. Skip _LOGGER.debug("skipping sse comment") @@ -120,15 +119,15 @@ def _read_events(self): else: event_builder.process_line(line) except Exception: #pylint:disable=broad-except - _LOGGER.info('sse connection ended.') + _LOGGER.debug('sse connection ended.') _LOGGER.debug('stack trace: ', exc_info=True) finally: - self._connection.close() - self._connection = None # clear so it can be started again + self._conn.close() + self._conn = None # clear so it can be started again return self._shutdown_requested - def start(self, url, extra_headers=None): #pylint:disable=dangerous-default-value + def start(self, url, extra_headers=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): #pylint:disable=protected-access """ Connect and start listening for events. @@ -138,25 +137,29 @@ def start(self, url, extra_headers=None): #pylint:disable=dangerous-default-val :param extra_headers: additional headers :type extra_headers: dict[str, str] + :param timeout: connection & read timeout + :type timeout: float + :returns: True if the connection was ended by us. False if it was closed by the serve. :rtype: bool """ - if self._connection is not None: + if self._conn is not None: raise RuntimeError('Client already started.') self._shutdown_requested = False url = urlparse(url) headers = self._DEFAULT_HEADERS.copy() headers.update(extra_headers if extra_headers is not None else {}) - self._connection = HTTPSConnection(url.hostname, url.port) if url.scheme == 'https' \ - else HTTPConnection(url.hostname, port=url.port) + self._conn = (HTTPSConnection(url.hostname, url.port, timeout=timeout) + if url.scheme == 'https' + else HTTPConnection(url.hostname, port=url.port, timeout=timeout)) - self._connection.request('GET', '%s?%s' % (url.path, url.query), headers=headers) + self._conn.request('GET', '%s?%s' % (url.path, url.query), headers=headers) return self._read_events() def shutdown(self): """Shutdown the current connection.""" - if self._connection is None: + if self._conn is None: _LOGGER.warn("no sse connection has been started on this SSEClient instance. Ignoring") return @@ -165,4 +168,4 @@ def shutdown(self): return self._shutdown_requested = True - self._connection.sock.shutdown(socket.SHUT_RDWR) + self._conn.sock.shutdown(socket.SHUT_RDWR) diff --git a/tests/push/test_sse.py b/tests/push/test_sse.py index 928fefbf..e27fe13a 100644 --- a/tests/push/test_sse.py +++ b/tests/push/test_sse.py @@ -47,7 +47,7 @@ def runner(): SSEEvent('4', 'message', None, 'ghi') ] - assert client._connection is None + assert client._conn is None server.publish(server.GRACEFUL_REQUEST_END) server.stop() @@ -87,7 +87,7 @@ def runner(): SSEEvent('4', 'message', None, 'ghi') ] - assert client._connection is None + assert client._conn is None def test_sse_server_disconnects_abruptly(self): """Test correct initialization. Server ends connection.""" @@ -125,4 +125,4 @@ def runner(): SSEEvent('4', 'message', None, 'ghi') ] - assert client._connection is None + assert client._conn is None