class StreamScope(_object)Determines the scope of a stream's durability, if any.
Example:
scope = tbapi.StreamScope('TRANSIENT')Possible values:
DURABLE, EXTERNAL_FILE, TRANSIENT, RUNTIME
class WriteMode(_object)APPEND: Adds only new data into a stream without truncations. REPLACE: Adds data into a stream and removes previous data older that first message time [truncate(first message time + 1)]. REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. [truncate(first message time)]. TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time.
Example:
mode = tbapi.StreamScope('TRUNCATE')Possible values:
APPEND, REPLACE, REWRITE, TRUNCATE
class SelectionOptions(_object)Options for selecting data from a stream.
Example:
so = tbapi.SelectionOptions() so._from = 0 so.to = 100000 so.useCompression = False ...
class LoadingOptions(_object)Options for loading data into a stream.
Example:
lo = tbapi.LoadingOptions() lo.writeMode = tbapi.WriteMode('TRUNCATE') so.space = 'myspace' ...
class StreamOptions(_object)Stream definition attributes.
Example:
so = tbapi.StreamOptions() so.name = key so.description = key so.scope = tbapi.StreamScope('DURABLE') so.distributionFactor = 1 so.highAvailability = False so.polymorphic = False so.metadata = schema db.createStream(key, options)
def name(name: str = None) -> NoneOptional user-readable name.
def description(description: str = None) -> NoneOptional multi-line description.
def owner(owner: str = None) -> NoneOptional owner of stream. During stream creation it will be set equals to authenticated user name.
def location(location: str = None) -> NoneLocation of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)
def distributionRuleName(distributionRuleName: str = None) -> NoneClass name of the distribution rule
def metadata(metadata: str = None) -> NoneStream metadata in XML format. To build metadata programatically, use tbapi.SchemaDef class.
class QueryParameter(_object)Input parameter definition for a prepared statement.
class TickDb(_object)The top-level implementation to the methods of the Deltix Tick Database engine. Instances of this class are created by static method createFromUrl:
db = tbapi.TickDb.createFromUrl('dxtick://localhost:8011')or
db = tbapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password')
@staticmethod
def createFromUrl(url: str, user: str = None, password: str = None) -> "TickDb"Creates a new database instance with the specified root folder, or URL.
Arguments:
urlstr - Connection URL.userstr - User.passwordstr - Password.Returns:
TickDb- An un-opened TickDB instance.
@staticmethod
@contextmanager
def openFromUrl(url: str, readonly: bool, user: str = None, password: str = None)Creates a new database instance with the specified root folder, or URL, and opens it.
Arguments:
urlstr - Connection URL.readonlybool - Open data store in read-only mode.userstr - User.passwordstr - Password.Returns:
TickDb- An opened TickDB instance.
def isReadOnly() -> boolDetermines whether the store is open as read-only.
def isOpen() -> boolDetermines whether the store is open.
def open(readOnlyMode: bool) -> boolOpen the data store.
Arguments:
readOnlyModebool - Open data store in read-only mode.
def close() -> NoneCloses data store.
def format() -> boolCreate a new object on disk and format internally. The data store is left open for read-write at the end of this method.
def listStreams() -> 'list[TickStream]'Enumerates existing streams.
Returns:
list[TickStream]- An array of existing stream objects.
def getStream(key: str) -> 'TickStream'Looks up an existing stream by key.
Arguments:
keystr - Identifies the stream.Returns:
TickStream- A stream object, or None if the key was not found.
def createStream(key: str, options: StreamOptions) -> 'TickStream'Creates a new stream within the database.
Arguments:
keystr - A required key later used to identify the stream.optionsStreamOptions - Options for creating the stream.Returns:
TickStream- A new instance of TickStream.
def createFileStream(key: str, dataFile: str) -> 'TickStream'Creates a new stream mount to the given data file.
Arguments:
keystr - A required key later used to identify the stream.dataFilestr - Path to the data file (on server side).Returns:
TickStream- A new instance of TickStream.
def createCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'Opens an initially empty cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact timestamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
The cursor is returned initially empty and must be reset. The TickCursor class provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
streamTickStream - Stream from which data will be selected.optionsSelectionOptions - Selection options.Returns:
TickCursor- A cursor used to read messages.
@contextmanager
def tryCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'contextmanager version of createCursor. Usage:
with db.newCursor(stream, options) as cursor: while cursor.next(): message = cursor.getMessage()
def select(timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor'Opens a cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor clsas provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
timestampint - The start timestamp in millis.streamslist[TickStream] - Streams from which data will be selected.optionsSelectionOptions - Selection options.typeslist[str] - Specified message types to be subscribed. If null, then all types will be subscribed.entitieslist[str] - Specified entities to be subscribed. If null, then all entities will be subscribed.Returns:
TickCursor- A cursor used to read messages.
@contextmanager
def trySelect(timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor'Contextmanager version of select. Usage:
with db.newSelect(timestamp, streams, options, types, entities) as cursor: while cursor.next(): message = cursor.getMessage()
def createLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'Creates a channel for loading data. The loader must be closed when the loading process is finished.
Arguments:
streamTickStream - stream for loading data.optionsSelectionOptions - Loading Options.Returns:
TickLoader- created loader.
@contextmanager
def tryLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'Contextmanager version of createLoader. Usage:
with db.newLoader(stream, options) as loader: loader.send(message)
def executeQuery(query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor'Execute Query and creates a message source for reading data from it, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Arguments:
querystr - Query text element.optionsSelectionOptions - Selection options.timestampint - The start timestamp in millis.entitieslist[str] - Specified entities to be subscribed. If null, then all entities will be subscribed.paramslist[QueryParameter] - The parameter values of the query.Returns:
TickCursor- An iterable message source to read messages.
@contextmanager
def tryExecuteQuery(query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor'Contextmanager version of executeQuery. Usage:
with db.newExecuteQuery('select * from stream') as cursor: while cursor.next(): message = cursor.getMessage()
class TickStream(_object)The stream is a time series of messages for a number of financial instruments ('entities'). Messages can be price bars, trade ticks, bid/offer ticks, or any of the many more built-in and user-defined types. In the simplest case, a database will have a single stream of data. Multiple streams can be used to represent data of different frequencies, or completely different factors. For instance, separate streams can represent 1-minute price bars and ticks for the same set of entities. Or, you can have price bars and volatility bars in separate streams.
Get stream:
stream = tickdb.getStream('stream_key')List stream:
streams = tickdb.listStreams()
def key() -> strReturns the key, which uniquely identifies the stream within its database.
def name() -> strReturns a user-readable short name.
def distributionFactor() -> intReturns the target number of files to be used for storing data.
def description() -> strReturns a user-readable multi-line description.
def owner() -> strReturns stream owner.
def location() -> strReturns stream location.
def metadata() -> strReturns stream schema (in xml format).
def scope() -> StreamScopeReturns stream schema (in xml format).
def highAvailability() -> boolReturns stream memory caching parameter. High availability durable streams are cached on startup.
def unique() -> boolUnique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.
def polymorphic() -> boolReturns whether the stream is configured as polymorphic.
def periodicity() -> strReturns Stream periodicity, if known.
def options() -> StreamOptionsReturns stream options object.
def describe() -> strReturns stream DDL description.
def setSchema(options: StreamOptions) -> boolChanges stream schema.
Arguments:
optionsStreamOptions - Stream options, that contains new schema xml.Returns
bool- True, if schema was changed successfully.
def listEntities() -> 'list[str]'Return an inclusive range of times for which the specified entities have data in the database.
Returns:
list[str]- selected entities.
def truncate(timestamp: int, entities: 'list[str]' = None) -> boolTruncates stream data for the given entities from given time
Arguments:
timestampint - Timestamp in millis. If time less than stream start time, then all stream data will be deleted.entitieslist[str] - A list of entities. If None, all stream entities will be used.Returns:
bool- true, if stream was truncated successfully.
def clear(entities: 'list[str]' = None) -> boolClear stream data for the given entities.
Arguments:
entitieslist[str] - A list of entities. If None, all stream entities will be used.
def purge(timestamp: int) -> boolDeletes stream data that is older than a specified time
Arguments:
timestamp (int):Purge time in milliseconds.
Returns:
bool- true, if stream was purged successfully.
def deleteStream() -> boolDeletes this stream
Returns:
bool- true, if stream was deleted successfully.
def abortBackgroundProcess() -> boolAborts active background process if any exists
def select(timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor'Opens a cursor for reading data from this stream, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor interface provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
timestampint - The start timestamp in millis.optionsSelectionOptions - Selection options.typeslist[str] - Specified message types to be subscribed. If null, then all types will be subscribed.entitieslist[str] - Specified entities to be subscribed. If null, then all entities will be subscribed.Returns:
TickCursor- A cursor used to read messages.
@contextmanager
def trySelect(timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor'Contextmanager version of select. Usage:
with stream.newSelect(timestamp, options, types, entities) as cursor: while cursor.next(): message = cursor.getMessage()
def createCursor(options: SelectionOptions) -> 'TickCursor'Creates a cursor for reading data from this stream, according to the specified options, but initially with a fully restricted filter. The user must call TickCursor.reset at least once, in order to begin retrieving data. This method is equivalent to (but is slightly more optimal than) calling createCursor(options)
Arguments:
optionsSelectionOptions - Selection Options.Returns:
A cursor used to read messages. Never null.
@contextmanager
def tryCursor(options: SelectionOptions) -> 'TickCursor'contextmanager version of createCursor. Usage:
with stream.newCursor(options) as cursor: while cursor.next(): message = cursor.getMessage()
def createLoader(options: LoadingOptions) -> 'TickLoader'Creates a channel for loading data. The loader must be closed when the loading process is finished.
Arguments:
optionsSelectionOptions - Loading Options.Returns:
TickLoader- created loader.
@contextmanager
def tryLoader(options: LoadingOptions) -> 'TickLoader'Contextmanager version of createLoader. Usage:
with stream.newLoader(options) as loader: loader.send(message)
def listSpaces() -> 'list[str]'Returns all created "spaces" for the stream. Default space returns as "" (empty string). If backing stream does not support spaces None will be returned.
def renameSpace(newName: str, oldName: str) -> NoneRename existing space.
Arguments:
nameNamestr - space to rename.oldNamestr - new space name.
def deleteSpaces(spaces: 'list[str]') -> NoneRemoved given 'spaces' permanently.
Arguments:
spaceslist[str] - list of spaces names to delete.
def getTimeRange(entities: 'list[str]' = None) -> 'list[int]'Return an inclusive range of times for which the specified entities have data in the database.
Arguments:
entitieslist[str] - A list of entities. If empty, return for all.
def getSpaceTimeRange(space: str) -> 'list[int]'An array consisting of two long timestamps (from and to) or None if no data was found.
Arguments:
spacestr - space name.
class TickCursor(_object)A cursor (also known as iterator, or result set) for reading data from a stream. This class provides methods for dynamically reconfiguring the feed, as well as method reset for essentially re-opening the cursor on a completely different timestamp.
To get a cursor, use select method from TickDb or TickStream objects, or call executeQuery to open cursor to QQL result set.
Also cursor can be created with createCursor method, but it will be not initialized cursor, so cursor should be configured with types, entities and read time calling reset:
options = tbapi.SelectionOptions() cursor = tickdb.createCursor(stream, options) cursor.subscribeToAllEntities() cursor.subscribeToAllTypes() cursor.reset(timestamp)
def next() -> boolMoves cursor on to the next message. This method blocks until the next message becomes available, or until the cursor is determined to be at the end of the sequence. This method is illegal to call if isAtEnd() returns true.
Returns:
bool- false if at the end of the cursor.
def getMessage() -> 'InstrumentMessage'Returns an InstrumentMessage object cursor points at.
def isAtEnd() -> boolReturns true if the last call to next() returned false. Returns false if next() has not been called yet. This method is legal to call any number of times at any point in the cursor's lifecycle.
def nextIfAvailable() -> intMoves cursor on to the next message, but this method NOT blocks until the next message becomes available.
Returns:
NextResult- OK (0) if new message is available, END_OF_CURSOR (1) if cursor was closed, otherwise, UNAVAILABLE (2)
def isClosed() -> boolReturns true, if cursor was closed
def close() -> NoneClose the cursor
def getCurrentStreamKey() -> strReturn the key of the stream that is the source of the current message.
def reset(timestamp: int, entities: 'list[str]' = None) -> NoneReposition the message source to a new point in time, while preserving current subscription.
Arguments:
timestampint - The new position in time in millis.entities'list[str]' - list of entities to reset
def subscribeToAllEntities() -> NoneSubscribe to all available entities.
def clearAllEntities() -> NoneSwitch to selective subscription mode (if necessary) and clear the list.
def addEntity(entity: str) -> NoneAdd the specified entity to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.
Special note about options: The following fragment will subscribe to specific option contract "DAV 100417P00085000": cursor.addEntity('DAV 100417P00085000');
While the following will subscribe to option root (and you will get all instruments with this root): cursor.addEntity("DAV ");
def addEntities(entities: 'list[str]') -> NoneBulk add the specified entities to subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.
def removeEntities(entities: 'list[str]') -> NoneRemove the specified entities from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.
def removeEntity(entity: str) -> NoneRemove the specified entity from subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.
def subscribeToAllTypes() -> NoneSubscribe to all available types (no filtering).
def addTypes(types: 'list[str]') -> NoneAdd the specified type names to subscription.
def removeTypes(types: 'list[str]') -> NoneRemove the specified types from subscription.
def setTypes(types: 'list[str]') -> NoneSubscribe to specified types.
def add(types: 'list[str]', entities: 'list[str]') -> NoneAdd the specified entities and types to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.
Arguments:
typeslist[str] - not-null array of type names to subscribe.entitieslist[str] - not-null array of instruments to subscribe.
def remove(types: 'list[str]', entities: 'list[str]') -> NoneRemove the specified entities and types from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.
Arguments:
typeslist[str] - not-null array of type names to unsubscribe.entitieslist[str] - not-null array of instruments to unsubscribe.
def addStreams(streams: 'list[TickStream]') -> NoneAdd streams to subscription. Current time and filter is used to query data from new sources.
Arguments:
streams'list[TickStream]' - Streams to add.
def removeStreams(streams: 'list[TickStream]') -> NoneRemove streams from subscription.
Arguments:
streamslist[TickStream] - Streams to remove.
def removeAllStreams() -> NoneRemove all streams from subscription.
def setTimeForNewSubscriptions(timestamp: int) -> NoneThis method affects subsequent "add subscription" methods, such as, for instance, addEntity(). New subscriptions start at the specified time.
Arguments:
timestampint - The time to use.
class TickLoader(_object)Object which consumes messages.
Create loader from TickDb: options = tbapi.LoadingOptions() stream = tickdb.createLoader(stream, options)
Create loader from TickStream: options = tbapi.LoadingOptions() stream = stream.createLoader(options)
def send(message: InstrumentMessage) -> NoneThis method is invoked to send a message to the object.
Arguments:
messageInstrumentMessage - A temporary buffer with the message. By convention, the message is only valid for the duration of this call.
def flush() -> NoneFlushes all buffered messages by sending them to server. Note that calling 'send' method not guaranty that all messages will be delivered and stored to server.
def close() -> NoneFlushes and closes the loader
def addListener(listener: 'ErrorListener') -> NoneRegister error listener. All writing data errors will be delivered to the listener.
Arguments:
listenerErrorListener - error listener to register.
def removeListener(listener: 'ErrorListener') -> NoneUnsubscribe registered error listener.
Arguments:
listenerErrorListener - error listener to unsubscribe.
def nErrorListeners() -> intReturns number of registered error listeners
def registerType(type: str) -> intRegister type of sending message to get type id. For performance reasons, you could specify type id instead of type name, for example:
message = tbapi.InstrumentMessage() message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader") // as alternative, you could write: // message.typeName = "deltix.timebase.api.messages.universal.PackageHeader" loader.send(message)Arguments:
typestr - name of type to register.Returns:
int- id of registered type.
def registerInstrument(symbol: str) -> intRegister instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:
message = tbapi.InstrumentMessage() message.instrumentId = loader.registerInstrument('AAPL') // as alternative, you could write: // message.symbol = 'AAPL' loader.send(message)Arguments:
symbolstr - instrument ticker.Returns:
int- id of registered instrument.