diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 6040654784c..79c6cbdf515 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -186,6 +186,7 @@ class DataSource(ABC): def __init__( self, + *, event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, @@ -354,11 +355,12 @@ def get_table_column_names_and_types( def __init__( self, - name: str, - event_timestamp_column: str, - bootstrap_servers: str, - message_format: StreamFormat, - topic: str, + *args, + name: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + bootstrap_servers: Optional[str] = None, + message_format: Optional[StreamFormat] = None, + topic: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -368,22 +370,62 @@ def __init__( timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, ): + positional_attributes = [ + "name", + "event_timestamp_column", + "bootstrap_servers", + "message_format", + "topic", + ] + _name = name + _event_timestamp_column = event_timestamp_column + _bootstrap_servers = bootstrap_servers or "" + _message_format = message_format + _topic = topic or "" + + if args: + warnings.warn( + ( + "Kafka parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct Kafka sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"Kafka sources, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _event_timestamp_column = args[1] + if len(args) >= 3: + _bootstrap_servers = args[2] + if len(args) >= 4: + _message_format = args[3] + if len(args) >= 5: + _topic = args[4] + + if _message_format is None: + raise ValueError("Message format must be specified for Kafka source") + print("Asdfasdf") super().__init__( - event_timestamp_column=event_timestamp_column, + event_timestamp_column=_event_timestamp_column, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, date_partition_column=date_partition_column, description=description, tags=tags, owner=owner, - name=name, + name=_name, timestamp_field=timestamp_field, ) self.batch_source = batch_source self.kafka_options = KafkaOptions( - bootstrap_servers=bootstrap_servers, - message_format=message_format, - topic=topic, + bootstrap_servers=_bootstrap_servers, + message_format=_message_format, + topic=_topic, ) def __eq__(self, other): @@ -472,32 +514,56 @@ class RequestSource(DataSource): def __init__( self, - name: str, - schema: Union[Dict[str, ValueType], List[Field]], + *args, + name: Optional[str] = None, + schema: Optional[Union[Dict[str, ValueType], List[Field]]] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", ): """Creates a RequestSource object.""" - super().__init__(name=name, description=description, tags=tags, owner=owner) - if isinstance(schema, Dict): + positional_attributes = ["name", "schema"] + _name = name + _schema = schema + if args: + warnings.warn( + ( + "Request source parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct request sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"feature views, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _schema = args[1] + + super().__init__(name=_name, description=description, tags=tags, owner=owner) + if not _schema: + raise ValueError("Schema needs to be provided for Request Source") + if isinstance(_schema, Dict): warnings.warn( "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. " "Please use List[Field] instead for the schema", DeprecationWarning, ) schemaList = [] - for key, valueType in schema.items(): + for key, valueType in _schema.items(): schemaList.append( Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[valueType]) ) self.schema = schemaList - elif isinstance(schema, List): - self.schema = schema + elif isinstance(_schema, List): + self.schema = _schema else: raise Exception( "Schema type must be either dictionary or list, not " - + str(type(schema)) + + str(type(_schema)) ) def validate(self, config: RepoConfig): @@ -643,12 +709,13 @@ def get_table_query_string(self) -> str: def __init__( self, - name: str, - event_timestamp_column: str, - created_timestamp_column: str, - record_format: StreamFormat, - region: str, - stream_name: str, + *args, + name: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + record_format: Optional[StreamFormat] = None, + region: Optional[str] = "", + stream_name: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", description: Optional[str] = "", @@ -657,10 +724,53 @@ def __init__( timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, ): + positional_attributes = [ + "name", + "event_timestamp_column", + "created_timestamp_column", + "record_format", + "region", + "stream_name", + ] + _name = name + _event_timestamp_column = event_timestamp_column + _created_timestamp_column = created_timestamp_column + _record_format = record_format + _region = region or "" + _stream_name = stream_name or "" + if args: + warnings.warn( + ( + "Kinesis parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct kinesis sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"kinesis sources, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _event_timestamp_column = args[1] + if len(args) >= 3: + _created_timestamp_column = args[2] + if len(args) >= 4: + _record_format = args[3] + if len(args) >= 5: + _region = args[4] + if len(args) >= 6: + _stream_name = args[5] + + if _record_format is None: + raise ValueError("Record format must be specified for kinesis source") + super().__init__( - name=name, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, + name=_name, + event_timestamp_column=_event_timestamp_column, + created_timestamp_column=_created_timestamp_column, field_mapping=field_mapping, date_partition_column=date_partition_column, description=description, @@ -670,7 +780,7 @@ def __init__( ) self.batch_source = batch_source self.kinesis_options = KinesisOptions( - record_format=record_format, region=region, stream_name=stream_name + record_format=_record_format, region=_region, stream_name=_stream_name ) def __eq__(self, other): @@ -725,9 +835,9 @@ class PushSource(DataSource): def __init__( self, - *, - name: str, - batch_source: DataSource, + *args, + name: Optional[str] = None, + batch_source: Optional[DataSource] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", @@ -744,10 +854,33 @@ def __init__( maintainer. """ - super().__init__(name=name, description=description, tags=tags, owner=owner) - self.batch_source = batch_source - if not self.batch_source: - raise ValueError(f"batch_source is needed for push source {self.name}") + positional_attributes = ["name", "batch_source"] + _name = name + _batch_source = batch_source + if args: + warnings.warn( + ( + "Push source parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct push sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"push sources, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _batch_source = args[1] + + super().__init__(name=_name, description=description, tags=tags, owner=owner) + if not _batch_source: + raise ValueError( + f"batch_source parameter is needed for push source {self.name}" + ) + self.batch_source = _batch_source def __eq__(self, other): if not isinstance(other, PushSource): diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index cea8f619cb1..ea5953e2232 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -137,7 +137,7 @@ def __init__( ValueError: A field mapping conflicts with an Entity or a Feature. """ - positional_attributes = ["name, entities, ttl"] + positional_attributes = ["name", "entities", "ttl"] _name = name _entities = entities diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 31b0ed617e9..cb4cd1b5be7 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -16,6 +16,7 @@ class BigQuerySource(DataSource): def __init__( self, + *, event_timestamp_column: Optional[str] = "", table: Optional[str] = None, created_timestamp_column: Optional[str] = "", diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 65997040cc0..dc92e08a50e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -30,6 +30,7 @@ class SparkSourceFormat(Enum): class SparkSource(DataSource): def __init__( self, + *, name: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py index 7d6280746ec..b8fddee89ff 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py @@ -88,7 +88,6 @@ def __init__( table: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = "", diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 3df0db69b1c..e177642a32f 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -20,7 +20,8 @@ class FileSource(DataSource): def __init__( self, - path: str, + *args, + path: Optional[str] = None, event_timestamp_column: Optional[str] = "", file_format: Optional[FileFormat] = None, created_timestamp_column: Optional[str] = "", @@ -58,13 +59,31 @@ def __init__( >>> from feast import FileSource >>> file_source = FileSource(path="my_features.parquet", timestamp_field="event_timestamp") """ - if path is None: + positional_attributes = ["path"] + _path = path + if args: + if args: + warnings.warn( + ( + "File Source parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct File sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"File sources, for backwards compatibility." + ) + if len(args) >= 1: + _path = args[0] + if _path is None: raise ValueError( 'No "path" argument provided. Please set "path" to the location of your file source.' ) self.file_options = FileOptions( file_format=file_format, - uri=path, + uri=_path, s3_endpoint_override=s3_endpoint_override, ) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index f099e307cc8..dcfcb50aa69 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -16,6 +16,7 @@ class RedshiftSource(DataSource): def __init__( self, + *, event_timestamp_column: Optional[str] = "", table: Optional[str] = None, schema: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 1d24cba44ae..8f3f2f0bb57 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -15,6 +15,7 @@ class SnowflakeSource(DataSource): def __init__( self, + *, database: Optional[str] = None, warehouse: Optional[str] = None, schema: Optional[str] = None, diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 883ab7ddc09..6bd4baf4fa9 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,10 +1,17 @@ import pytest from feast import ValueType -from feast.data_source import PushSource, RequestDataSource, RequestSource +from feast.data_format import ProtoFormat +from feast.data_source import ( + KafkaSource, + KinesisSource, + PushSource, + RequestDataSource, + RequestSource, +) from feast.field import Field from feast.infra.offline_stores.bigquery_source import BigQuerySource -from feast.types import Bool, Float32 +from feast.types import Bool, Float32, Int64 def test_push_with_batch(): @@ -71,3 +78,70 @@ def test_hash(): s4 = {push_source_1, push_source_2, push_source_3, push_source_4} assert len(s4) == 3 + + +# TODO(kevjumba): Remove this test in feast 0.23 when positional arguments are removed. +def test_default_data_source_kw_arg_warning(): + # source_class = request.param + with pytest.warns(DeprecationWarning): + source = KafkaSource( + "name", "column", "bootstrap_servers", ProtoFormat("class_path"), "topic" + ) + assert source.name == "name" + assert source.timestamp_field == "column" + assert source.kafka_options.bootstrap_servers == "bootstrap_servers" + assert source.kafka_options.topic == "topic" + with pytest.raises(ValueError): + KafkaSource("name", "column", "bootstrap_servers", topic="topic") + + with pytest.warns(DeprecationWarning): + source = KinesisSource( + "name", + "column", + "c_column", + ProtoFormat("class_path"), + "region", + "stream_name", + ) + assert source.name == "name" + assert source.timestamp_field == "column" + assert source.created_timestamp_column == "c_column" + assert source.kinesis_options.region == "region" + assert source.kinesis_options.stream_name == "stream_name" + + with pytest.raises(ValueError): + KinesisSource( + "name", "column", "c_column", region="region", stream_name="stream_name" + ) + + with pytest.warns(DeprecationWarning): + source = RequestSource( + "name", [Field(name="val_to_add", dtype=Int64)], description="description" + ) + assert source.name == "name" + assert source.description == "description" + + with pytest.raises(ValueError): + RequestSource("name") + + with pytest.warns(DeprecationWarning): + source = PushSource( + "name", + BigQuerySource(name="bigquery_source", table="table"), + description="description", + ) + assert source.name == "name" + assert source.description == "description" + assert source.batch_source.name == "bigquery_source" + + with pytest.raises(ValueError): + PushSource("name") + + # No name warning for DataSource + with pytest.warns(UserWarning): + source = KafkaSource( + event_timestamp_column="column", + bootstrap_servers="bootstrap_servers", + message_format=ProtoFormat("class_path"), + topic="topic", + )