Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion splitio/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def get(self, server, path, apikey, query=None, extra_headers=None): #pylint: d
:rtype: HttpResponse
"""
headers = self._build_basic_headers(apikey)

if extra_headers is not None:
headers.update(extra_headers)

Expand Down
Empty file added splitio/push/__init__.py
Empty file.
126 changes: 126 additions & 0 deletions splitio/push/splitsse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""An SSE client wrapper to be used with split endpoint."""
import logging
import threading
from enum import Enum
import six
from splitio.push.sse import SSEClient, SSE_EVENT_ERROR
from splitio.util.threadutil import EventGroup


_LOGGER = logging.getLogger(__name__)


class SplitSSEClient(object):
"""Split streaming endpoint SSE client."""

class _Status(Enum):
IDLE = 0
CONNECTING = 1
ERRORED = 2
CONNECTED = 3

def __init__(self, callback, base_url='https://streaming.split.io'):
"""
Construct a split sse client.

:param callback: fuction to call when an event is received.
:type callback: callable

:param base_url: scheme + :// + host
:type base_url: str
"""
self._client = SSEClient(self._raw_event_handler)
self._callback = callback
self._base_url = base_url
self._status = SplitSSEClient._Status.IDLE
self._sse_first_event = None
self._sse_connection_closed = None

def _raw_event_handler(self, event):
"""
Handle incoming raw sse event.

:param event: Incoming raw sse event.
:type event: splitio.push.sse.SSEEvent
"""
if self._status == SplitSSEClient._Status.CONNECTING:
self._status = SplitSSEClient._Status.CONNECTED if event.event != SSE_EVENT_ERROR \
else SplitSSEClient._Status.ERRORED
self._sse_first_event.set()

if event.data is not None:
self._callback(event)

@staticmethod
def _format_channels(channels):
"""
Format channels into a list from the raw object retrieved in the token.

:param channels: object as extracted from the JWT capabilities.
:type channels: dict[str,list[str]]

:returns: channels as a list of strings.
:rtype: list[str]
"""
regular = [k for (k, v) in six.iteritems(channels) if v == ['subscribe']]
occupancy = ['[?occupancy=metrics.publishers]' + k
for (k, v) in six.iteritems(channels)
if 'channel-metadata:publishers' in v]
return regular + occupancy

def _build_url(self, token):
"""
Build the url to connect to and return it as a string.

:param token: (parsed) JWT
:type token: splitio.models.token.Token

:returns: true if the connection was successful. False otherwise.
:rtype: bool
"""
return '{base}/event-stream?v=1.1&accessToken={token}&channels={channels}'.format(
base=self._base_url,
token=token.token,
channels=','.join(self._format_channels(token.channels)))

def start(self, token):
"""
Open a connection to start listening for events.

:param token: (parsed) JWT
:type token: splitio.models.token.Token

:returns: true if the connection was successful. False otherwise.
:rtype: bool
"""
if self._status != SplitSSEClient._Status.IDLE:
raise Exception('SseClient already started.')

self._status = SplitSSEClient._Status.CONNECTING

event_group = EventGroup()
self._sse_first_event = event_group.make_event()
self._sse_connection_closed = event_group.make_event()

def connect(url):
"""Connect to sse in a blocking manner."""
try:
self._client.start(url)
finally:
self._sse_connection_closed.set()
self._status = SplitSSEClient._Status.IDLE

url = self._build_url(token)
task = threading.Thread(target=connect, args=(url,))
task.setDaemon(True)
task.start()
event_group.wait()
return self._status == SplitSSEClient._Status.CONNECTED

def stop(self, blocking=False, timeout=None):
"""Abort the ongoing connection."""
if self._status == SplitSSEClient._Status.IDLE:
raise Exception('SseClient not running')
self._client.shutdown()
if blocking:
self._sse_connection_closed.wait(timeout)
167 changes: 167 additions & 0 deletions splitio/push/sse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Low-level SSE Client."""
import logging
import socket
import sys
from collections import namedtuple

try: # try to import python3 names. fallback to python2
from http.client import HTTPConnection, HTTPSConnection
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from httplib import HTTPConnection, HTTPSConnection


_LOGGER = logging.getLogger(__name__)


SSE_EVENT_ERROR = 'error'
SSE_EVENT_MESSAGE = 'message'


SSEEvent = namedtuple('SSEEvent', ['event_id', 'event', 'retry', 'data'])


__ENDING_CHARS = set(['\n', ''])
def __httpresponse_readline_py2(response):
"""
Hacky `readline` implementation to be used with chunked transfers in python2.

This makes syscalls in a loop, so not particularly efficient. Migrate to py3 now!

:param response: HTTPConnection's response after a .request() call
:type response: httplib.HTTPResponse

:returns: a string with the read line
:rtype: str
"""
buf = []
while True:
read = response.read(1)
buf.append(read)
if read in __ENDING_CHARS:
break

return ''.join(buf)


_http_response_readline = (__httpresponse_readline_py2 if sys.version_info.major <= 2 #pylint:disable=invalid-name
else lambda response: response.readline())


class EventBuilder(object):
"""Event builder class."""

_SEPARATOR = b':'

def __init__(self):
"""Construct a builder."""
self._lines = {}

def process_line(self, line):
"""
Process a new line.

