-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathutils.py
More file actions
1572 lines (1337 loc) · 59.4 KB
/
utils.py
File metadata and controls
1572 lines (1337 loc) · 59.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import copy
import itertools
import os
import typing
import warnings
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import (
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
import pandas as pd
import pyarrow
from dateutil.tz import tzlocal
from google.protobuf.timestamp_pb2 import Timestamp
from feast.aggregation import aggregation_specs_to_agg_ops
from feast.constants import FEAST_FS_YAML_FILE_PATH_ENV_NAME
from feast.entity import Entity
from feast.errors import (
FeatureNameCollisionError,
FeatureViewNotFoundException,
RequestDataNotFoundInEntityRowsException,
)
from feast.field import Field
from feast.infra.compute_engines.backends.pandas_backend import PandasBackend
from feast.infra.key_encoding_utils import deserialize_entity_key
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
GetOnlineFeaturesResponse,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto
from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.type_map import python_values_to_proto_values
from feast.types import ComplexFeastType, PrimitiveFeastType
from feast.value_type import ValueType
from feast.version import get_version
if typing.TYPE_CHECKING:
from feast.base_feature_view import BaseFeatureView
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
APPLICATION_NAME = "feast-dev/feast"
USER_AGENT = "{}/{}".format(APPLICATION_NAME, get_version())
def get_user_agent():
return USER_AGENT
def make_tzaware(t: datetime) -> datetime:
"""We assume tz-naive datetimes are UTC"""
if t.tzinfo is None:
return t.replace(tzinfo=timezone.utc)
else:
return t
def make_df_tzaware(t: pd.DataFrame) -> pd.DataFrame:
"""Make all datetime type columns tzaware; leave everything else intact."""
df = t.copy() # don't modify incoming dataframe inplace
for column in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[column]):
df[column] = pd.to_datetime(df[column], utc=True)
return df
def to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(timezone.utc).replace(tzinfo=None)
def maybe_local_tz(t: datetime) -> datetime:
if t.tzinfo is None:
return t.replace(tzinfo=tzlocal())
else:
return t
def get_default_yaml_file_path(repo_path: Path) -> Path:
if FEAST_FS_YAML_FILE_PATH_ENV_NAME in os.environ:
yaml_path = os.environ[FEAST_FS_YAML_FILE_PATH_ENV_NAME]
return Path(yaml_path)
else:
return repo_path / "feature_store.yaml"
def _get_requested_feature_views_to_features_dict(
feature_refs: List[str],
feature_views: List["FeatureView"],
on_demand_feature_views: List["OnDemandFeatureView"],
) -> Tuple[Dict["FeatureView", List[str]], Dict["OnDemandFeatureView", List[str]]]:
"""Create a dict of FeatureView -> List[Feature] for all requested features.
Set full_feature_names to True to have feature names prefixed by their feature view name.
"""
feature_views_to_feature_map: Dict["FeatureView", List[str]] = defaultdict(list)
on_demand_feature_views_to_feature_map: Dict["OnDemandFeatureView", List[str]] = (
defaultdict(list)
)
for ref in feature_refs:
ref_parts = ref.split(":")
feature_view_from_ref = ref_parts[0]
feature_from_ref = ref_parts[1]
found = False
for fv in feature_views:
if fv.projection.name_to_use() == feature_view_from_ref:
found = True
feature_views_to_feature_map[fv].append(feature_from_ref)
for odfv in on_demand_feature_views:
if odfv.projection.name_to_use() == feature_view_from_ref:
found = True
on_demand_feature_views_to_feature_map[odfv].append(feature_from_ref)
if not found:
raise ValueError(f"Could not find feature view from reference {ref}")
return feature_views_to_feature_map, on_demand_feature_views_to_feature_map
def _get_column_names(
feature_view: "FeatureView", entities: List[Entity]
) -> Tuple[List[str], List[str], str, Optional[str]]:
"""
If a field mapping exists, run it in reverse on the join keys,
feature names, event timestamp column, and created timestamp column
to get the names of the relevant columns in the offline feature store table.
Returns:
Tuple containing the list of reverse-mapped join_keys,
reverse-mapped feature names, reverse-mapped event timestamp column,
and reverse-mapped created timestamp column that will be passed into
the query to the offline store.
"""
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be used for offline retrieval."
)
# if we have mapped fields, use the original field names in the call to the offline store
timestamp_field = feature_view.batch_source.timestamp_field
# For feature views with aggregations, read INPUT columns from aggregations.
# This applies to StreamFeatureView, BatchFeatureView,
# or any FeatureView that has aggregations.
if hasattr(feature_view, "aggregations") and feature_view.aggregations:
# Extract unique input columns from aggregations, preserving order
feature_names = list(
dict.fromkeys(agg.column for agg in feature_view.aggregations)
)
else:
# For regular feature views, use the feature names
feature_names = [feature.name for feature in feature_view.features]
created_timestamp_column = feature_view.batch_source.created_timestamp_column
from feast.feature_view import DUMMY_ENTITY_ID
join_keys = [
entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID
]
if feature_view.batch_source.field_mapping is not None:
reverse_field_mapping = {
v: k for k, v in feature_view.batch_source.field_mapping.items()
}
timestamp_field = (
reverse_field_mapping[timestamp_field]
if timestamp_field in reverse_field_mapping.keys()
else timestamp_field
)
created_timestamp_column = (
reverse_field_mapping[created_timestamp_column]
if created_timestamp_column
and created_timestamp_column in reverse_field_mapping.keys()
else created_timestamp_column
)
join_keys = [
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in join_keys
]
feature_names = [
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in feature_names
]
# We need to exclude join keys and timestamp columns from the list of features, after they are mapped to
# their final column names via the `field_mapping` field of the source.
feature_names = [
name
for name in feature_names
if name not in join_keys
and name != timestamp_field
and name != created_timestamp_column
]
return (
join_keys,
feature_names,
timestamp_field,
created_timestamp_column,
)
def _run_pyarrow_field_mapping(
table: pyarrow.Table,
field_mapping: Dict[str, str],
) -> pyarrow.Table:
# run field mapping in the forward direction
cols = table.column_names
mapped_cols = [
field_mapping[col] if col in field_mapping.keys() else col for col in cols
]
table = table.rename_columns(mapped_cols)
return table
def _get_fields_with_aliases(
fields: List[str],
field_mappings: Dict[str, str],
) -> Tuple[List[str], List[str]]:
"""
Get a list of fields with aliases based on the field mappings.
"""
for field in fields:
if "." in field and field not in field_mappings:
raise ValueError(
f"Feature {field} contains a '.' character, which is not allowed in field names. Use field mappings to rename fields."
)
fields_with_aliases = [
f"{field} AS {field_mappings[field]}" if field in field_mappings else field
for field in fields
]
aliases = [
field_mappings[field] if field in field_mappings else field for field in fields
]
return (fields_with_aliases, aliases)
def _coerce_datetime(ts):
"""
Depending on underlying time resolution, arrow to_pydict() sometimes returns pd
timestamp type (for nanosecond resolution), and sometimes you get standard python datetime
(for microsecond resolution).
While pd timestamp class is a subclass of python datetime, it doesn't always behave the
same way. We convert it to normal datetime so that consumers downstream don't have to deal
with these quirks.
"""
if isinstance(ts, pd.Timestamp):
return ts.to_pydatetime()
else:
return ts
def _columns_to_proto_values(
table: pyarrow.RecordBatch,
columns: List[Tuple[str, ValueType]],
allow_missing: bool = False,
) -> Dict[str, List[ValueProto]]:
"""Convert table columns to proto values dict.
Args:
table: PyArrow RecordBatch containing the data.
columns: List of (column_name, value_type) tuples to convert.
allow_missing: If True, skip columns not found in table. If False, raise ValueError.
Returns:
Dict mapping column names to lists of ValueProto.
"""
result: Dict[str, List[ValueProto]] = {}
for column, value_type in columns:
if column in table.column_names:
result[column] = python_values_to_proto_values(
table.column(column).to_numpy(zero_copy_only=False), value_type
)
elif not allow_missing:
raise ValueError(f"Column {column} not found in table")
return result
def _build_entity_keys(
num_rows: int,
join_keys: Dict[str, ValueType],
proto_values: Dict[str, List[ValueProto]],
) -> List[EntityKeyProto]:
"""Build entity key protos for each row.
Args:
num_rows: Number of rows to generate entity keys for.
join_keys: Dict mapping join key names to their value types.
proto_values: Dict mapping column names to lists of ValueProto values.
Returns:
List of EntityKeyProto, one per row.
"""
return [
EntityKeyProto(
join_keys=list(join_keys.keys()),
entity_values=[
proto_values[k][idx] for k in join_keys if k in proto_values
],
)
for idx in range(num_rows)
]
def _convert_arrow_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: Union["FeatureView", "BaseFeatureView", "OnDemandFeatureView"],
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# This is a workaround for isinstance(feature_view, OnDemandFeatureView), which triggers a circular import
# Check for source_request_sources or source_feature_view_projections attributes to identify ODFVs
if (
getattr(feature_view, "source_request_sources", None) is not None
or getattr(feature_view, "source_feature_view_projections", None) is not None
):
return _convert_arrow_odfv_to_proto(table, feature_view, join_keys) # type: ignore[arg-type]
else:
return _convert_arrow_fv_to_proto(table, feature_view, join_keys) # type: ignore[arg-type]
def _convert_arrow_fv_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
if isinstance(table, pyarrow.Table):
table = table.to_batches()[0]
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be converted to proto."
)
# TODO: This will break if the feature view has aggregations or transformations
columns = [
(field.name, field.dtype.to_value_type()) for field in feature_view.features
] + list(join_keys.items())
proto_values_by_column = _columns_to_proto_values(
table, columns, allow_missing=False
)
entity_keys = _build_entity_keys(table.num_rows, join_keys, proto_values_by_column)
# Serialize the features per row
feature_dict = {
feature.name: proto_values_by_column[feature.name]
for feature in feature_view.features
}
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]
# Convert event_timestamps
event_timestamps = [
_coerce_datetime(val)
for val in pd.to_datetime(
table.column(feature_view.batch_source.timestamp_field).to_numpy(
zero_copy_only=False
)
)
]
# Convert created_timestamps if they exist
if feature_view.batch_source.created_timestamp_column:
created_timestamps = [
_coerce_datetime(val)
for val in pd.to_datetime(
table.column(
feature_view.batch_source.created_timestamp_column
).to_numpy(zero_copy_only=False)
)
]
else:
created_timestamps = [None] * table.num_rows
return list(zip(entity_keys, features, event_timestamps, created_timestamps))
def _convert_arrow_odfv_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "OnDemandFeatureView",
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
if isinstance(table, pyarrow.Table):
table = table.to_batches()[0]
columns = [
(field.name, field.dtype.to_value_type()) for field in feature_view.features
] + list(join_keys.items())
# Convert columns that exist in the table
proto_values_by_column = _columns_to_proto_values(
table, columns, allow_missing=True
)
# Ensure join keys are included, creating null values if missing from table
for join_key, value_type in join_keys.items():
if join_key not in proto_values_by_column:
if join_key in table.column_names:
proto_values_by_column[join_key] = python_values_to_proto_values(
table.column(join_key).to_numpy(zero_copy_only=False), value_type
)
else:
# Create null proto values directly (no need to build a PyArrow array)
proto_values_by_column[join_key] = python_values_to_proto_values(
[None] * table.num_rows, value_type
)
# Cache column names set to avoid recreating list in loop
column_names = {c[0] for c in columns}
# Adding On Demand Features that are missing from proto_values
for feature in feature_view.features:
if feature.name in column_names and feature.name not in proto_values_by_column:
# Create null proto values directly (more efficient than building PyArrow array)
proto_values_by_column[feature.name] = python_values_to_proto_values(
[None] * table.num_rows, feature.dtype.to_value_type()
)
entity_keys = _build_entity_keys(table.num_rows, join_keys, proto_values_by_column)
# Serialize the features per row
feature_dict = {
feature.name: proto_values_by_column[feature.name]
for feature in feature_view.features
if feature.name in proto_values_by_column
}
if feature_view.write_to_online_store:
table_columns = {col.name for col in table.schema}
for feature in feature_view.schema:
if feature.name not in feature_dict and feature.name in table_columns:
feature_dict[feature.name] = proto_values_by_column[feature.name]
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]
# We need to artificially add event_timestamps and created_timestamps
now = _utc_now()
event_timestamps = [
_coerce_datetime(pd.Timestamp(now)) for _ in range(table.num_rows)
]
# setting them equivalent
created_timestamps = event_timestamps
return list(zip(entity_keys, features, event_timestamps, created_timestamps))
def _validate_entity_values(join_key_values: Dict[str, List[ValueProto]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
if len(set_of_row_lengths) > 1:
raise ValueError("All entity rows must have the same columns.")
return set_of_row_lengths.pop()
def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = False):
"""
Validates that there are no collisions among the feature references.
Args:
feature_refs: List of feature references to validate. Feature references must have format
"feature_view:feature", e.g. "customer_fv:daily_transactions".
full_feature_names: If True, the full feature references are compared for collisions; if False,
only the feature names are compared.
Raises:
FeatureNameCollisionError: There is a collision among the feature references.
"""
collided_feature_refs = []
if full_feature_names:
collided_feature_refs = [
ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1
]
else:
feature_names = [ref.split(":")[1] for ref in feature_refs]
collided_feature_names = [
ref
for ref, occurrences in Counter(feature_names).items()
if occurrences > 1
]
for feature_name in collided_feature_names:
collided_feature_refs.extend(
[ref for ref in feature_refs if ref.endswith(":" + feature_name)]
)
if len(collided_feature_refs) > 0:
raise FeatureNameCollisionError(collided_feature_refs, full_feature_names)
def _group_feature_refs(
features: List[str],
all_feature_views: List["FeatureView"],
all_on_demand_feature_views: List["OnDemandFeatureView"],
) -> Tuple[
List[Tuple[Union["FeatureView", "OnDemandFeatureView"], List[str]]],
List[Tuple["OnDemandFeatureView", List[str]]],
]:
"""Get list of feature views and corresponding feature names based on feature references"""
# view name to view proto
view_index: Dict[str, Union["FeatureView", "OnDemandFeatureView"]] = {
view.projection.name_to_use(): view for view in all_feature_views
}
# on demand view to on demand view proto
on_demand_view_index: Dict[str, "OnDemandFeatureView"] = {}
for view in all_on_demand_feature_views:
if view.projection and not view.write_to_online_store:
on_demand_view_index[view.projection.name_to_use()] = view
elif view.projection and view.write_to_online_store:
# we insert the ODFV view to FVs for ones that are written to the online store
view_index[view.projection.name_to_use()] = view
# view name to feature names
views_features = defaultdict(set)
# on demand view name to feature names
on_demand_view_features = defaultdict(set)
for ref in features:
view_name, feat_name = ref.split(":")
if view_name in view_index:
if hasattr(view_index[view_name], "write_to_online_store"):
tmp_feat_name = [
f for f in view_index[view_name].schema if f.name == feat_name
]
if len(tmp_feat_name) > 0:
feat_name = tmp_feat_name[0].name
else:
view_index[view_name].projection.get_feature(
feat_name
) # For validation
views_features[view_name].add(feat_name)
elif view_name in on_demand_view_index:
on_demand_view_index[view_name].projection.get_feature(
feat_name
) # For validation
on_demand_view_features[view_name].add(feat_name)
# Let's also add in any FV Feature dependencies here.
for input_fv_projection in on_demand_view_index[
view_name
].source_feature_view_projections.values():
for input_feat in input_fv_projection.features:
views_features[input_fv_projection.name].add(input_feat.name)
else:
raise FeatureViewNotFoundException(view_name)
fvs_result: List[Tuple[Union["FeatureView", "OnDemandFeatureView"], List[str]]] = []
odfvs_result: List[Tuple["OnDemandFeatureView", List[str]]] = []
for view_name, feature_names in views_features.items():
fvs_result.append((view_index[view_name], list(feature_names)))
for view_name, feature_names in on_demand_view_features.items():
odfvs_result.append((on_demand_view_index[view_name], list(feature_names)))
return fvs_result, odfvs_result
def construct_response_feature_vector(
values_vector: Iterable[Any],
statuses_vector: Iterable[Any],
timestamp_vector: Iterable[Any],
mapping_indexes: Iterable[List[int]],
output_len: int,
) -> GetOnlineFeaturesResponse.FeatureVector:
values_output: Iterable[Any] = [None] * output_len
statuses_output: Iterable[Any] = [None] * output_len
timestamp_output: Iterable[Any] = [None] * output_len
for i, destinations in enumerate(mapping_indexes):
for idx in destinations:
values_output[idx] = values_vector[i] # type: ignore[index]
statuses_output[idx] = statuses_vector[i] # type: ignore[index]
timestamp_output[idx] = timestamp_vector[i] # type: ignore[index]
return GetOnlineFeaturesResponse.FeatureVector(
values=values_output,
statuses=statuses_output,
event_timestamps=timestamp_output,
)
def _apply_aggregations_to_response(
response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
aggregations,
group_keys: Optional[List[str]],
mode: str,
) -> Union[pyarrow.Table, Dict[str, List[Any]]]:
"""
Apply aggregations using PandasBackend.
Args:
response_data: Either a pyarrow.Table or dict of lists containing the data
aggregations: List of Aggregation objects to apply
group_keys: List of column names to group by (optional)
mode: Transformation mode ("python", "pandas", or "substrait")
Returns:
Aggregated data in the same format as input
TODO: Consider refactoring to support backends other than pandas in the future.
"""
if not aggregations:
return response_data
backend = PandasBackend()
# Convert to pandas DataFrame
if isinstance(response_data, dict):
df = pd.DataFrame(response_data)
else: # pyarrow.Table
df = backend.from_arrow(response_data)
if df.empty:
return response_data
# Convert aggregations to agg_ops format
agg_ops = aggregation_specs_to_agg_ops(
aggregations,
time_window_unsupported_error_message=(
"Time window aggregation is not supported in online serving."
),
)
# Apply aggregations using PandasBackend
if group_keys:
result_df = backend.groupby_agg(df, group_keys, agg_ops)
else:
# No grouping - aggregate over entire dataset
result_df = backend.groupby_agg(df, [], agg_ops)
# Convert back to original format
if mode == "python":
return {col: result_df[col].tolist() for col in result_df.columns}
else: # pandas or substrait
return backend.to_arrow(result_df)
def _augment_response_with_on_demand_transforms(
online_features_response: GetOnlineFeaturesResponse,
feature_refs: List[str],
requested_on_demand_feature_views: List["OnDemandFeatureView"],
full_feature_names: bool,
):
"""Computes on demand feature values and adds them to the result rows.
Assumes that 'online_features_response' already contains the necessary request data and input feature
views for the on demand feature views. Unneeded feature values such as request data and
unrequested input feature views will be removed from 'online_features_response'.
Args:
online_features_response: Protobuf object to populate
feature_refs: List of all feature references to be returned.
requested_on_demand_feature_views: List of all odfvs that have been requested.
full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
"customer_fv__daily_transactions").
"""
from feast.online_response import OnlineResponse
requested_odfv_map = {odfv.name: odfv for odfv in requested_on_demand_feature_views}
requested_odfv_feature_names = requested_odfv_map.keys()
odfv_feature_refs = defaultdict(list)
for feature_ref in feature_refs:
view_name, feature_name = feature_ref.split(":")
if view_name in requested_odfv_feature_names:
odfv_feature_refs[view_name].append(
f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
)
initial_response = OnlineResponse(online_features_response)
initial_response_arrow: Optional[pyarrow.Table] = None
initial_response_dict: Optional[Dict[str, List[Any]]] = None
# Apply on demand transformations and augment the result rows
odfv_result_names = set()
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = requested_odfv_map[odfv_name]
if not odfv.write_to_online_store:
# Apply aggregations if configured.
if odfv.aggregations:
if odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
initial_response_dict = _apply_aggregations_to_response(
initial_response_dict,
odfv.aggregations,
odfv.entities,
odfv.mode,
)
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_arrow is None:
initial_response_arrow = initial_response.to_arrow()
initial_response_arrow = _apply_aggregations_to_response(
initial_response_arrow,
odfv.aggregations,
odfv.entities,
odfv.mode,
)
# Apply transformation. Note: aggregations and transformation configs are mutually exclusive
# TODO: Fix to make it work for having both aggregation and transformation
# ticket: https://github.com/feast-dev/feast/issues/5689
elif odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict(
initial_response_dict
)
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_arrow is None:
initial_response_arrow = initial_response.to_arrow()
transformed_features_arrow = odfv.transform_arrow(
initial_response_arrow, full_feature_names
)
else:
raise Exception(
f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'."
)
transformed_features = (
transformed_features_dict
if odfv.mode == "python"
else transformed_features_arrow
)
transformed_columns = (
transformed_features.column_names
if isinstance(transformed_features, pyarrow.Table)
else transformed_features
)
selected_subset = [f for f in transformed_columns if f in _feature_refs]
proto_values = []
schema_dict = {k.name: k.dtype for k in odfv.schema}
for selected_feature in selected_subset:
feature_vector = transformed_features[selected_feature]
selected_feature_type = schema_dict.get(selected_feature, None)
feature_type: ValueType = ValueType.UNKNOWN
if selected_feature_type is not None:
if isinstance(
selected_feature_type, (ComplexFeastType, PrimitiveFeastType)
):
feature_type = selected_feature_type.to_value_type()
elif not isinstance(selected_feature_type, ValueType):
raise TypeError(
f"Unexpected type for feature_type: {type(feature_type)}"
)
proto_values.append(
python_values_to_proto_values(
feature_vector
if isinstance(feature_vector, list)
else [feature_vector]
if odfv.mode == "python"
else feature_vector.to_numpy(),
feature_type,
)
)
odfv_result_names |= set(selected_subset)
online_features_response.metadata.feature_names.val.extend(selected_subset)
for feature_idx in range(len(selected_subset)):
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=proto_values[feature_idx],
statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]),
event_timestamps=[Timestamp()] * len(proto_values[feature_idx]),
)
)
def _get_entity_maps(
registry,
project,
feature_views,
) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]:
# TODO(felixwang9817): Support entities that have different types for different feature views.
entities = registry.list_entities(project, allow_cache=True)
entity_by_name: Dict[str, "Entity"] = {entity.name: entity for entity in entities}
entity_name_to_join_key_map: Dict[str, str] = {}
entity_type_map: Dict[str, ValueType] = {}
for entity in entities:
entity_name_to_join_key_map[entity.name] = entity.join_key
for feature_view in feature_views:
for entity_name in feature_view.entities:
entity = entity_by_name.get(entity_name)
if entity is None:
from feast.errors import EntityNotFoundException
raise EntityNotFoundException(entity_name, project=project)
# User directly uses join_key as the entity reference in the entity_rows for the
# entity mapping case.
entity_name = feature_view.projection.join_key_map.get(
entity.join_key, entity.name
)
join_key = feature_view.projection.join_key_map.get(
entity.join_key, entity.join_key
)
entity_name_to_join_key_map[entity_name] = join_key
for entity_column in feature_view.entity_columns:
dtype = entity_column.dtype.to_value_type()
entity_join_key_column_name = feature_view.projection.join_key_map.get(
entity_column.name, entity_column.name
)
entity_type_map[entity_join_key_column_name] = dtype
return (
entity_name_to_join_key_map,
entity_type_map,
set(entity_name_to_join_key_map.values()),
)
def _get_table_entity_values(
table: "FeatureView",
entity_name_to_join_key_map: Dict[str, str],
join_key_proto_values: Dict[str, List[ValueProto]],
) -> Dict[str, List[ValueProto]]:
# The correct join_keys expected by the OnlineStore for this Feature View.
table_join_keys = [
entity_name_to_join_key_map[entity_name] for entity_name in table.entities
]
# If the FeatureView has a Projection then the join keys may be aliased.
alias_to_join_key_map = {v: k for k, v in table.projection.join_key_map.items()}
# Subset to columns which are relevant to this FeatureView and
# give them the correct names.
entity_values = {
alias_to_join_key_map.get(k, k): v
for k, v in join_key_proto_values.items()
if alias_to_join_key_map.get(k, k) in table_join_keys
}
return entity_values
def _get_unique_entities(
table: "FeatureView",
join_key_values: Dict[str, List[ValueProto]],
entity_name_to_join_key_map: Dict[str, str],
) -> Tuple[Tuple[Dict[str, ValueProto], ...], Tuple[List[int], ...], int]:
"""Return the set of unique composite Entities for a Feature View and the indexes at which they appear.
This method allows us to query the OnlineStore for data we need only once
rather than requesting and processing data for the same combination of
Entities multiple times.
"""
# Get the correct set of entity values with the correct join keys.
table_entity_values = _get_table_entity_values(
table,
entity_name_to_join_key_map,
join_key_values,
)
# Validate that all expected join keys exist and have non-empty values.
expected_keys = set(entity_name_to_join_key_map.values())
expected_keys.discard("__dummy_id")
missing_keys = sorted(
list(set([key for key in expected_keys if key not in table_entity_values]))
)
empty_keys = sorted(
list(set([key for key in expected_keys if not table_entity_values.get(key)]))
)
if missing_keys or empty_keys:
if not any(table_entity_values.values()):
raise KeyError(
f"Missing join key values for keys: {missing_keys}. "
f"No values provided for keys: {empty_keys}. "
f"Provided join_key_values: {list(join_key_values.keys())}"
)
# Convert the column-oriented table_entity_values into row-wise data.
keys = list(table_entity_values.keys())
# Each row is a tuple of ValueProto objects corresponding to the join keys.
rowise = list(enumerate(zip(*table_entity_values.values())))
# If there are no rows, return empty tuples.
if not rowise:
return (), (), 0
# Sort rowise so that rows with the same join key values are adjacent.
rowise.sort(key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1]))
# Group rows by their composite join key value.
groups = [
(dict(zip(keys, key_tuple)), [idx for idx, _ in group])
for key_tuple, group in itertools.groupby(rowise, key=lambda row: row[1])
]
# If no groups were formed (should not happen for valid input), return empty tuples.
if not groups:
return (), (), 0
# Unpack the unique entities and their original row indexes.
unique_entities, indexes = tuple(zip(*groups))
return unique_entities, indexes, len(rowise)
def _get_unique_entities_from_values(
table_entity_values: Dict[str, List[ValueProto]],
) -> Tuple[Tuple[Dict[str, ValueProto], ...], Tuple[List[int], ...], int]:
"""Return the set of unique composite Entities for a Feature View and the indexes at which they appear.
This method allows us to query the OnlineStore for data we need only once
rather than requesting and processing data for the same combination of
Entities multiple times.
"""
keys = table_entity_values.keys()
# Sort the rowise data to allow for grouping but keep original index. This lambda is
# sufficient as Entity types cannot be complex (ie. lists).
rowise = list(enumerate(zip(*table_entity_values.values())))
rowise.sort(key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1]))
# Identify unique entities and the indexes at which they occur.
unique_entities: Tuple[Dict[str, ValueProto], ...]
indexes: Tuple[List[int], ...]
unique_entities, indexes = tuple(
zip(
*[
(dict(zip(keys, k)), [_[0] for _ in g])
for k, g in itertools.groupby(rowise, key=lambda x: x[1])
]
)
)
return unique_entities, indexes, len(rowise)
def _drop_unneeded_columns(
online_features_response: GetOnlineFeaturesResponse,
requested_result_row_names: Set[str],
):
"""
Unneeded feature values such as request data and unrequested input feature views will
be removed from 'online_features_response'.
Args:
online_features_response: Protobuf object to populate
requested_result_row_names: Fields from 'result_rows' that have been requested, and
therefore should not be dropped.
"""
# Drop values that aren't needed
unneeded_feature_indices = [
idx
for idx, val in enumerate(online_features_response.metadata.feature_names.val)
if val not in requested_result_row_names
]
for idx in reversed(unneeded_feature_indices):
del online_features_response.metadata.feature_names.val[idx]
del online_features_response.results[idx]
def _populate_result_rows_from_columnar(
online_features_response: GetOnlineFeaturesResponse,
data: Dict[str, List[ValueProto]],
):
timestamp = Timestamp() # Only initialize this timestamp once.
# Add more values to the existing result rows
for feature_name, feature_values in data.items():
online_features_response.metadata.feature_names.val.append(feature_name)
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=feature_values,
statuses=[FieldStatus.PRESENT] * len(feature_values),
event_timestamps=[timestamp] * len(feature_values),
)
)
def get_needed_request_data(
grouped_odfv_refs: List[Tuple["OnDemandFeatureView", List[str]]],
) -> Set[str]: