diff --git a/go/internal/feast/ondemandfeatureview.go b/go/internal/feast/ondemandfeatureview.go index 35155355d81..436cb0a3d53 100644 --- a/go/internal/feast/ondemandfeatureview.go +++ b/go/internal/feast/ondemandfeatureview.go @@ -6,7 +6,7 @@ import ( ) type OnDemandFeatureView struct { - base *BaseFeatureView + base *BaseFeatureView sourceFeatureViewProjections map[string]*FeatureViewProjection sourceRequestDataSources map[string]*core.DataSource_RequestDataOptions } diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 05048c31cf4..f9a68cf098b 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,7 +26,7 @@ import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; // Defines a Data Source that can be used source Feature data -// Next available id: 23 +// Next available id: 26 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. @@ -53,6 +53,12 @@ message DataSource { // Name of Feast project that this data source belongs to. string project = 21; + string description = 23; + + map tags = 24; + + string owner = 25; + SourceType type = 1; // Defines mapping between fields in the sourced data diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 1e0cb47caa9..f5c277aac43 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -162,6 +162,10 @@ class DataSource(ABC): source to feature names in a feature table or view. Only used for feature columns, not entity or timestamp columns. date_partition_column (optional): Timestamp column used for partitioning. + description (optional) A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the data source, typically the email of the primary + maintainer. """ name: str @@ -169,6 +173,9 @@ class DataSource(ABC): created_timestamp_column: str field_mapping: Dict[str, str] date_partition_column: str + description: str + tags: Dict[str, str] + owner: str def __init__( self, @@ -177,8 +184,27 @@ def __init__( created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): - """Creates a DataSource object.""" + """ + Creates a DataSource object. + Args: + name: Name of data source, which should be unique within a project + event_timestamp_column (optional): Event timestamp column used for point in time + joins of feature values. + created_timestamp_column (optional): Timestamp column indicating when the row + was created, used for deduplicating rows. + field_mapping (optional): A dictionary mapping of column names in this data + source to feature names in a feature table or view. Only used for feature + columns, not entity or timestamp columns. + date_partition_column (optional): Timestamp column used for partitioning. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the data source, typically the email of the primary + maintainer. + """ self.name = name self.event_timestamp_column = ( event_timestamp_column if event_timestamp_column else "" @@ -190,6 +216,9 @@ def __init__( self.date_partition_column = ( date_partition_column if date_partition_column else "" ) + self.description = description or "" + self.tags = tags or {} + self.owner = owner or "" def __hash__(self): return hash((id(self), self.name)) @@ -207,6 +236,9 @@ def __eq__(self, other): or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column + or self.tags != other.tags + or self.owner != other.owner + or self.description != other.description ): return False @@ -303,6 +335,9 @@ def __init__( created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): super().__init__( name, @@ -310,6 +345,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) self.kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, @@ -346,6 +384,9 @@ def from_proto(data_source: DataSourceProto): event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: @@ -354,12 +395,14 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.STREAM_KAFKA, field_mapping=self.field_mapping, kafka_options=self.kafka_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column - return data_source_proto @staticmethod @@ -377,16 +420,25 @@ class RequestDataSource(DataSource): Args: name: Name of the request data source schema: Schema mapping from the input feature name to a ValueType + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the request data source, typically the email of the primary + maintainer. """ name: str schema: Dict[str, ValueType] def __init__( - self, name: str, schema: Dict[str, ValueType], + self, + name: str, + schema: Dict[str, ValueType], + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): """Creates a RequestDataSource object.""" - super().__init__(name) + super().__init__(name, description=description, tags=tags, owner=owner) self.schema = schema def validate(self, config: RepoConfig): @@ -403,7 +455,13 @@ def from_proto(data_source: DataSourceProto): schema = {} for key, val in schema_pb.items(): schema[key] = ValueType(val) - return RequestDataSource(name=data_source.name, schema=schema) + return RequestDataSource( + name=data_source.name, + schema=schema, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) def to_proto(self) -> DataSourceProto: schema_pb = {} @@ -414,6 +472,9 @@ def to_proto(self) -> DataSourceProto: name=self.name, type=DataSourceProto.REQUEST_SOURCE, request_data_options=options, + description=self.description, + tags=self.tags, + owner=self.owner, ) return data_source_proto @@ -448,6 +509,9 @@ def from_proto(data_source: DataSourceProto): event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) @staticmethod @@ -467,6 +531,9 @@ def __init__( stream_name: str, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): super().__init__( name, @@ -474,6 +541,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) self.kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name @@ -504,6 +574,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.STREAM_KINESIS, field_mapping=self.field_mapping, kinesis_options=self.kinesis_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column @@ -529,6 +602,9 @@ def __init__( schema: Dict[str, ValueType], batch_source: DataSource, event_timestamp_column="timestamp", + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): """ Creates a PushSource object. @@ -539,8 +615,12 @@ def __init__( store to the online store, and when retrieving historical features. event_timestamp_column (optional): Event timestamp column used for point in time joins of feature values. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the data source, typically the email of the primary + maintainer. """ - super().__init__(name) + super().__init__(name, description=description, tags=tags, owner=owner) self.schema = schema self.batch_source = batch_source if not self.batch_source: @@ -574,6 +654,9 @@ def from_proto(data_source: DataSourceProto): schema=schema, batch_source=batch_source, event_timestamp_column=data_source.event_timestamp_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: @@ -592,6 +675,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.PUSH_SOURCE, push_options=options, event_timestamp_column=self.event_timestamp_column, + description=self.description, + tags=self.tags, + owner=self.owner, ) return data_source_proto diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 07e2e4d26fc..a2831567e6e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -24,6 +24,9 @@ def __init__( date_partition_column: Optional[str] = "", query: Optional[str] = None, name: Optional[str] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): """Create a BigQuerySource from an existing table or query. @@ -37,6 +40,10 @@ def __init__( date_partition_column (optional): Timestamp column used for partitioning. query (optional): SQL query to execute to generate data for this data source. name (optional): Name for the source. Defaults to the table_ref if not specified. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the bigquery source, typically the email of the primary + maintainer. Example: >>> from feast import BigQuerySource >>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table") @@ -75,6 +82,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) # Note: Python requires redefining hash in child classes that override __eq__ @@ -94,6 +104,9 @@ def __eq__(self, other): and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping + and self.description == other.description + and self.tags == other.tags + and self.owner == other.owner ) @property @@ -117,6 +130,9 @@ def from_proto(data_source: DataSourceProto): created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, query=data_source.bigquery_options.query, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: @@ -125,6 +141,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.BATCH_BIGQUERY, field_mapping=self.field_mapping, bigquery_options=self.bigquery_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 3ffdf6eda0c..40197b4c011 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -40,6 +40,9 @@ def __init__( created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): # If no name, use the table_ref as the default name _name = name @@ -54,6 +57,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) warnings.warn( "The spark data source API is an experimental feature in alpha development. " @@ -125,6 +131,9 @@ def from_proto(data_source: DataSourceProto) -> Any: event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: @@ -133,6 +142,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.CUSTOM_SOURCE, field_mapping=self.field_mapping, custom_options=self.spark_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 756ec2a65e0..d2e91596d37 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -27,6 +27,9 @@ def __init__( date_partition_column: Optional[str] = "", s3_endpoint_override: Optional[str] = None, name: Optional[str] = "", + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): """Create a FileSource from a file containing feature data. Only Parquet format supported. @@ -42,6 +45,10 @@ def __init__( date_partition_column (optional): Timestamp column used for partitioning. s3_endpoint_override (optional): Overrides AWS S3 enpoint with custom S3 storage name (optional): Name for the file source. Defaults to the path. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the file source, typically the email of the primary + maintainer. Examples: >>> from feast import FileSource @@ -63,6 +70,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) # Note: Python requires redefining hash in child classes that override __eq__ @@ -82,6 +92,9 @@ def __eq__(self, other): and self.field_mapping == other.field_mapping and self.file_options.s3_endpoint_override == other.file_options.s3_endpoint_override + and self.description == other.description + and self.tags == other.tags + and self.owner == other.owner ) @property @@ -102,6 +115,9 @@ def from_proto(data_source: DataSourceProto): created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, s3_endpoint_override=data_source.file_options.s3_endpoint_override, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: @@ -110,6 +126,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.BATCH_FILE, field_mapping=self.field_mapping, file_options=self.file_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index df42e18910c..b2fa143a860 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -24,6 +24,9 @@ def __init__( date_partition_column: Optional[str] = "", query: Optional[str] = None, name: Optional[str] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): """ Creates a RedshiftSource object. @@ -40,6 +43,10 @@ def __init__( date_partition_column (optional): Timestamp column used for partitioning. query (optional): The query to be executed to obtain the features. name (optional): Name for the source. Defaults to the table_ref if not specified. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the redshift source, typically the email of the primary + maintainer. """ # The default Redshift schema is named "public". _schema = "public" if table and not schema else schema @@ -68,6 +75,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) @staticmethod @@ -89,6 +99,9 @@ def from_proto(data_source: DataSourceProto): created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, query=data_source.redshift_options.query, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) # Note: Python requires redefining hash in child classes that override __eq__ @@ -109,6 +122,9 @@ def __eq__(self, other): and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping + and self.description == other.description + and self.tags == other.tags + and self.owner == other.owner ) @property @@ -137,6 +153,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.BATCH_REDSHIFT, field_mapping=self.field_mapping, redshift_options=self.redshift_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index a972df191b1..40868ef64d2 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -24,6 +24,9 @@ def __init__( field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", name: Optional[str] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", ): """ Creates a SnowflakeSource object. @@ -41,6 +44,10 @@ def __init__( source to column names in a feature table or view. date_partition_column (optional): Timestamp column used for partitioning. name (optional): Name for the source. Defaults to the table if not specified. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the snowflake source, typically the email of the primary + maintainer. """ if table is None and query is None: raise ValueError('No "table" argument provided.') @@ -71,6 +78,9 @@ def __init__( created_timestamp_column, field_mapping, date_partition_column, + description=description, + tags=tags, + owner=owner, ) @staticmethod @@ -93,6 +103,9 @@ def from_proto(data_source: DataSourceProto): created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, query=data_source.snowflake_options.query, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, ) # Note: Python requires redefining hash in child classes that override __eq__ @@ -114,6 +127,9 @@ def __eq__(self, other): and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping + and self.description == other.description + and self.tags == other.tags + and self.owner == other.owner ) @property @@ -147,6 +163,9 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.BATCH_SNOWFLAKE, field_mapping=self.field_mapping, snowflake_options=self.snowflake_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, ) data_source_proto.event_timestamp_column = self.event_timestamp_column