diff --git a/qpython/__init__.py b/qpython/__init__.py index 65d2678..02a887f 100644 --- a/qpython/__init__.py +++ b/qpython/__init__.py @@ -14,7 +14,12 @@ # limitations under the License. # -__all__ = ['qconnection', 'qtype', 'qtemporal', 'qcollection'] +import sys + +if sys.version_info >= (3, 5): + __all__ = ['qconnection', 'qtype', 'qtemporal', 'qcollection'] +else: + __all__ = ['qconnection', 'qaioconnection', 'qtype', 'qtemporal', 'qcollection'] __version__ = '@@VERSION_PLACEHOLDER@@' diff --git a/qpython/qaioconnection.py b/qpython/qaioconnection.py new file mode 100644 index 0000000..7762dde --- /dev/null +++ b/qpython/qaioconnection.py @@ -0,0 +1,317 @@ +# +# Copyright (c) 2011-2014 Exxeleron GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import struct + + +from qpython.qtype import QException +from qpython.qaioreader import QReader, QReaderException +from qpython.qaiowriter import QWriter, QWriterException +from qpython.qconnection import Connection, QConnectionException, QAuthenticationException, MessageType + + +class QConnection(Connection): + """Connector class for interfacing with the q service. + + Provides methods for synchronous and asynchronous interaction. + + The :class:`.QConnection` class provides a context manager API and can be + used with a ``with`` statement:: + + with qconnection.QConnection(host = 'localhost', port = 5000) as q: + print(q) + print(q('{`int$ til x}', 10)) + + :Parameters: + - `host` (`string`) - q service hostname + - `port` (`integer`) - q service port + - `username` (`string` or `None`) - username for q authentication/authorization + - `password` (`string` or `None`) - password for q authentication/authorization + - `timeout` (`nonnegative float` or `None`) - set a timeout on blocking socket operations + - `encoding` (`string`) - string encoding for data deserialization + - `reader_class` (subclass of `QReader`) - data deserializer + - `writer_class` (subclass of `QWriter`) - data serializer + :Options: + - `raw` (`boolean`) - if ``True`` returns raw data chunk instead of parsed + data, **Default**: ``False`` + - `numpy_temporals` (`boolean`) - if ``False`` temporal vectors are + backed by raw q representation (:class:`.QTemporalList`, + :class:`.QTemporal`) instances, otherwise are represented as + `numpy datetime64`/`timedelta64` arrays and atoms, + **Default**: ``False`` + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, **Default**: ``False`` + """ + + def __init__(self, host, port, username=None, password=None, timeout=None, + encoding='latin-1', reader_class=None, writer_class=None, **options): + self._s_reader = None + self._s_writer = None + + # Todo: Add pands support + self._reader_class = QReader + self._writer_class = QWriter + + if reader_class: + self._reader_class = reader_class + + if writer_class: + self._writer_class = writer_class + + super().__init__(host, port, username, password, timeout, encoding, reader_class, writer_class, **options) + + async def open(self): + """Initialises connection to q service. + + If the connection hasn't been initialised yet, invoking the + :func:`.open` creates a new socket and performs a handshake with a q + service. + + :raises: :class:`.QConnectionException`, :class:`.QAuthenticationException` + """ + if not self._s_writer: + if not self.host: + raise QConnectionException('Host cannot be None') + + await self._init_socket() + await self._initialize() + + self._writer = self._writer_class(self._s_writer, protocol_version=self._protocol_version, + encoding=self._encoding) + self._reader = self._reader_class(self._s_reader, encoding=self._encoding) + + async def _init_socket(self): + """Initialises the socket used for communicating with a q service,""" + try: + self._s_reader, self._s_writer = await asyncio.open_connection(self.host, self.port) + except: + self._s_reader, self._s_writer = None, None + raise + + async def close(self): + """Closes connection with the q service.""" + if self._s_writer: + self._s_writer.close() + await self._s_writer.wait_closed() + + def is_connected(self): + """Checks whether connection with a q service has been established. + + Connection is considered inactive when: + - it has not been initialised, + - it has been closed. + + :returns: `boolean` -- ``True`` if connection has been established, + ``False`` otherwise + """ + return True if self._s_writer else False + + async def _initialize(self): + """Performs a IPC protocol handshake.""" + credentials = (self.username if self.username else '') + ':' + (self.password if self.password else '') + credentials = credentials.encode(self._encoding) + self._s_writer.write(credentials + bytes([self.MAX_PROTOCOL_VERSION, 0])) + await self._s_writer.drain() + response = await self._s_reader.read(1) + + if len(response) != 1: + await self._s_writer.close() + await self._init_socket() + + self._s_writer.write(credentials + b'\0') + await self._s_writer.drain() + response = await self._s_reader.read(1) + if len(response) != 1: + await self.close() + raise QAuthenticationException('Connection denied.') + + self._protocol_version = min(struct.unpack('B', response)[0], self.MAX_PROTOCOL_VERSION) + + async def query(self, msg_type, query, *parameters, **options): + """Performs a query against a q service. + + In typical use case, `query` is the name of the function to call and + `parameters` are its parameters. When `parameters` list is empty, the + query can be an arbitrary q expression (e.g. ``0 +/ til 100``). + + Calls a anonymous function with a single parameter: + + >>> q.query(qconnection.MessageType.SYNC,'{til x}', 10) + + Executes a q expression: + + >>> q.query(qconnection.MessageType.SYNC,'til 10') + + :Parameters: + - `msg_type` (one of the constants defined in :class:`.MessageType`) - + type of the query to be executed + - `query` (`string`) - query to be executed + - `parameters` (`list` or `None`) - parameters for the query + :Options: + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, + **Default**: ``False`` + + :raises: :class:`.QConnectionException`, :class:`.QWriterException` + """ + if not self._s_writer: + raise QConnectionException('Connection is not established.') + + if parameters and len(parameters) > 8: + raise QWriterException('Too many parameters.') + + if not parameters or len(parameters) == 0: + await self._writer.write(query, msg_type, **self._options.union_dict(**options)) + else: + await self._writer.write([query] + list(parameters), msg_type, **self._options.union_dict(**options)) + + async def sendSync(self, query, *parameters, **options): + """Performs a synchronous query against a q service and returns parsed + data. + + In typical use case, `query` is the name of the function to call and + `parameters` are its parameters. When `parameters` list is empty, the + query can be an arbitrary q expression (e.g. ``0 +/ til 100``). + + Executes a q expression: + + >>> print(q.sendSync('til 10')) + [0 1 2 3 4 5 6 7 8 9] + + Executes an anonymous q function with a single parameter: + + >>> print(q.sendSync('{til x}', 10)) + [0 1 2 3 4 5 6 7 8 9] + + Executes an anonymous q function with two parameters: + + >>> print(q.sendSync('{y + til x}', 10, 1)) + [ 1 2 3 4 5 6 7 8 9 10] + + >>> print(q.sendSync('{y + til x}', *[10, 1])) + [ 1 2 3 4 5 6 7 8 9 10] + + The :func:`.sendSync` is called from the overloaded :func:`.__call__` + function. This allows :class:`.QConnection` instance to be called as + a function: + + >>> print(q('{y + til x}', 10, 1)) + [ 1 2 3 4 5 6 7 8 9 10] + + + :Parameters: + - `query` (`string`) - query to be executed + - `parameters` (`list` or `None`) - parameters for the query + :Options: + - `raw` (`boolean`) - if ``True`` returns raw data chunk instead of + parsed data, **Default**: ``False`` + - `numpy_temporals` (`boolean`) - if ``False`` temporal vectors are + backed by raw q representation (:class:`.QTemporalList`, + :class:`.QTemporal`) instances, otherwise are represented as + `numpy datetime64`/`timedelta64` arrays and atoms, + **Default**: ``False`` + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, + **Default**: ``False`` + + :returns: query result parsed to Python data structures + + :raises: :class:`.QConnectionException`, :class:`.QWriterException`, + :class:`.QReaderException` + """ + await self.query(MessageType.SYNC, query, *parameters, **options) + response = await self.receive(data_only=False, **options) + + if response.type == MessageType.RESPONSE: + return response.data + else: + self._writer.write(QException('nyi: qPython expected response message'), MessageType.ASYNC if response.type == MessageType.ASYNC else MessageType.RESPONSE) + raise QReaderException('Received message of type: %s where response was expected') + + async def sendAsync(self, query, *parameters, **options): + """Performs an asynchronous query and returns **without** retrieving of + the response. + + In typical use case, `query` is the name of the function to call and + `parameters` are its parameters. When `parameters` list is empty, the + query can be an arbitrary q expression (e.g. ``0 +/ til 100``). + + Calls a anonymous function with a single parameter: + + >>> q.sendAsync('{til x}', 10) + + Executes a q expression: + + >>> q.sendAsync('til 10') + + :Parameters: + - `query` (`string`) - query to be executed + - `parameters` (`list` or `None`) - parameters for the query + :Options: + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, + **Default**: ``False`` + + :raises: :class:`.QConnectionException`, :class:`.QWriterException` + """ + await self.query(MessageType.ASYNC, query, *parameters, **options) + + async def receive(self, data_only=True, **options): + """Reads and (optionally) parses the response from a q service. + + Retrieves query result along with meta-information: + + >>> q.query(qconnection.MessageType.SYNC,'{x}', 10) + >>> print(q.receive(data_only = False, raw = False)) + QMessage: message type: 2, data size: 13, is_compressed: False, data: 10 + + Retrieves parsed query result: + + >>> q.query(qconnection.MessageType.SYNC,'{x}', 10) + >>> print(q.receive(data_only = True, raw = False)) + 10 + + Retrieves not-parsed (raw) query result: + + >>> from binascii import hexlify + >>> q.query(qconnection.MessageType.SYNC,'{x}', 10) + >>> print(hexlify(q.receive(data_only = True, raw = True))) + fa0a000000 + + :Parameters: + - `data_only` (`boolean`) - if ``True`` returns only data part of the + message, otherwise returns data and message meta-information + encapsulated in :class:`.QMessage` instance + :Options: + - `raw` (`boolean`) - if ``True`` returns raw data chunk instead of + parsed data, **Default**: ``False`` + - `numpy_temporals` (`boolean`) - if ``False`` temporal vectors are + backed by raw q representation (:class:`.QTemporalList`, + :class:`.QTemporal`) instances, otherwise are represented as + `numpy datetime64`/`timedelta64` arrays and atoms, + **Default**: ``False`` + + :returns: depending on parameter flags: :class:`.QMessage` instance, + parsed message, raw data + :raises: :class:`.QReaderException` + """ + result = await self._reader.read(**self._options.union_dict(**options)) + return result.data if data_only else result + + async def __call__(self, *parameters, **options): + result = await self.sendSync(parameters[0], *parameters[1:], **options) + return result diff --git a/qpython/qaioreader.py b/qpython/qaioreader.py new file mode 100644 index 0000000..84715e0 --- /dev/null +++ b/qpython/qaioreader.py @@ -0,0 +1,87 @@ +# +# Copyright (c) 2011-2014 Exxeleron GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from qpython.qtype import * # @UnusedWildImport +from qpython.qreader import Reader, QMessage, QReaderException + +try: + from qpython.fastutils import uncompress +except: + from qpython.utils import uncompress + + +class QReader(Reader): + async def read(self, source=None, **options): + message = await self.read_header(source) + message.data = await self.read_data(message.size, message.compression_mode, **options) + return message + + async def read_header(self, source=None): + if self._stream: + header = await self._read_bytes(8) + self._buffer.wrap(header) + else: + self._buffer.wrap(source) + + self._buffer.endianness = '<' if self._buffer.get_byte() == 1 else '>' + self._is_native = self._buffer.endianness == ('<' if sys.byteorder == 'little' else '>') + message_type = self._buffer.get_byte() + message_compression_mode = self._buffer.get_byte() + message_size_ext = self._buffer.get_byte() + + message_size = self._buffer.get_uint() + message_size += message_size_ext << 32 + return QMessage(None, message_type, message_size, message_compression_mode) + + async def read_data(self, message_size, compression_mode=0, **options): + super().read_data(message_size, compression_mode, **options) + + if compression_mode > 0: + comprHeaderLen = 4 if compression_mode == 1 else 8 + if self._stream: + self._buffer.wrap(await self._read_bytes(comprHeaderLen)) + uncompressed_size = -8 + (self._buffer.get_uint() if compression_mode == 1 else self._buffer.get_long()) + compressed_data = await self._read_bytes(message_size - (8+comprHeaderLen)) if self._stream else self._buffer.raw(message_size - (8+comprHeaderLen)) + + raw_data = numpy.frombuffer(compressed_data, dtype=numpy.uint8) + if uncompressed_size <= 0: + raise QReaderException('Error while data decompression.') + + raw_data = uncompress(raw_data, numpy.int64(uncompressed_size)) + raw_data = numpy.ndarray.tobytes(raw_data) + self._buffer.wrap(raw_data) + elif self._stream: + raw_data = await self._read_bytes(message_size - 8) + self._buffer.wrap(raw_data) + if not self._stream and self._options.raw: + raw_data = self._buffer.raw(message_size - 8) + + return raw_data if self._options.raw else self._read_object() + + async def _read_bytes(self, length): + if not self._stream: + raise QReaderException('There is no input data. QReader requires either stream or data chunk') + + if length == 0: + return b'' + else: + data = await self._stream.read(length) + + if len(data) == 0: + raise QReaderException('Error while reading data') + return data diff --git a/qpython/qaiowriter.py b/qpython/qaiowriter.py new file mode 100644 index 0000000..59207e4 --- /dev/null +++ b/qpython/qaiowriter.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2011-2014 Exxeleron GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +try: + from cStringIO import BytesIO +except ImportError: + from io import BytesIO + +from qpython.qwriter import Writer, QWriterException + + +class QWriter(Writer): + async def write(self, data, msg_type, **options): + super().write(data, msg_type, **options) + # write data to stream + if self._stream: + self._stream.write(self._buffer.getvalue()) + await self._stream.drain() + else: + return self._buffer.getvalue() diff --git a/qpython/qconnection.py b/qpython/qconnection.py index ede969b..be78fb7 100644 --- a/qpython/qconnection.py +++ b/qpython/qconnection.py @@ -17,25 +17,24 @@ import socket import struct +from abc import ABC, abstractmethod + from qpython import MetaData, CONVERSION_OPTIONS from qpython.qtype import QException from qpython.qreader import QReader, QReaderException from qpython.qwriter import QWriter, QWriterException - class QConnectionException(Exception): '''Raised when a connection to the q service cannot be established.''' pass - class QAuthenticationException(QConnectionException): '''Raised when a connection to the q service is denied.''' pass - class MessageType(object): '''Enumeration defining IPC protocol message types.''' ASYNC = 0 @@ -43,8 +42,243 @@ class MessageType(object): RESPONSE = 2 +class Connection(ABC): + MAX_PROTOCOL_VERSION = 6 + + @abstractmethod + def __init__(self, host, port, username=None, password=None, timeout=None, + encoding='latin-1', reader_class=None, writer_class=None, **options): + self.host = host + self.port = port + self.username = username + self.password = password + self.timeout = timeout + + self._encoding = encoding + self._protocol_version = None + + self._writer = None + self._reader = None + + self._options = MetaData(**CONVERSION_OPTIONS.union_dict(**options)) + + pass + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def __str__(self): + return '%s@:%s:%s' % (self.username, self.host, self.port) \ + if self.username \ + else ':%s:%s' % (self.host, self.port) + + @abstractmethod + def __call__(self, *parameters, **options): + pass + + @property + def protocol_version(self): + """Retrieves established version of the IPC protocol. + + :returns: `integer` -- version of the IPC protocol + """ + return self._protocol_version + + @abstractmethod + def open(self): + pass + + @abstractmethod + def _init_socket(self): + """Initialises the socket used for communicating with a q service,""" + pass + + @abstractmethod + def close(self): + pass + + @abstractmethod + def is_connected(self): + """Checks whether connection with a q service has been established. + + Connection is considered inactive when: + - it has not been initialised, + - it has been closed. + + :returns: `boolean` -- ``True`` if connection has been established, + ``False`` otherwise + """ + pass + + @abstractmethod + def _initialize(self): + """Performs a IPC protocol handshake.""" + pass + + @abstractmethod + def query(self, msg_type, query, *parameters, **options): + """Performs a query against a q service. + + In typical use case, `query` is the name of the function to call and + `parameters` are its parameters. When `parameters` list is empty, the + query can be an arbitrary q expression (e.g. ``0 +/ til 100``). + + Calls a anonymous function with a single parameter: + + >>> q.query(qconnection.MessageType.SYNC,'{til x}', 10) + + Executes a q expression: + + >>> q.query(qconnection.MessageType.SYNC,'til 10') + + :Parameters: + - `msg_type` (one of the constants defined in :class:`.MessageType`) - + type of the query to be executed + - `query` (`string`) - query to be executed + - `parameters` (`list` or `None`) - parameters for the query + :Options: + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, + **Default**: ``False`` + + :raises: :class:`.QConnectionException`, :class:`.QWriterException` + """ + pass + + @abstractmethod + def sendSync(self, query, *parameters, **options): + """Performs a synchronous query against a q service and returns parsed + data. + + In typical use case, `query` is the name of the function to call and + `parameters` are its parameters. When `parameters` list is empty, the + query can be an arbitrary q expression (e.g. ``0 +/ til 100``). + + Executes a q expression: + + >>> print(q.sendSync('til 10')) + [0 1 2 3 4 5 6 7 8 9] + + Executes an anonymous q function with a single parameter: + + >>> print(q.sendSync('{til x}', 10)) + [0 1 2 3 4 5 6 7 8 9] + + Executes an anonymous q function with two parameters: + + >>> print(q.sendSync('{y + til x}', 10, 1)) + [ 1 2 3 4 5 6 7 8 9 10] + + >>> print(q.sendSync('{y + til x}', *[10, 1])) + [ 1 2 3 4 5 6 7 8 9 10] + + The :func:`.sendSync` is called from the overloaded :func:`.__call__` + function. This allows :class:`.QConnection` instance to be called as + a function: + + >>> print(q('{y + til x}', 10, 1)) + [ 1 2 3 4 5 6 7 8 9 10] -class QConnection(object): + + :Parameters: + - `query` (`string`) - query to be executed + - `parameters` (`list` or `None`) - parameters for the query + :Options: + - `raw` (`boolean`) - if ``True`` returns raw data chunk instead of + parsed data, **Default**: ``False`` + - `numpy_temporals` (`boolean`) - if ``False`` temporal vectors are + backed by raw q representation (:class:`.QTemporalList`, + :class:`.QTemporal`) instances, otherwise are represented as + `numpy datetime64`/`timedelta64` arrays and atoms, + **Default**: ``False`` + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, + **Default**: ``False`` + + :returns: query result parsed to Python data structures + + :raises: :class:`.QConnectionException`, :class:`.QWriterException`, + :class:`.QReaderException` + """ + pass + + @abstractmethod + def sendAsync(self, query, *parameters, **options): + """Performs an asynchronous query and returns **without** retrieving of + the response. + + In typical use case, `query` is the name of the function to call and + `parameters` are its parameters. When `parameters` list is empty, the + query can be an arbitrary q expression (e.g. ``0 +/ til 100``). + + Calls a anonymous function with a single parameter: + + >>> q.sendAsync('{til x}', 10) + + Executes a q expression: + + >>> q.sendAsync('til 10') + + :Parameters: + - `query` (`string`) - query to be executed + - `parameters` (`list` or `None`) - parameters for the query + :Options: + - `single_char_strings` (`boolean`) - if ``True`` single char Python + strings are encoded as q strings instead of chars, + **Default**: ``False`` + + :raises: :class:`.QConnectionException`, :class:`.QWriterException` + """ + pass + + @abstractmethod + def receive(self, data_only=True, **options): + """Reads and (optionally) parses the response from a q service. + + Retrieves query result along with meta-information: + + >>> q.query(qconnection.MessageType.SYNC,'{x}', 10) + >>> print(q.receive(data_only = False, raw = False)) + QMessage: message type: 2, data size: 13, is_compressed: False, data: 10 + + Retrieves parsed query result: + + >>> q.query(qconnection.MessageType.SYNC,'{x}', 10) + >>> print(q.receive(data_only = True, raw = False)) + 10 + + Retrieves not-parsed (raw) query result: + + >>> from binascii import hexlify + >>> q.query(qconnection.MessageType.SYNC,'{x}', 10) + >>> print(hexlify(q.receive(data_only = True, raw = True))) + fa0a000000 + + :Parameters: + - `data_only` (`boolean`) - if ``True`` returns only data part of the + message, otherwise returns data and message meta-information + encapsulated in :class:`.QMessage` instance + :Options: + - `raw` (`boolean`) - if ``True`` returns raw data chunk instead of + parsed data, **Default**: ``False`` + - `numpy_temporals` (`boolean`) - if ``False`` temporal vectors are + backed by raw q representation (:class:`.QTemporalList`, + :class:`.QTemporal`) instances, otherwise are represented as + `numpy datetime64`/`timedelta64` arrays and atoms, + **Default**: ``False`` + + :returns: depending on parameter flags: :class:`.QMessage` instance, + parsed message, raw data + :raises: :class:`.QReaderException` + """ + pass + + +class QConnection(Connection): '''Connector class for interfacing with the q service. Provides methods for synchronous and asynchronous interaction. @@ -77,23 +311,10 @@ class QConnection(object): strings are encoded as q strings instead of chars, **Default**: ``False`` ''' - MAX_PROTOCOL_VERSION = 6 - - def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, **options): - self.host = host - self.port = port - self.username = username - self.password = password - + def __init__(self, host, port, username=None, password=None, timeout=None, + encoding='latin-1', reader_class=None, writer_class=None, **options): self._connection = None self._connection_file = None - self._protocol_version = None - - self.timeout = timeout - - self._encoding = encoding - - self._options = MetaData(**CONVERSION_OPTIONS.union_dict(**options)) try: from qpython._pandas import PandasQReader, PandasQWriter @@ -109,24 +330,7 @@ def __init__(self, host, port, username = None, password = None, timeout = None, if writer_class: self._writer_class = writer_class - - def __enter__(self): - self.open() - return self - - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - - @property - def protocol_version(self): - '''Retrieves established version of the IPC protocol. - - :returns: `integer` -- version of the IPC protocol - ''' - return self._protocol_version - + super().__init__(host, port, username, password, timeout, encoding, reader_class, writer_class, **options) def open(self): '''Initialises connection to q service. @@ -147,7 +351,6 @@ def open(self): self._writer = self._writer_class(self._connection, protocol_version = self._protocol_version, encoding = self._encoding) self._reader = self._reader_class(self._connection_file, encoding = self._encoding) - def _init_socket(self): '''Initialises the socket used for communicating with a q service,''' try: @@ -160,7 +363,6 @@ def _init_socket(self): self._connection_file = None raise - def close(self): '''Closes connection with the q service.''' if self._connection: @@ -169,7 +371,6 @@ def close(self): self._connection.close() self._connection = None - def is_connected(self): '''Checks whether connection with a q service has been established. @@ -182,7 +383,6 @@ def is_connected(self): ''' return True if self._connection else False - def _initialize(self): '''Performs a IPC protocol handshake.''' credentials = (self.username if self.username else '') + ':' + (self.password if self.password else '') @@ -202,11 +402,6 @@ def _initialize(self): self._protocol_version = min(struct.unpack('B', response)[0], self.MAX_PROTOCOL_VERSION) - - def __str__(self): - return '%s@:%s:%s' % (self.username, self.host, self.port) if self.username else ':%s:%s' % (self.host, self.port) - - def query(self, msg_type, query, *parameters, **options): '''Performs a query against a q service. @@ -245,7 +440,6 @@ def query(self, msg_type, query, *parameters, **options): else: self._writer.write([query] + list(parameters), msg_type, **self._options.union_dict(**options)) - def sendSync(self, query, *parameters, **options): '''Performs a synchronous query against a q service and returns parsed data. @@ -309,7 +503,6 @@ def sendSync(self, query, *parameters, **options): self._writer.write(QException('nyi: qPython expected response message'), MessageType.ASYNC if response.type == MessageType.ASYNC else MessageType.RESPONSE) raise QReaderException('Received message of type: %s where response was expected') - def sendAsync(self, query, *parameters, **options): '''Performs an asynchronous query and returns **without** retrieving of the response. @@ -338,8 +531,7 @@ def sendAsync(self, query, *parameters, **options): ''' self.query(MessageType.ASYNC, query, *parameters, **options) - - def receive(self, data_only = True, **options): + def receive(self, data_only=True, **options): '''Reads and (optionally) parses the response from a q service. Retrieves query result along with meta-information: @@ -381,6 +573,5 @@ def receive(self, data_only = True, **options): result = self._reader.read(**self._options.union_dict(**options)) return result.data if data_only else result - def __call__(self, *parameters, **options): return self.sendSync(parameters[0], *parameters[1:], **options) diff --git a/qpython/qreader.py b/qpython/qreader.py index 32a7eb9..ba20393 100644 --- a/qpython/qreader.py +++ b/qpython/qreader.py @@ -20,6 +20,8 @@ from sys import intern unicode = str +from abc import ABC, abstractmethod + from qpython import MetaData, CONVERSION_OPTIONS from qpython.qtype import * # @UnusedWildImport from qpython.qcollection import qlist, QDictionary, qtable, QTable, QKeyedTable @@ -30,6 +32,7 @@ except: from qpython.utils import uncompress + class QReaderException(Exception): ''' Indicates an error raised during data deserialization. @@ -37,7 +40,6 @@ class QReaderException(Exception): pass - class QMessage(object): ''' Represents a single message parsed from q protocol. @@ -60,38 +62,32 @@ def data(self): def data(self, value): self._data = value - @property def type(self): '''Type of the message.''' return self._type - @property def compression_mode(self): '''Indicates whether source message was compressed.''' return self._compression_mode - @property def size(self): '''Size of the source message.''' return self._size - def __init__(self, data, message_type, message_size, compression_mode): self._data = data self._type = message_type self._size = message_size self._compression_mode = compression_mode - def __str__(self, *args, **kwargs): return 'QMessage: message type: %s, data size: %s, compression_mode: %s, data: %s' % (self._type, self._size, self._compression_mode, self._data) - -class QReader(object): +class Reader(ABC): ''' Provides deserialization from q IPC protocol. @@ -107,14 +103,13 @@ class QReader(object): _reader_map = {} parse = Mapper(_reader_map) - def __init__(self, stream, encoding = 'latin-1'): self._stream = stream self._buffer = QReader.BytesBuffer() self._encoding = encoding - - def read(self, source = None, **options): + @abstractmethod + def read(self, source=None, **options): ''' Reads and optionally parses a single message. @@ -133,12 +128,9 @@ def read(self, source = None, **options): :returns: :class:`.QMessage` - read data (parsed or raw byte form) along with meta information ''' - message = self.read_header(source) - message.data = self.read_data(message.size, message.compression_mode, **options) - - return message - + pass + @abstractmethod def read_header(self, source = None): ''' Reads and parses message header. @@ -152,23 +144,9 @@ def read_header(self, source = None): :returns: :class:`.QMessage` - read meta information ''' - if self._stream: - header = self._read_bytes(8) - self._buffer.wrap(header) - else: - self._buffer.wrap(source) - - self._buffer.endianness = '<' if self._buffer.get_byte() == 1 else '>' - self._is_native = self._buffer.endianness == ('<' if sys.byteorder == 'little' else '>') - message_type = self._buffer.get_byte() - message_compression_mode = self._buffer.get_byte() - message_size_ext = self._buffer.get_byte() - - message_size = self._buffer.get_uint() - message_size += message_size_ext << 32 - return QMessage(None, message_type, message_size, message_compression_mode) - + pass + @abstractmethod def read_data(self, message_size, compression_mode = 0, **options): ''' Reads and optionally parses data part of a message. @@ -192,29 +170,6 @@ def read_data(self, message_size, compression_mode = 0, **options): ''' self._options = MetaData(**CONVERSION_OPTIONS.union_dict(**options)) - if compression_mode > 0: - comprHeaderLen = 4 if compression_mode == 1 else 8 - if self._stream: - self._buffer.wrap(self._read_bytes(comprHeaderLen)) - uncompressed_size = -8 + (self._buffer.get_uint() if compression_mode == 1 else self._buffer.get_long()) - compressed_data = self._read_bytes(message_size - (8+comprHeaderLen)) if self._stream else self._buffer.raw(message_size - (8+comprHeaderLen)) - - raw_data = numpy.frombuffer(compressed_data, dtype = numpy.uint8) - if uncompressed_size <= 0: - raise QReaderException('Error while data decompression.') - - raw_data = uncompress(raw_data, numpy.int64(uncompressed_size)) - raw_data = numpy.ndarray.tobytes(raw_data) - self._buffer.wrap(raw_data) - elif self._stream: - raw_data = self._read_bytes(message_size - 8) - self._buffer.wrap(raw_data) - if not self._stream and self._options.raw: - raw_data = self._buffer.raw(message_size - 8) - - return raw_data if self._options.raw else self._read_object() - - def _read_object(self): qtype = self._buffer.get_byte() @@ -229,38 +184,31 @@ def _read_object(self): raise QReaderException('Unable to deserialize q type: %s' % hex(qtype)) - def _get_reader(self, qtype): return self._reader_map.get(qtype, None) - @parse(QERROR) def _read_error(self, qtype = QERROR): raise QException(self._read_symbol()) - @parse(QSTRING) def _read_string(self, qtype = QSTRING): self._buffer.skip() # ignore attributes length = self._buffer.get_int() return self._buffer.raw(length) if length > 0 else b'' - @parse(QSYMBOL) def _read_symbol(self, qtype = QSYMBOL): return numpy.string_(self._buffer.get_symbol()) - @parse(QCHAR) def _read_char(self, qtype = QCHAR): return chr(self._read_atom(QCHAR)).encode(self._encoding) - @parse(QGUID) def _read_guid(self, qtype = QGUID): return uuid.UUID(bytes = self._buffer.raw(16)) - def _read_atom(self, qtype): try: fmt = STRUCT_MAP[qtype] @@ -269,7 +217,6 @@ def _read_atom(self, qtype): except KeyError: raise QReaderException('Unable to deserialize q type: %s' % hex(qtype)) - @parse(QTIMESPAN, QTIMESTAMP, QTIME, QSECOND, QMINUTE, QDATE, QMONTH, QDATETIME) def _read_temporal(self, qtype): try: @@ -280,7 +227,6 @@ def _read_temporal(self, qtype): except KeyError: raise QReaderException('Unable to deserialize q type: %s' % hex(qtype)) - def _read_list(self, qtype): attr = self._buffer.get_byte() isLongLength = attr & 0x80 != 0 @@ -307,7 +253,6 @@ def _read_list(self, qtype): else: raise QReaderException('Unable to deserialize q type: %s' % hex(qtype)) - @parse(QDICTIONARY) def _read_dictionary(self, qtype = QDICTIONARY): keys = self._read_object() @@ -318,7 +263,6 @@ def _read_dictionary(self, qtype = QDICTIONARY): else: return QDictionary(keys, values) - @parse(QTABLE) def _read_table(self, qtype = QTABLE): self._buffer.skip() # ignore attributes @@ -329,7 +273,6 @@ def _read_table(self, qtype = QTABLE): return qtable(columns, data, qtype = QTABLE) - @parse(QGENERAL_LIST) def _read_general_list(self, qtype = QGENERAL_LIST): self._buffer.skip() # ignore attributes @@ -337,7 +280,6 @@ def _read_general_list(self, qtype = QGENERAL_LIST): return [self._read_object() for x in range(length)] - @parse(QNULL) @parse(QUNARY_FUNC) @parse(QBINARY_FUNC) @@ -346,20 +288,17 @@ def _read_function(self, qtype = QNULL): code = self._buffer.get_byte() return None if qtype == QNULL and code == 0 else QFunction(qtype) - @parse(QLAMBDA) def _read_lambda(self, qtype = QLAMBDA): self._buffer.get_symbol() # skip expression = self._read_object() return QLambda(expression.decode()) - @parse(QCOMPOSITION_FUNC) def _read_function_composition(self, qtype = QCOMPOSITION_FUNC): self._read_projection(qtype) # skip return QFunction(qtype) - @parse(QADVERB_FUNC_106) @parse(QADVERB_FUNC_107) @parse(QADVERB_FUNC_108) @@ -370,28 +309,15 @@ def _read_adverb_function(self, qtype = QADVERB_FUNC_106): self._read_object() # skip return QFunction(qtype) - @parse(QPROJECTION) def _read_projection(self, qtype = QPROJECTION): length = self._buffer.get_int() parameters = [ self._read_object() for x in range(length) ] return QProjection(parameters) - + @abstractmethod def _read_bytes(self, length): - if not self._stream: - raise QReaderException('There is no input data. QReader requires either stream or data chunk') - - if length == 0: - return b'' - else: - data = self._stream.read(length) - - if len(data) == 0: - raise QReaderException('Error while reading data') - return data - - + pass class BytesBuffer(object): ''' @@ -564,3 +490,63 @@ def get_symbols(self, count): return raw.split(b'\x00') +class QReader(Reader): + def read(self, source=None, **options): + message = self.read_header(source) + message.data = self.read_data(message.size, message.compression_mode, **options) + return message + + def read_header(self, source=None): + if self._stream: + header = self._read_bytes(8) + self._buffer.wrap(header) + else: + self._buffer.wrap(source) + + self._buffer.endianness = '<' if self._buffer.get_byte() == 1 else '>' + self._is_native = self._buffer.endianness == ('<' if sys.byteorder == 'little' else '>') + message_type = self._buffer.get_byte() + message_compression_mode = self._buffer.get_byte() + message_size_ext = self._buffer.get_byte() + + message_size = self._buffer.get_uint() + message_size += message_size_ext << 32 + return QMessage(None, message_type, message_size, message_compression_mode) + + def read_data(self, message_size, compression_mode=0, **options): + super().read_data(message_size, compression_mode, **options) + + if compression_mode > 0: + comprHeaderLen = 4 if compression_mode == 1 else 8 + if self._stream: + self._buffer.wrap(self._read_bytes(comprHeaderLen)) + uncompressed_size = -8 + (self._buffer.get_uint() if compression_mode == 1 else self._buffer.get_long()) + compressed_data = self._read_bytes(message_size - (8+comprHeaderLen)) if self._stream else self._buffer.raw(message_size - (8+comprHeaderLen)) + + raw_data = numpy.frombuffer(compressed_data, dtype=numpy.uint8) + if uncompressed_size <= 0: + raise QReaderException('Error while data decompression.') + + raw_data = uncompress(raw_data, numpy.int64(uncompressed_size)) + raw_data = numpy.ndarray.tobytes(raw_data) + self._buffer.wrap(raw_data) + elif self._stream: + raw_data = self._read_bytes(message_size - 8) + self._buffer.wrap(raw_data) + if not self._stream and self._options.raw: + raw_data = self._buffer.raw(message_size - 8) + + return raw_data if self._options.raw else self._read_object() + + def _read_bytes(self, length): + if not self._stream: + raise QReaderException('There is no input data. QReader requires either stream or data chunk') + + if length == 0: + return b'' + else: + data = self._stream.read(length) + + if len(data) == 0: + raise QReaderException('Error while reading data') + return data \ No newline at end of file diff --git a/qpython/qwriter.py b/qpython/qwriter.py index b402a64..df0d565 100644 --- a/qpython/qwriter.py +++ b/qpython/qwriter.py @@ -20,6 +20,8 @@ except ImportError: from io import BytesIO +from abc import ABC, abstractmethod + from qpython import MetaData, CONVERSION_OPTIONS from qpython.qtype import * # @UnusedWildImport from qpython.qcollection import qlist, QList, QTemporalList, QDictionary, QTable, QKeyedTable, get_list_qtype @@ -37,7 +39,7 @@ class QWriterException(Exception): ENDIANNESS = '\1' if sys.byteorder == 'little' else '\0' -class QWriter(object): +class Writer(ABC): ''' Provides serialization to q IPC protocol. @@ -54,13 +56,12 @@ class QWriter(object): _writer_map = {} serialize = Mapper(_writer_map) - - def __init__(self, stream, protocol_version, encoding = 'latin-1'): + def __init__(self, stream, protocol_version, encoding='latin-1'): self._stream = stream self._protocol_version = protocol_version self._encoding = encoding - + @abstractmethod def write(self, data, msg_type, **options): '''Serializes and pushes single data object to a wrapped stream. @@ -90,13 +91,6 @@ def write(self, data, msg_type, **options): self._buffer.seek(4) self._buffer.write(struct.pack('i', data_size)) - # write data to socket - if self._stream: - self._stream.sendall(self._buffer.getvalue()) - else: - return self._buffer.getvalue() - - def _write(self, data): if data is None: self._write_null() @@ -118,15 +112,12 @@ def _write(self, data): else: raise QWriterException('Unable to serialize type: %s' % data.__class__ if isinstance(data, object) else type(data)) - def _get_writer(self, data_type): return self._writer_map.get(data_type, None) - def _write_null(self): self._buffer.write(struct.pack('=bx', QNULL)) - @serialize(Exception) def _write_error(self, data): self._buffer.write(struct.pack('b', QERROR)) @@ -140,7 +131,6 @@ def _write_error(self, data): self._buffer.write(msg.encode(self._encoding)) self._buffer.write(b'\0') - def _write_atom(self, data, qtype): try: self._buffer.write(struct.pack('b', qtype)) @@ -152,14 +142,12 @@ def _write_atom(self, data, qtype): except KeyError: raise QWriterException('Unable to serialize type: %s' % data.__class__ if isinstance(data, object) else type(data)) - @serialize(tuple, list) def _write_generic_list(self, data): self._buffer.write(struct.pack('=bxi', QGENERAL_LIST, len(data))) for element in data: self._write(element) - @serialize(str, bytes) def _write_string(self, data): if not self._options.single_char_strings and len(data) == 1: @@ -171,7 +159,6 @@ def _write_string(self, data): else: self._buffer.write(data) - @serialize(numpy.string_) def _write_symbol(self, data): self._buffer.write(struct.pack('=b', QSYMBOL)) @@ -179,7 +166,6 @@ def _write_symbol(self, data): self._buffer.write(data) self._buffer.write(b'\0') - @serialize(uuid.UUID) def _write_guid(self, data): if self._protocol_version < 3: @@ -188,7 +174,6 @@ def _write_guid(self, data): self._buffer.write(struct.pack('=b', QGUID)) self._buffer.write(data.bytes) - @serialize(QTemporal) def _write_temporal(self, data): try: @@ -201,7 +186,6 @@ def _write_temporal(self, data): except KeyError: raise QWriterException('Unable to serialize type: %s' % type(data)) - @serialize(numpy.datetime64, numpy.timedelta64) def _write_numpy_temporal(self, data): try: @@ -216,28 +200,24 @@ def _write_numpy_temporal(self, data): except KeyError: raise QWriterException('Unable to serialize type: %s' % data.dtype) - @serialize(QLambda) def _write_lambda(self, data): self._buffer.write(struct.pack('=b', QLAMBDA)) self._buffer.write(b'\0') self._write_string(data.expression) - @serialize(QProjection) def _write_projection(self, data): self._buffer.write(struct.pack('=bi', QPROJECTION, len(data.parameters))) for parameter in data.parameters: self._write(parameter) - @serialize(QDictionary, QKeyedTable) def _write_dictionary(self, data): self._buffer.write(struct.pack('=b', QDICTIONARY)) self._write(data.keys) self._write(data.values) - @serialize(QTable) def _write_table(self, data): self._buffer.write(struct.pack('=bxb', QTABLE, QDICTIONARY)) @@ -246,7 +226,6 @@ def _write_table(self, data): for column in data.dtype.names: self._write_list(data[column], data.meta[column]) - @serialize(numpy.ndarray, QList, QTemporalList) def _write_list(self, data, qtype = None): if qtype is not None: @@ -282,3 +261,13 @@ def _write_list(self, data, qtype = None): else: self._buffer.write(data.tobytes()) + +class QWriter(Writer): + def write(self, data, msg_type, **options): + super().write(data, msg_type, **options) + # write data to socket + if self._stream: + self._stream.sendall(self._buffer.getvalue()) + else: + return self._buffer.getvalue() + diff --git a/samples/asyncio_query.py b/samples/asyncio_query.py new file mode 100644 index 0000000..3f39001 --- /dev/null +++ b/samples/asyncio_query.py @@ -0,0 +1,54 @@ +from qpython import qaioconnection +import asyncio +import time + + +async def kdb_query(kdb_port, query): + start_time = time.time() + # create connection object + q = qaioconnection.QConnection(host='localhost', port=kdb_port) + # initialize connection + await q.open() + + print(f"{q} Initialised") + print(f"IPC version: {q.protocol_version}. Port: {kdb_port} Is connected: {q.is_connected()}\n") + + data = await q.sendSync(query) + print(f"{q} Returned {data}") + + await q.close() + print(f"{q} Closed") + end_time = time.time() + print(f"{q} Duration:{end_time - start_time}\n") + + +async def wrapper(kdb_port, work_queue): + while not work_queue.empty(): + query = await work_queue.get() + await kdb_query(kdb_port, query) + + +async def main(): + start_time = time.time() + work_queue = asyncio.Queue() + + for query in [ + 'system"sleep 4"; 1', + 'system"sleep 3"; 2', + 'system"sleep 3"; 3', + 'system"sleep 1"; 4', + ]: + await work_queue.put(query) + + # Run the tasks + await asyncio.gather( + asyncio.create_task(wrapper(5000, work_queue)), + asyncio.create_task(wrapper(5001, work_queue)), + ) + + end_time = time.time() + print(f"Total Duration: {end_time - start_time}") + + +if __name__ == '__main__': + asyncio.run(main())