From 65e808b1608125f08958283614acb0e8bca8664c Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 11:22:08 -0700 Subject: [PATCH 1/4] fix: Support passing batch source to streaming sources for backfills Signed-off-by: Achal Shah --- go/embedded/online_features.go | 6 +++--- protos/feast/core/DataSource.proto | 9 ++++++--- sdk/python/feast/data_source.py | 18 ++++++++++++++---- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 05f9c77e03d..217e53d0607 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -63,13 +63,13 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri joinKeyTypes := make(map[string]int32) - for viewName, _ := range viewNames { + for viewName := range viewNames { view, err := s.fs.GetFeatureView(viewName, true) if err != nil { // skip on demand feature views continue } - for entityName, _ := range view.Entities { + for entityName := range view.Entities { entity := entitiesByName[entityName] joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } @@ -98,7 +98,7 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN // skip on demand feature views continue } - for entityName, _ := range view.Entities { + for entityName := range view.Entities { entity := entitiesByName[entityName] joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 6425085e61b..93503468c4f 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,7 +26,7 @@ import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; // Defines a Data Source that can be used source Feature data -// Next available id: 26 +// Next available id: 27 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. @@ -82,6 +82,10 @@ message DataSource { // first party sources as well. string data_source_class_type = 17; + // Optional batch source for streaming sources for historical features and materialization. + DataSource batch_source = 26; + + // Defines options for DataSource that sources features from a file message FileOptions { FileFormat file_format = 1; @@ -128,6 +132,7 @@ message DataSource { // Defines the stream data format encoding feature/entity data in Kafka messages. StreamFormat message_format = 3; + } // Defines options for DataSource that sources features from Kinesis records. @@ -199,8 +204,6 @@ message DataSource { message PushOptions { // Mapping of feature name to type map schema = 1; - // Optional batch source for the push source for historical features and materialization. - DataSource batch_source = 2; } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 4dde5a4faaf..fd981f1ea3c 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -360,6 +360,7 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", timestamp_field: Optional[str] = "", + batch_source: Optional[DataSource] = None, ): super().__init__( event_timestamp_column=event_timestamp_column, @@ -372,6 +373,7 @@ def __init__( name=name, timestamp_field=timestamp_field, ) + self.batch_source = batch_source self.kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, message_format=message_format, @@ -411,6 +413,7 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, + batch_source=DataSource.from_proto(data_source.batch_source), ) def to_proto(self) -> DataSourceProto: @@ -427,6 +430,8 @@ def to_proto(self) -> DataSourceProto: data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column + if self.batch_source: + data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto()) return data_source_proto @staticmethod @@ -546,6 +551,7 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, + batch_source=DataSource.from_proto(data_source.batch_source), ) @staticmethod @@ -569,6 +575,7 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", timestamp_field: Optional[str] = "", + batch_source: Optional[DataSource] = None, ): super().__init__( name=name, @@ -581,6 +588,7 @@ def __init__( owner=owner, timestamp_field=timestamp_field, ) + self.batch_source = batch_source self.kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name ) @@ -618,6 +626,8 @@ def to_proto(self) -> DataSourceProto: data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column + if self.batch_source: + data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto()) return data_source_proto @@ -634,6 +644,7 @@ class PushSource(DataSource): def __init__( self, + *, name: str, schema: Dict[str, ValueType], batch_source: DataSource, @@ -694,7 +705,7 @@ def from_proto(data_source: DataSourceProto): schema[key] = ValueType(val) assert data_source.push_options.HasField("batch_source") - batch_source = DataSource.from_proto(data_source.push_options.batch_source) + batch_source = DataSource.from_proto(data_source.batch_source) return PushSource( name=data_source.name, @@ -714,9 +725,7 @@ def to_proto(self) -> DataSourceProto: if self.batch_source: batch_source_proto = self.batch_source.to_proto() - options = DataSourceProto.PushOptions( - schema=schema_pb, batch_source=batch_source_proto - ) + options = DataSourceProto.PushOptions(schema=schema_pb,) data_source_proto = DataSourceProto( name=self.name, type=DataSourceProto.PUSH_SOURCE, @@ -725,6 +734,7 @@ def to_proto(self) -> DataSourceProto: description=self.description, tags=self.tags, owner=self.owner, + batch_source=batch_source_proto, ) return data_source_proto From dc06cc2351bf6eab8394d22e85fcea3bb79da93b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 11:44:32 -0700 Subject: [PATCH 2/4] fix tests Signed-off-by: Achal Shah --- sdk/python/tests/unit/test_data_sources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 6e8e44c0e3f..f32089b3b9e 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -12,8 +12,8 @@ def test_push_with_batch(): batch_source=BigQuerySource(table="test.test"), ) push_source_proto = push_source.to_proto() + assert push_source_proto.HasField("batch_source") assert push_source_proto.push_options is not None - assert push_source_proto.push_options.HasField("batch_source") push_source_unproto = PushSource.from_proto(push_source_proto) From 67c41b4168287de18406d712697b4905a2d5f985 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 11:55:19 -0700 Subject: [PATCH 3/4] fix tests Signed-off-by: Achal Shah --- sdk/python/feast/data_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index fd981f1ea3c..46df1088db8 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -704,7 +704,7 @@ def from_proto(data_source: DataSourceProto): for key, val in schema_pb.items(): schema[key] = ValueType(val) - assert data_source.push_options.HasField("batch_source") + assert data_source.HasField("batch_source") batch_source = DataSource.from_proto(data_source.batch_source) return PushSource( From d19cf5bb663d2240bfe597ae78ae1b9e01dd9337 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 12:55:08 -0700 Subject: [PATCH 4/4] chore: Add a source field in the feature view API Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 62 +++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 33fe3fe49b4..c4408786c59 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -18,7 +18,7 @@ from google.protobuf.duration_pb2 import Duration -from feast import utils +from feast import KafkaSource, KinesisSource, utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource, PushSource from feast.entity import Entity @@ -83,6 +83,7 @@ class FeatureView(BaseFeatureView): tags: Dict[str, str] owner: str materialization_intervals: List[Tuple[datetime, datetime]] + source: Optional[DataSource] @log_exceptions def __init__( @@ -98,6 +99,7 @@ def __init__( online: bool = True, description: str = "", owner: str = "", + source: Optional[DataSource] = None, ): """ Creates a FeatureView object. @@ -118,6 +120,8 @@ def __init__( description (optional): A human-readable description. owner (optional): The owner of the feature view, typically the email of the primary maintainer. + source (optional): The source of data for this group of features. May be a stream source, or a batch source. + If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. @@ -155,6 +159,8 @@ def __init__( self.name = _name self.entities = _entities if _entities else [DUMMY_ENTITY_NAME] + self._initialize_sources(_name, batch_source, stream_source, source) + if isinstance(_ttl, Duration): self.ttl = timedelta(seconds=int(_ttl.seconds)) warnings.warn( @@ -172,21 +178,6 @@ def __init__( _features = features or [] - if stream_source is not None and isinstance(stream_source, PushSource): - if stream_source.batch_source is None or not isinstance( - stream_source.batch_source, DataSource - ): - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - self.batch_source = stream_source.batch_source - else: - if batch_source is None: - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - self.batch_source = batch_source - cols = [entity for entity in self.entities] + [feat.name for feat in _features] for col in cols: if ( @@ -207,9 +198,46 @@ def __init__( owner=owner, ) self.online = online - self.stream_source = stream_source self.materialization_intervals = [] + def _initialize_sources(self, name, batch_source, stream_source, source): + if source: + if ( + isinstance(source, PushSource) + or isinstance(source, KafkaSource) + or isinstance(source, KinesisSource) + ): + self.stream_source = source + if not source.batch_source: + raise ValueError( + f"A batch_source needs to be specified for stream source `{source.name}`" + ) + else: + self.batch_source = source.batch_source + else: + self.batch_source = source + else: + warnings.warn( + "batch_source and stream_source have been deprecated in favor or `source`." + "The deprecated fields will be removed in Feast 0.23.", + DeprecationWarning, + ) + if stream_source is not None and isinstance(stream_source, PushSource): + if stream_source.batch_source is None or not isinstance( + stream_source.batch_source, DataSource + ): + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) + self.batch_source = stream_source.batch_source + else: + if batch_source is None: + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) + self.batch_source = batch_source + self.source = source + # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): return super().__hash__()