diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 29897aef430..15e614a5a39 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -267,7 +267,7 @@ def evaluate_historical_retrieval(): ) entity_df_with_features = _drop_columns( - df_to_join, timestamp_field, created_timestamp_column + df_to_join, features, timestamp_field, created_timestamp_column ) # Ensure that we delete dataframes to free up memory @@ -599,6 +599,11 @@ def _normalize_timestamp( created_timestamp_column_type = df_to_join_types[created_timestamp_column] if not hasattr(timestamp_field_type, "tz") or timestamp_field_type.tz != pytz.UTC: + # if you are querying for the event timestamp field, we have to deduplicate + if len(df_to_join[timestamp_field].shape) > 1: + df_to_join, dups = _df_column_uniquify(df_to_join) + df_to_join = df_to_join.drop(columns=dups) + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC df_to_join[timestamp_field] = df_to_join[timestamp_field].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), @@ -609,6 +614,11 @@ def _normalize_timestamp( not hasattr(created_timestamp_column_type, "tz") or created_timestamp_column_type.tz != pytz.UTC ): + if len(df_to_join[created_timestamp_column].shape) > 1: + # if you are querying for the created timestamp field, we have to deduplicate + df_to_join, dups = _df_column_uniquify(df_to_join) + df_to_join = df_to_join.drop(columns=dups) + df_to_join[created_timestamp_column] = df_to_join[ created_timestamp_column ].apply( @@ -701,14 +711,36 @@ def _drop_duplicates( def _drop_columns( df_to_join: dd.DataFrame, + features: List[str], timestamp_field: str, created_timestamp_column: str, ) -> dd.DataFrame: - entity_df_with_features = df_to_join.drop([timestamp_field], axis=1).persist() - - if created_timestamp_column: - entity_df_with_features = entity_df_with_features.drop( - [created_timestamp_column], axis=1 - ).persist() + entity_df_with_features = df_to_join + timestamp_columns = [ + timestamp_field, + created_timestamp_column, + ] + for column in timestamp_columns: + if column and column not in features: + entity_df_with_features = entity_df_with_features.drop( + [column], axis=1 + ).persist() return entity_df_with_features + + +def _df_column_uniquify(df: dd.DataFrame) -> Tuple[dd.DataFrame, List[str]]: + df_columns = df.columns + new_columns = [] + duplicate_cols = [] + for item in df_columns: + counter = 0 + newitem = item + while newitem in new_columns: + counter += 1 + newitem = "{}_{}".format(item, counter) + if counter > 0: + duplicate_cols.append(newitem) + new_columns.append(newitem) + df.columns = new_columns + return df, duplicate_cols diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 48da7c9acfd..a91a6c141d4 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -402,7 +402,12 @@ def _python_value_to_proto_value( valid_scalar_types, ) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type] if valid_scalar_types: - assert type(sample) in valid_scalar_types + if sample == 0 or sample == 0.0: + # Numpy convert 0 to int. However, in the feature view definition, the type of column may be a float. + # So, if value is 0, type validation must pass if scalar_types are either int or float. + assert type(sample) in [np.int64, int, np.float64, float] + else: + assert type(sample) in valid_scalar_types if feast_value_type == ValueType.BOOL: # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. return [