:param line: Line to process
:type line: bytes
"""
try:
key, val = line.split(self._SEPARATOR, 1)
self._lines[key.decode('utf8').strip()] = val.decode('utf8').strip()
except ValueError: # key without a value
self._lines[line.decode('utf8').strip()] = None

def build(self):
"""Construct an event with relevant fields."""
return SSEEvent(self._lines.get('id'), self._lines.get('event'),
self._lines.get('retry'), self._lines.get('data'))


class SSEClient(object):
"""SSE Client implementation."""

_DEFAULT_HEADERS = {'Accept': 'text/event-stream'}
_EVENT_SEPARATORS = set([b'\n', b'\r\n'])

def __init__(self, callback):
"""
Construct an SSE client.

:param callback: function to call when an event is received
:type callback: callable
"""
self._connection = None
self._event_callback = callback
self._shutdown_requested = False

def _read_events(self):
"""
Read events from the supplied connection.

:returns: True if the connection was ended by us. False if it was closed by the serve.
:rtype: bool
"""
try:
response = self._connection.getresponse()
event_builder = EventBuilder()
while True:
line = _http_response_readline(response)
if line is None or len(line) <= 0: # connection ended
_LOGGER.info("sse connection has ended.")
break
elif line.startswith(b':'): # comment. Skip
_LOGGER.debug("skipping sse comment")
continue
elif line in self._EVENT_SEPARATORS:
event = event_builder.build()
_LOGGER.debug("dispatching event: %s", event)
self._event_callback(event)
event_builder = EventBuilder()
else:
event_builder.process_line(line)
except Exception: #pylint:disable=broad-except
_LOGGER.info('sse connection ended.')
_LOGGER.debug('stack trace: ', exc_info=True)
finally:
self._connection.close()
self._connection = None # clear so it can be started again

return self._shutdown_requested

def start(self, url, extra_headers=None): #pylint:disable=dangerous-default-value
"""
Connect and start listening for events.

:param url: url to connect to
:type url: str

:param extra_headers: additional headers
:type extra_headers: dict[str, str]

:returns: True if the connection was ended by us. False if it was closed by the serve.
:rtype: bool
"""
if self._connection is not None:
raise RuntimeError('Client already started.')

url = urlparse(url)
headers = self._DEFAULT_HEADERS.copy()
headers.update(extra_headers if extra_headers is not None else {})
self._connection = HTTPSConnection(url.hostname, url.port) if url.scheme == 'https' \
else HTTPConnection(url.hostname, port=url.port)

self._connection.request('GET', '%s?%s' % (url.path, url.query), headers=headers)
return self._read_events()

def shutdown(self):
"""Shutdown the current connection."""
if self._connection is None:
_LOGGER.warn("no sse connection has been started on this SSEClient instance. Ignoring")
return

if self._shutdown_requested:
_LOGGER.warn("shutdown already requested")
return

self._shutdown_requested = True
self._connection.sock.shutdown(socket.SHUT_RDWR)
Empty file added splitio/util/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions splitio/util/threadutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Threading utilities."""
from inspect import isclass
import threading


# python2 workaround
_EventClass = threading.Event if isclass(threading.Event) else threading._Event #pylint:disable=protected-access,invalid-name


class EventGroup(object):
"""EventGroup that can be waited with an OR condition."""

class Event(_EventClass): #pylint:disable=too-few-public-methods
"""Threading event meant to be used in an group."""

def __init__(self, shared_condition):
"""
Construct an event.

:param shared_condition: shared condition varaible.
:type shared_condition: threading.Condition
"""
_EventClass.__init__(self)
self._shared_cond = shared_condition

def set(self):
"""Set the event."""
_EventClass.set(self)
with self._shared_cond:
self._shared_cond.notify()

def __init__(self):
"""Construct an event group."""
self._cond = threading.Condition()

def make_event(self):
"""
Make a new event associated to this waitable group.

:returns: an event that can be awaited as part of a group
:rtype: EventGroup.Event
"""
return EventGroup.Event(self._cond)

def wait(self, timeout=None):
"""
Wait until one of the events is triggered.

:param timeout: how many seconds to wait. None means forever.
:type timeout: int

:returns: True if the condition was notified within the specified timeout. False otherwise.
:rtype: bool
"""
with self._cond:
return self._cond.wait(timeout)
5 changes: 1 addition & 4 deletions tests/models/test_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@

class TokenTests(object):
"""Token model tests."""
raw_false = {
'pushEnabled': False,
'token': 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk56TTJNREk1TXpjMF9NVGd5TlRnMU1UZ3dOZz09X3NlZ21lbnRzXCI6W1wic3Vic2NyaWJlXCJdLFwiTnpNMk1ESTVNemMwX01UZ3lOVGcxTVRnd05nPT1fc3BsaXRzXCI6W1wic3Vic2NyaWJlXCJdLFwiY29udHJvbF9wcmlcIjpbXCJzdWJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRhdGE6cHVibGlzaGVyc1wiXSxcImNvbnRyb2xfc2VjXCI6W1wic3Vic2NyaWJlXCIsXCJjaGFubmVsLW1ldGFkYXRhOnB1Ymxpc2hlcnNcIl19IiwieC1hYmx5LWNsaWVudElkIjoiY2xpZW50SWQiLCJleHAiOjE2MDIwODgxMjcsImlhdCI6MTYwMjA4NDUyN30.5_MjWonhs6yoFhw44hNJm3H7_YMjXpSW105DwjjppqE',
}
raw_false = {'pushEnabled': False}

def test_from_raw_false(self):
"""Test token model parsing."""
Expand Down
Empty file added tests/push/__init__.py
Empty file.
Loading